|
|
import os |
|
|
import sys |
|
|
import time |
|
|
import traceback |
|
|
import inspect |
|
|
import logging |
|
|
import shutil |
|
|
import subprocess |
|
|
import numpy as np |
|
|
import soundfile as sf |
|
|
import librosa |
|
|
import gradio as gr |
|
|
import scipy.signal as signal |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(">>> System Startup: RVC Pro Max...") |
|
|
|
|
|
try: |
|
|
import imageio_ffmpeg |
|
|
import static_ffmpeg |
|
|
from rvc_python.infer import RVCInference |
|
|
print("Libraries loaded successfully.") |
|
|
except ImportError as e: |
|
|
print(f"Import Error: {e}") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
try: |
|
|
static_ffmpeg.add_paths() |
|
|
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() |
|
|
os.environ["PATH"] += os.pathsep + os.path.dirname(ffmpeg_exe) |
|
|
except Exception as e: |
|
|
print(f"FFmpeg Warning: {e}") |
|
|
|
|
|
TEMP_DIR = "/tmp/rvc_temp" |
|
|
os.makedirs(TEMP_DIR, exist_ok=True) |
|
|
os.environ["TEMP"] = TEMP_DIR |
|
|
os.environ["TMPDIR"] = TEMP_DIR |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def log_message(message): |
|
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
|
return f"[{timestamp}] {message}" |
|
|
|
|
|
def apply_clarity_eq(y, sr): |
|
|
|
|
|
try: |
|
|
|
|
|
sos_hp = signal.butter(4, 60, 'hp', fs=sr, output='sos') |
|
|
y = signal.sosfilt(sos_hp, y) |
|
|
|
|
|
|
|
|
sos_mid = signal.butter(2, [800, 1200], 'bandstop', fs=sr, output='sos') |
|
|
y_filtered = signal.sosfilt(sos_mid, y) |
|
|
|
|
|
y = (y * 0.7) + (y_filtered * 0.3) |
|
|
|
|
|
|
|
|
sos_high = signal.butter(2, 5000, 'hp', fs=sr, output='sos') |
|
|
y_high = signal.sosfilt(sos_high, y) |
|
|
y = y + (y_high * 0.15) |
|
|
|
|
|
return y |
|
|
except Exception as e: |
|
|
print(f"EQ Error: {e}") |
|
|
return y |
|
|
|
|
|
def preprocess_audio(input_path): |
|
|
try: |
|
|
y, sr = librosa.load(input_path, sr=None) |
|
|
if y.ndim > 1: |
|
|
y = librosa.to_mono(y) |
|
|
y = librosa.util.normalize(y) |
|
|
processed_path = os.path.join(TEMP_DIR, "preprocessed.wav") |
|
|
sf.write(processed_path, y, sr) |
|
|
return processed_path, f"Pre-process OK (SR: {sr}Hz)" |
|
|
except Exception as e: |
|
|
return input_path, f"Pre-process Error: {e}" |
|
|
|
|
|
def post_process_audio(input_path, clarity_boost=True): |
|
|
try: |
|
|
y, sr = librosa.load(input_path, sr=None) |
|
|
if clarity_boost: |
|
|
y = apply_clarity_eq(y, sr) |
|
|
y = librosa.util.normalize(y) * 0.95 |
|
|
output_path = input_path.replace(".wav", "_final.wav") |
|
|
sf.write(output_path, y, sr) |
|
|
return output_path |
|
|
except Exception: |
|
|
return input_path |
|
|
|
|
|
def cleanup_temp(): |
|
|
try: |
|
|
for f in os.listdir(TEMP_DIR): |
|
|
os.remove(os.path.join(TEMP_DIR, f)) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rvc_process_pipeline( |
|
|
audio_path, model_file, index_file, |
|
|
pitch_change, f0_method, index_rate, |
|
|
protect_val, filter_radius, resample_sr, |
|
|
envelope_mix, hop_length, |
|
|
enable_clarity |
|
|
): |
|
|
logs = [] |
|
|
logs.append(log_message("Starting conversion...")) |
|
|
|
|
|
if not audio_path: |
|
|
return None, "Error: No audio file." |
|
|
if not model_file: |
|
|
return None, "Error: No model file." |
|
|
|
|
|
try: |
|
|
cleanup_temp() |
|
|
model_path = model_file.name |
|
|
index_path = index_file.name if index_file else None |
|
|
|
|
|
|
|
|
clean_audio, msg = preprocess_audio(audio_path) |
|
|
logs.append(log_message(msg)) |
|
|
|
|
|
|
|
|
logs.append(log_message(f"Model: {os.path.basename(model_path)}")) |
|
|
rvc = RVCInference(device="cpu") |
|
|
rvc.load_model(model_path) |
|
|
|
|
|
output_temp = os.path.join(TEMP_DIR, f"rvc_out_{int(time.time())}.wav") |
|
|
|
|
|
|
|
|
kwargs = { |
|
|
"input_path": clean_audio, |
|
|
"output_path": output_temp, |
|
|
"pitch": int(pitch_change), |
|
|
"method": f0_method, |
|
|
"index_path": index_path, |
|
|
"index_rate": float(index_rate), |
|
|
"protect": float(protect_val), |
|
|
"filter_radius": int(filter_radius), |
|
|
"resample_sr": int(resample_sr), |
|
|
"rms_mix_rate": float(envelope_mix), |
|
|
"hop_length": int(hop_length) |
|
|
} |
|
|
|
|
|
|
|
|
sig = inspect.signature(rvc.infer_file) |
|
|
valid_keys = sig.parameters.keys() |
|
|
|
|
|
final_kwargs = {} |
|
|
for k, v in kwargs.items(): |
|
|
if k in valid_keys: |
|
|
final_kwargs[k] = v |
|
|
elif k == "pitch" and "f0_up_key" in valid_keys: |
|
|
final_kwargs["f0_up_key"] = v |
|
|
elif k == "method" and "f0_method" in valid_keys: |
|
|
final_kwargs["f0_method"] = v |
|
|
|
|
|
logs.append(log_message(f"Method: {f0_method}")) |
|
|
|
|
|
start_time = time.time() |
|
|
rvc.infer_file(**final_kwargs) |
|
|
|
|
|
|
|
|
final_output = output_temp |
|
|
if enable_clarity and os.path.exists(output_temp): |
|
|
logs.append(log_message("Applying clarity filter...")) |
|
|
final_output = post_process_audio(output_temp, clarity_boost=True) |
|
|
|
|
|
duration = time.time() - start_time |
|
|
logs.append(log_message(f"Done! ({duration:.2f}s)")) |
|
|
|
|
|
|
|
|
separator = chr(10) |
|
|
log_text = separator.join(logs) |
|
|
|
|
|
return final_output, log_text |
|
|
|
|
|
except Exception as e: |
|
|
separator = chr(10) |
|
|
err_msg = f"Error: {traceback.format_exc()}" |
|
|
print(err_msg) |
|
|
return None, err_msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
custom_css = """ |
|
|
#run_btn {background: linear-gradient(90deg, #FF5722 0%, #FF8A65 100%); color: white; border: none;} |
|
|
""" |
|
|
|
|
|
with gr.Blocks(title="RVC Pro Persian", theme=gr.themes.Soft(), css=custom_css) as demo: |
|
|
gr.Markdown("## RVC Pro: Professional Voice Converter") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
audio_input = gr.Audio(label="Input Audio", type="filepath") |
|
|
with gr.Row(): |
|
|
model_input = gr.File(label="Model (.pth)", file_types=[".pth"]) |
|
|
index_input = gr.File(label="Index (.index)", file_types=[".index"]) |
|
|
|
|
|
algo_dropdown = gr.Dropdown( |
|
|
choices=["rmvpe", "fcpe", "crepe", "harvest", "pm"], |
|
|
value="rmvpe", |
|
|
label="Algorithm" |
|
|
) |
|
|
pitch_slider = gr.Slider(-24, 24, value=0, step=1, label="Pitch Change") |
|
|
btn_run = gr.Button("Start Conversion", elem_id="run_btn", variant="primary") |
|
|
|
|
|
with gr.Column(): |
|
|
with gr.Accordion("Quality Settings", open=True): |
|
|
enable_clarity = gr.Checkbox(value=True, label="Fix Nasal Sound (Clarity)") |
|
|
index_rate = gr.Slider(0, 1, value=0.4, step=0.05, label="Index Rate") |
|
|
envelope_mix = gr.Slider(0, 1, value=0.25, step=0.05, label="Volume Mix") |
|
|
protect_val = gr.Slider(0, 0.5, value=0.33, step=0.01, label="Protect") |
|
|
filter_radius = gr.Slider(0, 7, value=3, step=1, label="Filter Radius") |
|
|
resample_sr = gr.Slider(0, 48000, value=0, step=1000, label="Resample SR") |
|
|
hop_len = gr.Slider(1, 512, value=128, step=1, label="Hop Length") |
|
|
|
|
|
output_audio = gr.Audio(label="Final Output", type="filepath") |
|
|
logs = gr.Textbox(label="Logs", lines=5) |
|
|
|
|
|
btn_run.click( |
|
|
rvc_process_pipeline, |
|
|
inputs=[ |
|
|
audio_input, model_input, index_input, |
|
|
pitch_slider, algo_dropdown, index_rate, |
|
|
protect_val, filter_radius, resample_sr, |
|
|
envelope_mix, hop_len, enable_clarity |
|
|
], |
|
|
outputs=[output_audio, logs] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue().launch(server_name="0.0.0.0", server_port=7860) |