""" 财务数据缓存管理器 用于管理 2024 FY Financial Metrics 和 Latest 3 Years Financial Metrics 的数据加载和缓存 """ from datetime import datetime from typing import Callable, Dict, Any import threading import time class DataTask: """数据加载任务""" def __init__(self, company: str, data_type: str, loader_func: Callable): self.company = company self.data_type = data_type self.loader_func = loader_func self.status = "pending" # pending, running, completed, error self.result = None self.error = None self.start_time = None self.end_time = None self.thread = None def run(self): """执行数据加载""" self.status = "running" self.start_time = datetime.now() try: self.result = self.loader_func() self.status = "completed" except Exception as e: self.status = "error" self.error = str(e) import traceback self.error_trace = traceback.format_exc() finally: self.end_time = datetime.now() def get_age_seconds(self): """获取任务年龄(秒)""" if self.end_time: return (datetime.now() - self.end_time).total_seconds() elif self.start_time: return (datetime.now() - self.start_time).total_seconds() return 0 class FinancialDataCacheManager: """财务数据缓存管理器""" def __init__(self, cache_ttl_seconds=1800, max_cache_size=100): self.cache_ttl = cache_ttl_seconds self.max_cache_size = max_cache_size self.tasks: Dict[tuple, DataTask] = {} self.lock = threading.Lock() self.stats = { "cache_hits": 0, "cache_misses": 0, "background_completions": 0, "total_requests": 0 } def get_or_load_data(self, company: str, data_type: str, loader_func: Callable) -> Any: """获取或加载财务数据 (同步版本,返回最终结果)""" with self.lock: self.stats["total_requests"] += 1 key = (company, data_type) # 清理过期缓存 self._cleanup_expired_cache() task = self.tasks.get(key) # 场景1: 缓存已存在且已完成 if task: if task.status == "completed": if task.get_age_seconds() < self.cache_ttl: self.stats["cache_hits"] += 1 print(f"✅ [Data Cache HIT] {company} - {data_type} (age: {task.get_age_seconds():.1f}s)") return task.result else: print(f"⏰ [Data Cache EXPIRED] {company} - {data_type}") del self.tasks[key] task = None # 场景2: 后台任务正在运行 elif task.status == "running": self.stats["cache_hits"] += 1 print(f"🔄 [Data Cache WAIT] {company} - {data_type} (running for {task.get_age_seconds():.1f}s)") # 等待后台任务完成 max_wait = 30 waited = 0 while task.status == "running" and waited < max_wait: time.sleep(0.5) waited += 0.5 if task.status == "completed": self.stats["background_completions"] += 1 print(f"✅ [Data Background COMPLETED] {company} - {data_type}") return task.result elif task.status == "error": print(f"❌ [Data Background ERROR] {company} - {data_type}: {task.error}") raise Exception(f"Data loading failed: {task.error}") # 场景3: 之前失败了,重试 elif task.status == "error": print(f"🔄 [Data Retry after ERROR] {company} - {data_type}") del self.tasks[key] task = None # 场景4: 缓存不存在,启动新任务 if not task: self.stats["cache_misses"] += 1 print(f"🆕 [Data Cache MISS] {company} - {data_type} - Starting background loading") task = DataTask(company, data_type, loader_func) self.tasks[key] = task task.thread = threading.Thread(target=task.run, daemon=True) task.thread.start() # 等待任务完成 max_wait = 30 waited = 0 while task.status == "running" and waited < max_wait: time.sleep(0.5) waited += 0.5 if task.status == "completed": print(f"✅ [Data NEW COMPLETED] {company} - {data_type}") return task.result elif task.status == "error": print(f"❌ [Data NEW ERROR] {company} - {data_type}: {task.error}") raise Exception(f"Data loading failed: {task.error}") else: print(f"⏱️ [Data TIMEOUT] {company} - {data_type}") raise Exception(f"Data loading timeout after {max_wait}s") def _cleanup_expired_cache(self): """清理过期缓存""" keys_to_remove = [] for key, task in self.tasks.items(): if task.status == "completed" and task.get_age_seconds() > self.cache_ttl: keys_to_remove.append(key) for key in keys_to_remove: company, data_type = key print(f"🗑️ [Data Cache CLEANUP] {company} - {data_type}") del self.tasks[key] # 限制缓存大小 if len(self.tasks) > self.max_cache_size: completed_tasks = [(k, v) for k, v in self.tasks.items() if v.status == "completed"] completed_tasks.sort(key=lambda x: x[1].end_time or datetime.min) to_remove = len(self.tasks) - self.max_cache_size for i in range(to_remove): key, task = completed_tasks[i] company, data_type = key print(f"🗑️ [Data Cache SIZE LIMIT] {company} - {data_type}") del self.tasks[key] def get_stats(self): """获取缓存统计""" with self.lock: total = self.stats["total_requests"] hits = self.stats["cache_hits"] misses = self.stats["cache_misses"] hit_rate = (hits / total * 100) if total > 0 else 0 return { **self.stats, "hit_rate": f"{hit_rate:.1f}%", "active_tasks": len([t for t in self.tasks.values() if t.status == "running"]), "cached_data": len([t for t in self.tasks.values() if t.status == "completed"]) }