Spaces:
Running
Running
| # app.py | |
| # AI Video Enhancer 4K - Gradio app for Hugging Face Spaces | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| from typing import Tuple | |
| import gradio as gr | |
| from PIL import Image | |
| # Optional: advanced AI enhancers (Real-ESRGAN, GFPGAN) | |
| try: | |
| import torch | |
| from gfpgan import GFPGANer # type: ignore | |
| from realesrgan import RealESRGAN # type: ignore | |
| HAVE_ENHANCERS = True | |
| except Exception: | |
| HAVE_ENHANCERS = False | |
| # Config | |
| MAX_SECONDS = 30 | |
| TEMP_DIR = Path(tempfile.gettempdir()) / "hf_video_enhancer" | |
| TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
| def run_cmd(cmd): | |
| p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if p.returncode != 0: | |
| raise RuntimeError(f"Command failed: {cmd}\n{p.stderr.decode()}") | |
| return p.stdout.decode() | |
| def probe_video(video_path: str) -> Tuple[float, int, int]: | |
| cmd = [ | |
| "ffprobe", "-v", "error", | |
| "-select_streams", "v:0", | |
| "-show_entries", "stream=width,height,duration", | |
| "-of", "default=noprint_wrappers=1:nokey=0", | |
| video_path | |
| ] | |
| p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| out = p.stdout.decode() | |
| width = height = 0 | |
| duration = 0.0 | |
| for line in out.splitlines(): | |
| if line.startswith("width="): | |
| width = int(line.split("=")[1]) | |
| if line.startswith("height="): | |
| height = int(line.split("=")[1]) | |
| if line.startswith("duration="): | |
| try: | |
| duration = float(line.split("=")[1]) | |
| except: | |
| duration = 0.0 | |
| return duration, width, height | |
| def extract_frames(video_path: str, frames_dir: Path): | |
| frames_dir.mkdir(parents=True, exist_ok=True) | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", video_path, | |
| "-vsync", "0", | |
| str(frames_dir / "%06d.png") | |
| ] | |
| run_cmd(cmd) | |
| def reassemble_video(frames_dir: Path, audio_src: str, out_path: str): | |
| tmp_video = str(frames_dir.parent / "tmp_video.mp4") | |
| cmd_encode = [ | |
| "ffmpeg", "-y", "-framerate", "30", | |
| "-i", str(frames_dir / "%06d.png"), | |
| "-c:v", "libx264", "-preset", "veryfast", "-pix_fmt", "yuv420p", | |
| tmp_video | |
| ] | |
| run_cmd(cmd_encode) | |
| # Check if source has audio | |
| p = subprocess.run( | |
| ["ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1", audio_src], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| has_audio = bool(p.stdout.decode().strip()) | |
| if has_audio: | |
| cmd_mux = [ | |
| "ffmpeg", "-y", | |
| "-i", tmp_video, | |
| "-i", audio_src, | |
| "-c:v", "copy", "-c:a", "aac", | |
| "-map", "0:v:0", "-map", "1:a:0", | |
| out_path | |
| ] | |
| run_cmd(cmd_mux) | |
| os.remove(tmp_video) | |
| else: | |
| shutil.move(tmp_video, out_path) | |
| def simple_upscale_with_ffmpeg(frames_dir: Path, scale_factor: int = 2): | |
| for p in sorted(frames_dir.glob("*.png")): | |
| tmp = str(p) + ".tmp.png" | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", str(p), | |
| "-vf", f"scale=iw*{scale_factor}:ih*{scale_factor}:flags=lanczos", | |
| tmp | |
| ] | |
| run_cmd(cmd) | |
| os.replace(tmp, p) | |
| def enhance_frames(frames_dir: Path): | |
| # fallback to simple upscale for now (Real-ESRGAN requires GPU + weights) | |
| simple_upscale_with_ffmpeg(frames_dir, scale_factor=2) | |
| def process_video(video_file) -> Tuple[str, str]: | |
| """ | |
| Accepts uploaded video path from Gradio, | |
| processes it and returns (message, path_to_result_video) | |
| """ | |
| ts = int(time.time() * 1000) | |
| base_dir = TEMP_DIR / f"job_{ts}" | |
| base_dir.mkdir(parents=True, exist_ok=True) | |
| in_path = base_dir / "input_video" | |
| try: | |
| shutil.copy(video_file, in_path) # β FIXED: treat input as path, not .read() | |
| except Exception as e: | |
| return f"Error saving uploaded file: {e}", "" | |
| try: | |
| duration, w, h = probe_video(str(in_path)) | |
| except Exception as e: | |
| shutil.rmtree(base_dir, ignore_errors=True) | |
| return f"Error probing video: {e}", "" | |
| if duration > MAX_SECONDS: | |
| shutil.rmtree(base_dir, ignore_errors=True) | |
| return f"Video too long: {duration:.1f}s (limit {MAX_SECONDS}s).", "" | |
| frames_dir = base_dir / "frames" | |
| try: | |
| extract_frames(str(in_path), frames_dir) | |
| except Exception as e: | |
| shutil.rmtree(base_dir, ignore_errors=True) | |
| return f"Failed extracting frames: {e}", "" | |
| try: | |
| enhance_frames(frames_dir) | |
| except Exception as e: | |
| print(f"Enhancement failed: {e}") | |
| out_video = base_dir / "enhanced_output.mp4" | |
| try: | |
| reassemble_video(frames_dir, str(in_path), str(out_video)) | |
| except Exception as e: | |
| shutil.rmtree(base_dir, ignore_errors=True) | |
| return f"Failed to reassemble video: {e}", "" | |
| try: | |
| shutil.rmtree(frames_dir) | |
| except Exception: | |
| pass | |
| return "Processing complete. Download below.", str(out_video) | |
| # Gradio UI | |
| with gr.Blocks(title="AI Video Enhancer 4K") as demo: | |
| gr.Markdown("# AI Video Enhancer 4K") | |
| gr.Markdown("Upload a short video (<= 60s). It will be upscaled using AI/ffmpeg.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| video_in = gr.File(label="Upload video (mp4/avi/mov)", file_types=[".mp4", ".avi", ".mov"]) | |
| btn = gr.Button("Enhance Video") | |
| status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Column(scale=1): | |
| out_video = gr.Video(label="Enhanced video") | |
| def on_click_process(file_obj): | |
| if not file_obj: | |
| return "Please upload a video file.", None | |
| try: | |
| msg, path = process_video(file_obj) | |
| if path: | |
| return msg, path | |
| else: | |
| return msg, None | |
| except Exception as e: | |
| return f"Unexpected error: {e}", None | |
| btn.click(fn=on_click_process, inputs=[video_in], outputs=[status, out_video]) | |
| if __name__ == "__main__": | |
| demo.launch() | |