Spaces:
Sleeping
Sleeping
| # app.py — Gradio ASR avec fallback (LM si disponible) | |
| import os, time, json | |
| import torch | |
| import numpy as np | |
| import librosa | |
| from scipy.special import log_softmax | |
| import gradio as gr | |
| from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor | |
| # Try to import pyctcdecode (may fail if kenlm not installed) | |
| try: | |
| from pyctcdecode import build_ctcdecoder | |
| PYCTC_OK = True | |
| except Exception as e: | |
| build_ctcdecoder = None | |
| PYCTC_OK = False | |
| # ---------- CONFIG ---------- | |
| MODEL_ID = "IbnAoudi/fulfulfudecameroun" # already pushed on HF | |
| KENLM_ARPA_PATH = "kenlm_5gram.arpa" # place this file in the Space root | |
| SAMPLE_RATE = 16000 | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| # ---------------------------- | |
| print("Loading model from:", MODEL_ID) | |
| processor = Wav2Vec2Processor.from_pretrained(MODEL_ID) | |
| model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID) | |
| model.to(DEVICE) | |
| model.eval() | |
| print("Model loaded. Device:", DEVICE) | |
| # --- Build safe alphabet for decoder --- | |
| tokenizer = processor.tokenizer | |
| vocab_size = getattr(tokenizer, "vocab_size", None) or len(tokenizer.get_vocab()) | |
| # convert ids -> tokens in id order (robust) | |
| tokens = tokenizer.convert_ids_to_tokens(list(range(vocab_size))) | |
| # create alphabet_for_decoder where index i corresponds to token id i | |
| alphabet_for_decoder = [] | |
| pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else None | |
| unk_id = tokenizer.unk_token_id if hasattr(tokenizer, "unk_token_id") else None | |
| for i, tok in enumerate(tokens): | |
| # Standardize special tokens: pad/unk/cls/special to empty string (pyctcdecode expects blank token as "") | |
| if pad_id is not None and i == pad_id: | |
| alphabet_for_decoder.append("") # blank (CTC) | |
| elif tok in {tokenizer.pad_token, tokenizer.eos_token, tokenizer.bos_token, tokenizer.cls_token, | |
| tokenizer.sep_token, tokenizer.unk_token, "<s>", "</s>", "<pad>", "<unk>"}: | |
| alphabet_for_decoder.append("") # remove special tokens for decoder | |
| else: | |
| # Some tokenizers return tokens like "▁a" or "Ġa" for wordpieces — keep them as-is or strip special markers if needed | |
| # For CTC char-based decoders it's good to keep the visible character | |
| alphabet_for_decoder.append(tok) | |
| print("Alphabet length:", len(alphabet_for_decoder)) | |
| decoder = None | |
| if PYCTC_OK and os.path.exists(KENLM_ARPA_PATH): | |
| try: | |
| t0 = time.time() | |
| decoder = build_ctcdecoder(alphabet_for_decoder, KENLM_ARPA_PATH) | |
| print("Decoder built (KenLM) in {:.1f}s".format(time.time() - t0)) | |
| except Exception as e: | |
| print("Warning: failed to build decoder (pyctcdecode/kenlm). LM disabled. Error:", e) | |
| decoder = None | |
| else: | |
| if not PYCTC_OK: | |
| print("pyctcdecode not available -> LM disabled (decoder=None).") | |
| else: | |
| print(f"KenLM ARPA file not found at {KENLM_ARPA_PATH} -> LM disabled.") | |
| # --- normalisation (same style as compute_metrics_fast) --- | |
| import re, unicodedata | |
| def normalize_text(s: str): | |
| if s is None: return "" | |
| s = unicodedata.normalize("NFKC", str(s)).lower().strip() | |
| s = re.sub(r"[’`ʼ‹›´]", "'", s) | |
| s = s.replace("|", " ") | |
| s = re.sub(r"[^0-9a-zɓɗŋƴɲəàáâãäçèéêëìíîïòóôõöùúûüÿ'\t \-]", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| # --- decoder helper with safe confidence estimation --- | |
| def decode_with_lm_np(logits_np: np.array, beam_width=50, alpha=0.8, beta=1.0): | |
| lp = log_softmax(logits_np, axis=-1) | |
| # try decoder | |
| try: | |
| text = decoder.decode(lp, beam_width=int(beam_width), alpha=float(alpha), beta=float(beta)) | |
| # pyctcdecode does not always return a stable 'score' value; compute a fallback confidence | |
| # heuristic: average max timestep prob (from softmax) | |
| max_probs = np.max(np.exp(lp), axis=-1) | |
| conf = float(np.mean(max_probs)) | |
| conf = max(0.0, min(1.0, conf)) | |
| except Exception as e: | |
| # fallback: greedy decode | |
| pred_ids = np.argmax(logits_np, axis=-1) | |
| prev = None | |
| out = [] | |
| for p in pred_ids: | |
| if p != prev: | |
| out.append(p) | |
| prev = p | |
| out = [p for p in out if p != tokenizer.pad_token_id] | |
| text = processor.batch_decode([out])[0] | |
| max_probs = np.max(np.exp(lp), axis=-1) | |
| conf = float(np.mean(max_probs)) | |
| conf = max(0.0, min(1.0, conf)) | |
| return text, conf | |
| # --- main transcribe function --- | |
| def transcribe(audio, beam_width=50, alpha=0.8, beta=1.0, use_lm=True): | |
| if audio is None: | |
| return {"transcription": "", "transcription_norm": "", "confidence": 0.0} | |
| # gradio: numpy tuple (sr, array) or path string | |
| if isinstance(audio, tuple) and len(audio) == 2: | |
| sr, wav_np = audio # some gradio versions invert order | |
| # handle both patterns: | |
| if isinstance(sr, np.ndarray): | |
| # sometimes returns (np_array, sr) | |
| wav_np, sr = sr, wav_np | |
| elif isinstance(audio, str) and os.path.exists(audio): | |
| wav_np, sr = librosa.load(audio, sr=None) | |
| else: | |
| # assume np array + SAMPLE_RATE | |
| wav_np = np.array(audio) | |
| sr = SAMPLE_RATE | |
| # resample if needed | |
| if sr != SAMPLE_RATE: | |
| wav_np = librosa.resample(wav_np.astype(float), orig_sr=sr, target_sr=SAMPLE_RATE) | |
| sr = SAMPLE_RATE | |
| if wav_np.ndim > 1: | |
| wav_np = np.mean(wav_np, axis=1) | |
| wav_np = wav_np.astype(np.float32) | |
| inputs = processor(wav_np, sampling_rate=SAMPLE_RATE, return_tensors="pt", padding=True) | |
| input_values = inputs.input_values.to(DEVICE) | |
| with torch.no_grad(): | |
| logits = model(input_values).logits.cpu().numpy() # (B, T, V) | |
| logits_np = logits[0] | |
| if use_lm and decoder is not None: | |
| raw_pred, conf = decode_with_lm_np(logits_np, beam_width=int(beam_width), alpha=float(alpha), beta=float(beta)) | |
| else: | |
| # greedy | |
| pred_ids = np.argmax(logits_np, axis=-1) | |
| prev = None | |
| out = [] | |
| for p in pred_ids: | |
| if p != prev: | |
| out.append(p) | |
| prev = p | |
| out = [p for p in out if p != tokenizer.pad_token_id] | |
| raw_pred = processor.batch_decode([out])[0] | |
| lp = log_softmax(logits_np, axis=-1) | |
| max_probs = np.max(np.exp(lp), axis=-1) | |
| conf = float(np.mean(max_probs)) | |
| conf = max(0.0, min(1.0, conf)) | |
| pred_norm = normalize_text(raw_pred) | |
| return {"transcription": raw_pred, "transcription_norm": pred_norm, "confidence": round(float(conf), 4)} | |
| # --- Gradio UI --- | |
| title = "ASR XLS-R 300m — Live transcription (LM optional)" | |
| description = "Record or upload audio. If KenLM is available, decoding uses pyctcdecode+KenLM. Otherwise greedy fallback." | |
| with gr.Blocks() as demo: | |
| gr.Markdown(f"## {title}\n\nDevice: **{DEVICE}**\n\n{description}") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| audio_in = gr.Audio(source="microphone", type="numpy", label="Record or upload audio (wav)") | |
| use_lm_checkbox = gr.Checkbox(value=True, label="Use LM (pyctcdecode + KenLM)") | |
| beam_slider = gr.Slider(minimum=1, maximum=200, step=1, value=50, label="Beam width (LM)") | |
| alpha_slider = gr.Slider(minimum=0.0, maximum=4.0, step=0.05, value=0.8, label="LM weight (alpha)") | |
| beta_slider = gr.Slider(minimum=0.0, maximum=4.0, step=0.05, value=1.0, label="Word insertion (beta)") | |
| btn = gr.Button("Transcribe") | |
| with gr.Column(scale=3): | |
| out_txt = gr.Textbox(label="Transcription (raw)", lines=4) | |
| out_norm = gr.Textbox(label="Transcription (normalized)", lines=2) | |
| out_conf = gr.Textbox(label="Confidence") | |
| def _run(a, use_lm, beam, a_w, b_w): | |
| res = transcribe(a, beam_width=beam, alpha=a_w, beta=b_w, use_lm=use_lm) | |
| return res["transcription"], res["transcription_norm"], str(res["confidence"]) | |
| btn.click(_run, inputs=[audio_in, use_lm_checkbox, beam_slider, alpha_slider, beta_slider], | |
| outputs=[out_txt, out_norm, out_conf]) | |
| if __name__ == "__main__": | |
| # On Spaces, no need for share=True. On Colab you may want share=True. | |
| demo.launch() |