Easy-Financial-Report / service /three_year_tool.py
JC321's picture
Upload 2 files
6fc6c55 verified
import json
# -----------------------------
# 辅助函数:安全转换数值
# -----------------------------
def safe_int(val, default=0):
if val is None:
return default
try:
return int(float(val))
except (ValueError, TypeError):
return default
def safe_float(val, default=0.0):
if val is None:
return default
try:
return float(val)
except (ValueError, TypeError):
return default
# -----------------------------
# 步骤1:提取带 fallback 的数据(支持最新年+历史4年,共5年)
# -----------------------------
def extract_last_three_with_fallback(data_list):
"""
从数据列表中提取最新的5年数据(最新1年 + 历史4年)
优先级: FY > Q4 > Q3 > Q2 > Q1
返回: 最新的5个period的数据
"""
years = [2025, 2024, 2023, 2022, 2021] # ✅ 扩展到5年,确保Latest 3 Years有完整数据
priority_levels = [
("FY", [f"FY{y}" for y in years]),
("Q4", [f"{y}Q4" for y in years]),
("Q3", [f"{y}Q3" for y in years]),
("Q2", [f"{y}Q2" for y in years]),
("Q1", [f"{y}Q1" for y in years]),
]
data_map = {item["period"]: item for item in data_list if isinstance(item, dict) and "period" in item}
# 尝试找到完整的5年数据
for level_name, periods in priority_levels:
records = []
valid = True
for period in periods:
item = data_map.get(period)
if not isinstance(item, dict) or item.get("total_revenue") is None:
valid = False
break
# 从period中提取fiscal_year
if level_name == "FY":
fiscal_year = int(period[2:]) # FY2025 -> 2025
else:
fiscal_year = int(period[:4]) # 2025Q3 -> 2025
records.append({
"period": period,
"fiscal_year": fiscal_year,
"level": level_name,
"total_revenue": item.get("total_revenue"),
"net_income": item.get("net_income"),
"earnings_per_share": item.get("earnings_per_share"),
"operating_expenses": item.get("operating_expenses"),
"operating_cash_flow": item.get("operating_cash_flow")
})
if valid:
return records
# Fallback: 返回第一个有数据的层级(即使不全)
for level_name, periods in priority_levels:
records = []
for period in periods:
item = data_map.get(period)
if isinstance(item, dict) and item.get("total_revenue") is not None:
if level_name == "FY":
fiscal_year = int(period[2:])
else:
fiscal_year = int(period[:4])
records.append({
"period": period,
"fiscal_year": fiscal_year,
"level": level_name,
"total_revenue": item.get("total_revenue"),
"net_income": item.get("net_income"),
"earnings_per_share": item.get("earnings_per_share"),
"operating_expenses": item.get("operating_expenses"),
"operating_cash_flow": item.get("operating_cash_flow")
})
if records:
return records
return []
# -----------------------------
# 步骤2:格式化数字(B / M)
# -----------------------------
def format_number(value):
if value >= 1_000_000_000:
num = value / 1_000_000_000
if num == int(num):
return f"${int(num)}B"
else:
return f"${num:.2f}B".rstrip('0').rstrip('.')
elif value >= 1_000_000:
num = value / 1_000_000
if num == int(num):
return f"${int(num)}M"
else:
return f"${num:.1f}M".rstrip('0').rstrip('.')
elif value >= 1_000:
return f"${value:,.0f}"
else:
return f"${value}"
def format_eps(value):
return f"${value:.2f}"
def calculate_change(current, previous):
if previous == 0:
return "+0.0%" if current >= 0 else "-0.0%"
change = (current - previous) / abs(previous) * 100
sign = "+" if change >= 0 else "-"
return f"{sign}{abs(change):.1f}%"
# -----------------------------
# 步骤3:构建最终 metrics
# -----------------------------
def build_financial_metrics(three_year_data):
if len(three_year_data) < 2:
raise ValueError("至少需要两年数据来计算同比变化")
sorted_data = sorted(three_year_data, key=lambda x: x["fiscal_year"], reverse=True)
latest = sorted_data[0]
previous = sorted_data[1]
rev_curr = safe_int(latest["total_revenue"])
rev_prev = safe_int(previous["total_revenue"])
net_curr = safe_int(latest["net_income"])
net_prev = safe_int(previous["net_income"])
eps_curr = safe_float(latest["earnings_per_share"])
eps_prev = safe_float(previous["earnings_per_share"])
opex_curr = safe_int(latest["operating_expenses"])
opex_prev = safe_int(previous["operating_expenses"])
cash_curr = safe_int(latest["operating_cash_flow"])
cash_prev = safe_int(previous["operating_cash_flow"])
return [
{
"label": "Total Revenue",
"value": format_number(rev_curr),
"change": calculate_change(rev_curr, rev_prev),
"color": "green" if rev_curr >= rev_prev else "red"
},
{
"label": "Net Income",
"value": format_number(net_curr),
"change": calculate_change(net_curr, net_prev),
"color": "green" if net_curr >= net_prev else "red"
},
{
"label": "Earnings Per Share",
"value": format_eps(eps_curr),
"change": calculate_change(eps_curr, eps_prev),
"color": "green" if eps_curr >= eps_prev else "red"
},
{
"label": "Operating Expenses",
"value": format_number(opex_curr),
"change": calculate_change(opex_curr, opex_prev),
"color": "green" if opex_curr >= opex_prev else "red"
},
{
"label": "Operating Cash Flow",
"value": format_number(cash_curr),
"change": calculate_change(cash_curr, cash_prev),
"color": "green" if cash_curr >= cash_prev else "red"
}
]
# -----------------------------
# 主流程:输入 raw_data,输出 financial_metrics
# -----------------------------
# def process_financial_data(raw_data):
# # 如果是字符串,先解析 JSON
# if isinstance(raw_data, str):
# raw_data = json.loads(raw_data)
# # 确保是列表
# if not isinstance(raw_data, list):
# raise TypeError("raw_data 必须是列表或 JSON 字符串表示的列表")
# # 提取三年数据
# three_years = extract_last_three_with_fallback(raw_data)
# if not three_years:
# raise ValueError("无法提取有效的三年财务数据")
# # 构建指标
# return build_financial_metrics(three_years)
def process_financial_data_with_metadata(raw_data):
"""
返回包含 financial_metrics + year_data + three_year_data 的完整结果
financial_metrics: 最新1年的指标(与前一年对比)
three_year_data: 最新的前3年数据(排除最新年,用于Latest 3 Years表格)
"""
return_value = {"financial_metrics": [], "year_data": "N/A", "three_year_data": []}
if not raw_data:
return return_value
if not isinstance(raw_data, list):
return return_value
if not isinstance(raw_data[0], dict):
return {"financial_metrics": [], "year_data": "N/A", "three_year_data": []}
# 1. 解析输入
if isinstance(raw_data, str):
raw_data = json.loads(raw_data)
if not isinstance(raw_data, list):
raise TypeError("raw_data 必须是列表或 JSON 字符串")
# 2. ✅ 提取5年数据(最新1年 + 历史4年,带 fallback)
five_years = extract_last_three_with_fallback(raw_data)
if not five_years:
print("无法提取有效的财务数据")
return return_value
# 按fiscal_year降序排序
five_years_sorted = sorted(five_years, key=lambda x: x["fiscal_year"], reverse=True)
# 3. 分离最新1年和历史3年
if len(five_years_sorted) < 2:
print("数据不足,至少需要2年数据")
return return_value
# 最新年用于financial_metrics
latest_year = five_years_sorted[0]
previous_year = five_years_sorted[1]
# ✅ 历史3年用于three_year_data表格(取第2-4年,确保有完整3年数据)
# 例如: [2024, 2023, 2022, 2021, 2020] -> 取索引1-3 -> [2023, 2022, 2021]
three_years_for_table = five_years_sorted[1:4] if len(five_years_sorted) >= 4 else five_years_sorted[1:]
# 4. 获取最新年份和报告类型(用于 year_data)
year = latest_year["fiscal_year"]
level = latest_year["level"]
if level == "FY":
year_data = f"FY {year}" # ✅ 改为"FY 2025"格式
else: # Q1, Q2, Q3, Q4
year_data = f"{year} {level}"
# 5. 构建最新年的financial_metrics(与前一年对比)
financial_metrics = build_financial_metrics([latest_year, previous_year])
# 6. 返回完整结构
return {
"financial_metrics": financial_metrics, # 最新1年的指标
"year_data": year_data, # 最新年份标签
"three_year_data": three_years_for_table # 历史3年数据(用于表格)
}
# -----------------------------
# 示例使用(替换为你的真实数据)
# -----------------------------
# if __name__ == "__main__":
# # 👇 在这里粘贴你的原始数据(可以是字符串或变量)
# # 示例:从文件读取或直接赋值
# with open("financial_data.json", "r", encoding="utf-8") as f:
# raw_input = f.read() # 或者直接赋值为你的数据变量
# # 处理
# try:
# financial_metrics = process_financial_data(raw_input)
# print(json.dumps(financial_metrics, indent=2))
# except Exception as e:
# print("处理失败:", e)