"""EDGAR API Client Module with Performance Optimization""" import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import urllib3 try: from sec_edgar_api.EdgarClient import EdgarClient except ImportError: EdgarClient = None import json import time import threading from functools import lru_cache from datetime import datetime, timedelta import re import difflib # Disable SSL warnings for better compatibility urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class EdgarDataClient: # Class-level cache for company_tickers.json (shared across instances) _company_tickers_cache = None _company_tickers_cache_time = None _company_tickers_cache_ttl = 3600 # 1 hour TTL _cache_lock = threading.Lock() # Class-level rate limiter (SEC requires max 10 requests per second) _last_request_time = 0 _rate_limit_lock = threading.Lock() _min_request_interval = 0.11 # 110ms between requests (9 req/sec, safe margin) # 新增:公司索引(加速搜索,避免每次遍历全量数据) _by_ticker = None # ticker -> company info _by_title = None # title (lowercase) -> company info _by_title_norm = None # normalized title -> company info _all_keys = None # 用于模糊匹配的所有key列表 _index_built_time = None _index_ttl = 3600 # 1 hour # 新增:常见别名映射(提升搜索智能性) _alias_map = { "google": "alphabet inc", "alphabet": "alphabet inc", "facebook": "meta platforms, inc.", "meta": "meta platforms, inc.", "amazon": "amazon.com, inc.", "apple": "apple inc.", "microsoft": "microsoft corporation", "netflix": "netflix, inc.", "nvidia": "nvidia corporation", "tesla": "tesla, inc.", "adobe": "adobe inc.", "oracle": "oracle corporation", "ibm": "international business machines corporation", "paypal": "paypal holdings, inc.", "shopify": "shopify inc.", } def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): """Initialize EDGAR client with connection pooling and timeout""" self.user_agent = user_agent # 新增:实例级搜索缓存(进一步减少重复搜索开销) self._search_cache = {} # Configure requests session with connection pooling self.session = requests.Session() # Configure retry strategy with enhanced retries for stability retry_strategy = Retry( total=5, # Increased from 3 to 5 for better reliability backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "OPTIONS"] ) adapter = HTTPAdapter( pool_connections=10, pool_maxsize=20, max_retries=retry_strategy, pool_block=False ) self.session.mount("http://", adapter) self.session.mount("https://", adapter) # Set default timeout with connection and read timeouts self.timeout = (10, 30) # (connect timeout, read timeout) # Initialize sec_edgar_api client with timeout wrapper if EdgarClient: self.edgar = EdgarClient(user_agent=user_agent) # Monkey patch to add timeout self._patch_edgar_client_timeout() else: self.edgar = None def _patch_edgar_client_timeout(self): """Monkey patch sec_edgar_api to add timeout support""" if not self.edgar: return # Wrap get_submissions and get_company_facts with timeout (thread-based, Gradio compatible) original_get_submissions = self.edgar.get_submissions original_get_company_facts = self.edgar.get_company_facts def get_submissions_with_timeout(cik): """Thread-based timeout wrapper for get_submissions (Gradio compatible)""" result = [None] exception = [None] def wrapper(): try: result[0] = original_get_submissions(cik) except Exception as e: exception[0] = e thread = threading.Thread(target=wrapper, daemon=True) thread.start() # Use read timeout value (second element of timeout tuple) timeout_seconds = self.timeout[1] if isinstance(self.timeout, tuple) else self.timeout thread.join(timeout=timeout_seconds) if thread.is_alive(): raise TimeoutError(f"SEC API request timeout ({timeout_seconds}s)") if exception[0]: raise exception[0] return result[0] def get_company_facts_with_timeout(cik): """Thread-based timeout wrapper for get_company_facts (Gradio compatible)""" result = [None] exception = [None] def wrapper(): try: result[0] = original_get_company_facts(cik) except Exception as e: exception[0] = e thread = threading.Thread(target=wrapper, daemon=True) thread.start() # Use read timeout value (second element of timeout tuple) timeout_seconds = self.timeout[1] if isinstance(self.timeout, tuple) else self.timeout thread.join(timeout=timeout_seconds) if thread.is_alive(): raise TimeoutError(f"SEC API request timeout ({timeout_seconds}s)") if exception[0]: raise exception[0] return result[0] self.edgar.get_submissions = get_submissions_with_timeout self.edgar.get_company_facts = get_company_facts_with_timeout def _rate_limit(self): """Thread-safe rate limiting to comply with SEC requirements""" with self._rate_limit_lock: current_time = time.time() time_since_last = current_time - EdgarDataClient._last_request_time if time_since_last < self._min_request_interval: sleep_time = self._min_request_interval - time_since_last time.sleep(sleep_time) EdgarDataClient._last_request_time = time.time() def _normalize_text(self, s: str) -> str: """规范化文本:用于提升匹配准确度""" if not s: return "" s = s.lower().strip() s = s.replace("&", " and ") s = re.sub(r"[.,()\-_/]", " ", s) s = re.sub(r"\s+", " ", s) # 移除常见后缀词 stopwords = {"inc", "inc.", "incorporated", "corp", "corporation", "co", "company", "plc", "ltd", "llc", "the"} tokens = [t for t in s.split() if t not in stopwords] return " ".join(tokens).strip() def _ensure_company_index(self): """确保公司索引已构建(按需构建或过期重建)""" with self._cache_lock: current_time = time.time() # 若 company_tickers 缓存不存在或已过期,先刷新 if (EdgarDataClient._company_tickers_cache is None or EdgarDataClient._company_tickers_cache_time is None or current_time - EdgarDataClient._company_tickers_cache_time >= self._company_tickers_cache_ttl): # 拉取并更新 company_tickers 缓存 self._rate_limit() url = "https://www.sec.gov/files/company_tickers.json" headers = {"User-Agent": self.user_agent} response = self.session.get(url, headers=headers, timeout=self.timeout) response.raise_for_status() companies = response.json() EdgarDataClient._company_tickers_cache = companies EdgarDataClient._company_tickers_cache_time = current_time else: companies = EdgarDataClient._company_tickers_cache # 若索引不存在或已过期,则重建索引 if (EdgarDataClient._by_ticker is None or EdgarDataClient._by_title is None or EdgarDataClient._by_title_norm is None or EdgarDataClient._all_keys is None or EdgarDataClient._index_built_time is None or current_time - EdgarDataClient._index_built_time >= EdgarDataClient._index_ttl): by_ticker = {} by_title = {} by_title_norm = {} all_keys = [] for _, company in companies.items(): title = company.get("title", "") ticker = company.get("ticker", "") cik_str = str(company.get("cik_str", "")).zfill(10) title_lower = title.lower() ticker_lower = ticker.lower() title_norm = self._normalize_text(title) # 构建索引:ticker、title、normalized title if ticker_lower: by_ticker[ticker_lower] = {"cik": cik_str, "name": title, "ticker": ticker} all_keys.append(ticker_lower) if title_lower: by_title[title_lower] = {"cik": cik_str, "name": title, "ticker": ticker} if title_norm: by_title_norm[title_norm] = {"cik": cik_str, "name": title, "ticker": ticker} all_keys.append(title_norm) EdgarDataClient._by_ticker = by_ticker EdgarDataClient._by_title = by_title EdgarDataClient._by_title_norm = by_title_norm EdgarDataClient._all_keys = all_keys EdgarDataClient._index_built_time = current_time def search_company_by_name(self, company_name): """Search company CIK by company name with caching and optimized ticker matching""" try: # 实例级缓存命中检查(按规范化后的query) norm_query = self._normalize_text(company_name) cache_hit = self._search_cache.get(norm_query) if cache_hit: return cache_hit # 确保索引已构建(首次或过期后会重建) self._ensure_company_index() # 获取索引引用(已在锁内构建完成) by_ticker = EdgarDataClient._by_ticker by_title = EdgarDataClient._by_title by_title_norm = EdgarDataClient._by_title_norm all_keys = EdgarDataClient._all_keys # ✅ OPTIMIZATION 1: Ticker 优先匹配(遵循项目规范) raw = company_name.strip().lower() raw_compact = re.sub(r"[^a-z0-9]", "", raw) is_ticker_like = len(raw_compact) <= 5 and len(raw_compact) >= 1 if is_ticker_like and raw_compact in by_ticker: result = by_ticker[raw_compact] self._search_cache[norm_query] = result return result # ✅ OPTIMIZATION 2: 别名映射(如 'google' -> 'alphabet inc') alias_target = EdgarDataClient._alias_map.get(norm_query) if alias_target: alias_norm = self._normalize_text(alias_target) # 先尝试规范化标题 if alias_norm in by_title_norm: result = by_title_norm[alias_norm] self._search_cache[norm_query] = result return result # 再尝试原始标题 alias_lower = alias_target.lower() if alias_lower in by_title: result = by_title[alias_lower] self._search_cache[norm_query] = result return result # 最后尝试 ticker(有些别名可能实际上是ticker) alias_ticker = re.sub(r"[^a-z0-9]", "", alias_lower) if alias_ticker in by_ticker: result = by_ticker[alias_ticker] self._search_cache[norm_query] = result return result # ✅ OPTIMIZATION 3: 精确匹配(原始标题) title_lower = company_name.lower().strip() if title_lower in by_title: result = by_title[title_lower] self._search_cache[norm_query] = result return result # ✅ OPTIMIZATION 4: 精确匹配(规范化标题) if norm_query in by_title_norm: result = by_title_norm[norm_query] self._search_cache[norm_query] = result return result # ✅ OPTIMIZATION 5: 精确匹配(ticker,再次尝试原始输入) if raw_compact in by_ticker: result = by_ticker[raw_compact] self._search_cache[norm_query] = result return result # ✅ OPTIMIZATION 6: 部分包含匹配 partial_matches = [] for key in by_title_norm.keys(): if norm_query in key: partial_matches.append(key) if not partial_matches: for t in by_ticker.keys(): if norm_query in t: partial_matches.append(t) if partial_matches: best_key = max( partial_matches, key=lambda k: difflib.SequenceMatcher(None, norm_query, k).ratio() ) result = by_title_norm.get(best_key) or by_ticker.get(best_key) if result: self._search_cache[norm_query] = result return result # ✅ OPTIMIZATION 7: 模糊匹配(difflib,用于拼写近似的情况) close = difflib.get_close_matches(norm_query, all_keys, n=1, cutoff=0.78) if close: best = close[0] result = by_title_norm.get(best) or by_ticker.get(best) if result: self._search_cache[norm_query] = result return result # 未找到 return None except TimeoutError as e: print(f"Timeout searching company: {e}") return None except Exception as e: print(f"Error searching company: {e}") return None @lru_cache(maxsize=128) def get_company_info(self, cik): """ Get basic company information (cached) Args: cik (str): Company CIK code Returns: dict: Dictionary containing company information """ if not self.edgar: print("sec_edgar_api library not installed") return None try: self._rate_limit() # Get company submissions (now has timeout protection) submissions = self.edgar.get_submissions(cik=cik) return { "cik": cik, "name": submissions.get("name", ""), "tickers": submissions.get("tickers", []), "sic": submissions.get("sic", ""), "sic_description": submissions.get("sicDescription", "") } except TimeoutError as e: print(f"Timeout getting company info for CIK {cik}: {e}") return None except Exception as e: print(f"Error getting company info: {e}") return None @lru_cache(maxsize=128) def get_company_filings(self, cik, form_types=None): """ Get all company filing documents (cached) Args: cik (str): Company CIK code form_types (tuple): Tuple of form types, e.g., ('10-K', '10-Q'), None for all types Returns: list: List of filing documents """ if not self.edgar: print("sec_edgar_api library not installed") return [] # Convert list to tuple for caching (lists are not hashable) if form_types and isinstance(form_types, list): form_types = tuple(form_types) try: self._rate_limit() # Get company submissions (now has timeout protection) submissions = self.edgar.get_submissions(cik=cik) # Extract filing information filings = [] recent = submissions.get("filings", {}).get("recent", {}) # Get data from each field form_types_list = recent.get("form", []) filing_dates = recent.get("filingDate", []) accession_numbers = recent.get("accessionNumber", []) primary_documents = recent.get("primaryDocument", []) # Iterate through all filings for i in range(len(form_types_list)): form_type = form_types_list[i] # ✅ 归一化表单类型: "10-K/A" -> "10-K", "20-F/A" -> "20-F" # 这样修订版年报也能被正确识别和使用 normalized_form_type = form_type.split('/')[0] # Filter by form type if specified (使用归一化后的类型) if form_types and normalized_form_type not in form_types: continue filing_date = filing_dates[i] if i < len(filing_dates) else "" accession_number = accession_numbers[i] if i < len(accession_numbers) else "" primary_document = primary_documents[i] if i < len(primary_documents) else "" filing = { "form_type": form_type, # 保留原始form_type供参考 "filing_date": filing_date, "accession_number": accession_number, "primary_document": primary_document } filings.append(filing) return filings except TimeoutError as e: print(f"Timeout getting company filings for CIK {cik}: {e}") return [] except Exception as e: print(f"Error getting company filings: {e}") return [] @lru_cache(maxsize=128) def get_company_facts(self, cik): """ Get all company financial facts data (cached) Args: cik (str): Company CIK code Returns: dict: Company financial facts data """ if not self.edgar: print("sec_edgar_api library not installed") return {} try: self._rate_limit() # Now has timeout protection via monkey patch facts = self.edgar.get_company_facts(cik=cik) return facts except TimeoutError as e: print(f"Timeout getting company facts for CIK {cik}: {e}") return {} except Exception as e: print(f"Error getting company facts: {e}") return {} def get_financial_data_for_period(self, cik, period): """ Get financial data for a specific period (supports annual and quarterly) - Cached Args: cik (str): Company CIK code period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3') Returns: dict: Financial data dictionary """ if not self.edgar: print("sec_edgar_api library not installed") return {} # 实例级缓存(避免重复计算) cache_key = f"period_{cik}_{period}" if hasattr(self, '_period_cache') and cache_key in self._period_cache: return self._period_cache[cache_key] if not hasattr(self, '_period_cache'): self._period_cache = {} try: # Get company financial facts facts = self.get_company_facts(cik) if not facts: return {} # Extract us-gaap and ifrs-full financial data (20-F may use IFRS) us_gaap = facts.get("facts", {}).get("us-gaap", {}) ifrs_full = facts.get("facts", {}).get("ifrs-full", {}) # Define financial metrics and their XBRL tags # Include multiple possible tags to improve match rate (including US-GAAP and IFRS tags) financial_metrics = { "total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"], "net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"], "earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"], "operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"], "operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"], } # Store result with new optimized structure result = { "period": period, "_metadata": {}, "metrics": {} } # Determine target form types to search if 'Q' in period: # Quarterly data, mainly search 10-Q (20-F usually doesn't have quarterly reports) target_forms = ("10-Q",) # Use tuple for caching target_forms_annual = ("10-K", "20-F") # for fallback year = int(period.split('Q')[0]) quarter = period.split('Q')[1] else: # Annual data, search 10-K and 20-F annual forms target_forms = ("10-K", "20-F") # Use tuple for caching target_forms_annual = target_forms year = int(period) quarter = None # Get company filings to find accession number and primary document filings = self.get_company_filings(cik, form_types=target_forms) filings_map = {} # Map: form -> {accession_number, primary_document, filing_date} # Build filing map for quick lookup for filing in filings: form_type = filing.get("form_type", "") filing_date = filing.get("filing_date", "") accession_number = filing.get("accession_number", "") primary_document = filing.get("primary_document", "") if filing_date and accession_number: # Extract year from filing_date (format: YYYY-MM-DD) file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0 # ✅ 归一化表单类型: "10-K/A" -> "10-K", "20-F/A" -> "20-F" # 使用归一化后的类型构建 key,这样 facts 中的 "10-K" 能命中 "10-K/A" normalized_form_type = form_type.split('/')[0] # ✅ FIXED: Remove year filter to keep all filings # 20-F forms are often filed in the year after the fiscal year # We'll match them later using fiscal year (fy) and filed date key = f"{normalized_form_type}_{file_year}" # 使用归一化后的类型 if key not in filings_map: filings_map[key] = { "accession_number": accession_number, "primary_document": primary_document, "form_type": form_type, # 保留原始 form_type "filing_date": filing_date } # Iterate through each financial metric for metric_key, metric_tags in financial_metrics.items(): # Support multiple possible tags for metric_tag in metric_tags: # Search both US-GAAP and IFRS tags metric_data = None data_source = None if metric_tag in us_gaap: metric_data = us_gaap[metric_tag] data_source = "us-gaap" elif metric_tag in ifrs_full: metric_data = ifrs_full[metric_tag] data_source = "ifrs-full" if metric_data: units = metric_data.get("units", {}) # Find USD unit data (supports USD and USD/shares) usd_data = None if "USD" in units: usd_data = units["USD"] elif "USD/shares" in units and metric_key == "earnings_per_share": # EPS uses USD/shares unit usd_data = units["USD/shares"] if usd_data: # Try exact match first, then loose match matched_entry = None # Search for data in the specified period for entry in usd_data: form = entry.get("form", "") fy = entry.get("fy", 0) fp = entry.get("fp", "") end_date = entry.get("end", "") if not end_date or len(end_date) < 4: continue entry_year = int(end_date[:4]) # Check if form type matches if form in target_forms: if quarter: # Quarterly data match if entry_year == year and fp == f"Q{quarter}": # If already matched, compare end date, choose the latest if matched_entry: if entry.get("end", "") > matched_entry.get("end", ""): matched_entry = entry else: matched_entry = entry else: # Annual data match - prioritize fiscal year (fy) field # Strategy 1: Exact match by fiscal year if fy == year and (fp == "FY" or fp == "" or not fp): # If already matched, compare end date, choose the latest if matched_entry: if entry.get("end", "") > matched_entry.get("end", ""): matched_entry = entry else: matched_entry = entry # Strategy 2: Match by end date year (when fy not available or doesn't match) elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp): matched_entry = entry # Strategy 3: Allow fy to differ by 1 year (fiscal year vs calendar year mismatch) elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp): matched_entry = entry # Strategy 4: Match by frame field for 20-F elif not matched_entry and form == "20-F" and "frame" in entry: frame = entry.get("frame", "") if f"CY{year}" in frame or str(year) in end_date: matched_entry = entry # If quarterly data not found, try finding from annual report (fallback strategy) if not matched_entry and quarter and target_forms_annual: for entry in usd_data: form = entry.get("form", "") end_date = entry.get("end", "") fp = entry.get("fp", "") if form in target_forms_annual and end_date: # Check if end date is within this quarter range if str(year) in end_date and f"Q{quarter}" in fp: matched_entry = entry break # Apply matched data if matched_entry: # Store metric value and tag result["metrics"][metric_key] = { "value": matched_entry.get("val", 0), "tag": metric_tag } # Get form and accession info - only populate metadata once if not result["_metadata"]: form_type = matched_entry.get("form", "") accn_from_facts = matched_entry.get('accn', '').replace('-', '') filed_date = matched_entry.get('filed', '') # Multi-strategy filing lookup for 20-F and cross-year submissions filing_info = None # Strategy 1: Try matching by fiscal year filing_key = f"{form_type}_{year}" filing_info = filings_map.get(filing_key) # Strategy 2: Try matching by filed year (for 20-F filed in next year) if not filing_info and filed_date: filed_year = int(filed_date[:4]) if len(filed_date) >= 4 else 0 if filed_year > 0: filing_key = f"{form_type}_{filed_year}" filing_info = filings_map.get(filing_key) # Strategy 3: Try fiscal year + 1 (common for 20-F) if not filing_info: filing_key = f"{form_type}_{year + 1}" filing_info = filings_map.get(filing_key) # Strategy 4: Search all filings with matching form type and accession if not filing_info and accn_from_facts: for key, finfo in filings_map.items(): if finfo["form_type"] == form_type: filing_accn = finfo["accession_number"].replace('-', '') if filing_accn == accn_from_facts: filing_info = finfo break # Generate source URL source_url = "" if filing_info: accession_number = filing_info["accession_number"].replace('-', '') primary_document = filing_info["primary_document"] if primary_document: source_url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}" else: source_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" else: source_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" # Populate metadata (only once per period) result["_metadata"] = { "form": matched_entry.get("form", ""), "fiscal_year": matched_entry.get("fy", 0), "fiscal_period": matched_entry.get("fp", ""), "start_date": matched_entry.get("start", ""), "end_date": matched_entry.get("end", ""), "filed_date": matched_entry.get("filed", ""), "source_url": source_url, "data_source": data_source } # If data is found, break out of tag loop if metric_key in result["metrics"]: break # 缓存结果 if result and "period" in result: self._period_cache[cache_key] = result return result except Exception as e: print(f"Error getting financial data for period {period}: {e}") return {}