Lyrics pipeline (Whisper + Demucs + description alignment):
- New GenerateLyricsJob runs WhisperX with VAD filtering and forced word
alignment, writes per-track JSON to NAS.
- New DecorateLyricsJob calls the active LLM provider to bake one to
several emojis into each line (heavy decoration prompt).
- LyricsDescriptionParser strips heading content, section markers, and
emoji-decoration from a song's description while preserving every
language verbatim.
- correct_whisper_with_description aligner: strong-match anchors only,
vocal-region-aware gap-fill so missing verses land on actual singing.
- Owner UI for generate/regenerate/edit/delete in the player gear.
Admin pages:
- /admin/lyrics toggles for VAD, vocal gap-fill, Demucs, master
- /admin/gpu extracted GPU section, encoder picker, FFmpeg path
- /admin/backup extracted users-and-settings export/import
- /admin/settings now AI/LLM only with provider list and Test button
- /admin/nas-storage hosts NAS settings, repair, disable flow, browser
- Shared partials/settings-styles for a uniform look across pages.
Playlist view tracking:
- Migration adds playlists.view_count and playlist_views dedup table.
- Playlist::bumpViewIfNew increments per device with a one-hour window.
- Tracked from /playlists/{id}, /playlists/share/{token}, /ps/{token},
and /videos/{id}?playlist={token}. Dispatched after-response so it
never blocks the page render.
- Loading a playlist on the video page now runs one query instead of
the four the old getNextVideo/getPreviousVideo path triggered.
- View counts shown on every playlist card and the playlist hero.
Player polish:
- Floating mini-player is draggable, persists its position in
localStorage, clamps to viewport on resize.
- Mini disabled entirely on mobile (less than 768px).
- New gear-menu Mini Player toggle (persists in localStorage) lets the
user disable both scroll-activation and SPA-nav-activation.
- Close button keeps media playing when used on the player's own page.
- SPA navigator now swaps a #page-scripts container so per-page JS
(channel tabs, etc.) gets re-executed after content swaps.
Storage layout:
- Runtime data moved from /storage/* to /data/* and gitignored.
- /ml/venv, /ml/cache, /ml/__pycache__ excluded.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
896 lines
37 KiB
Python
896 lines
37 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Lyrics transcription + word-level alignment pipeline.
|
||
|
||
Pipeline: Demucs (isolate vocals) -> WhisperX transcribe (large-v3) -> forced
|
||
word alignment. Emits a JSON file with line- and word-level timestamps that the
|
||
web player overlay and the ASS subtitle burner both consume.
|
||
|
||
Usage:
|
||
transcribe.py --audio /abs/song.mp3 --out /abs/lyrics.json \
|
||
[--language en] [--gpu 0] [--model large-v3] [--no-demucs]
|
||
|
||
All heavy logs go to stderr; stdout stays clean. Exit code 0 on success.
|
||
The output JSON shape is:
|
||
{
|
||
"version": 1,
|
||
"language": "en",
|
||
"source": "whisperx",
|
||
"model": "large-v3",
|
||
"demucs": true,
|
||
"lines": [
|
||
{"start": 12.30, "end": 16.80, "text": "...",
|
||
"words": [{"start": 12.30, "end": 12.55, "text": "..."}]}
|
||
]
|
||
}
|
||
"""
|
||
import argparse
|
||
import json
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
import tempfile
|
||
from pathlib import Path
|
||
|
||
|
||
def log(*a):
|
||
print(*a, file=sys.stderr, flush=True)
|
||
|
||
|
||
# Progress file path, set from --progress. The web layer polls a status endpoint
|
||
# that reads this file to drive a live progress bar.
|
||
_PROGRESS_PATH = None
|
||
|
||
|
||
def write_progress(pct: int, stage: str):
|
||
if not _PROGRESS_PATH:
|
||
return
|
||
try:
|
||
tmp = _PROGRESS_PATH + ".tmp"
|
||
with open(tmp, "w", encoding="utf-8") as f:
|
||
json.dump({"status": "processing", "pct": int(pct), "stage": stage}, f)
|
||
os.replace(tmp, _PROGRESS_PATH)
|
||
except Exception:
|
||
pass # progress is best-effort, never fail the run over it
|
||
|
||
|
||
def isolate_vocals(audio_path: str, gpu: int | None) -> str | None:
|
||
"""Run Demucs two-stem separation and return the path to vocals.wav.
|
||
|
||
Returns None if separation fails so the caller can fall back to the raw mix.
|
||
"""
|
||
tmp_dir = tempfile.mkdtemp(prefix="demucs_")
|
||
cmd = [
|
||
sys.executable, "-m", "demucs",
|
||
"--two-stems", "vocals",
|
||
"-n", "htdemucs",
|
||
"-o", tmp_dir,
|
||
audio_path,
|
||
]
|
||
env = dict(os.environ)
|
||
if gpu is not None:
|
||
env["CUDA_VISIBLE_DEVICES"] = str(gpu)
|
||
cmd += ["-d", "cuda"]
|
||
else:
|
||
cmd += ["-d", "cpu"]
|
||
|
||
log(f"[demucs] separating vocals -> {tmp_dir}")
|
||
try:
|
||
# Stream stderr so demucs' tqdm percentage drives live progress (8→38%).
|
||
import re
|
||
proc = subprocess.Popen(cmd, env=env, stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.PIPE, bufsize=0)
|
||
buf = b""
|
||
last = -1
|
||
while True:
|
||
chunk = proc.stderr.read(64)
|
||
if not chunk:
|
||
break
|
||
buf += chunk
|
||
# tqdm overwrites with \r; scan the tail for the newest "NN%".
|
||
text = buf[-200:].decode("utf-8", "ignore")
|
||
m = re.findall(r"(\d{1,3})%", text)
|
||
if m:
|
||
p = int(m[-1])
|
||
if 0 <= p <= 100 and p != last:
|
||
last = p
|
||
write_progress(8 + int(p * 0.30), "Separating vocals")
|
||
proc.wait()
|
||
if proc.returncode != 0:
|
||
log(f"[demucs] exited {proc.returncode}; falling back to raw mix")
|
||
return None
|
||
except Exception as e:
|
||
log(f"[demucs] failed ({e}); falling back to raw mix")
|
||
return None
|
||
|
||
stem = Path(audio_path).stem
|
||
vocals = Path(tmp_dir) / "htdemucs" / stem / "vocals.wav"
|
||
if vocals.exists():
|
||
log(f"[demucs] vocals at {vocals}")
|
||
return str(vocals)
|
||
log("[demucs] vocals.wav not found; falling back to raw mix")
|
||
return None
|
||
|
||
|
||
# Karaoke display lines are short — we re-split a segment's words on natural
|
||
# pauses, a soft word cap, and (for spaced scripts) clause punctuation / new
|
||
# capitalised lines.
|
||
LINE_GAP = 0.65 # seconds of silence that ends a display line
|
||
LINE_MAX_WORDS = 12 # hard cap so Latin-script lines never overflow
|
||
LINE_MAX_CHARS = 30 # char cap for spaceless scripts (Thai/CJK/…)
|
||
LINE_MIN_WORDS = 3 # don't break on punctuation before this many words
|
||
PUNCT_END = (".", ",", "!", "?", ";", ":", "—")
|
||
# Scripts written without spaces between words — join tokens directly and split
|
||
# by character count instead of word count.
|
||
SPACELESS = {"th", "zh", "ja", "lo", "my", "km", "yue", "wuu"}
|
||
# Languages that use a non-Latin script — used to detect a mis-forced pass (a
|
||
# Thai/Arabic/… pass that produced Latin text is really a misheard English part).
|
||
NONLATIN_LANGS = {
|
||
"th", "zh", "ja", "ko", "ar", "he", "ru", "uk", "bg", "sr", "mk", "el",
|
||
"hi", "bn", "ta", "te", "kn", "ml", "mr", "ne", "si", "my", "km", "lo",
|
||
"ka", "am", "fa", "ur", "ps", "yue", "wuu", "yi",
|
||
}
|
||
|
||
|
||
def _emit(words: list, lang: str) -> dict | None:
|
||
if not words:
|
||
return None
|
||
sep = "" if lang in SPACELESS else " "
|
||
return {
|
||
"start": words[0]["start"],
|
||
"end": words[-1]["end"],
|
||
"text": sep.join(w["text"] for w in words),
|
||
"lang": lang,
|
||
"words": words,
|
||
}
|
||
|
||
|
||
def _norm_for_match(s: str) -> str:
|
||
"""Normalize text for similarity comparison (lowercase, keep letters/numbers
|
||
including non-ASCII scripts; drop everything else)."""
|
||
out = []
|
||
for c in s or "":
|
||
if c.isalnum():
|
||
out.append(c.lower())
|
||
return "".join(out)
|
||
|
||
|
||
def _guess_lang_from_script(text: str) -> str:
|
||
"""Best-effort language guess from a line's Unicode script (used when we have
|
||
no whisper anchor to inherit the language from)."""
|
||
for c in text or "":
|
||
co = ord(c)
|
||
if 0x3040 <= co <= 0x30FF or 0x4E00 <= co <= 0x9FFF:
|
||
return "ja"
|
||
if 0x0E00 <= co <= 0x0E7F:
|
||
return "th"
|
||
if 0xAC00 <= co <= 0xD7AF:
|
||
return "ko"
|
||
if 0x0600 <= co <= 0x06FF:
|
||
return "ar"
|
||
if 0x0400 <= co <= 0x04FF:
|
||
return "ru"
|
||
return "en"
|
||
|
||
|
||
def _redistribute_words(start: float, end: float, text: str, lang: str) -> list:
|
||
"""Evenly distribute the line's [start,end] across its tokens — words for
|
||
spaced languages, characters for spaceless scripts (Thai/CJK/…)."""
|
||
if not text or end <= start:
|
||
return []
|
||
tokens = list(text) if lang in SPACELESS else text.split()
|
||
tokens = [t for t in tokens if t.strip()]
|
||
n = len(tokens)
|
||
if n == 0:
|
||
return []
|
||
slot = (end - start) / n
|
||
return [{"start": round(start + i * slot, 3),
|
||
"end": round(start + (i + 1) * slot, 3),
|
||
"text": t} for i, t in enumerate(tokens)]
|
||
|
||
|
||
def _distribute_in_vocal_regions(lines: list, regions: list,
|
||
gap_start: float, gap_end: float) -> list:
|
||
"""Place each line at a moment within [gap_start, gap_end] where vocals
|
||
are actually active. `regions` is a list of (start, end) seconds covering
|
||
the whole song. Falls back to even spread if no vocal activity is detected
|
||
in the gap (e.g. instrumental break with no vocals at all)."""
|
||
gap_regions = []
|
||
for s, e in regions:
|
||
s_clip = max(s, gap_start)
|
||
e_clip = min(e, gap_end)
|
||
if e_clip - s_clip >= 0.3:
|
||
gap_regions.append((s_clip, e_clip))
|
||
|
||
N = len(lines)
|
||
if N == 0: return []
|
||
if not gap_regions or gap_end <= gap_start:
|
||
# No vocals in the gap — last-resort even spread so coverage isn't lost.
|
||
if gap_end <= gap_start: return []
|
||
slot = (gap_end - gap_start) / N
|
||
out = []
|
||
for k, ul in enumerate(lines):
|
||
s = gap_start + k * slot
|
||
e = gap_start + (k + 1) * slot
|
||
lang = _guess_lang_from_script(ul)
|
||
out.append({"start": round(s, 3), "end": round(e, 3),
|
||
"text": ul, "lang": lang,
|
||
"words": _redistribute_words(s, e, ul, lang)})
|
||
return out
|
||
|
||
M = len(gap_regions)
|
||
out = []
|
||
|
||
if N <= M:
|
||
# Fewer lines than vocal regions — pick N regions roughly evenly spaced
|
||
# and start each line at its region's start. Each line ends at the next
|
||
# selected region's start (or its own region's end if last).
|
||
chosen = [int(round(i * (M - 1) / max(1, N - 1))) if N > 1 else 0 for i in range(N)]
|
||
# Ensure strictly increasing
|
||
for i in range(1, len(chosen)):
|
||
if chosen[i] <= chosen[i - 1]:
|
||
chosen[i] = min(M - 1, chosen[i - 1] + 1)
|
||
for i, ul in enumerate(lines):
|
||
rs, re = gap_regions[chosen[i]]
|
||
if i + 1 < N:
|
||
nxt_rs = gap_regions[chosen[i + 1]][0]
|
||
line_end = min(re, nxt_rs - 0.05)
|
||
else:
|
||
line_end = re
|
||
line_end = max(rs + 0.4, line_end)
|
||
lang = _guess_lang_from_script(ul)
|
||
out.append({"start": round(rs, 3), "end": round(line_end, 3),
|
||
"text": ul, "lang": lang,
|
||
"words": _redistribute_words(rs, line_end, ul, lang)})
|
||
else:
|
||
# More lines than vocal regions — assign multiple lines per region,
|
||
# divided proportionally to each region's duration so longer regions
|
||
# take more lines.
|
||
total = sum(e - s for s, e in gap_regions)
|
||
line_idx = 0
|
||
consumed = 0.0
|
||
for ri, (rs, re) in enumerate(gap_regions):
|
||
# Lines that should land in this region: proportional to its share
|
||
# of total vocal time, rounded so the last region takes the rest.
|
||
if ri == M - 1:
|
||
n_here = N - line_idx
|
||
else:
|
||
consumed += re - rs
|
||
target = int(round(consumed / total * N))
|
||
n_here = max(0, target - line_idx)
|
||
if n_here <= 0: continue
|
||
slot = (re - rs) / n_here
|
||
for k in range(n_here):
|
||
if line_idx >= N: break
|
||
s = rs + k * slot
|
||
e = rs + (k + 1) * slot
|
||
ul = lines[line_idx]
|
||
lang = _guess_lang_from_script(ul)
|
||
out.append({"start": round(s, 3), "end": round(e, 3),
|
||
"text": ul, "lang": lang,
|
||
"words": _redistribute_words(s, e, ul, lang)})
|
||
line_idx += 1
|
||
return out
|
||
|
||
|
||
def correct_whisper_with_description(whisper_lines: list, user_lines: list,
|
||
audio_duration: float = 0.0,
|
||
vocal_regions: list = None) -> list:
|
||
"""Description-first alignment, with whisper used only as structural anchors:
|
||
1. Find HIGH-confidence whisper-to-description matches (sim ≥ STRONG).
|
||
Weak/spurious matches are ignored — they cause downstream skips and
|
||
misplacements (e.g. line #5 anchored at 30s because of a loose match,
|
||
making line #4 disappear).
|
||
2. The strong anchors partition the description into segments. Each
|
||
segment of description lines is distributed across the vocal regions
|
||
in its time window — so every line lands on actual singing and every
|
||
line appears exactly once, in order.
|
||
3. No description line is ever skipped; no weak match consumes the wrong
|
||
slot; every output line carries description text (never whisper).
|
||
|
||
Falls back to pure vocal-region distribution if no strong anchors exist.
|
||
"""
|
||
from difflib import SequenceMatcher
|
||
if not user_lines:
|
||
return whisper_lines or []
|
||
|
||
U = [u for u in user_lines if u.strip()]
|
||
if not U:
|
||
return whisper_lines or []
|
||
|
||
vocal_regions = vocal_regions or []
|
||
audio_end = max(audio_duration, 10.0)
|
||
if vocal_regions:
|
||
audio_end = max(audio_end, vocal_regions[-1][1])
|
||
|
||
# ── Find strong anchors ────────────────────────────────────────────────
|
||
# Only matches at STRONG similarity (0.55+) count as anchors. Anything
|
||
# less confident than that has historically misled the alignment.
|
||
user_script = [_guess_lang_from_script(u) for u in U]
|
||
user_norm = [_norm_for_match(u) for u in U]
|
||
|
||
LATIN = {"en", "es", "pt", "it", "fr", "de", "nl", "ca", "ro", "tr", "vi", "id", "ms"}
|
||
def same_script(a: str, b: str) -> bool:
|
||
if a in LATIN and b in LATIN: return True
|
||
return a == b
|
||
|
||
STRONG = 0.55
|
||
SKIP_AHEAD = 10
|
||
|
||
anchors = [] # list of (user_idx, whisper_start, whisper_end)
|
||
next_u = 0
|
||
for w in (whisper_lines or []):
|
||
w_text = (w.get("text") or "").strip()
|
||
if not w_text: continue
|
||
w_lang = w.get("lang") or _guess_lang_from_script(w_text)
|
||
w_norm = _norm_for_match(w_text)
|
||
if not w_norm: continue
|
||
best_u = -1; best_sim = 0.0
|
||
end = min(next_u + SKIP_AHEAD + 1, len(U))
|
||
for ui in range(next_u, end):
|
||
if not same_script(user_script[ui], w_lang): continue
|
||
if not user_norm[ui]: continue
|
||
sim = SequenceMatcher(None, user_norm[ui], w_norm).ratio()
|
||
if sim > best_sim:
|
||
best_sim = sim; best_u = ui
|
||
if best_u >= 0 and best_sim >= STRONG:
|
||
anchors.append((best_u, float(w["start"]), float(w["end"])))
|
||
next_u = best_u + 1
|
||
|
||
# ── Build output ───────────────────────────────────────────────────────
|
||
out = []
|
||
|
||
if not anchors:
|
||
# No reliable whisper structure — distribute all description lines
|
||
# across the vocal regions in order. Best-effort but never skips.
|
||
return _distribute_in_vocal_regions(U, vocal_regions, 0.5, audio_end - 0.3)
|
||
|
||
# Segment 0: description lines BEFORE the first anchor go in the time
|
||
# window [0, anchor[0].start], aligned to vocal regions there.
|
||
first_u, first_start, _ = anchors[0]
|
||
if first_u > 0 and first_start > 0.6:
|
||
out.extend(_distribute_in_vocal_regions(
|
||
U[0:first_u], vocal_regions, 0.0, first_start
|
||
))
|
||
|
||
# The anchor line itself uses whisper timing.
|
||
out.append(_build_line(U[first_u], first_start, anchors[0][2]))
|
||
|
||
# Middle segments: between each pair of anchors, distribute the lines
|
||
# between them across vocal regions in that window.
|
||
for i in range(1, len(anchors)):
|
||
prev_u, _, prev_end_t = anchors[i - 1]
|
||
cur_u, cur_start_t, cur_end_t = anchors[i]
|
||
gap_start = prev_end_t
|
||
gap_end = cur_start_t
|
||
between_lines = U[prev_u + 1 : cur_u]
|
||
if between_lines and gap_end - gap_start > 0.6:
|
||
out.extend(_distribute_in_vocal_regions(
|
||
between_lines, vocal_regions, gap_start, gap_end
|
||
))
|
||
out.append(_build_line(U[cur_u], cur_start_t, cur_end_t))
|
||
|
||
# Trailing segment: description lines after the last anchor distributed
|
||
# across the audio's remaining vocal regions.
|
||
last_u, _, last_end_t = anchors[-1]
|
||
trailing = U[last_u + 1:]
|
||
if trailing:
|
||
end_time = max(audio_end - 0.3, last_end_t + 2.0)
|
||
if end_time > last_end_t + 0.6:
|
||
out.extend(_distribute_in_vocal_regions(
|
||
trailing, vocal_regions, last_end_t, end_time
|
||
))
|
||
|
||
return out
|
||
|
||
|
||
def _build_line(text: str, start: float, end: float) -> dict:
|
||
"""Construct an output line dict with redistributed word timings."""
|
||
lang = _guess_lang_from_script(text)
|
||
s = round(float(start), 3)
|
||
e = round(max(float(end), s + 0.4), 3)
|
||
return {"start": s, "end": e, "text": text, "lang": lang,
|
||
"words": _redistribute_words(s, e, text, lang)}
|
||
|
||
|
||
def _spread_lines_evenly(lines: list, start: float, end: float) -> list:
|
||
"""Distribute `lines` evenly between [start, end]. Used as a last-resort
|
||
fallback when whisper produced no usable anchors at all."""
|
||
if not lines or end <= start: return []
|
||
slot = (end - start) / len(lines)
|
||
out = []
|
||
for k, ul in enumerate(lines):
|
||
s = start + k * slot
|
||
e = start + (k + 1) * slot
|
||
lang = _guess_lang_from_script(ul)
|
||
out.append({
|
||
"start": round(s, 3), "end": round(e, 3),
|
||
"text": ul, "lang": lang,
|
||
"words": _redistribute_words(s, e, ul, lang),
|
||
})
|
||
return out
|
||
|
||
|
||
def align_user_lyrics(user_lines: list, whisper_lines: list) -> list:
|
||
"""Legacy: project user lines onto whisper anchors with N-W DP. Kept for
|
||
reference; the active pipeline uses correct_whisper_with_description()
|
||
because it preserves whisper's natural timing instead of squeezing all
|
||
description lines into whatever anchors were found."""
|
||
from difflib import SequenceMatcher
|
||
if not user_lines:
|
||
return whisper_lines
|
||
if not whisper_lines:
|
||
return []
|
||
|
||
U = [u for u in user_lines if u.strip()]
|
||
W = whisper_lines
|
||
nU, nW = len(U), len(W)
|
||
if nU == 0:
|
||
return []
|
||
|
||
user_norm = [_norm_for_match(u) for u in U]
|
||
whisper_norm = [_norm_for_match(w.get("text", "")) for w in W]
|
||
|
||
# Script of each user line and each whisper line. For multilingual songs
|
||
# an English user line MUST anchor to an English whisper segment and a Thai
|
||
# user line MUST anchor to a Thai whisper segment — otherwise the DP forces
|
||
# a Thai user line onto an English anchor (or vice-versa) and the whole
|
||
# block of mismatched-language user lines collapses into the wrong region.
|
||
user_script = [_guess_lang_from_script(u) for u in U]
|
||
whisper_script = [(w.get("lang") or _guess_lang_from_script(w.get("text", ""))) for w in W]
|
||
|
||
def _same_script(a: str, b: str) -> bool:
|
||
# Coarse equivalence — collapse all Latin-script European languages
|
||
# together, all CJK together, etc. so e.g. an English user line still
|
||
# matches a Spanish whisper anchor if that's all we have.
|
||
LATIN = {"en", "es", "pt", "it", "fr", "de", "nl", "ca", "ro", "tr", "vi", "id", "ms"}
|
||
if a in LATIN and b in LATIN: return True
|
||
return a == b
|
||
|
||
# Similarity matrix (cached lookups via SequenceMatcher). Cross-script
|
||
# pairs are zeroed so the DP can never anchor across languages.
|
||
sim = [[0.0] * nW for _ in range(nU)]
|
||
for i in range(nU):
|
||
if not user_norm[i]:
|
||
continue
|
||
sm = SequenceMatcher(None, user_norm[i], "")
|
||
sm.set_seq1(user_norm[i])
|
||
for j in range(nW):
|
||
if not whisper_norm[j]:
|
||
continue
|
||
if not _same_script(user_script[i], whisper_script[j]):
|
||
continue # different script → can't be the same line
|
||
sm.set_seq2(whisper_norm[j])
|
||
sim[i][j] = sm.ratio()
|
||
|
||
# Higher threshold prevents the DP from anchoring a user line to a weakly-
|
||
# similar whisper segment in the wrong region of the song. Weak matches get
|
||
# interpolated between confident anchors instead, which spreads lyric lines
|
||
# over the right time window.
|
||
MATCH_THRESHOLD = 0.35
|
||
GAP_USER = -0.10 # cost of leaving a user line unmatched
|
||
GAP_WHISPER = -0.04 # cost of skipping a whisper line
|
||
SOFT_DIAG = -0.04 # diagonal move with too-low similarity (no match credit)
|
||
|
||
# DP table: dp[i][j] = best score aligning U[:i] vs W[:j].
|
||
dp = [[0.0] * (nW + 1) for _ in range(nU + 1)]
|
||
for i in range(1, nU + 1):
|
||
dp[i][0] = dp[i - 1][0] + GAP_USER
|
||
for j in range(1, nW + 1):
|
||
dp[0][j] = dp[0][j - 1] + GAP_WHISPER
|
||
|
||
for i in range(1, nU + 1):
|
||
for j in range(1, nW + 1):
|
||
s = sim[i - 1][j - 1]
|
||
match_score = dp[i - 1][j - 1] + (s if s >= MATCH_THRESHOLD else SOFT_DIAG)
|
||
user_gap = dp[i - 1][j] + GAP_USER
|
||
whisper_gap = dp[i][j - 1] + GAP_WHISPER
|
||
dp[i][j] = max(match_score, user_gap, whisper_gap)
|
||
|
||
# Traceback to recover the matched pairs (user_idx → whisper_idx).
|
||
matches = {}
|
||
i, j = nU, nW
|
||
while i > 0 and j > 0:
|
||
s = sim[i - 1][j - 1]
|
||
eff = (s if s >= MATCH_THRESHOLD else SOFT_DIAG)
|
||
if abs(dp[i][j] - (dp[i - 1][j - 1] + eff)) < 1e-9:
|
||
if s >= MATCH_THRESHOLD:
|
||
matches[i - 1] = j - 1
|
||
i -= 1; j -= 1
|
||
elif abs(dp[i][j] - (dp[i - 1][j] + GAP_USER)) < 1e-9:
|
||
i -= 1
|
||
else:
|
||
j -= 1
|
||
|
||
# Build aligned output: matched lines get the whisper timing; unmatched user
|
||
# lines get evenly interpolated between their nearest matched neighbours.
|
||
out = []
|
||
pending = []
|
||
last_end = 0.0
|
||
|
||
def flush(next_start):
|
||
if not pending:
|
||
return
|
||
n = len(pending)
|
||
span = max(0.0, next_start - last_end)
|
||
slot = (span / (n + 1)) if span > 0 else 0.6
|
||
for k, (pt, pl) in enumerate(pending):
|
||
s = last_end + (k + 0.5) * slot
|
||
e = last_end + (k + 1.5) * slot
|
||
out.append({"start": round(s, 3), "end": round(e, 3),
|
||
"text": pt, "lang": pl,
|
||
"words": _redistribute_words(s, e, pt, pl)})
|
||
pending.clear()
|
||
|
||
for ui, u in enumerate(U):
|
||
if ui in matches:
|
||
wl = W[matches[ui]]
|
||
start = float(wl["start"])
|
||
end = float(wl["end"])
|
||
lang = wl.get("lang") or _guess_lang_from_script(u)
|
||
flush(start)
|
||
out.append({"start": round(start, 3), "end": round(end, 3),
|
||
"text": u, "lang": lang,
|
||
"words": _redistribute_words(start, end, u, lang)})
|
||
last_end = end
|
||
else:
|
||
pending.append((u, _guess_lang_from_script(u)))
|
||
|
||
if pending:
|
||
anchor_end = max(last_end + 1.0, float(W[-1]["end"]))
|
||
flush(anchor_end)
|
||
|
||
return out
|
||
|
||
|
||
def merge_fragments(lines: list) -> list:
|
||
"""Stitch tiny leftover fragments (e.g. a lone 'The' or a 1-char Thai token)
|
||
into an adjacent same-language line when they're close in time."""
|
||
def tiny(ln):
|
||
if ln["lang"] in SPACELESS:
|
||
return len(ln["text"]) < 4
|
||
return len(ln["text"].split()) < 2
|
||
|
||
out = []
|
||
for ln in lines:
|
||
if out and out[-1]["lang"] == ln["lang"]:
|
||
prev = out[-1]
|
||
gap = ln["start"] - prev["end"]
|
||
if gap < 1.0 and (tiny(ln) or tiny(prev)):
|
||
sep = "" if ln["lang"] in SPACELESS else " "
|
||
prev["text"] = (prev["text"] + sep + ln["text"]).strip()
|
||
prev["end"] = ln["end"]
|
||
prev["words"] = (prev.get("words") or []) + (ln.get("words") or [])
|
||
continue
|
||
out.append(ln)
|
||
return out
|
||
|
||
|
||
def split_into_lines(words: list, lang: str) -> list:
|
||
"""Split one (single-language) segment's timed words into short karaoke lines."""
|
||
if not words:
|
||
return []
|
||
spaced = lang not in SPACELESS
|
||
lines, cur = [], [words[0]]
|
||
for prev, w in zip(words, words[1:]):
|
||
brk = (w["start"] - prev["end"]) >= LINE_GAP
|
||
if not brk and spaced and len(cur) >= LINE_MAX_WORDS:
|
||
brk = True
|
||
if not brk and not spaced and sum(len(x["text"]) for x in cur) >= LINE_MAX_CHARS:
|
||
brk = True
|
||
if not brk and spaced and len(cur) >= LINE_MIN_WORDS:
|
||
if prev["text"].endswith(PUNCT_END):
|
||
brk = True
|
||
else:
|
||
head = w["text"][:1]
|
||
if (head.isupper() and not head.isdigit()
|
||
and w["text"] not in ("I", "I'm", "I'll", "I've", "I'd", "I’m", "I’ll", "I’ve", "I’d")):
|
||
brk = True
|
||
if brk:
|
||
line = _emit(cur, lang)
|
||
if line:
|
||
lines.append(line)
|
||
cur = [w]
|
||
else:
|
||
cur.append(w)
|
||
line = _emit(cur, lang)
|
||
if line:
|
||
lines.append(line)
|
||
return lines
|
||
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--audio", required=True)
|
||
ap.add_argument("--out", required=True)
|
||
ap.add_argument("--language", default=None)
|
||
ap.add_argument("--gpu", type=int, default=None)
|
||
ap.add_argument("--model", default="large-v3")
|
||
ap.add_argument("--no-demucs", action="store_true")
|
||
ap.add_argument("--no-vad", action="store_true",
|
||
help="disable Silero VAD filter inside Whisper (transcribe full audio)")
|
||
ap.add_argument("--no-vocal-gapfill", action="store_true",
|
||
help="distribute gap-filled description lines evenly instead of snapping them "
|
||
"to vocal-active regions detected by Silero VAD")
|
||
ap.add_argument("--progress", default=None, help="path to write live progress JSON")
|
||
ap.add_argument("--user-lyrics", default=None,
|
||
help="path to a text file with one lyric line per line; the pipeline will "
|
||
"ALIGN these exact lines to the audio instead of producing its own text")
|
||
args = ap.parse_args()
|
||
|
||
global _PROGRESS_PATH
|
||
_PROGRESS_PATH = args.progress
|
||
|
||
if not os.path.isfile(args.audio):
|
||
log(f"audio not found: {args.audio}")
|
||
sys.exit(2)
|
||
|
||
write_progress(3, "Starting")
|
||
|
||
# GPU pinning must happen before torch is imported by whisperx.
|
||
if args.gpu is not None:
|
||
os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu)
|
||
|
||
vocals_path = args.audio
|
||
used_demucs = False
|
||
if not args.no_demucs:
|
||
write_progress(8, "Separating vocals")
|
||
sep = isolate_vocals(args.audio, args.gpu)
|
||
if sep:
|
||
vocals_path = sep
|
||
used_demucs = True
|
||
|
||
write_progress(40, "Loading model")
|
||
from faster_whisper import WhisperModel, decode_audio
|
||
from faster_whisper.vad import get_speech_timestamps, VadOptions
|
||
from collections import defaultdict
|
||
import gc
|
||
|
||
SR = 16000
|
||
audio = decode_audio(vocals_path, sampling_rate=SR)
|
||
|
||
def is_oom(e):
|
||
s = str(e).lower()
|
||
return "out of memory" in s or "cuda failed" in s or "cublas" in s
|
||
|
||
def overlap_ratio(a, b):
|
||
o = min(a["end"], b["end"]) - max(a["start"], b["start"])
|
||
if o <= 0:
|
||
return 0.0
|
||
return o / max(1e-6, min(a["end"] - a["start"], b["end"] - b["start"]))
|
||
|
||
# Full multilingual transcription on a given device/precision. Raises on OOM
|
||
# so the caller can retry on a lighter config (cuda/fp16 → cuda/int8 → cpu).
|
||
#
|
||
# Strategy that handles bilingual duets WITHOUT skipping verses: transcribe the
|
||
# WHOLE song once per candidate language (full recall + sentence context), then
|
||
# for every time region keep whichever language's transcription is the most
|
||
# confident. English regions win in the English pass, Thai regions win in the
|
||
# Thai pass — nothing is dropped and each part is in its own script.
|
||
def transcribe_all(dev, ct):
|
||
log(f"[fw] loading {args.model} on {dev}/{ct}")
|
||
model = WhisperModel(args.model, device=dev, compute_type=ct)
|
||
try:
|
||
# ── Candidate languages: detect across several windows of the song ──
|
||
write_progress(46, "Detecting languages")
|
||
if args.language:
|
||
cands = [args.language]
|
||
else:
|
||
votes = defaultdict(float)
|
||
win = 30 * SR
|
||
positions = list(range(0, max(1, len(audio) - win + 1), max(win // 2, 1)))[:12] or [0]
|
||
for pos in positions:
|
||
sl = audio[pos:pos + win]
|
||
if len(sl) < SR:
|
||
continue
|
||
try:
|
||
lang, prob, _ = model.detect_language(sl, language_detection_segments=1)
|
||
except Exception as e:
|
||
if is_oom(e):
|
||
raise
|
||
lang, prob = None, 0.0
|
||
if lang and prob >= 0.5:
|
||
votes[lang] += prob
|
||
if not votes:
|
||
cands = ["en"]
|
||
else:
|
||
ranked = sorted(votes, key=votes.get, reverse=True)
|
||
top = votes[ranked[0]]
|
||
# Keep languages with ≥25% of the top vote mass (drops flukes).
|
||
cands = [l for l in ranked if votes[l] >= 0.25 * top][:3]
|
||
log(f"[lang] candidates={cands}")
|
||
|
||
# ── One full-song pass per candidate language ──────────────────────
|
||
# Loose VAD pass: drops obvious instrumental stretches but keeps soft
|
||
# sung vocals (threshold 0.20 vs default 0.5). Without it, Whisper
|
||
# invents lyrics over the intro/outro music. With it tuned too high
|
||
# it drops legitimate quiet singing — we erred on the loose side after
|
||
# users reported missing verses in the middle of long songs.
|
||
VAD_PARAMS = {
|
||
"threshold": 0.20,
|
||
"min_speech_duration_ms": 200,
|
||
"min_silence_duration_ms": 350,
|
||
"speech_pad_ms": 250,
|
||
}
|
||
# Common Whisper hallucinations on silence / music. If a segment IS
|
||
# one of these phrases (no extra content), it's a hallucination
|
||
# regardless of how confident the model was.
|
||
HALLUCINATIONS = {
|
||
"thank you", "thanks for watching", "thank you for watching",
|
||
"subscribe", "please subscribe", "like and subscribe",
|
||
"music", "[music]", "(music)", "♪", "♫",
|
||
"you", ".", "..", "...", "thank you.",
|
||
}
|
||
segs_all = []
|
||
for ci, L in enumerate(cands):
|
||
write_progress(50 + int(40 * ci / max(1, len(cands))), "Transcribing")
|
||
seg_iter, _ = model.transcribe(
|
||
audio, language=L, word_timestamps=True, beam_size=5,
|
||
vad_filter=(not args.no_vad), vad_parameters=VAD_PARAMS,
|
||
condition_on_previous_text=False,
|
||
no_speech_threshold=0.70,
|
||
log_prob_threshold=-1.4,
|
||
)
|
||
for s in seg_iter:
|
||
# Drop clear non-speech and low-confidence hallucinations on
|
||
# instrumental sections, but keep genuinely-sung (lower-conf) lines.
|
||
if getattr(s, "no_speech_prob", 0.0) > 0.70:
|
||
continue
|
||
if getattr(s, "avg_logprob", 0.0) < -1.4:
|
||
continue
|
||
text = (s.text or "").strip()
|
||
if not text:
|
||
continue
|
||
# Drop the well-known Whisper boilerplate hallucinations.
|
||
if text.lower().strip(".,!? ") in HALLUCINATIONS:
|
||
continue
|
||
# Drop "compression ratio" gibberish — pathological repeats.
|
||
if getattr(s, "compression_ratio", 1.0) > 2.4:
|
||
continue
|
||
segs_all.append({
|
||
"start": float(s.start), "end": float(s.end), "lang": L,
|
||
"score": float(getattr(s, "avg_logprob", -5.0)),
|
||
"text": text, "words": list(s.words or []),
|
||
})
|
||
|
||
# ── Resolve overlaps using OUTPUT SCRIPT as the language signal ─────
|
||
# avg_logprob alone is unreliable (the Thai pass can "win" English
|
||
# regions yet output Latin). The script actually produced is the
|
||
# truth: a non-Latin-language pass that emitted Latin text is a
|
||
# mis-forced English region — drop it. Native non-Latin script wins
|
||
# overlaps so Thai regions never get the romanised English version.
|
||
def nonlatin_frac(t):
|
||
letters = [c for c in t if c.isalpha()]
|
||
if not letters:
|
||
return 0.0
|
||
return sum(1 for c in letters if not ("a" <= c.lower() <= "z")) / len(letters)
|
||
|
||
kept = []
|
||
for s in segs_all:
|
||
nl = nonlatin_frac(s["text"])
|
||
s["native"] = 1 if nl >= 0.5 else 0
|
||
if s["lang"] in NONLATIN_LANGS and nl < 0.3:
|
||
continue # Thai (etc.) pass that produced Latin = mis-forced English
|
||
kept.append(s)
|
||
|
||
kept.sort(key=lambda x: (x["native"], x["score"]), reverse=True)
|
||
accepted = []
|
||
for s in kept:
|
||
if any(overlap_ratio(s, a) > 0.4 for a in accepted):
|
||
continue
|
||
accepted.append(s)
|
||
accepted.sort(key=lambda x: x["start"])
|
||
|
||
dur = defaultdict(float)
|
||
for s in accepted:
|
||
dur[s["lang"]] += s["end"] - s["start"]
|
||
dominant = max(dur, key=dur.get) if dur else (cands[0] if cands else "en")
|
||
trusted = set(dur.keys()) or set(cands)
|
||
|
||
# ── Build karaoke lines ────────────────────────────────────────────
|
||
lines = []
|
||
for s in accepted:
|
||
compact = s["text"].replace(" ", "")
|
||
if len(compact) >= 8 and len(set(compact)) <= 1: # degenerate "ㄷㄷㄷ"
|
||
continue
|
||
words = []
|
||
for w in s["words"]:
|
||
if w.start is None or w.end is None:
|
||
continue
|
||
tok = (w.word or "").strip()
|
||
if not tok:
|
||
continue
|
||
words.append({"start": round(float(w.start), 3),
|
||
"end": round(float(w.end), 3), "text": tok})
|
||
if words:
|
||
lines += split_into_lines(words, s["lang"])
|
||
else:
|
||
lines.append({"start": round(s["start"], 3), "end": round(s["end"], 3),
|
||
"text": s["text"], "lang": s["lang"], "words": []})
|
||
return lines, dominant, trusted
|
||
finally:
|
||
del model
|
||
gc.collect()
|
||
|
||
all_lines, dominant, trusted, last_err = [], "en", set(), None
|
||
for dev, ct in [("cuda", "float16"), ("cuda", "int8"), ("cpu", "int8")]:
|
||
try:
|
||
all_lines, dominant, trusted = transcribe_all(dev, ct)
|
||
break
|
||
except Exception as e:
|
||
last_err = e
|
||
if is_oom(e):
|
||
log(f"[fw] {dev}/{ct} ran out of memory; retrying lighter")
|
||
continue
|
||
raise
|
||
else:
|
||
raise last_err if last_err else RuntimeError("transcription failed")
|
||
|
||
all_lines.sort(key=lambda ln: ln["start"])
|
||
all_lines = merge_fragments(all_lines)
|
||
|
||
# If the uploader provided lyrics in the song description, ALIGN those exact
|
||
# lines to the audio (using the whisper timing) instead of using the noisier
|
||
# whisper text. The transcription pass still ran — it's what provides the
|
||
# anchoring timestamps the user lines snap to.
|
||
source = "faster-whisper"
|
||
if args.user_lyrics and os.path.isfile(args.user_lyrics):
|
||
write_progress(92, "Syncing description lyrics")
|
||
try:
|
||
user_lines = [l.strip() for l in open(args.user_lyrics, encoding="utf-8")
|
||
.read().splitlines() if l.strip()]
|
||
except Exception as e:
|
||
log(f"[user-lyrics] read failed ({e})")
|
||
user_lines = []
|
||
if user_lines:
|
||
# Hybrid alignment: whisper-anchored where whisper heard the song,
|
||
# description-filled where whisper missed. Gap-filled lines snap
|
||
# to vocal-active moments detected by Silero VAD so they sit on
|
||
# actual singing instead of drifting across instrumental beats.
|
||
audio_duration = len(audio) / SR
|
||
vocal_regions = []
|
||
if not args.no_vocal_gapfill:
|
||
try:
|
||
vad_opts = VadOptions(threshold=0.20,
|
||
min_speech_duration_ms=400,
|
||
min_silence_duration_ms=500,
|
||
speech_pad_ms=120)
|
||
raw = get_speech_timestamps(audio, vad_opts)
|
||
vocal_regions = [(r["start"] / SR, r["end"] / SR) for r in raw]
|
||
log(f"[vad] {len(vocal_regions)} vocal regions detected")
|
||
except Exception as e:
|
||
log(f"[vad] failed ({e}); falling back to even spread in gaps")
|
||
else:
|
||
log("[vad] vocal-region gap-fill disabled by admin toggle")
|
||
corrected = correct_whisper_with_description(
|
||
all_lines, user_lines, audio_duration, vocal_regions
|
||
)
|
||
if corrected:
|
||
all_lines = corrected
|
||
source = "description-aligned"
|
||
log(f"[user-lyrics] aligned: description={len(user_lines)} "
|
||
f"output={len(all_lines)} duration={audio_duration:.1f}s")
|
||
|
||
write_progress(95, "Finishing")
|
||
|
||
payload = {
|
||
"version": 1,
|
||
"language": dominant,
|
||
"source": source,
|
||
"model": args.model,
|
||
"demucs": used_demucs,
|
||
"multilingual": True,
|
||
"lines": all_lines,
|
||
}
|
||
|
||
out_dir = os.path.dirname(args.out)
|
||
if out_dir:
|
||
os.makedirs(out_dir, exist_ok=True)
|
||
with open(args.out, "w", encoding="utf-8") as f:
|
||
json.dump(payload, f, ensure_ascii=False)
|
||
log(f"[done] wrote {len(payload['lines'])} lines ({sorted(trusted)}) -> {args.out}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|