TransferRapid/CommonVoices20_ro
Viewer • Updated • 41.4k • 67 • 4
More details

from transformers import WhisperProcessor, WhisperForConditionalGeneration
import torchaudio
import torch
model_name = "TransferRapid/whisper-large-v3-turbo_ro"
# Load processor and model
processor = WhisperProcessor.from_pretrained(model_name)
model = WhisperForConditionalGeneration.from_pretrained(model_name)
# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()
def preprocess_audio(audio_path, processor):
"""Preprocess audio: load, resample if needed, and convert to model input format."""
waveform, sample_rate = torchaudio.load(audio_path)
# Resample to 16kHz if needed
if sample_rate != 16000:
resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
waveform = resampler(waveform)
# Process audio into model input format
inputs = processor(waveform.squeeze().numpy(), sampling_rate=16000, return_tensors="pt")
# Move inputs to device
inputs = {key: val.to(device) for key, val in inputs.items()}
return inputs
def transcribe(audio_path, model, processor, language="romanian", task="transcribe"):
"""Generate transcription for an audio file."""
inputs = preprocess_audio(audio_path, processor)
forced_decoder_ids = processor.tokenizer.get_decoder_prompt_ids(language=language, task=task)
with torch.no_grad():
generated_ids = model.generate(inputs["input_features"], forced_decoder_ids=forced_decoder_ids)
transcription = processor.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)
return transcription[0]
# Define audio path
audio_file = "audio.wav"
# Run transcription
transcription = transcribe(audio_file, model, processor)
print("Transcription:", transcription)
import os
import torchaudio
import numpy as np
import librosa
import webrtcvad
import soundfile as sf
from pydub import AudioSegment
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import torch
# Load model from Hugging Face
model_name = "TransferRapid/whisper-large-v3-turbo_ro"
processor = WhisperProcessor.from_pretrained(model_name)
model = WhisperForConditionalGeneration.from_pretrained(model_name)
# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()
def convert_mp3_to_wav(mp3_file_path):
"""Convert MP3 to WAV (16kHz)."""
audio = AudioSegment.from_mp3(mp3_file_path)
wav_16k_file_path = mp3_file_path.replace(".mp3", "_16k.wav")
audio.set_frame_rate(16000).export(wav_16k_file_path, format="wav")
return wav_16k_file_path
def extract_audio_channels(wav_file_path):
"""Extract left and right channels from stereo WAV."""
y, sr = librosa.load(wav_file_path, sr=None, mono=False)
if len(y.shape) == 1:
mono_file = wav_file_path.replace(".wav", "_mono.wav")
sf.write(mono_file, y, sr)
return y, None, sr, mono_file, None
left_channel, right_channel = y[0], y[1]
left_file = wav_file_path.replace(".wav", "_left.wav")
right_file = wav_file_path.replace(".wav", "_right.wav")
sf.write(left_file, left_channel, sr)
sf.write(right_file, right_channel, sr)
return left_channel, right_channel, sr, left_file, right_file
def detect_speech_intervals(channel_data, sr, vad_level=3):
"""Detect speech activity using VAD (30ms frames)."""
vad = webrtcvad.Vad(vad_level)
frame_duration = 30
frame_length = int(sr * frame_duration / 1000)
frames = librosa.util.frame(channel_data, frame_length=frame_length, hop_length=frame_length)
speech_intervals = []
for i, frame in enumerate(frames.T):
pcm_data = (frame * np.iinfo(np.int16).max).astype(np.int16).tobytes()
if vad.is_speech(pcm_data, sr):
start_time, end_time = (i * frame_duration) / 1000, ((i + 1) * frame_duration) / 1000
speech_intervals.append((start_time, end_time))
return speech_intervals
def merge_intervals(intervals, merge_threshold=1):
"""Merge speech intervals with a gap smaller than merge_threshold."""
if not intervals:
return []
merged = [list(intervals[0])]
for start, end in intervals[1:]:
if (start - merged[-1][1]) <= merge_threshold:
merged[-1][1] = end
else:
merged.append([start, end])
return merged
def save_segments(channel_data, sr, intervals, output_dir="segments", prefix="segment"):
"""Save detected speech segments."""
os.makedirs(output_dir, exist_ok=True)
segment_paths = []
for idx, (start, end) in enumerate(intervals):
start_sample = int(start * sr)
end_sample = int(end * sr)
segment = channel_data[start_sample:end_sample]
segment_path = os.path.join(output_dir, f"{prefix}_{idx+1}.wav")
sf.write(segment_path, segment, sr)
segment_paths.append((start, end, segment_path, prefix))
return segment_paths
def preprocess_audio(audio_path, processor, device):
"""Preprocess audio: load, resample if needed, and convert to model input format."""
waveform, sample_rate = torchaudio.load(audio_path)
if sample_rate != 16000:
resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
waveform = resampler(waveform)
inputs = processor(waveform.squeeze().numpy(), sampling_rate=16000, return_tensors="pt")
inputs = {key: val.to(device) for key, val in inputs.items()}
return inputs
def transcribe(audio_path, model, processor, device, language="romanian", task="transcribe"):
"""Generate transcription for an audio file."""
inputs = preprocess_audio(audio_path, processor, device)
forced_decoder_ids = processor.tokenizer.get_decoder_prompt_ids(language=language, task=task)
with torch.no_grad():
generated_ids = model.generate(inputs["input_features"], forced_decoder_ids=forced_decoder_ids)
transcription = processor.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)
return transcription[0]
# Load audio file (MP3 or WAV)
audio_file = "audio.mp3"
# Convert MP3 to WAV if needed
if audio_file.endswith(".mp3"):
wav_file = convert_mp3_to_wav(audio_file)
else:
wav_file = audio_file
# Process stereo or mono file
left_channel, right_channel, sr, left_file, right_file = extract_audio_channels(wav_file)
# Process left channel (or mono)
if left_channel is not None:
left_intervals = detect_speech_intervals(left_channel, sr)
merged_left_intervals = merge_intervals(left_intervals)
left_segments = save_segments(left_channel, sr, merged_left_intervals, output_dir="left_segments", prefix="Left")
else:
left_segments = []
# Process right channel (if stereo)
if right_channel is not None:
right_intervals = detect_speech_intervals(right_channel, sr)
merged_right_intervals = merge_intervals(right_intervals)
right_segments = save_segments(right_channel, sr, merged_right_intervals, output_dir="right_segments", prefix="Right")
else:
right_segments = []
# Combine all segments and sort by start time
all_segments = left_segments + right_segments
all_segments.sort(key=lambda x: x[0])
# Transcribe each segment
for idx, (start, end, segment_path, channel) in enumerate(all_segments, start=1):
transcription = transcribe(segment_path, model, processor, device)
print(f"{idx}. {start:.2f}s → {end:.2f}s | {channel}: {transcription}")
Base model
openai/whisper-large-v3