#!/usr/bin/env python3 """ Lyrics transcription + word-level alignment pipeline. Pipeline: Demucs (isolate vocals) -> WhisperX transcribe (large-v3) -> forced word alignment. Emits a JSON file with line- and word-level timestamps that the web player overlay and the ASS subtitle burner both consume. Usage: transcribe.py --audio /abs/song.mp3 --out /abs/lyrics.json \ [--language en] [--gpu 0] [--model large-v3] [--no-demucs] All heavy logs go to stderr; stdout stays clean. Exit code 0 on success. The output JSON shape is: { "version": 1, "language": "en", "source": "whisperx", "model": "large-v3", "demucs": true, "lines": [ {"start": 12.30, "end": 16.80, "text": "...", "words": [{"start": 12.30, "end": 12.55, "text": "..."}]} ] } """ import argparse import json import os import subprocess import sys import tempfile from pathlib import Path def log(*a): print(*a, file=sys.stderr, flush=True) # Progress file path, set from --progress. The web layer polls a status endpoint # that reads this file to drive a live progress bar. _PROGRESS_PATH = None def write_progress(pct: int, stage: str): if not _PROGRESS_PATH: return try: tmp = _PROGRESS_PATH + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump({"status": "processing", "pct": int(pct), "stage": stage}, f) os.replace(tmp, _PROGRESS_PATH) except Exception: pass # progress is best-effort, never fail the run over it def isolate_vocals(audio_path: str, gpu: int | None) -> str | None: """Run Demucs two-stem separation and return the path to vocals.wav. Returns None if separation fails so the caller can fall back to the raw mix. """ tmp_dir = tempfile.mkdtemp(prefix="demucs_") cmd = [ sys.executable, "-m", "demucs", "--two-stems", "vocals", "-n", "htdemucs", "-o", tmp_dir, audio_path, ] env = dict(os.environ) if gpu is not None: env["CUDA_VISIBLE_DEVICES"] = str(gpu) cmd += ["-d", "cuda"] else: cmd += ["-d", "cpu"] log(f"[demucs] separating vocals -> {tmp_dir}") try: # Stream stderr so demucs' tqdm percentage drives live progress (8→38%). import re proc = subprocess.Popen(cmd, env=env, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, bufsize=0) buf = b"" last = -1 while True: chunk = proc.stderr.read(64) if not chunk: break buf += chunk # tqdm overwrites with \r; scan the tail for the newest "NN%". text = buf[-200:].decode("utf-8", "ignore") m = re.findall(r"(\d{1,3})%", text) if m: p = int(m[-1]) if 0 <= p <= 100 and p != last: last = p write_progress(8 + int(p * 0.30), "Separating vocals") proc.wait() if proc.returncode != 0: log(f"[demucs] exited {proc.returncode}; falling back to raw mix") return None except Exception as e: log(f"[demucs] failed ({e}); falling back to raw mix") return None stem = Path(audio_path).stem vocals = Path(tmp_dir) / "htdemucs" / stem / "vocals.wav" if vocals.exists(): log(f"[demucs] vocals at {vocals}") return str(vocals) log("[demucs] vocals.wav not found; falling back to raw mix") return None # Karaoke display lines are short — we re-split a segment's words on natural # pauses, a soft word cap, and (for spaced scripts) clause punctuation / new # capitalised lines. LINE_GAP = 0.65 # seconds of silence that ends a display line LINE_MAX_WORDS = 12 # hard cap so Latin-script lines never overflow LINE_MAX_CHARS = 30 # char cap for spaceless scripts (Thai/CJK/…) LINE_MIN_WORDS = 3 # don't break on punctuation before this many words PUNCT_END = (".", ",", "!", "?", ";", ":", "—") # Scripts written without spaces between words — join tokens directly and split # by character count instead of word count. SPACELESS = {"th", "zh", "ja", "lo", "my", "km", "yue", "wuu"} # Languages that use a non-Latin script — used to detect a mis-forced pass (a # Thai/Arabic/… pass that produced Latin text is really a misheard English part). NONLATIN_LANGS = { "th", "zh", "ja", "ko", "ar", "he", "ru", "uk", "bg", "sr", "mk", "el", "hi", "bn", "ta", "te", "kn", "ml", "mr", "ne", "si", "my", "km", "lo", "ka", "am", "fa", "ur", "ps", "yue", "wuu", "yi", } def _emit(words: list, lang: str) -> dict | None: if not words: return None sep = "" if lang in SPACELESS else " " return { "start": words[0]["start"], "end": words[-1]["end"], "text": sep.join(w["text"] for w in words), "lang": lang, "words": words, } def _norm_for_match(s: str) -> str: """Normalize text for similarity comparison (lowercase, keep letters/numbers including non-ASCII scripts; drop everything else).""" out = [] for c in s or "": if c.isalnum(): out.append(c.lower()) return "".join(out) def _guess_lang_from_script(text: str) -> str: """Best-effort language guess from a line's Unicode script (used when we have no whisper anchor to inherit the language from).""" for c in text or "": co = ord(c) if 0x3040 <= co <= 0x30FF or 0x4E00 <= co <= 0x9FFF: return "ja" if 0x0E00 <= co <= 0x0E7F: return "th" if 0xAC00 <= co <= 0xD7AF: return "ko" if 0x0600 <= co <= 0x06FF: return "ar" if 0x0400 <= co <= 0x04FF: return "ru" return "en" def _redistribute_words(start: float, end: float, text: str, lang: str) -> list: """Evenly distribute the line's [start,end] across its tokens — words for spaced languages, characters for spaceless scripts (Thai/CJK/…).""" if not text or end <= start: return [] tokens = list(text) if lang in SPACELESS else text.split() tokens = [t for t in tokens if t.strip()] n = len(tokens) if n == 0: return [] slot = (end - start) / n return [{"start": round(start + i * slot, 3), "end": round(start + (i + 1) * slot, 3), "text": t} for i, t in enumerate(tokens)] def _distribute_in_vocal_regions(lines: list, regions: list, gap_start: float, gap_end: float) -> list: """Place each line at a moment within [gap_start, gap_end] where vocals are actually active. `regions` is a list of (start, end) seconds covering the whole song. Falls back to even spread if no vocal activity is detected in the gap (e.g. instrumental break with no vocals at all).""" gap_regions = [] for s, e in regions: s_clip = max(s, gap_start) e_clip = min(e, gap_end) if e_clip - s_clip >= 0.3: gap_regions.append((s_clip, e_clip)) N = len(lines) if N == 0: return [] if not gap_regions or gap_end <= gap_start: # No vocals in the gap — last-resort even spread so coverage isn't lost. if gap_end <= gap_start: return [] slot = (gap_end - gap_start) / N out = [] for k, ul in enumerate(lines): s = gap_start + k * slot e = gap_start + (k + 1) * slot lang = _guess_lang_from_script(ul) out.append({"start": round(s, 3), "end": round(e, 3), "text": ul, "lang": lang, "words": _redistribute_words(s, e, ul, lang)}) return out M = len(gap_regions) out = [] if N <= M: # Fewer lines than vocal regions — pick N regions roughly evenly spaced # and start each line at its region's start. Each line ends at the next # selected region's start (or its own region's end if last). chosen = [int(round(i * (M - 1) / max(1, N - 1))) if N > 1 else 0 for i in range(N)] # Ensure strictly increasing for i in range(1, len(chosen)): if chosen[i] <= chosen[i - 1]: chosen[i] = min(M - 1, chosen[i - 1] + 1) for i, ul in enumerate(lines): rs, re = gap_regions[chosen[i]] if i + 1 < N: nxt_rs = gap_regions[chosen[i + 1]][0] line_end = min(re, nxt_rs - 0.05) else: line_end = re line_end = max(rs + 0.4, line_end) lang = _guess_lang_from_script(ul) out.append({"start": round(rs, 3), "end": round(line_end, 3), "text": ul, "lang": lang, "words": _redistribute_words(rs, line_end, ul, lang)}) else: # More lines than vocal regions — assign multiple lines per region, # divided proportionally to each region's duration so longer regions # take more lines. total = sum(e - s for s, e in gap_regions) line_idx = 0 consumed = 0.0 for ri, (rs, re) in enumerate(gap_regions): # Lines that should land in this region: proportional to its share # of total vocal time, rounded so the last region takes the rest. if ri == M - 1: n_here = N - line_idx else: consumed += re - rs target = int(round(consumed / total * N)) n_here = max(0, target - line_idx) if n_here <= 0: continue slot = (re - rs) / n_here for k in range(n_here): if line_idx >= N: break s = rs + k * slot e = rs + (k + 1) * slot ul = lines[line_idx] lang = _guess_lang_from_script(ul) out.append({"start": round(s, 3), "end": round(e, 3), "text": ul, "lang": lang, "words": _redistribute_words(s, e, ul, lang)}) line_idx += 1 return out def correct_whisper_with_description(whisper_lines: list, user_lines: list, audio_duration: float = 0.0, vocal_regions: list = None) -> list: """Description-first alignment, with whisper used only as structural anchors: 1. Find HIGH-confidence whisper-to-description matches (sim ≥ STRONG). Weak/spurious matches are ignored — they cause downstream skips and misplacements (e.g. line #5 anchored at 30s because of a loose match, making line #4 disappear). 2. The strong anchors partition the description into segments. Each segment of description lines is distributed across the vocal regions in its time window — so every line lands on actual singing and every line appears exactly once, in order. 3. No description line is ever skipped; no weak match consumes the wrong slot; every output line carries description text (never whisper). Falls back to pure vocal-region distribution if no strong anchors exist. """ from difflib import SequenceMatcher if not user_lines: return whisper_lines or [] U = [u for u in user_lines if u.strip()] if not U: return whisper_lines or [] vocal_regions = vocal_regions or [] audio_end = max(audio_duration, 10.0) if vocal_regions: audio_end = max(audio_end, vocal_regions[-1][1]) # ── Find strong anchors ──────────────────────────────────────────────── # Only matches at STRONG similarity (0.55+) count as anchors. Anything # less confident than that has historically misled the alignment. user_script = [_guess_lang_from_script(u) for u in U] user_norm = [_norm_for_match(u) for u in U] LATIN = {"en", "es", "pt", "it", "fr", "de", "nl", "ca", "ro", "tr", "vi", "id", "ms"} def same_script(a: str, b: str) -> bool: if a in LATIN and b in LATIN: return True return a == b STRONG = 0.55 SKIP_AHEAD = 10 anchors = [] # list of (user_idx, whisper_start, whisper_end) next_u = 0 for w in (whisper_lines or []): w_text = (w.get("text") or "").strip() if not w_text: continue w_lang = w.get("lang") or _guess_lang_from_script(w_text) w_norm = _norm_for_match(w_text) if not w_norm: continue best_u = -1; best_sim = 0.0 end = min(next_u + SKIP_AHEAD + 1, len(U)) for ui in range(next_u, end): if not same_script(user_script[ui], w_lang): continue if not user_norm[ui]: continue sim = SequenceMatcher(None, user_norm[ui], w_norm).ratio() if sim > best_sim: best_sim = sim; best_u = ui if best_u >= 0 and best_sim >= STRONG: anchors.append((best_u, float(w["start"]), float(w["end"]))) next_u = best_u + 1 # ── Build output ─────────────────────────────────────────────────────── out = [] if not anchors: # No reliable whisper structure — distribute all description lines # across the vocal regions in order. Best-effort but never skips. return _distribute_in_vocal_regions(U, vocal_regions, 0.5, audio_end - 0.3) # Segment 0: description lines BEFORE the first anchor go in the time # window [0, anchor[0].start], aligned to vocal regions there. first_u, first_start, _ = anchors[0] if first_u > 0 and first_start > 0.6: out.extend(_distribute_in_vocal_regions( U[0:first_u], vocal_regions, 0.0, first_start )) # The anchor line itself uses whisper timing. out.append(_build_line(U[first_u], first_start, anchors[0][2])) # Middle segments: between each pair of anchors, distribute the lines # between them across vocal regions in that window. for i in range(1, len(anchors)): prev_u, _, prev_end_t = anchors[i - 1] cur_u, cur_start_t, cur_end_t = anchors[i] gap_start = prev_end_t gap_end = cur_start_t between_lines = U[prev_u + 1 : cur_u] if between_lines and gap_end - gap_start > 0.6: out.extend(_distribute_in_vocal_regions( between_lines, vocal_regions, gap_start, gap_end )) out.append(_build_line(U[cur_u], cur_start_t, cur_end_t)) # Trailing segment: description lines after the last anchor distributed # across the audio's remaining vocal regions. last_u, _, last_end_t = anchors[-1] trailing = U[last_u + 1:] if trailing: end_time = max(audio_end - 0.3, last_end_t + 2.0) if end_time > last_end_t + 0.6: out.extend(_distribute_in_vocal_regions( trailing, vocal_regions, last_end_t, end_time )) return out def _build_line(text: str, start: float, end: float) -> dict: """Construct an output line dict with redistributed word timings.""" lang = _guess_lang_from_script(text) s = round(float(start), 3) e = round(max(float(end), s + 0.4), 3) return {"start": s, "end": e, "text": text, "lang": lang, "words": _redistribute_words(s, e, text, lang)} def _spread_lines_evenly(lines: list, start: float, end: float) -> list: """Distribute `lines` evenly between [start, end]. Used as a last-resort fallback when whisper produced no usable anchors at all.""" if not lines or end <= start: return [] slot = (end - start) / len(lines) out = [] for k, ul in enumerate(lines): s = start + k * slot e = start + (k + 1) * slot lang = _guess_lang_from_script(ul) out.append({ "start": round(s, 3), "end": round(e, 3), "text": ul, "lang": lang, "words": _redistribute_words(s, e, ul, lang), }) return out def align_user_lyrics(user_lines: list, whisper_lines: list) -> list: """Legacy: project user lines onto whisper anchors with N-W DP. Kept for reference; the active pipeline uses correct_whisper_with_description() because it preserves whisper's natural timing instead of squeezing all description lines into whatever anchors were found.""" from difflib import SequenceMatcher if not user_lines: return whisper_lines if not whisper_lines: return [] U = [u for u in user_lines if u.strip()] W = whisper_lines nU, nW = len(U), len(W) if nU == 0: return [] user_norm = [_norm_for_match(u) for u in U] whisper_norm = [_norm_for_match(w.get("text", "")) for w in W] # Script of each user line and each whisper line. For multilingual songs # an English user line MUST anchor to an English whisper segment and a Thai # user line MUST anchor to a Thai whisper segment — otherwise the DP forces # a Thai user line onto an English anchor (or vice-versa) and the whole # block of mismatched-language user lines collapses into the wrong region. user_script = [_guess_lang_from_script(u) for u in U] whisper_script = [(w.get("lang") or _guess_lang_from_script(w.get("text", ""))) for w in W] def _same_script(a: str, b: str) -> bool: # Coarse equivalence — collapse all Latin-script European languages # together, all CJK together, etc. so e.g. an English user line still # matches a Spanish whisper anchor if that's all we have. LATIN = {"en", "es", "pt", "it", "fr", "de", "nl", "ca", "ro", "tr", "vi", "id", "ms"} if a in LATIN and b in LATIN: return True return a == b # Similarity matrix (cached lookups via SequenceMatcher). Cross-script # pairs are zeroed so the DP can never anchor across languages. sim = [[0.0] * nW for _ in range(nU)] for i in range(nU): if not user_norm[i]: continue sm = SequenceMatcher(None, user_norm[i], "") sm.set_seq1(user_norm[i]) for j in range(nW): if not whisper_norm[j]: continue if not _same_script(user_script[i], whisper_script[j]): continue # different script → can't be the same line sm.set_seq2(whisper_norm[j]) sim[i][j] = sm.ratio() # Higher threshold prevents the DP from anchoring a user line to a weakly- # similar whisper segment in the wrong region of the song. Weak matches get # interpolated between confident anchors instead, which spreads lyric lines # over the right time window. MATCH_THRESHOLD = 0.35 GAP_USER = -0.10 # cost of leaving a user line unmatched GAP_WHISPER = -0.04 # cost of skipping a whisper line SOFT_DIAG = -0.04 # diagonal move with too-low similarity (no match credit) # DP table: dp[i][j] = best score aligning U[:i] vs W[:j]. dp = [[0.0] * (nW + 1) for _ in range(nU + 1)] for i in range(1, nU + 1): dp[i][0] = dp[i - 1][0] + GAP_USER for j in range(1, nW + 1): dp[0][j] = dp[0][j - 1] + GAP_WHISPER for i in range(1, nU + 1): for j in range(1, nW + 1): s = sim[i - 1][j - 1] match_score = dp[i - 1][j - 1] + (s if s >= MATCH_THRESHOLD else SOFT_DIAG) user_gap = dp[i - 1][j] + GAP_USER whisper_gap = dp[i][j - 1] + GAP_WHISPER dp[i][j] = max(match_score, user_gap, whisper_gap) # Traceback to recover the matched pairs (user_idx → whisper_idx). matches = {} i, j = nU, nW while i > 0 and j > 0: s = sim[i - 1][j - 1] eff = (s if s >= MATCH_THRESHOLD else SOFT_DIAG) if abs(dp[i][j] - (dp[i - 1][j - 1] + eff)) < 1e-9: if s >= MATCH_THRESHOLD: matches[i - 1] = j - 1 i -= 1; j -= 1 elif abs(dp[i][j] - (dp[i - 1][j] + GAP_USER)) < 1e-9: i -= 1 else: j -= 1 # Build aligned output: matched lines get the whisper timing; unmatched user # lines get evenly interpolated between their nearest matched neighbours. out = [] pending = [] last_end = 0.0 def flush(next_start): if not pending: return n = len(pending) span = max(0.0, next_start - last_end) slot = (span / (n + 1)) if span > 0 else 0.6 for k, (pt, pl) in enumerate(pending): s = last_end + (k + 0.5) * slot e = last_end + (k + 1.5) * slot out.append({"start": round(s, 3), "end": round(e, 3), "text": pt, "lang": pl, "words": _redistribute_words(s, e, pt, pl)}) pending.clear() for ui, u in enumerate(U): if ui in matches: wl = W[matches[ui]] start = float(wl["start"]) end = float(wl["end"]) lang = wl.get("lang") or _guess_lang_from_script(u) flush(start) out.append({"start": round(start, 3), "end": round(end, 3), "text": u, "lang": lang, "words": _redistribute_words(start, end, u, lang)}) last_end = end else: pending.append((u, _guess_lang_from_script(u))) if pending: anchor_end = max(last_end + 1.0, float(W[-1]["end"])) flush(anchor_end) return out def merge_fragments(lines: list) -> list: """Stitch tiny leftover fragments (e.g. a lone 'The' or a 1-char Thai token) into an adjacent same-language line when they're close in time.""" def tiny(ln): if ln["lang"] in SPACELESS: return len(ln["text"]) < 4 return len(ln["text"].split()) < 2 out = [] for ln in lines: if out and out[-1]["lang"] == ln["lang"]: prev = out[-1] gap = ln["start"] - prev["end"] if gap < 1.0 and (tiny(ln) or tiny(prev)): sep = "" if ln["lang"] in SPACELESS else " " prev["text"] = (prev["text"] + sep + ln["text"]).strip() prev["end"] = ln["end"] prev["words"] = (prev.get("words") or []) + (ln.get("words") or []) continue out.append(ln) return out def split_into_lines(words: list, lang: str) -> list: """Split one (single-language) segment's timed words into short karaoke lines.""" if not words: return [] spaced = lang not in SPACELESS lines, cur = [], [words[0]] for prev, w in zip(words, words[1:]): brk = (w["start"] - prev["end"]) >= LINE_GAP if not brk and spaced and len(cur) >= LINE_MAX_WORDS: brk = True if not brk and not spaced and sum(len(x["text"]) for x in cur) >= LINE_MAX_CHARS: brk = True if not brk and spaced and len(cur) >= LINE_MIN_WORDS: if prev["text"].endswith(PUNCT_END): brk = True else: head = w["text"][:1] if (head.isupper() and not head.isdigit() and w["text"] not in ("I", "I'm", "I'll", "I've", "I'd", "I’m", "I’ll", "I’ve", "I’d")): brk = True if brk: line = _emit(cur, lang) if line: lines.append(line) cur = [w] else: cur.append(w) line = _emit(cur, lang) if line: lines.append(line) return lines def main(): ap = argparse.ArgumentParser() ap.add_argument("--audio", required=True) ap.add_argument("--out", required=True) ap.add_argument("--language", default=None) ap.add_argument("--gpu", type=int, default=None) ap.add_argument("--model", default="large-v3") ap.add_argument("--no-demucs", action="store_true") ap.add_argument("--no-vad", action="store_true", help="disable Silero VAD filter inside Whisper (transcribe full audio)") ap.add_argument("--no-vocal-gapfill", action="store_true", help="distribute gap-filled description lines evenly instead of snapping them " "to vocal-active regions detected by Silero VAD") ap.add_argument("--progress", default=None, help="path to write live progress JSON") ap.add_argument("--user-lyrics", default=None, help="path to a text file with one lyric line per line; the pipeline will " "ALIGN these exact lines to the audio instead of producing its own text") args = ap.parse_args() global _PROGRESS_PATH _PROGRESS_PATH = args.progress if not os.path.isfile(args.audio): log(f"audio not found: {args.audio}") sys.exit(2) write_progress(3, "Starting") # GPU pinning must happen before torch is imported by whisperx. if args.gpu is not None: os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu) vocals_path = args.audio used_demucs = False if not args.no_demucs: write_progress(8, "Separating vocals") sep = isolate_vocals(args.audio, args.gpu) if sep: vocals_path = sep used_demucs = True write_progress(40, "Loading model") from faster_whisper import WhisperModel, decode_audio from faster_whisper.vad import get_speech_timestamps, VadOptions from collections import defaultdict import gc SR = 16000 audio = decode_audio(vocals_path, sampling_rate=SR) def is_oom(e): s = str(e).lower() return "out of memory" in s or "cuda failed" in s or "cublas" in s def overlap_ratio(a, b): o = min(a["end"], b["end"]) - max(a["start"], b["start"]) if o <= 0: return 0.0 return o / max(1e-6, min(a["end"] - a["start"], b["end"] - b["start"])) # Full multilingual transcription on a given device/precision. Raises on OOM # so the caller can retry on a lighter config (cuda/fp16 → cuda/int8 → cpu). # # Strategy that handles bilingual duets WITHOUT skipping verses: transcribe the # WHOLE song once per candidate language (full recall + sentence context), then # for every time region keep whichever language's transcription is the most # confident. English regions win in the English pass, Thai regions win in the # Thai pass — nothing is dropped and each part is in its own script. def transcribe_all(dev, ct): log(f"[fw] loading {args.model} on {dev}/{ct}") model = WhisperModel(args.model, device=dev, compute_type=ct) try: # ── Candidate languages: detect across several windows of the song ── write_progress(46, "Detecting languages") if args.language: cands = [args.language] else: votes = defaultdict(float) win = 30 * SR positions = list(range(0, max(1, len(audio) - win + 1), max(win // 2, 1)))[:12] or [0] for pos in positions: sl = audio[pos:pos + win] if len(sl) < SR: continue try: lang, prob, _ = model.detect_language(sl, language_detection_segments=1) except Exception as e: if is_oom(e): raise lang, prob = None, 0.0 if lang and prob >= 0.5: votes[lang] += prob if not votes: cands = ["en"] else: ranked = sorted(votes, key=votes.get, reverse=True) top = votes[ranked[0]] # Keep languages with ≥25% of the top vote mass (drops flukes). cands = [l for l in ranked if votes[l] >= 0.25 * top][:3] log(f"[lang] candidates={cands}") # ── One full-song pass per candidate language ────────────────────── # Loose VAD pass: drops obvious instrumental stretches but keeps soft # sung vocals (threshold 0.20 vs default 0.5). Without it, Whisper # invents lyrics over the intro/outro music. With it tuned too high # it drops legitimate quiet singing — we erred on the loose side after # users reported missing verses in the middle of long songs. VAD_PARAMS = { "threshold": 0.20, "min_speech_duration_ms": 200, "min_silence_duration_ms": 350, "speech_pad_ms": 250, } # Common Whisper hallucinations on silence / music. If a segment IS # one of these phrases (no extra content), it's a hallucination # regardless of how confident the model was. HALLUCINATIONS = { "thank you", "thanks for watching", "thank you for watching", "subscribe", "please subscribe", "like and subscribe", "music", "[music]", "(music)", "♪", "♫", "you", ".", "..", "...", "thank you.", } segs_all = [] for ci, L in enumerate(cands): write_progress(50 + int(40 * ci / max(1, len(cands))), "Transcribing") seg_iter, _ = model.transcribe( audio, language=L, word_timestamps=True, beam_size=5, vad_filter=(not args.no_vad), vad_parameters=VAD_PARAMS, condition_on_previous_text=False, no_speech_threshold=0.70, log_prob_threshold=-1.4, ) for s in seg_iter: # Drop clear non-speech and low-confidence hallucinations on # instrumental sections, but keep genuinely-sung (lower-conf) lines. if getattr(s, "no_speech_prob", 0.0) > 0.70: continue if getattr(s, "avg_logprob", 0.0) < -1.4: continue text = (s.text or "").strip() if not text: continue # Drop the well-known Whisper boilerplate hallucinations. if text.lower().strip(".,!? ") in HALLUCINATIONS: continue # Drop "compression ratio" gibberish — pathological repeats. if getattr(s, "compression_ratio", 1.0) > 2.4: continue segs_all.append({ "start": float(s.start), "end": float(s.end), "lang": L, "score": float(getattr(s, "avg_logprob", -5.0)), "text": text, "words": list(s.words or []), }) # ── Resolve overlaps using OUTPUT SCRIPT as the language signal ───── # avg_logprob alone is unreliable (the Thai pass can "win" English # regions yet output Latin). The script actually produced is the # truth: a non-Latin-language pass that emitted Latin text is a # mis-forced English region — drop it. Native non-Latin script wins # overlaps so Thai regions never get the romanised English version. def nonlatin_frac(t): letters = [c for c in t if c.isalpha()] if not letters: return 0.0 return sum(1 for c in letters if not ("a" <= c.lower() <= "z")) / len(letters) kept = [] for s in segs_all: nl = nonlatin_frac(s["text"]) s["native"] = 1 if nl >= 0.5 else 0 if s["lang"] in NONLATIN_LANGS and nl < 0.3: continue # Thai (etc.) pass that produced Latin = mis-forced English kept.append(s) kept.sort(key=lambda x: (x["native"], x["score"]), reverse=True) accepted = [] for s in kept: if any(overlap_ratio(s, a) > 0.4 for a in accepted): continue accepted.append(s) accepted.sort(key=lambda x: x["start"]) dur = defaultdict(float) for s in accepted: dur[s["lang"]] += s["end"] - s["start"] dominant = max(dur, key=dur.get) if dur else (cands[0] if cands else "en") trusted = set(dur.keys()) or set(cands) # ── Build karaoke lines ──────────────────────────────────────────── lines = [] for s in accepted: compact = s["text"].replace(" ", "") if len(compact) >= 8 and len(set(compact)) <= 1: # degenerate "ㄷㄷㄷ" continue words = [] for w in s["words"]: if w.start is None or w.end is None: continue tok = (w.word or "").strip() if not tok: continue words.append({"start": round(float(w.start), 3), "end": round(float(w.end), 3), "text": tok}) if words: lines += split_into_lines(words, s["lang"]) else: lines.append({"start": round(s["start"], 3), "end": round(s["end"], 3), "text": s["text"], "lang": s["lang"], "words": []}) return lines, dominant, trusted finally: del model gc.collect() all_lines, dominant, trusted, last_err = [], "en", set(), None for dev, ct in [("cuda", "float16"), ("cuda", "int8"), ("cpu", "int8")]: try: all_lines, dominant, trusted = transcribe_all(dev, ct) break except Exception as e: last_err = e if is_oom(e): log(f"[fw] {dev}/{ct} ran out of memory; retrying lighter") continue raise else: raise last_err if last_err else RuntimeError("transcription failed") all_lines.sort(key=lambda ln: ln["start"]) all_lines = merge_fragments(all_lines) # If the uploader provided lyrics in the song description, ALIGN those exact # lines to the audio (using the whisper timing) instead of using the noisier # whisper text. The transcription pass still ran — it's what provides the # anchoring timestamps the user lines snap to. source = "faster-whisper" if args.user_lyrics and os.path.isfile(args.user_lyrics): write_progress(92, "Syncing description lyrics") try: user_lines = [l.strip() for l in open(args.user_lyrics, encoding="utf-8") .read().splitlines() if l.strip()] except Exception as e: log(f"[user-lyrics] read failed ({e})") user_lines = [] if user_lines: # Hybrid alignment: whisper-anchored where whisper heard the song, # description-filled where whisper missed. Gap-filled lines snap # to vocal-active moments detected by Silero VAD so they sit on # actual singing instead of drifting across instrumental beats. audio_duration = len(audio) / SR vocal_regions = [] if not args.no_vocal_gapfill: try: vad_opts = VadOptions(threshold=0.20, min_speech_duration_ms=400, min_silence_duration_ms=500, speech_pad_ms=120) raw = get_speech_timestamps(audio, vad_opts) vocal_regions = [(r["start"] / SR, r["end"] / SR) for r in raw] log(f"[vad] {len(vocal_regions)} vocal regions detected") except Exception as e: log(f"[vad] failed ({e}); falling back to even spread in gaps") else: log("[vad] vocal-region gap-fill disabled by admin toggle") corrected = correct_whisper_with_description( all_lines, user_lines, audio_duration, vocal_regions ) if corrected: all_lines = corrected source = "description-aligned" log(f"[user-lyrics] aligned: description={len(user_lines)} " f"output={len(all_lines)} duration={audio_duration:.1f}s") write_progress(95, "Finishing") payload = { "version": 1, "language": dominant, "source": source, "model": args.model, "demucs": used_demucs, "multilingual": True, "lines": all_lines, } out_dir = os.path.dirname(args.out) if out_dir: os.makedirs(out_dir, exist_ok=True) with open(args.out, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False) log(f"[done] wrote {len(payload['lines'])} lines ({sorted(trusted)}) -> {args.out}") if __name__ == "__main__": main()