JC321's picture
Upload app.py
b23a732 verified
import gradio as gr
import os
import datetime
import re
import pandas as pd
# from dotenv import load_dotenv
# # 加载.env文件中的环境变量
# load_dotenv()
# from EasyFinancialAgent.chat import query_company
from EasyFinancialAgent.chat_direct import advanced_search_company_detailed, search_and_format, search_company_direct, chatbot_response, format_search_result_for_display, format_search_result
from chatbot.chat_main import respond
import globals as g
from service.mysql_service import get_companys, insert_company, get_company_by_name
from service.chat_service import get_analysis_report, get_stock_price_from_bailian, search_company, search_news, get_invest_suggest, chat_bot
from service.company import check_company_exists
from service.hf_upload import get_hf_files_with_links
from MarketandStockMCP.news_quote_mcp import get_company_news, get_quote
from EasyReportDataMCP.report_mcp import query_financial_data
from service.report_service import get_report_data, query_company_advanced
from service.report_tools import build_financial_metrics_three_year_data, calculate_yoy_comparison, extract_financial_table, extract_last_three_with_fallback, get_yearly_data
from service.three_year_table_tool import build_table_format
from service.three_year_tool import process_financial_data_with_metadata
from service.tool_processor import get_stock_price
# ✅ 导入缓存管理器
from service.report_cache_manager import ReportCacheManager
from service.financial_data_cache_manager import FinancialDataCacheManager
get_companys_state = True
my_companies = [
{'company_name': 'Alibaba', 'stock_code': 'BABA', "cik": "0001577552"},
{'company_name': 'NVIDIA', 'stock_code': 'NVDA', "cik": "0001045810"},
{'company_name': 'Amazon', 'stock_code': 'AMZN', "cik": "0001018724"},
{'company_name': 'Intel', 'stock_code': 'INTC', "cik": "0000050863"},
{'company_name': 'Meta', 'stock_code': 'META', "cik": "0001326801"},
{'company_name': 'Google', 'stock_code': 'GOOGL', "cik": "0001652044"},
{'company_name': 'Apple', 'stock_code': 'AAPL', "cik": "0000320193"},
{'company_name': 'Tesla', 'stock_code': 'TSLA', "cik": "0001318605"},
{'company_name': 'AMD', 'stock_code': 'AMD', "cik": "0000002488"},
{'company_name': 'Microsoft', 'stock_code': 'MSFT', "cik": "0000789019"}
]
# JavaScript代码用于读取和存储数据
js_code = """
function handleStorage(operation, key, value) {
if (operation === 'set') {
localStorage.setItem(key, value);
return `已存储: ${key} = ${value}`;
} else if (operation === 'get') {
let storedValue = localStorage.getItem(key);
if (storedValue === null) {
return `未找到键: ${key}`;
}
return `读取到: ${key} = ${storedValue}`;
} else if (operation === 'clear') {
localStorage.removeItem(key);
return `已清除: ${key}`;
} else if (operation === 'clearAll') {
localStorage.clear();
return '已清除所有数据';
}
}
"""
custom_css = """
/* 匹配所有以 gradio-container- 开头的类 */
div[class^="gradio-container-"],
div[class*=" gradio-container-"] {
-webkit-text-size-adjust: 100% !important;
line-height: 1.5 !important;
font-family: unset !important;
-moz-tab-size: 4 !important;
tab-size: 4 !important;
}
.company-list-container {
background-color: white;
border-radius: 0.5rem;
padding: 0.75rem;
margin-bottom: 0.75rem;
border: 1px solid #e5e7eb;
box-shadow: 0 1px 2px 0 rgba(0, 0, 0, 0.05);
width: 100%;
}
/* 隐藏单选框 */
.company-list-container input[type="radio"] {
display: none;
}
/* 自定义选项样式 - 小而精致 */
.company-list-container label {
display: block;
padding: 0.5rem 0.75rem;
margin: 0.2rem 0;
border-radius: 0.25rem;
cursor: pointer;
transition: all 0.2s ease;
background-color: #f9fafb;
border: 1px solid #e5e7eb;
font-size: 0.875rem;
text-align: left;
width: 100%;
box-sizing: border-box;
position: relative;
padding-right: 2rem;
}
/* ✅ 为每个公司选项添加删除按钮 */
.company-list-container label::after {
content: '×';
position: absolute;
right: 8px;
top: 50%;
transform: translateY(-50%);
width: 18px;
height: 18px;
border-radius: 50%;
background: #d1d5db;
color: white;
font-size: 14px;
font-weight: bold;
display: flex;
align-items: center;
justify-content: center;
opacity: 0;
transition: opacity 0.2s;
cursor: pointer;
line-height: 1;
}
.company-list-container label:hover::after {
opacity: 1;
}
.company-list-container label::after:hover {
background: #9ca3af;
}
/* 悬停效果 */
.company-list-container label:hover {
background-color: #f3f4f6;
border-color: #d1d5db;
}
/* 选中效果 - 使用蓝色 */
.company-list-container input[type="radio"]:checked + span {
background: #3b82f6 !important;
color: white !important;
font-weight: 600 !important;
display: block;
width: 100%;
height: 100%;
padding: 0.75rem 1rem;
margin: -0.75rem -1rem;
border-radius: 0.375rem;
}
.company-list-container span {
display: block;
padding: 0;
border-radius: 0.375rem;
width: 100%;
}
label.selected {
background: #3b82f6 !important;
color: white !important;
}
/* 确保每行只有一个选项 */
.company-list-container .wrap {
display: block !important;
}
.company-list-container .wrap li {
display: block !important;
width: 100% !important;
}
/* ✅ 搜索框样式 - 带内置图标 */
.company-input-search {
position: relative;
}
.company-input-search input,
.company-input-search textarea {
padding-left: 36px !important;
background-image: url('data:image/svg+xml,%3Csvg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="%23999" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"%3E%3Ccircle cx="11" cy="11" r="8"%3E%3C/circle%3E%3Cpath d="m21 21-4.35-4.35"%3E%3C/path%3E%3C/svg%3E') !important;
background-repeat: no-repeat !important;
background-position: 10px center !important;
background-size: 18px 18px !important;
border: 1px solid #e5e7eb !important;
border-radius: 8px !important;
font-size: 14px !important;
transition: all 0.2s !important;
}
.company-input-search input:focus,
.company-input-search textarea:focus {
border-color: #667eea !important;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important;
outline: none !important;
}
/* ✅ 对齐Tabs和ChatInterface的横线 */
.tabs {
border-bottom: 1px solid #e5e7eb !important;
}
.chatbot {
border-top: 1px solid #e5e7eb !important;
}
/* 确俟tab内容和chatbot的padding一致 */
.tab-item {
padding: 16px !important;
}
/* ✅ 为三个板块添加间距 */
#select-company-section {
margin-bottom: 20px !important;
}
.report-news-box {
margin-top: 20px !important;
}
.report-list-box,
.news-list-box {
margin-bottom: 16px !important;
}
/* ✅ 对齐ChatInterface标题和Tabs标签栏 */
.chatbot .wrap {
margin-top: 0 !important;
}
/* ChatInterface标题样式 */
.chatbot > .wrap > .head {
height: 40px !important;
display: flex !important;
align-items: center !important;
padding: 8px 16px !important;
border-bottom: 1px solid #e5e7eb !important;
background: #f9fafb !important;
}
/* ✅ 缩小Easy Financial AI Assistant标题 - 更精致 */
.chatbot > .wrap > .head h1,
.chatbot > .wrap > .head h2,
.chatbot > .wrap > .head h3 {
font-size: 13px !important;
margin: 0 !important;
font-weight: 500 !important;
color: #6b7280 !important;
letter-spacing: 0.3px !important;
}
/* Tabs标签栏样式 */
.tab-container {
height: 48px !important;
display: flex !important;
align-items: center !important;
border-bottom: 1px solid #e5e7eb !important;
background: #f9fafb !important;
}
/* ✅ 移除Tabs底部横线 */
.tabs {
border-bottom: none !important;
}
"""
# 全局变量用于存储公司映射关系
companies_map = {}
# ✅ 全局变量用于缓存搜索结果(包含CIK等完整信息)
search_result_cache = {}
# 根据公司名称获取股票代码的函数
def get_stock_code_by_company_name(company_name):
"""根据公司名称获取股票代码"""
if company_name in companies_map and "CODE" in companies_map[company_name]:
return companies_map[company_name]["CODE"]
return "" # 默认返回
# 创建一个简单的函数来获取公司列表
def get_company_list_choices():
choices = []
print(f"Getting init add company list choices...{get_companys_state}")
if not get_companys_state:
return gr.update(choices=choices)
try:
# companies_data = get_companys()
companies_data = my_companies
print(f"Getting init add company list choices...companies_data: {companies_data}")
if isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
choices = []
except:
choices = []
return gr.update(choices=choices)
# Sidebar service functions
# 处理公司点击事件的函数
def handle_company_click(company_name):
"""处理公司点击事件,先判断是否已经入库,如果没有则进行入库操作,然后刷新公司列表"""
print(f"Handling click for company: {company_name}")
# 1. 判断是否已经入库
if not check_company_exists(my_companies, company_name):
# 2. 如果没有入库,则进行入库操作
# 获取股票代码(如果有的话)
stock_code = companies_map.get(company_name, {}).get("CODE", "Unknown")
print(f"Inserting company {company_name} with code {stock_code}")
# 插入公司到数据库
# success = insert_company(company_name, stock_code)
my_companies.append({"company_name": company_name, "stock_code": stock_code})
print(f"Successfully inserted company: {company_name}") # 直接更新companies_map,而不是重新加载整个映射
# 直接更新companies_map,而不是重新加载整个映射
companies_map[company_name] = {"NAME": company_name, "CODE": stock_code}
# 使用Gradio的成功提示
gr.Info(f"Successfully added company: {company_name}")
# 返回True表示添加成功,需要刷新列表
return True
else:
print(f"Company {company_name} already exists in database")
# 使用Gradio的警告提示
gr.Warning(f"Company '{company_name}' already exists")
# 3. 返回成功响应
return None
def get_company_list_html(selected_company=""):
try:
# 从数据库获取所有公司
# companies_data = get_companys()
companies_data = my_companies
# 检查是否为错误信息
if isinstance(companies_data, str):
if "查询执行失败" in companies_data:
return "<div class='text-red-500'>获取公司列表失败</div>"
else:
# 如果是字符串但不是错误信息,可能需要特殊处理
return ""
# 检查是否为DataFrame且为空
if not isinstance(companies_data, pd.DataFrame) or companies_data.empty:
return ""
# 生成HTML列表
html_items = []
for _, row in companies_data.iterrows():
company_name = row.get('company_name', 'Unknown')
# 根据是否选中添加不同的样式类
css_class = "company-item"
if company_name == selected_company:
css_class += " selected-company"
# 使用button元素来确保可点击性
html_items.append(f'<button class="{css_class}" data-company="{company_name}" style="width:100%; text-align:left; border:none; background:none;">{company_name}</button>')
return "\n".join(html_items)
except Exception as e:
return f"<div class='text-red-500'>生成公司列表失败: {str(e)}</div>"
def initialize_company_list(selected_company=""):
return get_company_list_html(selected_company)
def refresh_company_list(selected_company=""):
"""刷新公司列表,返回最新的HTML内容,带loading效果"""
# 先返回loading状态
loading_html = '''
<div style="display: flex; justify-content: center; align-items: center; height: 100px;">
<div class="loading-spinner" style="width: 24px; height: 24px; border: 3px solid #f3f3f3; border-top: 3px solid #3498db; border-radius: 50%; animation: spin 1s linear infinite;"></div>
<style>
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
</style>
</div>
'''
yield loading_html
# 然后返回实际的数据
yield get_company_list_html(selected_company)
# 新增函数:处理公司选择事件
def select_company(company_name):
"""处理公司选择事件,更新全局状态并返回更新后的公司列表"""
# 更新全局变量
g.SELECT_COMPANY = company_name if company_name else ""
# 对于Radio组件,我们只需要返回更新后的选项列表
try:
# companies_data = get_companys()
companies_data = my_companies
if isinstance(companies_data, list) and len(companies_data) > 0:
# my_companies 是对象列表 [{company_name: '', stock_code: ''}, ...]
choices = [str(item.get('company_name', 'Unknown')) for item in companies_data]
elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
choices = []
except:
choices = []
return gr.update(choices=choices, value=company_name)
def initialize_companies_map():
"""初始化 companies_map 字典"""
global companies_map
companies_map = {} # 清空之前的映射
print("Initializing companies map...")
try:
# 获取预定义的公司列表
predefined_companies = [
{ "NAME": "Alibaba", "CODE": "BABA" },
{ "NAME": "NVIDIA", "CODE": "NVDA" },
{ "NAME": "Amazon", "CODE": "AMZN" },
{ "NAME": "Intel", "CODE": "INTC" },
{ "NAME": "Meta", "CODE": "META" },
{ "NAME": "Google", "CODE": "GOOGL" },
{ "NAME": "Apple", "CODE": "AAPL" },
{ "NAME": "Tesla", "CODE": "TSLA" },
{ "NAME": "AMD", "CODE": "AMD" },
{ "NAME": "Microsoft", "CODE": "MSFT" },
]
# 将预定义公司添加到映射中
for company in predefined_companies:
companies_map[company["NAME"]] = {"NAME": company["NAME"], "CODE": company["CODE"]}
# print(f"Predefined companies added: {len(predefined_companies)}")
# 从数据库获取公司数据
# companies_data = get_companys()
companies_data = my_companies
# companies_data = window.cachedCompanies or []
print(f"Companies data from DB: {companies_data}")
# 如果数据库中有公司数据,则添加到映射中(去重)
if isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
print(f"Adding {len(companies_data)} companies from database")
for _, row in companies_data.iterrows():
company_name = row.get('company_name', 'Unknown')
stock_code = row.get('stock_code', '')
# 确保company_name和stock_code都是字符串类型
company_name = str(company_name) if company_name is not None else 'Unknown'
stock_code = str(stock_code) if stock_code is not None else ''
# 检查是否已存在于映射中(通过股票代码判断)
is_duplicate = False
for existing_company in companies_map.values():
if existing_company["CODE"] == stock_code:
is_duplicate = True
break
# 如果不重复,则添加到映射中
if not is_duplicate:
companies_map[company_name] = {"NAME": company_name, "CODE": stock_code}
# print(f"Added company: {company_name}")
else:
print("No companies found in database")
print(f"Final companies map: {companies_map}")
except Exception as e:
# 错误处理
print(f"Error initializing companies map: {str(e)}")
pass
# Sidebar company selector functions
def update_company_choices(user_input: str):
"""更新公司选择列表"""
# 第一次 yield:立即显示 modal + loading 提示
yield gr.update(
choices=["Searching..."],
visible=True
), gr.update(visible=False, value="")
# 第二次:执行耗时操作(调用 LLM)
search_result = advanced_search_company_detailed(user_input)
# ✅ 获取格式化的完整数据(包含CIK)
formatted_data = format_search_result(search_result)
# ✅ 使用新的显示格式:"公司名 (Ticker)"
choices = format_search_result_for_display(search_result)
# ✅ 将完整数据存储到全局变量,供add_company使用
global search_result_cache
search_result_cache = {choice: data for choice, data in zip(choices, formatted_data)}
# 检查choices是否为错误信息
if len(choices) == 0:
# 如果是错误信息或非正常格式,显示提示消息
error_message = "未找到匹配的公司"
# 使用Ant Design风格的错误提示
error_html = f'''
<div class="ant-message ant-message-error" style="
position: fixed;
top: 20px;
left: 50%;
transform: translateX(-50%);
z-index: 10000;
padding: 10px 16px;
border-radius: 4px;
background: #fff;
box-shadow: 0 4px 12px rgba(0,0,0,0.15);
display: flex;
align-items: center;
pointer-events: all;
animation: messageFadeIn 0.3s ease-in-out;
">
<div style="
width: 16px;
height: 16px;
background: #ff4d4f;
border-radius: 50%;
position: relative;
margin-right: 8px;
"></div>
<span>{error_message}</span>
</div>
<script>
setTimeout(function() {{
var msg = document.querySelector('.ant-message-error');
if (msg) {{
msg.style.animation = 'messageFadeOut 0.3s ease-in-out';
setTimeout(function() {{ msg.remove(); }}, 3000);
}}
}}, 3000);
</script>
'''
yield gr.update(choices=["No results found"], visible=True), gr.update(visible=True, value=error_html)
else:
# 第三次:更新为真实结果
yield gr.update(
choices=choices,
visible=len(choices) > 0
), gr.update(visible=False, value="")
def add_company(selected, current_list):
"""添加选中的公司"""
if selected == "No results found":
return gr.update(visible=False), current_list, gr.update(visible=False, value="")
if selected:
# ✅ 从缓存中获取完整数据(包含CIK)
global search_result_cache
selected_data = search_result_cache.get(selected, {})
# 从选择的文本中提取公司名称和股票代码
selected_clean = selected.strip()
match = re.match(r"^(.+?)\s*\(([^)]+)\)$", selected_clean)
if match:
company_name = match.group(1)
stock_code = match.group(2)
elif companies_map.get(selected_clean):
company_name = selected_clean
stock_code = companies_map[selected_clean]["CODE"]
else:
company_name = selected_clean
stock_code = "Unknown"
# ✅ 获取CIK用于重复判断
cik = selected_data.get('cik', '')
# ✅ 通过CIK判断是否重复
existing_company = None
if cik:
for company in my_companies:
# 如果已有公司的CIK与新选择的CIK相同,则为重复
if company.get('cik') == cik:
existing_company = company
break
if existing_company:
# ✅ 公司已存在,直接选中已有的公司
existing_name = existing_company.get('company_name', company_name)
gr.Info(f"公司 '{company_name}' 已存在(名称: {existing_name}),已自动选中")
return gr.update(visible=False), gr.update(value=existing_name), gr.update(visible=False, value="")
# ✅ 新公司,添加到列表(保存CIK)
my_companies.append({
"company_name": company_name,
"stock_code": stock_code,
"cik": cik # ✅ 保存CIK用于后续重复判断
})
# ✅ 同时更新 companies_map,确保其他板块能获取到股票代码
companies_map[company_name] = {"NAME": company_name, "CODE": stock_code}
# 从数据库获取更新后的公司列表
try:
companies_data = my_companies
if isinstance(companies_data, list) and len(companies_data) > 0:
updated_list = [str(item.get('company_name', 'Unknown')) for item in companies_data]
elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
updated_list = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
updated_list = []
except:
updated_list = []
# 添加默认公司选项
if not updated_list:
updated_list = ['Alibaba', '腾讯控股', 'Tencent', '阿里巴巴-W', 'Apple']
# 成功插入后清除状态消息,并更新Radio组件的选项,同时默认选中刚添加的公司
return gr.update(visible=False), gr.update(choices=updated_list, value=company_name), gr.update(visible=False, value="")
return gr.update(visible=False), current_list, gr.update(visible=False, value="")
# Sidebar report section functions
# 创建一个全局变量来存储公司按钮组件
company_buttons = {}
def create_company_buttons():
"""创建公司按钮组件"""
# 确保companies_map已被初始化
if not companies_map:
initialize_companies_map()
# 显示companies_map中的公司列表
companies = list(companies_map.keys())
# 添加调试信息
print(f"Companies in map: {companies}")
# 清空之前的按钮
company_buttons.clear()
if not companies:
# 如果没有公司,返回一个空的列
with gr.Column():
gr.Markdown("暂无公司数据")
else:
# 使用Gradio按钮组件创建公司列表
with gr.Column(elem_classes=["home-company-list"]):
# 按每行两个公司进行分组
for i in range(0, len(companies), 2):
# 检查是否是最后一行且只有一个元素
if i + 1 < len(companies):
# 有两个元素
with gr.Row(elem_classes=["home-company-item-box"]):
btn1 = gr.Button(companies[i], elem_classes=["home-company-item", "gradio-button"])
btn2 = gr.Button(companies[i + 1], elem_classes=["home-company-item", "gradio-button"])
# 保存按钮引用
company_buttons[companies[i]] = btn1
company_buttons[companies[i + 1]] = btn2
else:
# 只有一个元素
with gr.Row(elem_classes=["home-company-item-box", "single-item"]):
btn = gr.Button(companies[i], elem_classes=["home-company-item", "gradio-button"])
# 保存按钮引用
company_buttons[companies[i]] = btn
# 返回按钮字典
return company_buttons
def update_report_section(selected_company, report_data, stock_code):
"""根据选中的公司更新报告部分"""
print(f"Updating report (报告部分): {selected_company}") # 添加调试信息
if selected_company == "" or selected_company is None or selected_company == "Unknown":
# 没有选中的公司,显示公司列表
# html_content = get_initial_company_list_content()
# 暂时返回空内容,稍后会用Gradio组件替换
html_content = ""
return gr.update(value=html_content, visible=True)
else:
# 有选中的公司,显示相关报告
try:
# prmpt = f"""
# """
stock_code = get_stock_code_by_company_name(selected_company)
# result = get_report_data(stock_code)
# print(f"get_report_data=====================: {result}")
report_data = query_financial_data(stock_code, "5-Year")
# report_data = process_financial_data_with_metadata(financial_metrics_pre)
# 检查 report_data 是否是列表且第一个元素是字典
if not isinstance(report_data, list) or len(report_data) == 0:
return gr.update(value="", visible=True)
# 检查第一个元素是否是字典
if not isinstance(report_data[0], dict):
return gr.update(value="<div>数据格式不正常</div>", visible=True)
# ✅ 可折叠的Financial Reports,默认显示5个
total_reports = len(report_data)
show_limit = 5
html_content = '<div class="report-list-box bg-white">'
# ✅ 美化Financial Reports标题
html_content += '''<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 12px;
border-radius: 8px;
text-align: center;
margin-bottom: 12px;">
<h3 style="color: white; margin: 0; font-size: 16px; font-weight: 600;">Financial Reports</h3>
</div>'''
# 添加CSS样式
html_content += '''<style>
.report-toggle-btn {
background: #f3f4f6;
border: 1px solid #e5e7eb;
border-radius: 4px;
padding: 6px 12px;
cursor: pointer;
font-size: 0.875rem;
color: #374151;
text-align: center;
margin: 8px 0;
width: 100%;
transition: all 0.2s;
}
.report-toggle-btn:hover {
background: #e5e7eb;
}
.report-extra {
display: none;
}
.report-extra.show {
display: block;
}
</style>'''
# 显示前5个
for i, report in enumerate(report_data[:show_limit]):
source_url = report.get('source_url', '#')
period = report.get('period', 'N/A')
source_form = report.get('source_form', 'N/A')
html_content += f'''
<div class="report-item bg-white hover:bg-blue-50 cursor-pointer" onclick="window.open('{source_url}', '_blank')">
<div class="report-item-content">
<span class="text-gray-800">{period}-{stock_code}-{source_form}</span>
<svg xmlns="http://www.w3.org/2000/svg" width="12" height="12" class="text-blue-500" viewBox="0 0 20 20" fill="currentColor">
<path fill-rule="evenodd" d="M12.586 4.586a2 2 0 112.828 2.828l-3 3a2 2 0 01-2.828 0 1 1 0 10-1.414 1.414 4 4 0 005.656 0l3-3a4 4 0 00-5.656-5.656l-1.5 1.5a1 1 0 101.414 1.414l-1.5-1.5zm-5 5a2 2 0 012.828 0 1 1 0 101.414-1.414 4 4 0 00-5.656 0l-3 3a4 4 0 105.656 5.656l1.5-1.5a1 1 0 10-1.414-1.414l-1.5 1.5a2 2 0 11-2.828-2.828l3-3z" clip-rule="evenodd" />
</svg>
</div>
</div>
'''
# 剩余的放在可折叠区域
if total_reports > show_limit:
html_content += '<div class="report-extra" id="reportExtra">'
for i, report in enumerate(report_data[show_limit:]):
source_url = report.get('source_url', '#')
period = report.get('period', 'N/A')
source_form = report.get('source_form', 'N/A')
html_content += f'''
<div class="report-item bg-white hover:bg-blue-50 cursor-pointer" onclick="window.open('{source_url}', '_blank')">
<div class="report-item-content">
<span class="text-gray-800">{period}-{stock_code}-{source_form}</span>
<svg xmlns="http://www.w3.org/2000/svg" width="12" height="12" class="text-blue-500" viewBox="0 0 20 20" fill="currentColor">
<path fill-rule="evenodd" d="M12.586 4.586a2 2 0 112.828 2.828l-3 3a2 2 0 01-2.828 0 1 1 0 10-1.414 1.414 4 4 0 005.656 0l3-3a4 4 0 00-5.656-5.656l-1.5 1.5a1 1 0 101.414 1.414l-1.5-1.5a2 2 0 11-2.828-2.828l3-3z" clip-rule="evenodd" />
</svg>
</div>
</div>
'''
html_content += '</div>'
# 添加展开/收起按钮
html_content += f'''<div class="report-toggle-btn" onclick="
var extra = document.getElementById('reportExtra');
if (extra.classList.contains('show')) {{
extra.classList.remove('show');
this.innerHTML = '↓ Show All ({total_reports} reports)';
}} else {{
extra.classList.add('show');
this.innerHTML = '↑ Show Less';
}}
">↓ Show All ({total_reports} reports)</div>'''
html_content += '</div>'
return gr.update(value=html_content, visible=True)
except Exception as e:
print(f"Error in update_report_section: {str(e)}")
return gr.update(value=f"<div>报告载入失败: {str(e)}</div>", visible=True)
def update_news_section(selected_company):
"""根据选中的公司更新报告部分"""
html_content = ""
if selected_company == "" or selected_company is None:
# 没有选中的公司,显示公司列表
# html_content = get_initial_company_list_content()
# 暂时返回空内容,稍后会用Gradio组件替换
return gr.update(value=html_content, visible=True)
else:
try:
stock_code = get_stock_code_by_company_name(selected_company)
report_data = get_company_news(stock_code, None, None)
# print(f"新闻列表: {report_data['articles']}")
# report_data = search_news(selected_company)
if (report_data['articles']):
report_data = report_data['articles']
# ✅ 可折叠的News,默认显示5个
total_news = len(report_data)
show_limit = 5
news_html = "<div class='news-list-box bg-white'>"
# ✅ 美化News标题
news_html += '''<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 12px;
border-radius: 8px;
text-align: center;
margin-bottom: 12px;">
<h3 style="color: white; margin: 0; font-size: 16px; font-weight: 600;">News</h3>
</div>'''
# 添加CSS样式
news_html += '''<style>
.news-toggle-btn {
background: #f3f4f6;
border: 1px solid #e5e7eb;
border-radius: 4px;
padding: 6px 12px;
cursor: pointer;
font-size: 0.875rem;
color: #374151;
text-align: center;
margin: 8px 0;
width: 100%;
transition: all 0.2s;
}
.news-toggle-btn:hover {
background: #e5e7eb;
}
.news-extra {
display: none;
}
.news-extra.show {
display: block;
}
</style>'''
from datetime import datetime
# 显示前5个
for news in report_data[:show_limit]:
published_at = news['published']
dt = datetime.fromisoformat(published_at.replace("Z", "+00:00"))
formatted_date = dt.strftime("%Y.%m.%d")
news_html += f'''
<div class="news-item bg-white hover:bg-blue-50 cursor-pointer" onclick="window.open('{news['url']}', '_blank')">
<div class="news-item-content">
<span class="text-xs text-gray-500">[{formatted_date}]</span>
<span class="text-gray-800">{news['headline']}</span>
</div>
</div>
'''
# 剩余的放在可折叠区域
if total_news > show_limit:
news_html += '<div class="news-extra" id="newsExtra">'
for news in report_data[show_limit:]:
published_at = news['published']
dt = datetime.fromisoformat(published_at.replace("Z", "+00:00"))
formatted_date = dt.strftime("%Y.%m.%d")
news_html += f'''
<div class="news-item bg-white hover:bg-blue-50 cursor-pointer" onclick="window.open('{news['url']}', '_blank')">
<div class="news-item-content">
<span class="text-xs text-gray-500">[{formatted_date}]</span>
<span class="text-gray-800">{news['headline']}</span>
</div>
</div>
'''
news_html += '</div>'
# 添加展开/收起按钮
news_html += f'''<div class="news-toggle-btn" onclick="
var extra = document.getElementById('newsExtra');
if (extra.classList.contains('show')) {{
extra.classList.remove('show');
this.innerHTML = '↓ Show All ({total_news} news)';
}} else {{
extra.classList.add('show');
this.innerHTML = '↑ Show Less';
}}
">↓ Show All ({total_news} news)</div>'''
news_html += '</div>'
html_content += news_html
except Exception as e:
print(f"Error updating report section: {str(e)}")
return gr.update(value=html_content, visible=True)
# Component creation functions
def create_header():
"""创建头部组件"""
# 获取当前时间
# current_time = datetime.datetime.now().strftime("%B %d, %Y - Market Data Updated Today")
current_time = "" # ✅ 不再显示日期
with gr.Row(elem_classes=["header"]):
# 左侧:图标和标题
with gr.Column(scale=8):
# 使用圆柱体SVG图标表示数据库
gr.HTML('''
<div class="top-logo-box">
<svg xmlns="http://www.w3.org/2000/svg" width="32" height="32" viewBox="0 0 48 48">
<g fill="none" stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="4">
<path d="M44 11v27c0 3.314-8.954 6-20 6S4 41.314 4 38V11"></path>
<path d="M44 29c0 3.314-8.954 6-20 6S4 32.314 4 29m40-9c0 3.314-8.954 6-20 6S4 23.314 4 20"></path>
<ellipse cx="24" cy="10" rx="20" ry="6"></ellipse>
</g>
</svg>
<span class="logo-title">Easy Financial Report Dashboard</span>
</div>
''', elem_classes=["text-2xl"])
# 右侧:时间信息
with gr.Column(scale=2):
gr.Markdown(current_time, elem_classes=["text-sm-top-time"])
def create_company_list(get_companys_state):
"""创建公司列表组件"""
try:
# 获取公司列表数据
# companies_data = get_companys()
companies_data = my_companies
print(f"创建公司列表组件 - Companies data: {companies_data}")
if isinstance(companies_data, list) and len(companies_data) > 0:
# my_companies 是对象列表 [{company_name: '', stock_code: ''}, ...]
choices = [str(item.get('company_name', 'Unknown')) for item in companies_data]
elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
choices = []
except Exception as e:
print(f"Error creating company list: {str(e)}")
choices = []
# 添加默认公司选项
if not choices:
choices = []
# 使用Radio组件显示公司列表,不显示标签
company_list = gr.Radio(
choices=choices,
label="",
interactive=True,
elem_classes=["company-list-container"],
container=False, # 不显示外部容器边框
visible=True
)
return company_list
def create_company_selector():
"""创建公司选择器组件"""
# ✅ 使用HTML和CSS创建带内置图标的搜索框
company_input = gr.Textbox(
show_label=False,
placeholder=" Name, ticker, or CIK", # 留出空间给图标
elem_classes=["company-input-search"],
container=False
)
# 状态消息显示区域
status_message = gr.HTML(
"",
elem_classes=["status-message"],
visible=False
)
# 弹窗选择列表
company_modal = gr.Radio(
show_label=False,
choices=[],
visible=False,
elem_classes=["company-modal"]
)
return company_input, status_message, company_modal
def create_report_section():
"""创建报告部分组件"""
# 创建一个用于显示报告列表的组件,初始显示公司列表
# 先加载默认公司的报告数据
initial_content = ""
try:
if my_companies and len(my_companies) > 0:
default_company = my_companies[0]['company_name']
initial_content_result = update_report_section(default_company, None, None)
# update_report_section 返回 gr.update() 字典,提取 value 字段
initial_content = initial_content_result.get('value', '') if isinstance(initial_content_result, dict) else ""
except:
initial_content = ""
report_display = gr.HTML(initial_content)
return report_display
def create_news_section():
"""创建新闻部分组件"""
initial_content = ""
news_display = gr.HTML(initial_content)
return news_display
def format_financial_metrics(data: dict, prev_data: dict = None) -> list: # pyright: ignore[reportArgumentType]
"""
将原始财务数据转换为 financial_metrics 格式。
Args:
data (dict): 当前财年数据(必须包含 total_revenue, net_income 等字段)
prev_data (dict, optional): 上一财年数据,用于计算 change。若未提供,change 设为 "--"
Returns:
list[dict]: 符合 financial_metrics 格式的列表
"""
def format_currency(value: float) -> str:
"""将数字格式化为 $XB / $XM / $XK"""
if value >= 1e9:
return f"${value / 1e9:.2f}B"
elif value >= 1e6:
return f"${value / 1e6:.2f}M"
elif value >= 1e3:
return f"${value / 1e3:.2f}K"
else:
return f"${value:.2f}"
def calculate_change(current: float, previous: float) -> tuple:
"""计算变化百分比和颜色"""
if previous == 0:
return "--", "gray"
change_pct = (current - previous) / abs(previous) * 100
sign = "+" if change_pct >= 0 else ""
color = "green" if change_pct >= 0 else "red"
return f"{sign}{change_pct:.1f}%", color
# 定义指标映射
metrics_config = [
{
"key": "total_revenue",
"label": "Total Revenue",
"is_currency": True,
"eps_like": False
},
{
"key": "net_income",
"label": "Net Income",
"is_currency": True,
"eps_like": False
},
{
"key": "earnings_per_share",
"label": "Earnings Per Share",
"is_currency": False, # EPS 不用 B/M 单位
"eps_like": True
},
{
"key": "operating_expenses",
"label": "Operating Expenses",
"is_currency": True,
"eps_like": False
},
{
"key": "operating_cash_flow",
"label": "Cash Flow",
"is_currency": True,
"eps_like": False
}
]
result = []
for item in metrics_config:
key = item["key"]
current_val = data.get(key)
if current_val is None:
continue
# 格式化 value
if item["eps_like"]:
value_str = f"${current_val:.2f}"
elif item["is_currency"]:
value_str = format_currency(current_val)
else:
value_str = str(current_val)
# 计算 change(如果有上期数据)
if prev_data and key in prev_data:
prev_val = prev_data[key]
change_str, color = calculate_change(current_val, prev_val)
else:
change_str = "--"
color = "gray"
result.append({
"label": item["label"],
"value": value_str,
"change": change_str,
"color": color
})
return result
def create_sidebar():
"""创建侧边栏组件"""
# 初始化 companies_map
initialize_companies_map()
with gr.Column(elem_classes=["sidebar"]):
# 公司选择
with gr.Group(elem_classes=["card"], elem_id="select-company-section"):
# ✅ 美化标题:居中对齐,添加背景色
gr.HTML("""
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 12px;
border-radius: 8px;
text-align: center;
margin-bottom: 16px;">
<h3 style="color: white; margin: 0; font-size: 16px; font-weight: 600;">Select Company</h3>
</div>
""")
with gr.Column():
company_list = create_company_list(get_companys_state)
# 创建公司列表
# if not get_companys_state:
# getCompanyFromStorage = gr.Button("读取")
# getCompanyFromStorage.click(
# fn=create_company_list(True),
# inputs=[],
# outputs=[company_list, status_message]
# )
# 创建公司选择器
company_input, status_message, company_modal = create_company_selector()
# 绑定事件 - 只需要submit事件
company_input.submit(
fn=update_company_choices,
inputs=[company_input],
outputs=[company_modal, status_message]
)
company_modal.change(
fn=add_company,
inputs=[company_modal, company_list],
outputs=[company_modal, company_list, status_message]
)
# 创建公司按钮组件
# # company_buttons = create_company_buttons()
# # 为每个公司按钮绑定点击事件
# def make_click_handler(company_name):
# def handler():
# result = handle_company_click(company_name)
# # 如果添加成功,刷新Select Company列表并默认选中刚添加的公司
# if result is True:
# # 正确地刷新通过create_company_list()创建的Radio组件
# try:
# # companies_data = get_companys()
# companies_data = my_companies
# if isinstance(companies_data, list) and len(companies_data) > 0:
# # my_companies 是对象列表 [{company_name: '', stock_code: ''}, ...]
# updated_choices = [str(item.get('company_name', 'Unknown')) for item in companies_data]
# elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
# updated_choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
# else:
# updated_choices = []
# except:
# updated_choices = []
# # 使用gr.update来正确更新Radio组件,并默认选中刚添加的公司
# # 同时触发change事件来加载数据
# return gr.update(choices=updated_choices, value=company_name)
# return None
# return handler
# for company_name, button in company_buttons.items():
# button.click(
# fn=make_click_handler(company_name),
# inputs=[],
# outputs=[company_list]
# )
# 创建一个容器来容纳报告部分,初始时隐藏
with gr.Group(elem_classes=["report-news-box"]) as report_section_group:
# gr.Markdown("### Financial Reports", elem_classes=["card-title", "left-card-title"])
report_display = create_report_section()
news_display = create_news_section()
# 处理公司选择事件
def select_company_handler(company_name):
"""处理公司选择事件的处理器"""
# 更新全局变量
g.SELECT_COMPANY = company_name if company_name else ""
# 更新报告部分的内容
updated_report_display = update_report_section(company_name, None, None)
updated_news_display = update_news_section(company_name)
# 根据是否选择了公司来决定显示/隐藏报告部分
if company_name:
# 有选中的公司,显示报告部分
return gr.update(visible=True), updated_report_display, updated_news_display
else:
# 没有选中的公司,隐藏报告部分
return gr.update(visible=False), updated_report_display, updated_news_display
company_list.change(
fn=select_company_handler,
inputs=[company_list],
outputs=[report_section_group, report_display, news_display]
)
# 返回公司列表组件和报告部分组件
return company_list, report_section_group, report_display, news_display
def build_income_table(table_data):
# 兼容两种数据结构:
# 1. 新结构:包含 list_data 和 yoy_rates 的字典
# 2. 旧结构:直接是二维数组
if isinstance(table_data, dict) and "list_data" in table_data:
# 新结构
income_statement = table_data["list_data"]
yoy_rates = table_data["yoy_rates"] or []
else:
# 旧结构,直接使用传入的数据
income_statement = table_data
yoy_rates = []
# 创建一个映射,将年份列索引映射到增长率
yoy_map = {}
if len(yoy_rates) > 1 and len(yoy_rates[0]) > 1:
# 获取增长率表头(跳过第一列"Category")
yoy_headers = yoy_rates[0][1:]
# 为每个指标行创建增长率映射
for i, yoy_row in enumerate(yoy_rates[1:], 1): # 跳过标题行
category = yoy_row[0]
yoy_map[category] = {}
for j, rate in enumerate(yoy_row[1:]):
if j < len(yoy_headers):
yoy_map[category][yoy_headers[j]] = rate
table_rows = ""
header_row = income_statement[0]
for i, row in enumerate(income_statement):
if i == 0:
row_style = "background-color: #f5f5f5; font-weight: 500;"
else:
row_style = "background-color: #f9f9f9;"
cells = ""
for j, cell in enumerate(row):
if j == 0:
# Category列样式 - 恢复为居中
cells += f"<td style='padding: 8px; border: 1px solid #ddd; text-align: center; font-size: 14px;'>{cell}</td>"
else:
# 添加增长率箭头(如果有的话)
growth = None
category = row[0]
if i > 0 and category in yoy_map and j > 0 and j < len(header_row):
year_header = header_row[j]
if year_header in yoy_map[category]:
growth = yoy_map[category][year_header]
if growth and growth != "N/A" and growth != "--":
arrow = "▲" if growth.startswith("+") else "▼"
color = "green" if growth.startswith("+") else "red"
# 年份数据列,包含增幅指示 - 恢复为不分离的版本
cells += f"""<td style='padding: 8px; border: 1px solid #ddd; text-align: center; font-size: 14px; position: relative;'>
<div>{cell}</div>
<div style='position: absolute; bottom: -5px; right: 5px; font-size: 10px; color: {color};'>{arrow}{growth}</div>
</td>"""
else:
# 无增幅的单元格
cells += f"<td style='padding: 8px; border: 1px solid #ddd; text-align: center; font-size: 14px;'>{cell}</td>"
table_rows += f"<tr style='{row_style}'>{cells}</tr>"
html = f"""
<div style="min-width: 400px;max-width: 600px;height: 300px !important;border: 1px solid #e0e0e0; border-radius: 8px; padding: 16px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); font-family: 'Segoe UI', sans-serif;">
<div style="display: flex; align-items: center; gap: 8px; margin-bottom: 16px;">
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M12 2L15.09 8.26L19 9.07L16 14L16 19L12 19L8 14L8 9.07L4.91 8.26L8 2L12 2Z" fill="#0066cc"/>
</svg>
<div style="font-size: 18px; font-weight: 600;">Latest 3 Years Financial Metrics</div>
</div>
<table style="width: 100%; border-collapse: collapse; font-size: 14px;">
{table_rows}
</table>
</div>
"""
return html
def create_metrics_dashboard():
"""创建指标仪表板组件"""
with gr.Row(elem_classes=["metrics-dashboard"]):
card_custom_style = '''
background-color: white;
border-radius: 0.5rem;
box-shadow: rgba(0, 0, 0, 0.1) 0px 1px 3px 0px, rgba(0, 0, 0, 0.1) 0px 1px 2px -1px;
padding: 1.25rem;
min-height: 250px !important;
text-align: center;
'''
# 构建左侧卡片
def build_stock_card():
# 尝试加载默认公司的数据
default_company = my_companies[0]['company_name'] if my_companies else "N/A"
try:
stock_code = get_stock_code_by_company_name(default_company)
company_info = get_quote(stock_code.strip())
company_info['company'] = default_company
except:
company_info = {}
try:
if not company_info or not isinstance(company_info, dict):
company_name = "N/A"
symbol = "N/A"
price = "N/A"
change_html = '<span style="color:#888;">N/A</span>'
open_val = high_val = low_val = prev_close_val = volume_display = "N/A"
else:
company_name = company_info.get("company", "N/A")
symbol = company_info.get("symbol", "N/A")
price = company_info.get("current_price", "N/A")
# 解析 change
change_str = company_info.get("change", "0")
try:
change = float(change_str)
except (ValueError, TypeError):
change = 0.0
# 解析 change_percent
change_percent = company_info.get("percent_change", "0%")
change_color = "green" if change >= 0 else "red"
sign = "+" if change >= 0 else ""
change_html = f'<span style="color:{change_color};">{sign}{change:.2f} ({change_percent:+.2f}%)</span>'
# 其他价格字段
open_val = company_info.get("open", "N/A")
high_val = company_info.get("high", "N/A")
low_val = company_info.get("low", "N/A")
prev_close_val = company_info.get("previous_close", "N/A")
html = f"""
<div style="width: 250px; height: 300px !important; border: 1px solid #e0e0e0; border-radius: 8px; padding: 16px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); font-family: 'Segoe UI', sans-serif;">
<div style="font-size: 16px; color: #555; font-weight: 500;">{company_name}</div>
<div style="font-size: 12px; color: #888;">NYSE:{symbol}</div>
<div style="display: flex; align-items: center; gap: 10px; margin: 8px 0;">
<div style="font-size: 32px; font-weight: bold;">{price}</div>
<div style="font-size: 14px;">{change_html}</div>
</div>
<div style="margin-top: 12px; display: grid; grid-template-columns: auto 1fr; gap: 8px;">
<div style="font-size: 14px; color: #555;">Open</div><div style="font-size: 14px; font-weight: 500; text-align: center;">{open_val}</div>
<div style="font-size: 14px; color: #555;">High</div><div style="font-size: 14px; font-weight: 500; text-align: center;">{high_val}</div>
<div style="font-size: 14px; color: #555;">Low</div><div style="font-size: 14px; font-weight: 500; text-align: center;">{low_val}</div>
<div style="font-size: 14px; color: #555;">Prev Close</div><div style="font-size: 14px; font-weight: 500; text-align: center;">{prev_close_val}</div>
</div>
</div>
"""
return html
except Exception as e:
print(f"Error building stock card: {e}")
return '<div style="width:250px; padding:16px; color:red;">Error loading stock data</div>'
# 模拟数据 - 第一次打开页面时的默认值
financial_metrics = [
{"label": "Total Revenue", "value": "N/A", "change": "N/A", "color": "grey"},
{"label": "Net Income", "value": "N/A", "change": "N/A", "color": "grey"},
{"label": "Earnings Per Share", "value": "N/A", "change": "N/A", "color": "grey"},
{"label": "Operating Expenses", "value": "N/A", "change": "N/A", "color": "grey"},
{"label": "Cash Flow", "value": "N/A", "change": "N/A", "color": "grey"}
]
income_statement = {
"list_data": [
["Category", "N/A/FY", "N/A/FY", "N/A/FY"],
["Total", "N/A", "N/A", "N/A"],
["Net Income", "N/A", "N/A", "N/A.4M"],
["Earnings Per Share", "N/A", "N/A", "N/A"],
["Operating Expenses", "N/A", "N/A", "N/A"],
["Cash Flow", "N/A", "N/A", "N/A"]
],
"yoy_rates": []
}
yearly_data = 'N/A'
# 增长变化的 HTML 字符(箭头+百分比)
def render_change(change: str, color: str):
if change.startswith("+"):
return f'<span style="color:{color};">▲{change}</span>'
else:
return f'<span style="color:{color};">▼{change}</span>'
# 构建中间卡片
def build_financial_metrics():
# 尝试加载默认公司的财务指标数据
default_company = my_companies[0]['company_name'] if my_companies else "N/A"
try:
stock_code = get_stock_code_by_company_name(default_company)
financial_metrics_pre = query_financial_data(stock_code, "5-Year")
result = process_financial_data_with_metadata(financial_metrics_pre)
default_financial_metrics = result["financial_metrics"]
default_yearly_data = result["year_data"]
except:
default_financial_metrics = financial_metrics
default_yearly_data = yearly_data
metrics_html = ""
for item in default_financial_metrics:
change_html = render_change(item["change"], item["color"])
metrics_html += f"""
<div style="display: flex; justify-content: space-between; padding: 8px 0; font-family: 'Segoe UI', sans-serif;">
<div style="font-size: 14px; color: #555;">{item['label']}</div>
<div style="font-size: 16px; font-weight: 500; color: #333;">{item['value']} {change_html}</div>
</div>
"""
html = f"""
<div style="min-width: 300px;max-width: 450px;height: 300px !important;border: 1px solid #e0e0e0; border-radius: 8px; padding: 16px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); font-family: 'Segoe UI', sans-serif;">
<div style="display: flex; align-items: center; gap: 8px; margin-bottom: 16px;justify-content: space-between;">
<div style="font-size: 18px; font-weight: 600;display: flex;align-items: center;">
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M12 2L15.09 8.26L19 9.07L16 14L16 19L12 19L8 14L8 9.07L4.91 8.26L8 2L12 2Z" fill="#0066cc"/>
</svg>
<span style="margin-left: 10px;">{default_yearly_data} Financial Metrics</span>
</div>
<div style="font-size: 16px; color: #8f8f8f;">
YTD data
</div>
</div>
{metrics_html}
</div>
"""
return html
# 主函数:返回所有 HTML 片段
def get_dashboard():
# 尝试加载默认公司的收入表数据
default_company = my_companies[0]['company_name'] if my_companies else "N/A"
try:
stock_code = get_stock_code_by_company_name(default_company)
financial_metrics_pre = query_financial_data(stock_code, "5-Year")
result = process_financial_data_with_metadata(financial_metrics_pre)
default_three_year_data = result["three_year_data"]
default_table_data = build_table_format(default_three_year_data)
except:
default_table_data = income_statement
with gr.Row():
# ✅ 调整宽度比例为 1:1:2
with gr.Column(scale=1, min_width=250, elem_classes=["metric-card-col-left"]):
stock_card_html = gr.HTML(build_stock_card(), elem_classes=["metric-card-left"])
with gr.Column(scale=1, min_width=300, elem_classes=["metric-card-col-middle"]):
financial_metrics_html = gr.HTML(build_financial_metrics(), elem_classes=["metric-card-middle"])
with gr.Column(scale=2, min_width=450, elem_classes=["metric-card-col-right"]):
# 传递default_table_data参数
income_table_html = gr.HTML(build_income_table(default_table_data), elem_classes=["metric-card-right"])
return stock_card_html, financial_metrics_html, income_table_html
# 创建指标仪表板并保存引用
stock_card_component, financial_metrics_component, income_table_component = get_dashboard()
# 将组件引用保存到全局变量,以便在其他地方使用
global metrics_dashboard_components
metrics_dashboard_components = (stock_card_component, financial_metrics_component, income_table_component)
# 更新指标仪表板的函数
def update_metrics_dashboard(company_name):
"""根据选择的公司更新指标仪表板"""
company_info = {}
# 尝试获取股票价格数据,但不中断程序执行
stock_code = ""
try:
# 根据选择的公司获取股票代码
stock_code = get_stock_code_by_company_name(company_name)
# ✅ 检查股票代码是否有效
if not stock_code or stock_code.strip() == "":
print(f"⚠️ Warning: No stock code found for company '{company_name}'")
print(f"Current companies_map keys: {list(companies_map.keys())}")
# 返回友好的错误提示
error_html = f'''<div style="width:250px; padding:16px; color:#e74c3c; background:#fee; border-radius:8px;">
<strong>⚠️ Stock code not found</strong><br/>
<small>Company: {company_name}</small>
</div>'''
error_metrics = f'''<div style="width:450px; padding:20px; color:#e74c3c; background:#fee; border-radius:8px;">
<strong>N/A Financial Metrics</strong><br/>
<small>Unable to load data for {company_name}</small>
</div>'''
error_table = f'''<div style="padding:16px; color:#e74c3c; background:#fee; border-radius:8px;">
<strong>Latest 3 Years Financial Metrics</strong><br/>
<small>Please check if the company has been added correctly</small>
</div>'''
return error_html, error_metrics, error_table
company_info = get_quote(stock_code.strip())
company_info['company'] = company_name
print(f"股票价格数据 {company_info}")
except Exception as e:
print(f"获取股票价格数据失败: {e}")
financial_metrics_pre = query_financial_data(stock_code, "5-Year")
financial_metrics = []
year_data = None
three_year_data = None
try:
# financial_metrics = process_financial_data_with_metadata(financial_metrics_pre)
result = process_financial_data_with_metadata(financial_metrics_pre)
# 按需提取字段
financial_metrics = result["financial_metrics"]
year_data = result["year_data"]
three_year_data = result["three_year_data"]
print(f"格式化后的财务数据: {financial_metrics}")
except Exception as e:
print(f"Error process_financial_data: {e}")
yearly_data = year_data
table_data = build_table_format(three_year_data)
# 增长变化的 HTML 字符(箭头+百分比)
def render_change(change: str, color: str):
if change.startswith("+"):
return f'<span style="color:{color};">▲{change}</span>'
else:
return f'<span style="color:{color};">▼{change}</span>'
# 构建左侧卡片
def build_stock_card(company_info):
try:
if not company_info or not isinstance(company_info, dict):
company_name = "N/A"
symbol = "N/A"
price = "N/A"
change_html = '<span style="color:#888;">N/A</span>'
open_val = high_val = low_val = prev_close_val = volume_display = "N/A"
else:
company_name = company_info.get("company", "N/A")
symbol = company_info.get("symbol", "N/A")
price = company_info.get("current_price", "N/A")
# 解析 change
change_str = company_info.get("change", "0")
try:
change = float(change_str)
except (ValueError, TypeError):
change = 0.0
# 解析 change_percent
change_percent = company_info.get("percent_change", "0%")
# try:
# change_percent = float(change_percent_str.rstrip('%'))
# except (ValueError, TypeError):
# change_percent = 0.0
change_color = "green" if change >= 0 else "red"
sign = "+" if change >= 0 else ""
change_html = f'<span style="color:{change_color};">{sign}{change:.2f} ({change_percent:+.2f}%)</span>'
# 其他价格字段(可选:也可格式化为 2 位小数)
open_val = company_info.get("open", "N/A")
high_val = company_info.get("high", "N/A")
low_val = company_info.get("low", "N/A")
prev_close_val = company_info.get("previous_close", "N/A")
# raw_volume = company_info.get("volume", "N/A")
# volume_display = format_volume(raw_volume)
html = f"""
<div style="width: 250px; height: 300px !important; border: 1px solid #e0e0e0; border-radius: 8px; padding: 16px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); font-family: 'Segoe UI', sans-serif;">
<div style="font-size: 16px; color: #555; font-weight: 500;">{company_name}</div>
<div style="font-size: 12px; color: #888;">NYSE:{symbol}</div>
<div style="display: flex; align-items: center; gap: 10px; margin: 8px 0;">
<div style="font-size: 32px; font-weight: bold;">{price}</div>
<div style="font-size: 14px;">{change_html}</div>
</div>
<div style="margin-top: 12px; display: grid; grid-template-columns: auto 1fr; gap: 8px;">
<div style="font-size: 14px; color: #555;">Open</div><div style="font-size: 14px; font-weight: 500; text-align: center;">{open_val}</div>
<div style="font-size: 14px; color: #555;">High</div><div style="font-size: 14px; font-weight: 500; text-align: center;">{high_val}</div>
<div style="font-size: 14px; color: #555;">Low</div><div style="font-size: 14px; font-weight: 500; text-align: center;">{low_val}</div>
<div style="font-size: 14px; color: #555;">Prev Close</div><div style="font-size: 14px; font-weight: 500; text-align: center;">{prev_close_val}</div>
</div>
</div>
"""
# <div style="font-size: 14px; color: #555;">Vol</div><div style="font-size: 14px; font-weight: 500; text-align: center;">{volume_display}</div>
return html
except Exception as e:
print(f"Error building stock card: {e}")
return '<div style="width:250px; padding:16px; color:red;">Error loading stock data</div>'
# 构建中间卡片
def build_financial_metrics(yearly_data):
metrics_html = ""
for item in financial_metrics:
change_html = render_change(item["change"], item["color"])
metrics_html += f"""
<div style="display: flex; justify-content: space-between; padding: 8px 0; font-family: 'Segoe UI', sans-serif;">
<div style="font-size: 14px; color: #555;">{item['label']}</div>
<div style="font-size: 16px; font-weight: 500; color: #333;">{item['value']} {change_html}</div>
</div>
"""
html = f"""
<div style="width: 450px;height: 300px !important;border: 1px solid #e0e0e0; border-radius: 8px; padding: 16px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); font-family: 'Segoe UI', sans-serif;">
<div style="display: flex; align-items: center; gap: 8px; margin-bottom: 16px;justify-content: space-between;">
<div style="font-size: 18px; font-weight: 600;display: flex;align-items: center;">
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M12 2L15.09 8.26L19 9.07L16 14L16 19L12 19L8 14L8 9.07L4.91 8.26L8 2L12 2Z" fill="#0066cc"/>
</svg>
<span style="margin-left: 10px;">{yearly_data} Financial Metrics</span>
</div>
<div style="font-size: 16px; color: #8f8f8f;">
YTD data
</div>
</div>
{metrics_html}
</div>
"""
return html
# 返回三个HTML组件的内容
return build_stock_card(company_info), build_financial_metrics(yearly_data), build_income_table(table_data)
def create_tab_content(tab_name, company_name):
"""创建Tab内容组件"""
if tab_name == "summary":
print(f"company_name: {company_name}")
# content = get_invest_suggest(company_name)
gr.Markdown("# 11111", elem_classes=["invest-suggest-md-box"])
elif tab_name == "detailed":
with gr.Column(elem_classes=["tab-content"]):
gr.Markdown("Financial Statements", elem_classes=["text-xl", "font-semibold", "text-gray-900", "mb-6"])
with gr.Row(elem_classes=["gap-6"]):
# 收入报表 (3/5宽度)
with gr.Column(elem_classes=["w-3/5", "bg-gray-50", "rounded-xl", "p-4"]):
gr.Markdown("Income Statement", elem_classes=["font-medium", "mb-3"])
# 这里将显示收入报表表格
# 资产负债表和现金流量表 (2/5宽度)
with gr.Column(elem_classes=["w-2/5", "flex", "flex-col", "gap-6"]):
# 资产负债表
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]):
gr.Markdown("Balance Sheet Summary", elem_classes=["font-medium", "mb-3"])
# 这里将显示资产负债表图表
# 现金流量表
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]):
with gr.Row(elem_classes=["justify-between", "items-start"]):
gr.Markdown("Cash Flow Statement", elem_classes=["font-medium"])
gr.Markdown("View Detailed", elem_classes=["text-xs", "text-blue-600", "font-medium"])
with gr.Column(elem_classes=["mt-4", "space-y-3"]):
# 经营现金流
with gr.Column():
with gr.Row(elem_classes=["justify-between"]):
gr.Markdown("Operating Cash Flow")
gr.Markdown("$982M", elem_classes=["font-medium"])
with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]):
with gr.Column(elem_classes=["bg-green-500", "h-1.5", "rounded-full"], scale=85):
gr.Markdown("")
# 投资现金流
with gr.Column():
with gr.Row(elem_classes=["justify-between"]):
gr.Markdown("Investing Cash Flow")
gr.Markdown("-$415M", elem_classes=["font-medium"])
with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]):
with gr.Column(elem_classes=["bg-blue-500", "h-1.5", "rounded-full"], scale=42):
gr.Markdown("")
# 融资现金流
with gr.Column():
with gr.Row(elem_classes=["justify-between"]):
gr.Markdown("Financing Cash Flow")
gr.Markdown("-$212M", elem_classes=["font-medium"])
with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]):
with gr.Column(elem_classes=["bg-red-500", "h-1.5", "rounded-full"], scale=25):
gr.Markdown("")
elif tab_name == "comparative":
with gr.Column(elem_classes=["tab-content"]):
gr.Markdown("Industry Benchmarking", elem_classes=["text-xl", "font-semibold", "text-gray-900", "mb-6"])
# 收入增长对比
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4", "mb-6"]):
gr.Markdown("Revenue Growth - Peer Comparison", elem_classes=["font-medium", "mb-3"])
# 这里将显示对比图表
# 利润率和报告预览网格
with gr.Row(elem_classes=["grid-cols-2", "gap-6"]):
# 利润率表格
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]):
gr.Markdown("Profitability Ratios", elem_classes=["font-medium", "mb-3"])
# 这里将显示利润率表格
# 报告预览
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]):
gr.Markdown("Report Preview", elem_classes=["font-medium", "mb-3"])
# 这里将显示报告预览
def create_chat_panel():
"""创建聊天面板组件"""
# with gr.Column(elem_classes=["chat-panel"]):
# 聊天头部
# with gr.Row(elem_classes=["p-4", "border-b", "border-gray-200", "items-center", "gap-2"]):
# gr.Markdown("🤖", elem_classes=["text-xl", "text-blue-600"])
# gr.Markdown("Financial Assistant", elem_classes=["font-medium"])
# 聊天区域
# 一行代码嵌入!
# chat_component = create_financial_chatbot()
# chat_component.render()
# create_financial_chatbot()
# gr.LoginButton()
# chatbot = gr.Chatbot(
# value=[
# {"role": "assistant", "content": "I'm your financial assistant, how can I help you today?"},
# # {"role": "assistant", "content": "Hello! I can help you analyze financial data. Ask questions like \"Show revenue trends\" or \"Compare profitability ratios\""},
# # {"role": "user", "content": "Show revenue trends for last 4 quarters"},
# # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"},
# # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"},
# # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"},
# # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"}
# ],
# type="messages",
# # elem_classes=["min-h-0", "overflow-y-auto", "space-y-4", "chat-content-box"],
# show_label=False,
# autoscroll=True,
# show_copy_button=True,
# height=400,
# container=False,
# )
# # 输入区域
# with gr.Row(elem_classes=["border-t", "border-gray-200", "gap-2"]):
# msg = gr.Textbox(
# placeholder="Ask a financial question...",
# elem_classes=["flex-1", "border", "border-gray-300", "rounded-lg", "px-4", "py-2", "focus:border-blue-500"],
# show_label=False,
# lines=1,
# submit_btn=True,
# container=False,
# )
# msg.submit(
# chat_bot,
# [msg, chatbot],
# [msg, chatbot],
# queue=True,
# )
# def load_css_files(css_dir, filenames):
# css_content = ""
# for filename in filenames:
# path = os.path.join(css_dir, filename)
# if os.path.exists(path):
# with open(path, "r", encoding="utf-8") as f:
# css_content += f.read() + "\n"
# else:
# print(f"⚠️ CSS file not found: {path}")
# return css_content
def main():
# 获取当前目录
current_dir = os.path.dirname(os.path.abspath(__file__))
css_dir = os.path.join(current_dir, "css")
# ✅ 初始化缓存管理器
report_cache = ReportCacheManager(cache_ttl_seconds=3600, max_cache_size=50)
data_cache = FinancialDataCacheManager(cache_ttl_seconds=1800, max_cache_size=100)
# def load_css_files(css_dir, filenames):
# """读取多个 CSS 文件并合并为一个字符串"""
# css_content = ""
# for filename in filenames:
# path = os.path.join(css_dir, filename)
# if os.path.exists(path):
# with open(path, "r", encoding="utf-8") as f:
# css_content += f.read() + "\n"
# else:
# print(f"Warning: CSS file not found: {path}")
# return css_content
# 设置CSS路径
css_paths = [
os.path.join(css_dir, "main.css"),
os.path.join(css_dir, "components.css"),
os.path.join(css_dir, "layout.css")
]
# css_dir = "path/to/your/css/folder" # 替换为你的实际路径
# 自动定位 css 文件夹(与 app.py 同级)
# BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# CSS_DIR = os.path.join(BASE_DIR, "css")
# css_files = ["main.css", "components.css", "layout.css"]
# combined_css = load_css_files(CSS_DIR, css_files)
# print(combined_css)
# 获取默认选中的公司(第一个)
default_company = my_companies[0]['company_name'] if my_companies else ""
with gr.Blocks(
title="Financial Analysis Dashboard",
css_paths=css_paths,
css=custom_css,
# css=combined_css
) as demo:
# 添加处理公司点击事件的路由
# 创建一个状态组件来跟踪选中的公司
selected_company_state = gr.State(default_company)
with gr.Column(elem_classes=["container", "container-h"]):
# 头部
create_header()
# 创建主布局
with gr.Row(elem_classes=["main-content-box"]):
# 左侧边栏
with gr.Column(scale=1, min_width=350):
# 获取company_list组件的引用
company_list_component, report_section_component, report_display_component, news_display_component = create_sidebar()
# 主内容区域
with gr.Column(scale=9):
# ✅ 指标仪表板置顶显示
create_metrics_dashboard()
with gr.Row(elem_classes=["main-content-box"]):
with gr.Column(scale=8):
# Tab内容
with gr.Tabs():
with gr.TabItem("Investment Suggestion", elem_classes=["tab-item"]):
# 创建一个用于显示公司名称的组件
# company_display = gr.Markdown("# Please select a company")
# 创建一个占位符用于显示tab内容
tab_content = gr.Markdown(elem_classes=["invest-suggest-md-box"])
# 当选中的公司改变时,更新显示
# selected_company_state.change(
# fn=lambda company: f"# Investment Suggestions for {company}" if company else "# Please select a company",
# inputs=[selected_company_state],
# outputs=[company_display]
# )
# 当选中的公司改变时,重新加载tab内容
def update_tab_content(company):
if company:
# ✅ 使用缓存管理器生成投资建议
def generate_suggestion():
stock_code = get_stock_code_by_company_name(company)
return query_company_advanced(stock_code, "suggestion")
# 使用生成器yield缓存管理器的输出
for result in report_cache.get_or_create_report(company, "suggestion", generate_suggestion):
yield result
else:
yield "<div style=\"padding: 20px; text-align: center; color: #666;\">Please select a company</div>"
selected_company_state.change(
fn=update_tab_content,
inputs=[selected_company_state],
outputs=[tab_content],
)
with gr.TabItem("Analysis Report", elem_classes=["tab-item"]):
# 创建一个用于显示公司名称的组件
# analysis_company_display = gr.Markdown("# Please select a company")
# 创建一个占位符用于显示tab内容
analysis_tab_content = gr.Markdown(elem_classes=["analysis-report-md-box"])
# 当选中的公司改变时,更新显示
# selected_company_state.change(
# fn=lambda company: f"# Analysis Report for {company}" if company else "# Please select a company",
# inputs=[selected_company_state],
# outputs=[analysis_company_display]
# )
# 当选中的公司改变时,重新加载tab内容
def update_analysis_tab_content(company):
if company:
# ✅ 使用缓存管理器生成分析报告
def generate_report():
stock_code = get_stock_code_by_company_name(company)
return query_company_advanced(stock_code, "report")
# 使用生成器yield缓存管理器的输出
for result in report_cache.get_or_create_report(company, "report", generate_report):
yield result
else:
yield "<div style=\"padding: 20px; text-align: center; color: #666;\">Please select a company</div>"
selected_company_state.change(
fn=update_analysis_tab_content,
inputs=[selected_company_state],
outputs=[analysis_tab_content]
)
# with gr.TabItem("Comparison", elem_classes=["tab-item"]):
# create_tab_content("comparison")
with gr.Column(scale=2, min_width=400):
# 聊天面板
# ✅ 使用 chatbot.chat_main.respond (MCP_Financial_Report智能体)
# Investment Suggestion和Analysis Report继续使用EasyFinancialAgent
gr.ChatInterface(
respond,
title="Easy Financial AI Assistant",
additional_inputs=[
gr.State(value=""), # CRITICAL: Store session URL across turns (hidden from UI)
gr.State(value={}) # CRITICAL: Store agent context across turns (hidden from UI)
],
additional_inputs_accordion=gr.Accordion(label="Settings", open=False, visible=False), # Hide the accordion completely
)
# 在页面加载时设置默认选中的公司并加载数据
def load_default_company():
# 获取公司列表选项
try:
companies_data = my_companies
if isinstance(companies_data, list) and len(companies_data) > 0:
choices = [str(item.get('company_name', 'Unknown')) for item in companies_data]
elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
choices = []
except:
choices = []
if default_company:
# ✅ 真正调用缓存管理器生成数据,而不是只返回loading HTML
# 为Investment Suggestion生成数据
def generate_suggestion():
stock_code = get_stock_code_by_company_name(default_company)
return query_company_advanced(stock_code, "suggestion")
# 为Analysis Report生成数据
def generate_report():
stock_code = get_stock_code_by_company_name(default_company)
return query_company_advanced(stock_code, "report")
# 使用生成器逐步yield结果
suggestion_gen = report_cache.get_or_create_report(default_company, "suggestion", generate_suggestion)
report_gen = report_cache.get_or_create_report(default_company, "report", generate_report)
# 获取第一个结果(可能是loading或者缓存)
suggestion_result = next(suggestion_gen, "<div style='padding: 20px; text-align: center; color: #666;'>Loading...</div>")
report_result = next(report_gen, "<div style='padding: 20px; text-align: center; color: #666;'>Loading...</div>")
# ✅ Financial Metrics不在这里加载,而是通过selected_company_state.change事件触发
# 这样避免重复加载(demo.load设置状态 → .change事件触发)
return (
default_company,
gr.update(choices=choices, value=default_company),
suggestion_result,
report_result
)
return (
"",
gr.update(choices=choices),
"<div style='padding: 20px; text-align: center; color: #666;'>Please select a company</div>",
"<div style='padding: 20px; text-align: center; color: #666;'>Please select a company</div>"
)
demo.load(
fn=load_default_company,
inputs=[],
outputs=[
selected_company_state,
company_list_component,
tab_content,
analysis_tab_content
# ✅ Financial Metrics组件不在这里输出,由selected_company_state.change事件触发更新
],
concurrency_limit=None,
)
# 绑定公司选择事件到状态更新
# 注意:这里需要确保create_sidebar中没有重复绑定相同的事件
company_list_component.change(
fn=lambda x: x, # 直接返回选中的公司名称
inputs=[company_list_component],
outputs=[selected_company_state],
concurrency_limit=None
)
# 绑定公司选择事件到指标仪表板更新
def update_metrics_dashboard_wrapper(company_name):
if company_name:
# ✅ 使用缓存管理器加载财务数据
def load_financial_data():
return update_metrics_dashboard(company_name)
# 使用缓存管理器获取数据
try:
result = data_cache.get_or_load_data(company_name, "metrics", load_financial_data)
stock_card_html, financial_metrics_html, income_table_html = result
yield stock_card_html, financial_metrics_html, income_table_html
except Exception as e:
error_html = f'''
<div style="padding: 20px; text-align: center; color: #666;">
<p>Error loading financial data: {str(e)}</p>
<p>Please try again later.</p>
</div>
'''
yield error_html, error_html, error_html
else:
# 如果没有选择公司,返回空内容
empty_html = "<div style=\"padding: 20px; text-align: center; color: #666;\">Please select a company</div>"
yield empty_html, empty_html, empty_html
selected_company_state.change(
fn=update_metrics_dashboard_wrapper,
inputs=[selected_company_state],
outputs=list(metrics_dashboard_components),
concurrency_limit=None
)
return demo
if __name__ == "__main__":
demo = main()
demo.launch(share=True)