Spaces:
Running
on
Zero
Running
on
Zero
| import argparse | |
| import functools | |
| import importlib.util | |
| import os | |
| from pathlib import Path | |
| import re | |
| import time | |
| import orjson | |
| try: | |
| import spaces | |
| except ImportError: | |
| class _SpacesFallback: | |
| def GPU(*_args, **_kwargs): | |
| def _decorator(func): | |
| return func | |
| return _decorator | |
| spaces = _SpacesFallback() | |
| import gradio as gr | |
| import numpy as np | |
| import torch | |
| from transformers import AutoModel, AutoProcessor | |
| # Disable the broken cuDNN SDPA backend | |
| torch.backends.cuda.enable_cudnn_sdp(False) | |
| # Keep these enabled as fallbacks | |
| torch.backends.cuda.enable_flash_sdp(True) | |
| torch.backends.cuda.enable_mem_efficient_sdp(True) | |
| torch.backends.cuda.enable_math_sdp(True) | |
| MODEL_PATH = "OpenMOSS-Team/MOSS-TTS" | |
| DEFAULT_ATTN_IMPLEMENTATION = "auto" | |
| DEFAULT_MAX_NEW_TOKENS = 4096 | |
| PRELOAD_ENV_VAR = "MOSS_TTS_PRELOAD_AT_STARTUP" | |
| CONTINUATION_NOTICE = ( | |
| "Continuation mode is active. Make sure the reference audio transcript is prepended to the input text." | |
| ) | |
| MODE_CLONE = "Clone" | |
| MODE_CONTINUE = "Continuation" | |
| MODE_CONTINUE_CLONE = "Continuation + Clone" | |
| ZH_TOKENS_PER_CHAR = 3.098411951313033 | |
| EN_TOKENS_PER_CHAR = 0.8673376262755219 | |
| REFERENCE_AUDIO_DIR = Path(__file__).resolve().parent / "assets" / "audio" | |
| EXAMPLE_TEXTS_JSONL_PATH = Path(__file__).resolve().parent / "assets" / "text" / "moss_tts_example_texts.jsonl" | |
| def _parse_example_id(example_id: str) -> tuple[str, int] | None: | |
| matched = re.fullmatch(r"(zh|en)/(\d+)", (example_id or "").strip()) | |
| if matched is None: | |
| return None | |
| return matched.group(1), int(matched.group(2)) | |
| def _resolve_reference_audio_path(language: str, index: int) -> Path | None: | |
| stem_candidates = [f"reference_{language}_{index}"] | |
| for stem in stem_candidates: | |
| for ext in (".wav", ".mp3"): | |
| audio_path = REFERENCE_AUDIO_DIR / f"{stem}{ext}" | |
| if audio_path.exists(): | |
| return audio_path | |
| return None | |
| def build_example_rows() -> list[tuple[str, str, str]]: | |
| rows: list[tuple[str, str, str]] = [] | |
| with open(EXAMPLE_TEXTS_JSONL_PATH, "rb") as f: | |
| for line in f: | |
| if not line.strip(): | |
| continue | |
| sample = orjson.loads(line) | |
| parsed = _parse_example_id(sample.get("id", "")) | |
| if parsed is None: | |
| continue | |
| language, index = parsed | |
| text = str(sample.get("text", "")).strip() | |
| audio_path = _resolve_reference_audio_path(language, index) | |
| if audio_path is None: | |
| continue | |
| rows.append((sample['role'], str(audio_path), text)) | |
| return rows | |
| EXAMPLE_ROWS = build_example_rows() | |
| def load_backend(model_path: str, device_str: str, attn_implementation: str): | |
| device = torch.device(device_str if torch.cuda.is_available() else "cpu") | |
| dtype = torch.bfloat16 if device.type == "cuda" else torch.float32 | |
| resolved_attn_implementation = resolve_attn_implementation( | |
| requested=attn_implementation, | |
| device=device, | |
| dtype=dtype, | |
| ) | |
| processor = AutoProcessor.from_pretrained( | |
| model_path, | |
| trust_remote_code=True, | |
| ) | |
| if hasattr(processor, "audio_tokenizer"): | |
| processor.audio_tokenizer = processor.audio_tokenizer.to(device) | |
| processor.audio_tokenizer.eval() | |
| model_kwargs = { | |
| "trust_remote_code": True, | |
| "torch_dtype": dtype, | |
| } | |
| if resolved_attn_implementation: | |
| model_kwargs["attn_implementation"] = resolved_attn_implementation | |
| model = AutoModel.from_pretrained(model_path, **model_kwargs).to(device) | |
| model.eval() | |
| sample_rate = int(getattr(processor.model_config, "sampling_rate", 24000)) | |
| return model, processor, device, sample_rate | |
| def resolve_attn_implementation(requested: str, device: torch.device, dtype: torch.dtype) -> str | None: | |
| requested_norm = (requested or "").strip().lower() | |
| if requested_norm in {"none"}: | |
| return None | |
| if requested_norm not in {"", "auto"}: | |
| return requested | |
| # Prefer FlashAttention 2 when package + device conditions are met. | |
| if ( | |
| device.type == "cuda" | |
| and importlib.util.find_spec("flash_attn") is not None | |
| and dtype in {torch.float16, torch.bfloat16} | |
| ): | |
| major, _ = torch.cuda.get_device_capability(device) | |
| if major >= 8: | |
| return "flash_attention_2" | |
| # CUDA fallback: use PyTorch SDPA kernels. | |
| if device.type == "cuda": | |
| return "sdpa" | |
| # CPU fallback. | |
| return "eager" | |
| def detect_text_language(text: str) -> str: | |
| zh_chars = len(re.findall(r"[\u4e00-\u9fff]", text)) | |
| en_chars = len(re.findall(r"[A-Za-z]", text)) | |
| if zh_chars == 0 and en_chars == 0: | |
| return "en" | |
| return "zh" if zh_chars >= en_chars else "en" | |
| def supports_duration_control(mode_with_reference: str) -> bool: | |
| return mode_with_reference not in {MODE_CONTINUE, MODE_CONTINUE_CLONE} | |
| def estimate_duration_tokens(text: str) -> tuple[str, int, int, int]: | |
| normalized = text or "" | |
| effective_len = max(len(normalized), 1) | |
| language = detect_text_language(normalized) | |
| factor = ZH_TOKENS_PER_CHAR if language == "zh" else EN_TOKENS_PER_CHAR | |
| default_tokens = max(1, int(effective_len * factor)) | |
| min_tokens = max(1, int(default_tokens * 0.5)) | |
| max_tokens = max(min_tokens, int(default_tokens * 1.5)) | |
| return language, default_tokens, min_tokens, max_tokens | |
| def update_duration_controls( | |
| enabled: bool, | |
| text: str, | |
| current_tokens: float | int | None, | |
| mode_with_reference: str, | |
| ): | |
| if not supports_duration_control(mode_with_reference): | |
| return ( | |
| gr.update(visible=False), | |
| "Duration control is disabled for Continuation modes.", | |
| gr.update(value=False, interactive=False), | |
| ) | |
| checkbox_update = gr.update(interactive=True) | |
| if not enabled: | |
| return gr.update(visible=False), "Duration control is disabled.", checkbox_update | |
| language, default_tokens, min_tokens, max_tokens = estimate_duration_tokens(text) | |
| # Slider is initialized with value=1 as a placeholder; treat it as "unset" | |
| # so first-time estimation uses the computed default instead of clamping to min. | |
| if current_tokens is None or int(current_tokens) == 1: | |
| slider_value = default_tokens | |
| else: | |
| slider_value = int(current_tokens) | |
| slider_value = max(min_tokens, min(max_tokens, slider_value)) | |
| language_label = "Chinese" if language == "zh" else "English" | |
| hint = ( | |
| f"Duration control enabled | detected language: {language_label} | " | |
| f"default={default_tokens}, range=[{min_tokens}, {max_tokens}]" | |
| ) | |
| return ( | |
| gr.update( | |
| visible=True, | |
| minimum=min_tokens, | |
| maximum=max_tokens, | |
| value=slider_value, | |
| step=1, | |
| ), | |
| hint, | |
| checkbox_update, | |
| ) | |
| def build_conversation( | |
| text: str, | |
| reference_audio: str | None, | |
| mode_with_reference: str, | |
| expected_tokens: int | None, | |
| processor, | |
| ): | |
| text = (text or "").strip() | |
| if not text: | |
| raise ValueError("Please enter text to synthesize.") | |
| user_kwargs = {"text": text} | |
| if expected_tokens is not None: | |
| user_kwargs["tokens"] = int(expected_tokens) | |
| if not reference_audio: | |
| conversations = [[processor.build_user_message(**user_kwargs)]] | |
| return conversations, "generation", "Direct Generation" | |
| if mode_with_reference == MODE_CLONE: | |
| clone_kwargs = dict(user_kwargs) | |
| clone_kwargs["reference"] = [reference_audio] | |
| conversations = [[processor.build_user_message(**clone_kwargs)]] | |
| return conversations, "generation", MODE_CLONE | |
| if mode_with_reference == MODE_CONTINUE: | |
| conversations = [ | |
| [ | |
| processor.build_user_message(**user_kwargs), | |
| processor.build_assistant_message(audio_codes_list=[reference_audio]), | |
| ] | |
| ] | |
| return conversations, "continuation", MODE_CONTINUE | |
| continue_clone_kwargs = dict(user_kwargs) | |
| continue_clone_kwargs["reference"] = [reference_audio] | |
| conversations = [ | |
| [ | |
| processor.build_user_message(**continue_clone_kwargs), | |
| processor.build_assistant_message(audio_codes_list=[reference_audio]), | |
| ] | |
| ] | |
| return conversations, "continuation", MODE_CONTINUE_CLONE | |
| def render_mode_hint(reference_audio: str | None, mode_with_reference: str): | |
| if not reference_audio: | |
| return "Current mode: **Direct Generation** (no reference audio uploaded)" | |
| if mode_with_reference == MODE_CLONE: | |
| return "Current mode: **Clone** (speaker timbre will be cloned from the reference audio)" | |
| return f"Current mode: **{mode_with_reference}** \n> {CONTINUATION_NOTICE}" | |
| def apply_example_selection( | |
| mode_with_reference: str, | |
| duration_control_enabled: bool, | |
| duration_tokens: int, | |
| evt: gr.SelectData, | |
| ): | |
| if evt is None or evt.index is None: | |
| return gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| if isinstance(evt.index, (tuple, list)): | |
| row_idx = int(evt.index[0]) | |
| else: | |
| row_idx = int(evt.index) | |
| if row_idx < 0 or row_idx >= len(EXAMPLE_ROWS): | |
| return gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| _, audio_path, example_text = EXAMPLE_ROWS[row_idx] | |
| duration_slider_update, duration_hint, duration_checkbox_update = update_duration_controls( | |
| duration_control_enabled, | |
| example_text, | |
| duration_tokens, | |
| mode_with_reference, | |
| ) | |
| return ( | |
| audio_path, | |
| example_text, | |
| render_mode_hint(audio_path, mode_with_reference), | |
| duration_slider_update, | |
| duration_hint, | |
| duration_checkbox_update, | |
| ) | |
| def run_inference( | |
| text: str, | |
| reference_audio: str | None, | |
| mode_with_reference: str, | |
| duration_control_enabled: bool, | |
| duration_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| top_k: int, | |
| repetition_penalty: float, | |
| model_path: str, | |
| device: str, | |
| attn_implementation: str, | |
| max_new_tokens: int, | |
| ): | |
| started_at = time.monotonic() | |
| model, processor, torch_device, sample_rate = load_backend( | |
| model_path=model_path, | |
| device_str=device, | |
| attn_implementation=attn_implementation, | |
| ) | |
| duration_enabled = bool(duration_control_enabled and supports_duration_control(mode_with_reference)) | |
| expected_tokens = int(duration_tokens) if duration_enabled else None | |
| conversations, mode, mode_name = build_conversation( | |
| text=text, | |
| reference_audio=reference_audio, | |
| mode_with_reference=mode_with_reference, | |
| expected_tokens=expected_tokens, | |
| processor=processor, | |
| ) | |
| batch = processor(conversations, mode=mode) | |
| input_ids = batch["input_ids"].to(torch_device) | |
| attention_mask = batch["attention_mask"].to(torch_device) | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| max_new_tokens=int(max_new_tokens), | |
| audio_temperature=float(temperature), | |
| audio_top_p=float(top_p), | |
| audio_top_k=int(top_k), | |
| audio_repetition_penalty=float(repetition_penalty), | |
| ) | |
| messages = processor.decode(outputs) | |
| if not messages or messages[0] is None: | |
| raise RuntimeError("The model did not return a decodable audio result.") | |
| audio = messages[0].audio_codes_list[0] | |
| if isinstance(audio, torch.Tensor): | |
| audio_np = audio.detach().float().cpu().numpy() | |
| else: | |
| audio_np = np.asarray(audio, dtype=np.float32) | |
| if audio_np.ndim > 1: | |
| audio_np = audio_np.reshape(-1) | |
| audio_np = audio_np.astype(np.float32, copy=False) | |
| elapsed = time.monotonic() - started_at | |
| status = ( | |
| f"Done | mode: {mode_name} | elapsed: {elapsed:.2f}s | " | |
| f"max_new_tokens={int(max_new_tokens)}, " | |
| f"expected_tokens={expected_tokens if expected_tokens is not None else 'off'}, " | |
| f"audio_temperature={float(temperature):.2f}, audio_top_p={float(top_p):.2f}, " | |
| f"audio_top_k={int(top_k)}, audio_repetition_penalty={float(repetition_penalty):.2f}" | |
| ) | |
| return (sample_rate, audio_np), status | |
| def build_demo(args: argparse.Namespace): | |
| custom_css = """ | |
| :root { | |
| --bg: #f6f7f8; | |
| --panel: #ffffff; | |
| --ink: #111418; | |
| --muted: #4d5562; | |
| --line: #e5e7eb; | |
| --accent: #0f766e; | |
| } | |
| .gradio-container { | |
| background: linear-gradient(180deg, #f7f8fa 0%, #f3f5f7 100%); | |
| color: var(--ink); | |
| } | |
| .app-card { | |
| border: 1px solid var(--line); | |
| border-radius: 16px; | |
| background: var(--panel); | |
| padding: 14px; | |
| } | |
| .app-title { | |
| font-size: 22px; | |
| font-weight: 700; | |
| margin-bottom: 6px; | |
| letter-spacing: 0.2px; | |
| } | |
| .app-subtitle { | |
| color: var(--muted); | |
| font-size: 14px; | |
| margin-bottom: 8px; | |
| } | |
| #output_audio { | |
| padding-bottom: 12px; | |
| margin-bottom: 8px; | |
| overflow: hidden !important; | |
| } | |
| #output_audio > .wrap { | |
| overflow: hidden !important; | |
| } | |
| #output_audio audio { | |
| margin-bottom: 6px; | |
| } | |
| #run-btn { | |
| background: var(--accent); | |
| border: none; | |
| } | |
| """ | |
| with gr.Blocks(title="MOSS-TTS Demo", css=custom_css) as demo: | |
| gr.Markdown( | |
| """ | |
| <div class="app-card"> | |
| <div class="app-title">MOSS-TTS</div> | |
| <div class="app-subtitle">Minimal UI: Direct Generation, Clone, Continuation, Continuation + Clone</div> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(equal_height=False): | |
| with gr.Column(scale=3): | |
| text = gr.Textbox( | |
| label="Text", | |
| lines=9, | |
| placeholder="Enter text to synthesize. In continuation modes, prepend the reference audio transcript.", | |
| ) | |
| reference_audio = gr.Audio( | |
| label="Reference Audio (Optional)", | |
| type="filepath", | |
| ) | |
| mode_with_reference = gr.Radio( | |
| choices=[MODE_CLONE, MODE_CONTINUE, MODE_CONTINUE_CLONE], | |
| value=MODE_CLONE, | |
| label="Mode with Reference Audio", | |
| info="If no reference audio is uploaded, Direct Generation will be used automatically.", | |
| ) | |
| mode_hint = gr.Markdown(render_mode_hint(None, MODE_CLONE)) | |
| duration_control_enabled = gr.Checkbox( | |
| value=False, | |
| label="Enable Duration Control (Expected Audio Tokens)", | |
| ) | |
| duration_tokens = gr.Slider( | |
| minimum=1, | |
| maximum=1, | |
| step=1, | |
| value=1, | |
| label="expected_tokens", | |
| visible=False, | |
| ) | |
| duration_hint = gr.Markdown("Duration control is disabled.") | |
| with gr.Accordion("Sampling Parameters (Audio)", open=True): | |
| temperature = gr.Slider( | |
| minimum=0.1, | |
| maximum=3.0, | |
| step=0.05, | |
| value=1.7, | |
| label="temperature", | |
| ) | |
| top_p = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| step=0.01, | |
| value=0.8, | |
| label="top_p", | |
| ) | |
| top_k = gr.Slider( | |
| minimum=1, | |
| maximum=200, | |
| step=1, | |
| value=25, | |
| label="top_k", | |
| ) | |
| repetition_penalty = gr.Slider( | |
| minimum=0.8, | |
| maximum=2.0, | |
| step=0.05, | |
| value=1.0, | |
| label="repetition_penalty", | |
| ) | |
| max_new_tokens = gr.Slider( | |
| minimum=256, | |
| maximum=8192, | |
| step=128, | |
| value=DEFAULT_MAX_NEW_TOKENS, | |
| label="max_new_tokens", | |
| ) | |
| run_btn = gr.Button("Generate Speech", variant="primary", elem_id="run-btn") | |
| with gr.Column(scale=2): | |
| output_audio = gr.Audio(label="Output Audio", type="numpy", elem_id="output_audio") | |
| status = gr.Textbox(label="Status", lines=4, interactive=False) | |
| examples_table = gr.Dataframe( | |
| headers=["Reference Speech", "Example Text"], | |
| value=[[name, text] for name, _, text in EXAMPLE_ROWS], | |
| datatype=["str", "str"], | |
| row_count=(len(EXAMPLE_ROWS), "fixed"), | |
| col_count=(2, "fixed"), | |
| interactive=False, | |
| wrap=True, | |
| label="Examples (click a row to fill inputs)", | |
| ) | |
| reference_audio.change( | |
| fn=render_mode_hint, | |
| inputs=[reference_audio, mode_with_reference], | |
| outputs=[mode_hint], | |
| ) | |
| mode_with_reference.change( | |
| fn=render_mode_hint, | |
| inputs=[reference_audio, mode_with_reference], | |
| outputs=[mode_hint], | |
| ) | |
| duration_control_enabled.change( | |
| fn=update_duration_controls, | |
| inputs=[duration_control_enabled, text, duration_tokens, mode_with_reference], | |
| outputs=[duration_tokens, duration_hint, duration_control_enabled], | |
| ) | |
| text.change( | |
| fn=update_duration_controls, | |
| inputs=[duration_control_enabled, text, duration_tokens, mode_with_reference], | |
| outputs=[duration_tokens, duration_hint, duration_control_enabled], | |
| ) | |
| mode_with_reference.change( | |
| fn=update_duration_controls, | |
| inputs=[duration_control_enabled, text, duration_tokens, mode_with_reference], | |
| outputs=[duration_tokens, duration_hint, duration_control_enabled], | |
| ) | |
| examples_table.select( | |
| fn=apply_example_selection, | |
| inputs=[mode_with_reference, duration_control_enabled, duration_tokens], | |
| outputs=[ | |
| reference_audio, | |
| text, | |
| mode_hint, | |
| duration_tokens, | |
| duration_hint, | |
| duration_control_enabled, | |
| ], | |
| ) | |
| run_btn.click( | |
| fn=run_inference, | |
| inputs=[ | |
| text, | |
| reference_audio, | |
| mode_with_reference, | |
| duration_control_enabled, | |
| duration_tokens, | |
| temperature, | |
| top_p, | |
| top_k, | |
| repetition_penalty, | |
| gr.State(args.model_path), | |
| gr.State(args.device), | |
| gr.State(args.attn_implementation), | |
| max_new_tokens, | |
| ], | |
| outputs=[output_audio, status], | |
| ) | |
| return demo | |
| def resolve_runtime_attn(args: argparse.Namespace) -> argparse.Namespace: | |
| runtime_device = torch.device(args.device if torch.cuda.is_available() else "cpu") | |
| runtime_dtype = torch.bfloat16 if runtime_device.type == "cuda" else torch.float32 | |
| args.attn_implementation = resolve_attn_implementation( | |
| requested=args.attn_implementation, | |
| device=runtime_device, | |
| dtype=runtime_dtype, | |
| ) or "none" | |
| return args | |
| def parse_bool_env(name: str, default: bool) -> bool: | |
| value = os.getenv(name) | |
| if value is None: | |
| return default | |
| return value.strip().lower() in {"1", "true", "yes", "y", "on"} | |
| def parse_port(value: str | None, default: int) -> int: | |
| if not value: | |
| return default | |
| try: | |
| return int(value) | |
| except ValueError: | |
| return default | |
| def main(): | |
| parser = argparse.ArgumentParser(description="MossTTS Gradio Demo") | |
| parser.add_argument("--model_path", type=str, default=MODEL_PATH) | |
| parser.add_argument("--device", type=str, default="cuda:0") | |
| parser.add_argument("--attn_implementation", type=str, default=DEFAULT_ATTN_IMPLEMENTATION) | |
| parser.add_argument("--host", type=str, default="0.0.0.0") | |
| parser.add_argument( | |
| "--port", | |
| type=int, | |
| default=int(os.getenv("GRADIO_SERVER_PORT", os.getenv("PORT", "7860"))), | |
| ) | |
| parser.add_argument("--share", action="store_true") | |
| args = parser.parse_args() | |
| args.host = os.getenv("GRADIO_SERVER_NAME", args.host) | |
| args.port = parse_port(os.getenv("GRADIO_SERVER_PORT", os.getenv("PORT")), args.port) | |
| args = resolve_runtime_attn(args) | |
| print(f"[INFO] Using attn_implementation={args.attn_implementation}", flush=True) | |
| preload_enabled = parse_bool_env(PRELOAD_ENV_VAR, default=not bool(os.getenv("SPACE_ID"))) | |
| if preload_enabled: | |
| preload_started_at = time.monotonic() | |
| print( | |
| f"[Startup] Preloading backend: model={args.model_path}, device={args.device}, attn={args.attn_implementation}", | |
| flush=True, | |
| ) | |
| load_backend( | |
| model_path=args.model_path, | |
| device_str=args.device, | |
| attn_implementation=args.attn_implementation, | |
| ) | |
| print( | |
| f"[Startup] Backend preload finished in {time.monotonic() - preload_started_at:.2f}s", | |
| flush=True, | |
| ) | |
| else: | |
| print( | |
| f"[Startup] Skipping preload (set {PRELOAD_ENV_VAR}=1 to enable).", | |
| flush=True, | |
| ) | |
| demo = build_demo(args) | |
| demo.queue(max_size=16, default_concurrency_limit=1).launch( | |
| server_name=args.host, | |
| server_port=args.port, | |
| share=args.share, | |
| ssr_mode=False, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |