import json import requests BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" MODEL = "qwen3-max" TOKEN = "sk-ef26097310ec45c184e8d84b31ea9356" def call_llm(prompt: str): """模拟调用大模型的函数(你可以替换为真实请求)""" import requests headers = { "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json" } data = { "model": MODEL, "messages": [{"role": "user", "content": prompt}], "stream": True # 启用流式响应 } response = requests.post(f"{BASE_URL}/chat/completions", headers=headers, json=data, stream=True) for line in response.iter_lines(): if line: yield line.decode('utf-8') def call_llm_non_streaming(prompt: str): """调用大模型的函数(非流式)""" import requests headers = { "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json" } data = { "model": MODEL, "messages": [{"role": "user", "content": prompt}], "stream": False # 禁用流式响应 } response = requests.post(f"{BASE_URL}/chat/completions", headers=headers, json=data) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: return "Error: Failed to get response from LLM" def get_invest_suggest( user_input: str # code: str, # fm: str, # news: str, # stock: str ): if not user_input.strip(): return ['你没有输入任何内容'] # 对应股票代码${code}, # 结合财报数据${fm}、 # 最新新闻${news}、 # 当前股票信息${stock}, prompt = f""" 你是一个专业的投资建议助手,你会对公司${user_input}, 给出以下投资建议。 投资建议分析: - 是否建议投资 - 买入卖出价格 - 止损止盈点 - 持有时间(短期1月内、长期1月以上) 要求用语专业、言简意赅,给出具体的建议。: """ # 获取非流式响应 response = call_llm_non_streaming(prompt) # print(f"投资建议:{response}{user_input}") return response def get_analysis_report(user_input: str): if not user_input.strip(): return ['你没有输入任何内容'] prompt = f""" # 请给出针对以下关键词的分析报告,条理清晰,要纯英文版的: # {user_input} # """ # 获取非流式响应 response = call_llm_non_streaming(prompt) # print(f"投资建议:{response}{user_input}") return response def search_company(user_input: str): if not user_input.strip(): return ['你没有输入任何内容'] # 获取非流式响应 response = chat_with_bailian_non_streaming(user_input, "6f24ece2689c4716b53b78aa7512f19f") print(f"查询结果:{response}{user_input}") try: parsed_data = json.loads(response) # 检查parsed_data是否为列表 if isinstance(parsed_data, list): # 转换为Radio组件需要的choices格式 choices = [(f"{item['NAME']} ({item['CODE']})", f"{item['NAME']} ({item['CODE']})") for item in parsed_data] else: # 如果不是列表,可能是错误信息或其他格式 choices = ["未知错误"] return choices except json.JSONDecodeError: return ["未知错误"] def search_news(user_input: str): if not user_input.strip(): return [] # 获取非流式响应 response = chat_with_bailian_non_streaming(user_input, "f7be17d71e704607b6558cff62c94954") print(f"查询结果:{response}{user_input}") try: parsed_data = json.loads(response) # 检查parsed_data是否为列表 if isinstance(parsed_data, list): # 转换为Radio组件需要的choices格式 print(parsed_data) return parsed_data # choices = [(f"{item['NAME']} ({item['CODE']})", f"{item['NAME']} ({item['CODE']})") for item in parsed_data] else: # 如果不是列表,可能是错误信息或其他格式 choices = [] return choices except json.JSONDecodeError: return [] def get_stock_price_from_bailian(user_input: str): if not user_input.strip(): return {} # 获取非流式响应 response = chat_with_bailian_non_streaming(user_input, "d2b919f9b32e4fa28a75234cbb78a787") print(f"查询结果:{response}{user_input}") try: parsed_data = json.loads(response) return parsed_data if isinstance(parsed_data, dict) else {} except json.JSONDecodeError: return {} # def search_company(user_input: str): # if not user_input.strip(): # return [] # 返回空列表,而不是 yield # result_json = [ # {"NAME": "公司名", "CODE": "股票代码"} # ] # # prompt = f""" # # # 按照关键词搜索公司和其股票代码 # # 严格要求: # # - 1. 按照我给的关键词“{user_input}”进行搜索,查询跟这个关键词“{user_input}”相关的公司的名称和股票代码; # # - 2. 查询结果以“{json.dumps(result_json)}”的格式输出给我,不要掺杂其他内容,就纯“{json.dumps(result_json)}”格式的内容。 # # - 3. 数组内可以是很多条,只要是涉及到这个关键词“{user_input}”的公司都行。 # # - 4. 有公司名,没有股票代码 # # - 4. 如果搜不到相关的公司就返回空数组,不准直接返回“{json.dumps(result_json)}”。 # # 请严格遵守 # # """ # format = { "NAME": "公司名字", "CODE": "股票代码" } # prompt = f""" # # 角色 # 你是一位专业的股票信息提取助手,专门从用户的输入信息中提取出可能想选择的公司的股票名称和股票代码。对于美股,使用英文名称;对于港股和A股,使用中文名称。如果无法从用户输入中提取到有效的公司信息,则返回“无效输入,请输入公司”。 # ## 技能 # ### 技能 1: 提取公司股票名称 # - **任务**:从用户的输入信息中准确提取一个或多个疑似公司名称及股票代码。 # - 对于美股,使用英文名称+股票代码。 # - 对于港股和A股,使用中文名称+股票代码。 # - 如果有多个疑似的公司名称,则以 `{format}` 的JSON格式返回。 # ### 技能 2: 处理无效输入 # - **任务**:如果用户提供的输入信息中没有明确的公司名称,则返回“无效输入,请输入公司”。 # ## 限制 # - 仅处理与公司股票名称相关的输入信息。 # - 返回格式必须严格遵循 `{ format }` 或单个公司名称的形式。 # - 不提供任何其他类型的信息或建议,只专注于提取公司名称。 # - 如果无法从输入中提取到有效的公司信息,必须返回“无效输入,请输入公司”。 # - 未上市的公司不要返回给我 # - 没有股票代码的公司不要返回给我 # ## 示例 # - 输入: "我想买阿里巴巴的股票" # - 输出: # [ # { format }, # { format } # ] # - 输入: "最近股市行情怎么样?" # - 输出: Invalid input, please enter a company # 通过这些技能和限制,确保你能高效、准确地完成任务。 # 下面,来执行你的任务吧,用户用于搜索的关键词是:{user_input} # """ # print(f"搜索关键词:{prompt}") # bot_response = "" # for chunk in call_llm(prompt): # 假设 call_llm 是同步生成器 # if not chunk or chunk == "[DONE]": # continue # if chunk.startswith("data: "): # chunk = chunk[6:] # try: # response_data = json.loads(chunk) # delta = response_data.get("choices", [{}])[0].get("delta", {}) # content = delta.get("content", "") # if content: # bot_response += content # except json.JSONDecodeError: # continue # # 尝试解析最终的 bot_response 为 JSON # print(f"查询结果:{bot_response}") # try: # companies = json.loads(bot_response) # # 转换为 ["Alibaba (BABA)", "阿里巴巴-W (09988)"] 格式 # choices = [f"{item['NAME']} ({item['CODE']})" for item in companies] # # if not choices: # # return ["No results found"] # return choices # except (json.JSONDecodeError, KeyError, TypeError): # # 解析失败,返回空或默认 # # return ["No results found"] # return [] def chat_with_bailian_non_streaming(message, app_id): # APP_ID = "6f24ece2689c4716b53b78aa7512f19f" API_KEY = "sk-9bac83f8dccf4978a6bb1f8330a5404f" # 使用自定义应用的API端点 API_URL = f"https://dashscope.aliyuncs.com/api/v1/apps/{app_id}/completion" """ 与百炼自定义应用对话的函数(非流式) """ # 构建请求头 headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" # 不启用SSE以获得完整响应 } # 构建历史对话记录 messages = [] # 添加当前问题 messages.append({"role": "user", "content": message}) # 构建请求体 - 根据自定义应用的API格式 data = { "input": { "messages": messages }, "parameters": { "max_tokens": 2048, "temperature": 0.8, "top_p": 0.9 } } try: # 发送POST请求到百炼自定义应用API(非流式) response = requests.post(API_URL, headers=headers, data=json.dumps(data)) # 检查响应状态码 if response.status_code == 200: result = response.json() if "output" in result and "text" in result["output"]: return result["output"]["text"] else: return f"API响应格式不正确: {str(result)}" else: return f"API调用失败,状态码:{response.status_code},响应:{response.text}" except requests.RequestException as e: return f"网络请求错误:{str(e)}" except Exception as e: return f"发生未知错误:{str(e)}" def chat_bot(user_input: str, history: list): if not user_input.strip(): yield "", history return # 添加用户消息到历史 history.append({"role": "user", "content": user_input}) # 调用AI模型进行总结 bot_response = "" history.append({"role": "assistant", "content": bot_response}) for chunk in call_llm(user_input): if not chunk or chunk == "[DONE]": continue if chunk.startswith("data: "): chunk = chunk[6:] try: response_data = json.loads(chunk) delta = response_data.get("choices", [{}])[0].get("delta", {}) content = delta.get("content", "") if content: bot_response += content # 更新累积历史记录中的最后一条消息,实现流式显示 history[-1]["content"] = bot_response yield "", history except json.JSONDecodeError: continue # 忽略无效 JSON # # 最终完整历史 # history.append({"role": "assistant", "content": bot_response}) # yield "", history