#!/usr/bin/env python3 """ Enhanced profiling script for ACE-Step inference with deep LLM analysis This script helps diagnose why LLM generation is slow by tracking: 1. Total tokens generated vs expected throughput (200 tokens/sec baseline) 2. Per-iteration timing to detect compilation overhead or slow operations 3. Constrained decoding overhead 4. CFG overhead (2x forward passes) 5. Model forward time vs sampling/processing time Usage: python profile_inference.py # Standard profiling with warmup python profile_inference.py --no-warmup # Profile first run (includes compilation) python profile_inference.py --llm-debug # Deep LLM performance debugging python profile_inference.py --detailed # Add cProfile function-level analysis Inference mode options: python profile_inference.py --thinking # Enable CoT for code generation python profile_inference.py --use-constrained-decoding # Use FSM constrained decoding python profile_inference.py --use-cot-metas # Enable LM to generate metadata via CoT """ import time import argparse import sys import os from contextlib import contextmanager from collections import defaultdict import json from typing import Tuple, Dict, Any, List from functools import wraps # Add project root to path project_root = os.path.abspath(os.path.dirname(__file__)) if project_root not in sys.path: sys.path.insert(0, project_root) import torch from acestep.inference import generate_music, GenerationParams, GenerationConfig from acestep.handler import AceStepHandler from acestep.llm_inference import LLMHandler class PreciseTimer: """High-precision timer with CUDA synchronization for accurate GPU timing""" def __init__(self, device="cuda"): self.device = device self.timings = defaultdict(list) self.enabled = True def sync(self): """Synchronize CUDA operations for accurate timing""" if self.enabled and self.device.startswith("cuda") and torch.cuda.is_available(): torch.cuda.synchronize() @contextmanager def time(self, name: str): """Time a code section with CUDA synchronization""" if not self.enabled: yield return self.sync() start = time.perf_counter() try: yield finally: self.sync() elapsed = time.perf_counter() - start self.timings[name].append(elapsed) def get_total(self, name: str) -> float: """Get total accumulated time for a section""" return sum(self.timings.get(name, [])) def get_mean(self, name: str) -> float: """Get mean time per call for a section""" times = self.timings.get(name, []) return sum(times) / len(times) if times else 0.0 def get_count(self, name: str) -> int: """Get number of calls for a section""" return len(self.timings.get(name, [])) def get_all(self, name: str) -> List[float]: """Get all timing samples for a section""" return self.timings.get(name, []) class LLMDebugger: """Track detailed LLM performance metrics to diagnose slow generation""" def __init__(self): self.reset() def reset(self): """Reset all metrics""" self.total_tokens = 0 self.generation_start = None self.generation_end = None self.output_text = "" self.prompt_length = 0 def start(self, prompt_length: int = 0): """Mark generation start""" self.generation_start = time.perf_counter() self.prompt_length = prompt_length def end(self, output_text: str = ""): """Mark generation end and store output""" self.generation_end = time.perf_counter() self.output_text = output_text def set_token_count(self, count: int): """Set total token count""" self.total_tokens = count def get_throughput(self) -> float: """Calculate actual tokens per second""" if self.generation_start and self.generation_end and self.total_tokens > 0: total_time = self.generation_end - self.generation_start if total_time > 0: return self.total_tokens / total_time return 0.0 def print_analysis(self): """Print detailed LLM performance analysis""" if not self.generation_start or not self.generation_end: return print("\n" + "=" * 100) print("šŸ” LLM PERFORMANCE DEEP DIVE") print("=" * 100) total_time = self.generation_end - self.generation_start throughput = self.get_throughput() # Basic metrics table print(f"\n{'Metric':<40} {'Value':<20} {'Notes'}") print("-" * 100) print(f"{'Total Tokens Generated:':<40} {self.total_tokens:<20} (new tokens only)") print(f"{'Prompt Length (estimate):':<40} {self.prompt_length:<20} (input tokens)") print(f"{'Total Generation Time:':<40} {total_time:<20.3f} seconds") print(f"{'Measured Throughput:':<40} {throughput:<20.1f} tokens/sec") print(f"{'Expected Throughput:':<40} {'200':<20} tokens/sec (baseline)") # Calculate performance gap if throughput > 0: slowdown = 200.0 / throughput efficiency = (throughput / 200.0) * 100 print(f"{'Performance vs Baseline:':<40} {efficiency:<20.1f}% of expected") print(f"{'Slowdown Factor:':<40} {slowdown:<20.2f}x slower") # Analyze generated output if self.output_text: print(f"\n{'Output Analysis:':<40}") print(f"{' Output length:':<40} {len(self.output_text):<20} characters") # Count audio codes import re code_pattern = r'<\|audio_code_\d+\|>' codes = re.findall(code_pattern, self.output_text) if codes: print(f"{' Audio codes generated:':<40} {len(codes):<20} codes") print(f"{' Expected audio duration:':<40} {f'~{len(codes)/5:.1f}s':<20} (5 codes per second)") if total_time > 0: print(f"{' Time per audio code:':<40} {f'{total_time/len(codes)*1000:.1f}ms':<20}") # Check for CoT section if '' in self.output_text and '' in self.output_text: cot_start = self.output_text.find('') cot_end = self.output_text.find('') + 8 cot_section = self.output_text[cot_start:cot_end] cot_token_est = len(cot_section) // 4 print(f"{' CoT section tokens (estimate):':<40} {f'~{cot_token_est}':<20}") # Diagnostic guidance print("\n" + "=" * 100) print("šŸ”§ DIAGNOSTIC GUIDANCE") print("=" * 100) if throughput < 50: print("\nāš ļø CRITICAL: Throughput is extremely low (<50 tokens/sec)") print("\nThis is ~4x slower than expected. Likely causes:") print(" 1. ā— Constrained decoding FSM overhead") print(" → Each token triggers FSM state machine validation") print(" → Try: set use_constrained_decoding=False in config") print(" 2. ā— CFG with double forward passes") print(" → cfg_scale > 1.0 means running model twice per token") print(" → Check: params.lm_cfg_scale value") print(" 3. ā— Running in eager mode without compilation") print(" → PyTorch should compile kernels after warmup") print(" → Check: torch._dynamo.config settings") elif throughput < 100: print("\nāš ļø WARNING: Throughput is low (50-100 tokens/sec)") print("\nLikely causes:") print(" 1. Constrained decoding overhead (~30-50% slowdown expected)") print(" 2. CFG enabled (2x compute per token if cfg_scale > 1.0)") print(" 3. Small model or inefficient GPU utilization") elif throughput < 150: print("\nāš ļø Throughput is below baseline but acceptable (100-150 tokens/sec)") print("\nMinor overhead from:") print(" - Constrained decoding: ~20-30% overhead") print(" - Profiling instrumentation: ~5-10% overhead") else: print(f"\nāœ“ Throughput is good ({throughput:.1f} tokens/sec)") print(" Performance is within acceptable range") # Global instances timer = None llm_debugger = None def wrap_method_with_timing(obj, method_name: str, timing_key: str): """Wrap a method with timing instrumentation""" original_method = getattr(obj, method_name) @wraps(original_method) def timed_wrapper(*args, **kwargs): with timer.time(timing_key): return original_method(*args, **kwargs) setattr(obj, method_name, timed_wrapper) return original_method def wrap_llm_with_debug_tracking(llm_handler): """Wrap LLM generation with detailed performance tracking""" original_method = llm_handler.generate_with_stop_condition @wraps(original_method) def debug_wrapper(*args, **kwargs): # Estimate prompt length caption = kwargs.get('caption', args[0] if len(args) > 0 else "") lyrics = kwargs.get('lyrics', args[1] if len(args) > 1 else "") prompt_estimate = len(caption) + len(lyrics) prompt_tokens_estimate = prompt_estimate // 4 # Start tracking llm_debugger.reset() llm_debugger.start(prompt_length=prompt_tokens_estimate) # Call original with timing with timer.time('llm_inference'): result = original_method(*args, **kwargs) # Extract and analyze output output_text = "" if isinstance(result, tuple) and len(result) >= 2: if isinstance(result[1], list): # Batch mode output_text = "".join(result[1]) else: # Single mode cot_output = "" if isinstance(result[0], dict): for v in result[0].values(): if isinstance(v, str): cot_output += v output_text = cot_output + str(result[1]) # Count tokens import re code_pattern = r'<\|audio_code_\d+\|>' codes = re.findall(code_pattern, output_text) remaining_text = re.sub(code_pattern, '', output_text) cot_tokens_estimate = len(remaining_text) // 4 total_tokens = len(codes) + cot_tokens_estimate llm_debugger.set_token_count(total_tokens) llm_debugger.end(output_text) return result llm_handler.generate_with_stop_condition = debug_wrapper return original_method def instrument_handlers(dit_handler, llm_handler, enable_llm_debug=False): """Add timing instrumentation to handler methods""" originals = {} # Instrument LLM if llm_handler and llm_handler.llm_initialized: if enable_llm_debug: originals['llm_generate'] = wrap_llm_with_debug_tracking(llm_handler) else: originals['llm_generate'] = wrap_method_with_timing( llm_handler, 'generate_with_stop_condition', 'llm_inference' ) # Instrument DiT handler originals['dit_prepare'] = wrap_method_with_timing( dit_handler, 'prepare_batch_data', 'prepare_batch_data' ) originals['dit_generate'] = wrap_method_with_timing( dit_handler, 'service_generate', 'dit_inference' ) originals['dit_decode'] = wrap_method_with_timing( dit_handler, 'tiled_decode', 'vae_decode' ) return originals def restore_handlers(dit_handler, llm_handler, originals): """Restore original handler methods after profiling""" if llm_handler and 'llm_generate' in originals: llm_handler.generate_with_stop_condition = originals['llm_generate'] dit_handler.prepare_batch_data = originals['dit_prepare'] dit_handler.service_generate = originals['dit_generate'] dit_handler.tiled_decode = originals['dit_decode'] def print_profiling_results(total_time: float, show_llm_debug: bool = False): """Print comprehensive profiling results with performance insights""" print("\n" + "=" * 100) print("šŸŽÆ PROFILING RESULTS") print("=" * 100) # Define timing categories model_sections = { 'llm_inference': 'LLM Inference (5Hz Language Model)', 'dit_inference': 'DiT Inference (Diffusion Transformer)', 'vae_decode': 'VAE Decode (Audio Decoder)', } non_model_sections = { 'prepare_batch_data': 'Prepare Batch Data (embedding, formatting)', } # Calculate totals model_time = sum(timer.get_total(k) for k in model_sections.keys()) non_model_time = sum(timer.get_total(k) for k in non_model_sections.keys()) other_time = total_time - model_time - non_model_time # Print summary table print(f"\n{'CATEGORY':<50} {'TIME (s)':<12} {'%':<8} {'CALLS':<8}") print("-" * 100) # Model time breakdown print(f"\n{'šŸ¤– MODEL TIME (Total)':<50} {model_time:<12.3f} {100*model_time/total_time:>6.1f}% {'':<8}") for key, desc in model_sections.items(): t = timer.get_total(key) c = timer.get_count(key) if c > 0: mean = timer.get_mean(key) pct = 100 * t / total_time print(f" {'ā”œā”€ ' + desc:<48} {t:<12.3f} {pct:>6.1f}% {c:<8} (avg: {mean:.3f}s)") # Non-model time breakdown print(f"\n{'āš™ļø NON-MODEL TIME (Total)':<50} {non_model_time:<12.3f} {100*non_model_time/total_time:>6.1f}% {'':<8}") for key, desc in non_model_sections.items(): t = timer.get_total(key) c = timer.get_count(key) if c > 0: mean = timer.get_mean(key) pct = 100 * t / total_time print(f" {'ā”œā”€ ' + desc:<48} {t:<12.3f} {pct:>6.1f}% {c:<8} (avg: {mean:.3f}s)") # Other time if other_time > 0.01: pct = 100 * other_time / total_time print(f"\n{'šŸ“¦ OTHER TIME (I/O, overhead, audio save)':<50} {other_time:<12.3f} {pct:>6.1f}% {'':<8}") print(f"\n{'šŸ“Š TOTAL TIME':<50} {total_time:<12.3f} {'100.0%':>6} {'':<8}") # Show LLM detailed analysis if enabled if show_llm_debug: llm_debugger.print_analysis() # Performance insights print("\n" + "=" * 100) print("šŸ’” PERFORMANCE INSIGHTS") print("=" * 100) llm_t = timer.get_total('llm_inference') dit_t = timer.get_total('dit_inference') vae_t = timer.get_total('vae_decode') prep_t = timer.get_total('prepare_batch_data') # Model time insights if model_time > 0: print(f"\nāœ“ Model operations: {model_time:.3f}s ({100*model_time/total_time:.1f}% of total)") if llm_t > 0: print(f" - LLM: {llm_t:.3f}s ({100*llm_t/model_time:.1f}% of model time)") if dit_t > 0: print(f" - DiT: {dit_t:.3f}s ({100*dit_t/model_time:.1f}% of model time)") if vae_t > 0: print(f" - VAE: {vae_t:.3f}s ({100*vae_t/model_time:.1f}% of model time)") # LLM bottleneck analysis if llm_t > dit_t and llm_t > 5.0: print(f"\nāš ļø LLM IS THE BOTTLENECK: {llm_t:.3f}s ({100*llm_t/total_time:.1f}% of total)") print(f"\n Possible causes:") print(f" 1. Generating too many tokens → use --llm-debug to verify") print(f" 2. Constrained decoding overhead → FSM validation per token") print(f" 3. CFG overhead → cfg_scale > 1.0 = 2x forward passes") print(f" 4. First-token latency → warmup should help") print(f" 5. KV cache inefficiency → should be ~5-10ms/token") # Non-model insights if non_model_time / total_time > 0.1: print(f"\nāš ļø Non-model operations: {non_model_time:.3f}s ({100*non_model_time/total_time:.1f}%)") if prep_t > 0.1: print(f" - Batch preparation: {prep_t:.3f}s") # I/O overhead if other_time / total_time > 0.2: print(f"\nāš ļø Overhead/I/O: {other_time:.3f}s ({100*other_time/total_time:.1f}%)") # Recommendations print("\n" + "=" * 100) print("šŸš€ OPTIMIZATION RECOMMENDATIONS") print("=" * 100) if llm_t > dit_t * 2: print("\nšŸŽÆ Priority: Optimize LLM") print(" 1. Run: python profile_inference.py --llm-debug") print(" → Shows exact token count and throughput") print(" 2. Check constrained decoding overhead") print(" 3. Check CFG scaling (lm_cfg_scale parameter)") print(" 4. Profile nanovllm engine step() timing") print(" 5. Compare vllm vs transformers backends") def run_profiled_generation(dit_handler, llm_handler, params, config, enable_cprofile=False, enable_llm_debug=False): """Execute generation with full profiling instrumentation""" # Instrument handlers originals = instrument_handlers(dit_handler, llm_handler, enable_llm_debug) try: print("\n[Profiling] Starting generation...") timer.sync() total_start = time.perf_counter() # Optional cProfile prof = None if enable_cprofile: import cProfile prof = cProfile.Profile() prof.enable() # Run generation result = generate_music(dit_handler, llm_handler, params, config, save_dir="./") # Stop timing timer.sync() total_time = time.perf_counter() - total_start # Save cProfile if enabled if enable_cprofile and prof: prof.disable() import pstats import io output_file = "profile_cprofile_detailed.txt" with open(output_file, 'w') as f: ps = pstats.Stats(prof, stream=f) ps.sort_stats('cumulative') ps.print_stats(100) # Print top functions print("\n" + "=" * 100) print("šŸ“Š TOP 20 FUNCTIONS BY CUMULATIVE TIME (cProfile)") print("=" * 100) s = io.StringIO() ps = pstats.Stats(prof, stream=s) ps.sort_stats('cumulative') ps.print_stats(20) print(s.getvalue()) print(f"\nFull report: {output_file}") # Print results print_profiling_results(total_time, show_llm_debug=enable_llm_debug) return result, total_time finally: restore_handlers(dit_handler, llm_handler, originals) def load_example_config(example_file: str) -> Tuple[GenerationParams, GenerationConfig]: """Load configuration from example JSON file""" try: with open(example_file, 'r', encoding='utf-8') as f: data = json.load(f) params = GenerationParams( caption=data.get('caption', ''), lyrics=data.get('lyrics', ''), bpm=data.get('bpm'), keyscale=data.get('keyscale', ''), timesignature=data.get('timesignature', ''), vocal_language=data.get('language', 'unknown'), duration=data.get('duration'), thinking=data.get('think', False), inference_steps=data.get('inference_steps', 8), seed=data.get('seed', 42), ) config = GenerationConfig(batch_size=data.get('batch_size', 1), seeds=[42]) return params, config except Exception as e: print(f" āŒ Failed to load: {e}") return None, None def main(): global timer, llm_debugger parser = argparse.ArgumentParser( description="Profile ACE-Step inference with LLM debugging" ) parser.add_argument("--checkpoint-dir", type=str, default="./checkpoints") parser.add_argument("--config-path", type=str, default="acestep-v15-turbo-rl") parser.add_argument("--device", type=str, default="cuda") parser.add_argument("--lm-model", type=str, default="acestep-5Hz-lm-0.6B-v3") parser.add_argument("--lm-backend", type=str, default="vllm") parser.add_argument("--no-warmup", action="store_true") parser.add_argument("--detailed", action="store_true") parser.add_argument("--llm-debug", action="store_true", help="Enable deep LLM debugging (token count, throughput)") parser.add_argument("--example", type=str, default="example_05.json") # Inference mode parameters parser.add_argument("--thinking", action="store_true", help="Enable CoT reasoning for LM to generate audio codes") parser.add_argument("--use-constrained-decoding", action="store_true", help="Use FSM-based constrained decoding for meta generation") parser.add_argument("--use-cot-metas", action="store_true", help="Enable LLM to generate music metadata via CoT reasoning") args = parser.parse_args() # Initialize timer = PreciseTimer(device=args.device) llm_debugger = LLMDebugger() print("=" * 100) print("šŸŽµ ACE-Step Inference Profiler (LLM Performance Analysis)") print("=" * 100) print(f"\nConfiguration:") print(f" Device: {args.device}") print(f" LLM Backend: {args.lm_backend}") print(f" LLM Debug: {'Enabled' if args.llm_debug else 'Disabled'}") print(f" Warmup: {'Disabled' if args.no_warmup else 'Enabled'}") print(f"\nInference Mode:") print(f" Thinking (CoT): {'Enabled' if args.thinking else 'Disabled'}") print(f" Constrained Decoding: {'Enabled' if args.use_constrained_decoding else 'Disabled'}") print(f" Use CoT for Metas: {'Enabled' if args.use_cot_metas else 'Disabled'}") # Initialize models print(f"\nInitializing models...") dit_handler = AceStepHandler() llm_handler = LLMHandler() print(" šŸŽ¹ Initializing DiT...") status_dit, success_dit = dit_handler.initialize_service( project_root=project_root, config_path=args.config_path, device=args.device, use_flash_attention=True, ) if not success_dit: print(f" āŒ Failed: {status_dit}") sys.exit(1) print(f" āœ“ DiT ready") print(" 🧠 Initializing LLM...") if args.thinking or args.use_cot_metas: status_llm, success_llm = llm_handler.initialize( checkpoint_dir=args.checkpoint_dir, lm_model_path=args.lm_model, backend=args.lm_backend, device=args.device, ) if success_llm: print(f" āœ“ LLM ready ({args.lm_backend})") else: print(f" ⚠ Failed: {status_llm}") else: print(f" āœ“ LLM not initialized (thinking or use_cot_metas is disabled)") # Load example example_file = os.path.join(project_root, "examples", "text2music", args.example) if not os.path.exists(example_file): print(f"\nāŒ Not found: {example_file}") sys.exit(1) print(f"\nšŸ“„ Loading: {args.example}") params, config = load_example_config(example_file) if not params or not config: print("āŒ Failed to load config") sys.exit(1) print(f" Caption: {params.caption[:60]}...") print(f" Batch: {config.batch_size}, Steps: {params.inference_steps}, LLM: {params.thinking}") # Warmup if not args.no_warmup: print("\n" + "=" * 100) print("šŸ”„ WARMUP RUN") print("=" * 100) warmup_params = GenerationParams( caption=params.caption, lyrics=params.lyrics, bpm=params.bpm, keyscale=params.keyscale, timesignature=params.timesignature, vocal_language=params.vocal_language, duration=params.duration, thinking=args.thinking, use_cot_metas=args.use_cot_metas, inference_steps=params.inference_steps, seed=params.seed, ) warmup_config = GenerationConfig(batch_size=1, seeds=[42]) warmup_config.use_constrained_decoding = args.use_constrained_decoding warmup_start = time.perf_counter() warmup_result = generate_music(dit_handler, llm_handler, warmup_params, warmup_config, save_dir="./") warmup_time = time.perf_counter() - warmup_start print(f"\nāœ“ Warmup: {warmup_time:.2f}s") if not warmup_result.success: print(f"āš ļø Warning: {warmup_result.error}") # Reset timer = PreciseTimer(device=args.device) llm_debugger = LLMDebugger() # Profiling run print("\n" + "=" * 100) print("ā±ļø PROFILING RUN") print("=" * 100) # Apply inference mode settings config.use_constrained_decoding = args.use_constrained_decoding # Override thinking and use_cot_metas parameters if specified via CLI if args.thinking: params.thinking = True if args.use_cot_metas: params.use_cot_metas = True result, total_time = run_profiled_generation( dit_handler, llm_handler, params, config, enable_cprofile=args.detailed, enable_llm_debug=args.llm_debug ) if not result.success: print(f"\nāŒ Failed: {result.error}") sys.exit(1) print(f"\nāœ… Success! Generated {len(result.audios)} audio file(s)") # Final tips if args.detailed: print("\nšŸ’” Check profile_cprofile_detailed.txt for function-level analysis") elif not args.llm_debug: print("\nšŸ’” Run with --llm-debug to see LLM token count and throughput analysis") if __name__ == "__main__": main()