import json import requests import subprocess import os from typing import Dict, Any, Generator, Optional import time class MCP_Client: """MCP客户端,支持本地命令行和HTTP连接方式""" def __init__(self, config: Dict[str, Any]): """ 初始化MCP客户端 Args: config: MCP服务器配置 """ self.config = config self.server_name = list(config["mcpServers"].keys())[0] self.server_config = config["mcpServers"][self.server_name] # 根据配置类型初始化不同的连接方式 if "command" in self.server_config: # 本地命令行方式 self.connection_type = "local" self.process: Optional[subprocess.Popen] = None else: # HTTP方式 self.connection_type = "http" self.url = self.server_config.get("url") self.headers = self.server_config.get("headers", {}) # 初始化状态 self.initialized = False def initialize(self) -> bool: """ 初始化MCP会话 Returns: bool: 初始化是否成功 """ if self.initialized: return True try: if self.connection_type == "local": # 启动本地MCP进程 command = self.server_config["command"] args = self.server_config.get("args", []) env = self.server_config.get("env", {}) # 合并环境变量 process_env = os.environ.copy() process_env.update(env) self.process = subprocess.Popen( [command] + args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=process_env, text=True, bufsize=1 ) # 发送初始化请求到子进程 init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-06-18", "capabilities": { "experimental": {}, "sampling": {}, "elicitations": {}, "roots": {} }, "clientInfo": { "name": "mcp-client", "version": "1.0.0" } } } if self.process.stdin is not None: self.process.stdin.write(json.dumps(init_request) + "\n") self.process.stdin.flush() # 读取响应 if self.process.stdout is not None: response_line = self.process.stdout.readline() response_data = json.loads(response_line.strip()) else: raise Exception("无法读取进程输出") if "result" in response_data: # 发送初始化完成通知 initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized" } if self.process.stdin is not None: self.process.stdin.write(json.dumps(initialized_notification) + "\n") self.process.stdin.flush() self.initialized = True return True else: # HTTP方式初始化 init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-06-18", "capabilities": { "experimental": {}, "sampling": {}, "elicitations": {}, "roots": {} }, "clientInfo": { "name": "mcp-http-client", "version": "1.0.0" } } } init_response = requests.post( self.url, json=init_request, headers={**self.headers, "Content-Type": "application/json"}, timeout=10 ) if init_response.status_code == 200: response_data = init_response.json() if "result" in response_data: # 发送初始化完成通知 initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized" } requests.post( self.url, json=initialized_notification, headers={**self.headers, "Content-Type": "application/json"} ) self.initialized = True return True print(f"MCP初始化失败") return False except Exception as e: print(f"MCP初始化错误: {e}") return False def call_tool(self, tool_name: str, arguments: Dict[str, Any], timeout: int = 90) -> Dict[str, Any]: """ 调用MCP工具 Args: tool_name: 工具名称 arguments: 工具参数 timeout: 超时时间(秒) Returns: Dict[str, Any]: 工具执行结果 """ # 确保已初始化 if not self.initialized: if not self.initialize(): raise Exception("MCP会话初始化失败") if self.connection_type == "local": # 本地命令行方式调用工具 tool_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": tool_name, "arguments": arguments } } if self.process is not None and self.process.stdin is not None: self.process.stdin.write(json.dumps(tool_request) + "\n") self.process.stdin.flush() # 读取响应 if self.process is not None and self.process.stdout is not None: response_line = self.process.stdout.readline() response_data = json.loads(response_line.strip()) else: raise Exception("无法读取进程输出") # 检查是否有错误 if "error" in response_data: error = response_data["error"] raise Exception(f"MCP错误 {error.get('code', 'unknown')}: {error.get('message', 'Unknown error')}") # 返回结果 return response_data.get("result", {}) else: # HTTP方式调用工具 tool_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": tool_name, "arguments": arguments } } tool_response = requests.post( self.url, json=tool_request, headers={**self.headers, "Content-Type": "application/json"}, timeout=timeout ) if tool_response.status_code != 200: raise Exception(f"工具调用请求失败: HTTP {tool_response.status_code}") response_data = tool_response.json() # 检查是否有错误 if "error" in response_data: error = response_data["error"] raise Exception(f"MCP错误 {error.get('code', 'unknown')}: {error.get('message', 'Unknown error')}") # 返回结果 return response_data.get("result", {}) def close(self) -> None: """关闭MCP客户端连接""" if self.connection_type == "local" and self.process: self.process.terminate() self.process.wait() # 对于HTTP客户端,不需要特殊清理 pass # 使用示例 if __name__ == "__main__": # Alpha Vantage MCP配置(本地命令行方式) mcp_config = { "mcpServers": { "alpha-vantage": { "command": "npx", "args": ["alpha-ventage-mcp"], "env": { "ALPHA_VANTAGE_API_KEY": "97Q9TT7I6J9ZOLDS" } } } } # 创建MCP客户端 client = MCP_Client(mcp_config) try: # 初始化MCP会话 print("正在初始化MCP会话...") if client.initialize(): print("MCP会话初始化成功") else: print("MCP会话初始化失败") exit(1) # 示例:调用工具(请根据实际情况替换工具名称和参数) print("MCP客户端已准备就绪,可以调用工具") except Exception as e: print(f"发生错误: {e}") finally: # 关闭连接 client.close() print("MCP客户端连接已关闭")