tggtg commited on
Commit
b5b9c45
·
verified ·
1 Parent(s): 2723ab7

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +201 -0
app.py ADDED
@@ -0,0 +1,201 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app.py
2
+ # AI Video Enhancer 4K - Gradio app for Hugging Face Spaces
3
+
4
+ import os
5
+ import shutil
6
+ import subprocess
7
+ import tempfile
8
+ import time
9
+ from pathlib import Path
10
+ from typing import Tuple
11
+
12
+ import gradio as gr
13
+ from PIL import Image
14
+
15
+ # Optional: advanced AI enhancers (Real-ESRGAN, GFPGAN)
16
+ try:
17
+ import torch
18
+ from gfpgan import GFPGANer # type: ignore
19
+ from realesrgan import RealESRGAN # type: ignore
20
+ HAVE_ENHANCERS = True
21
+ except Exception:
22
+ HAVE_ENHANCERS = False
23
+
24
+ # Config
25
+ MAX_SECONDS = 30
26
+ TEMP_DIR = Path(tempfile.gettempdir()) / "hf_video_enhancer"
27
+ TEMP_DIR.mkdir(parents=True, exist_ok=True)
28
+
29
+
30
+ def run_cmd(cmd):
31
+ p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
32
+ if p.returncode != 0:
33
+ raise RuntimeError(f"Command failed: {cmd}\n{p.stderr.decode()}")
34
+ return p.stdout.decode()
35
+
36
+
37
+ def probe_video(video_path: str) -> Tuple[float, int, int]:
38
+ cmd = [
39
+ "ffprobe", "-v", "error",
40
+ "-select_streams", "v:0",
41
+ "-show_entries", "stream=width,height,duration",
42
+ "-of", "default=noprint_wrappers=1:nokey=0",
43
+ video_path
44
+ ]
45
+ p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
46
+ out = p.stdout.decode()
47
+ width = height = 0
48
+ duration = 0.0
49
+ for line in out.splitlines():
50
+ if line.startswith("width="):
51
+ width = int(line.split("=")[1])
52
+ if line.startswith("height="):
53
+ height = int(line.split("=")[1])
54
+ if line.startswith("duration="):
55
+ try:
56
+ duration = float(line.split("=")[1])
57
+ except:
58
+ duration = 0.0
59
+ return duration, width, height
60
+
61
+
62
+ def extract_frames(video_path: str, frames_dir: Path):
63
+ frames_dir.mkdir(parents=True, exist_ok=True)
64
+ cmd = [
65
+ "ffmpeg", "-y", "-i", video_path,
66
+ "-vsync", "0",
67
+ str(frames_dir / "%06d.png")
68
+ ]
69
+ run_cmd(cmd)
70
+
71
+
72
+ def reassemble_video(frames_dir: Path, audio_src: str, out_path: str):
73
+ tmp_video = str(frames_dir.parent / "tmp_video.mp4")
74
+ cmd_encode = [
75
+ "ffmpeg", "-y", "-framerate", "30",
76
+ "-i", str(frames_dir / "%06d.png"),
77
+ "-c:v", "libx264", "-preset", "veryfast", "-pix_fmt", "yuv420p",
78
+ tmp_video
79
+ ]
80
+ run_cmd(cmd_encode)
81
+
82
+ # Check if source has audio
83
+ p = subprocess.run(
84
+ ["ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1", audio_src],
85
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE
86
+ )
87
+ has_audio = bool(p.stdout.decode().strip())
88
+
89
+ if has_audio:
90
+ cmd_mux = [
91
+ "ffmpeg", "-y",
92
+ "-i", tmp_video,
93
+ "-i", audio_src,
94
+ "-c:v", "copy", "-c:a", "aac",
95
+ "-map", "0:v:0", "-map", "1:a:0",
96
+ out_path
97
+ ]
98
+ run_cmd(cmd_mux)
99
+ os.remove(tmp_video)
100
+ else:
101
+ shutil.move(tmp_video, out_path)
102
+
103
+
104
+ def simple_upscale_with_ffmpeg(frames_dir: Path, scale_factor: int = 2):
105
+ for p in sorted(frames_dir.glob("*.png")):
106
+ tmp = str(p) + ".tmp.png"
107
+ cmd = [
108
+ "ffmpeg", "-y", "-i", str(p),
109
+ "-vf", f"scale=iw*{scale_factor}:ih*{scale_factor}:flags=lanczos",
110
+ tmp
111
+ ]
112
+ run_cmd(cmd)
113
+ os.replace(tmp, p)
114
+
115
+
116
+ def enhance_frames(frames_dir: Path):
117
+ # fallback to simple upscale for now (Real-ESRGAN requires GPU + weights)
118
+ simple_upscale_with_ffmpeg(frames_dir, scale_factor=2)
119
+
120
+
121
+ def process_video(video_file) -> Tuple[str, str]:
122
+ """
123
+ Accepts uploaded video path from Gradio,
124
+ processes it and returns (message, path_to_result_video)
125
+ """
126
+ ts = int(time.time() * 1000)
127
+ base_dir = TEMP_DIR / f"job_{ts}"
128
+ base_dir.mkdir(parents=True, exist_ok=True)
129
+ in_path = base_dir / "input_video"
130
+
131
+ try:
132
+ shutil.copy(video_file, in_path) # ✅ FIXED: treat input as path, not .read()
133
+ except Exception as e:
134
+ return f"Error saving uploaded file: {e}", ""
135
+
136
+ try:
137
+ duration, w, h = probe_video(str(in_path))
138
+ except Exception as e:
139
+ shutil.rmtree(base_dir, ignore_errors=True)
140
+ return f"Error probing video: {e}", ""
141
+
142
+ if duration > MAX_SECONDS:
143
+ shutil.rmtree(base_dir, ignore_errors=True)
144
+ return f"Video too long: {duration:.1f}s (limit {MAX_SECONDS}s).", ""
145
+
146
+ frames_dir = base_dir / "frames"
147
+ try:
148
+ extract_frames(str(in_path), frames_dir)
149
+ except Exception as e:
150
+ shutil.rmtree(base_dir, ignore_errors=True)
151
+ return f"Failed extracting frames: {e}", ""
152
+
153
+ try:
154
+ enhance_frames(frames_dir)
155
+ except Exception as e:
156
+ print(f"Enhancement failed: {e}")
157
+
158
+ out_video = base_dir / "enhanced_output.mp4"
159
+ try:
160
+ reassemble_video(frames_dir, str(in_path), str(out_video))
161
+ except Exception as e:
162
+ shutil.rmtree(base_dir, ignore_errors=True)
163
+ return f"Failed to reassemble video: {e}", ""
164
+
165
+ try:
166
+ shutil.rmtree(frames_dir)
167
+ except Exception:
168
+ pass
169
+
170
+ return "Processing complete. Download below.", str(out_video)
171
+
172
+
173
+ # Gradio UI
174
+ with gr.Blocks(title="AI Video Enhancer 4K") as demo:
175
+ gr.Markdown("# AI Video Enhancer 4K")
176
+ gr.Markdown("Upload a short video (<= 30s). It will be upscaled using AI/ffmpeg.")
177
+
178
+ with gr.Row():
179
+ with gr.Column(scale=2):
180
+ video_in = gr.File(label="Upload video (mp4/avi/mov)", file_types=[".mp4", ".avi", ".mov"])
181
+ btn = gr.Button("Enhance Video")
182
+ status = gr.Textbox(label="Status", interactive=False)
183
+ with gr.Column(scale=1):
184
+ out_video = gr.Video(label="Enhanced video")
185
+
186
+ def on_click_process(file_obj):
187
+ if not file_obj:
188
+ return "Please upload a video file.", None
189
+ try:
190
+ msg, path = process_video(file_obj)
191
+ if path:
192
+ return msg, path
193
+ else:
194
+ return msg, None
195
+ except Exception as e:
196
+ return f"Unexpected error: {e}", None
197
+
198
+ btn.click(fn=on_click_process, inputs=[video_in], outputs=[status, out_video])
199
+
200
+ if __name__ == "__main__":
201
+ demo.launch()