Spaces:
Sleeping
Sleeping
| """ | |
| Autonomous AI Agent with MCP Tool Calling using HuggingFace Inference Providers | |
| This agent uses HuggingFace's Inference Providers API with native tool calling | |
| support to autonomously decide which MCP tools to call. | |
| Benefits: | |
| - Uses HuggingFace unified API (single HF token for all providers) | |
| - Native tool calling support (OpenAI-compatible API) | |
| - Multiple providers: Nebius, Together, Sambanova, etc. | |
| - Models like Qwen2.5-72B-Instruct with strong tool calling | |
| - Free tier available with HuggingFace account | |
| """ | |
| import os | |
| import json | |
| import uuid | |
| import logging | |
| import asyncio | |
| from typing import List, Dict, Any, AsyncGenerator | |
| from mcp.tools.definitions import MCP_TOOLS, list_all_tools | |
| from mcp.registry import MCPRegistry | |
| logger = logging.getLogger(__name__) | |
| # Free models available via HuggingFace Serverless Inference API | |
| # These don't require paid provider credits | |
| FREE_MODELS = [ | |
| "mistralai/Mistral-7B-Instruct-v0.3", # Fast, good quality | |
| "microsoft/Phi-3-mini-4k-instruct", # Small, fast | |
| "HuggingFaceH4/zephyr-7b-beta", # Good for chat | |
| "meta-llama/Llama-3.2-3B-Instruct", # Meta's small model | |
| "Qwen/Qwen2.5-3B-Instruct", # Qwen small | |
| ] | |
| # Paid provider models (require credits) | |
| QWEN3_MODELS = [ | |
| "Qwen/Qwen3-32B", | |
| "Qwen/Qwen3-8B", | |
| "Qwen/Qwen3-4B", | |
| ] | |
| # HuggingFace Inference Providers | |
| HF_PROVIDERS = { | |
| "nscale": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-32B"}, # nscale provider | |
| "nebius": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-32B"}, | |
| "together": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-32B"}, | |
| "sambanova": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-8B"}, | |
| "fireworks-ai": {"models": QWEN3_MODELS, "default": "Qwen/Qwen3-8B"}, | |
| "cerebras": {"models": ["Qwen/Qwen3-32B"], "default": "Qwen/Qwen3-32B"}, | |
| } | |
| # Default to FREE serverless API (no provider = serverless) | |
| DEFAULT_PROVIDER = "hf-inference" # Special value for free serverless | |
| DEFAULT_MODEL = "mistralai/Mistral-7B-Instruct-v0.3" | |
| class AutonomousMCPAgentHF: | |
| """ | |
| AI Agent that autonomously uses MCP servers as tools using HuggingFace Inference Providers. | |
| Uses native tool calling (OpenAI-compatible) for reliable tool execution. | |
| HuggingFace routes requests to inference providers like Nebius, Together, etc. | |
| """ | |
| def __init__( | |
| self, | |
| mcp_registry: MCPRegistry, | |
| hf_token: str = None, | |
| provider: str = None, | |
| model: str = None | |
| ): | |
| """ | |
| Initialize the autonomous agent with HuggingFace Inference Providers | |
| Args: | |
| mcp_registry: MCP registry with all servers | |
| hf_token: HuggingFace token (get at huggingface.co/settings/tokens) | |
| provider: Inference provider (nebius, together, sambanova, etc.) | |
| model: Model to use (default: Qwen/Qwen2.5-72B-Instruct) | |
| """ | |
| self.mcp_registry = mcp_registry | |
| self.hf_token = hf_token or os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN") | |
| self.model = model or os.getenv("HF_MODEL") or DEFAULT_MODEL | |
| # Use provider in this order: passed param > env var > auto-detect | |
| if provider: | |
| self.provider = provider | |
| elif os.getenv("HF_PROVIDER"): | |
| self.provider = os.getenv("HF_PROVIDER") | |
| elif self.model in QWEN3_MODELS or self.model.startswith("Qwen/Qwen3"): | |
| # Qwen3 models need a provider (use nscale by default) | |
| self.provider = "nscale" | |
| else: | |
| self.provider = DEFAULT_PROVIDER | |
| if not self.hf_token: | |
| raise ValueError( | |
| "HF_TOKEN is required!\n" | |
| "Get a token at: https://huggingface.co/settings/tokens\n" | |
| "Then set: export HF_TOKEN=hf_your_token_here" | |
| ) | |
| # Initialize HuggingFace InferenceClient | |
| try: | |
| from huggingface_hub import InferenceClient | |
| # For serverless API (hf-inference), don't pass provider | |
| if self.provider == "hf-inference": | |
| self.client = InferenceClient(token=self.hf_token) | |
| else: | |
| self.client = InferenceClient( | |
| provider=self.provider, | |
| token=self.hf_token | |
| ) | |
| logger.info(f"HuggingFace InferenceClient initialized") | |
| logger.info(f" Provider: {self.provider}") | |
| logger.info(f" Model: {self.model}") | |
| except ImportError: | |
| raise ImportError( | |
| "huggingface_hub package not installed or outdated!\n" | |
| "Install/upgrade with: pip install --upgrade huggingface_hub" | |
| ) | |
| # Create tool definitions in OpenAI/HF format | |
| self.tools = self._create_tool_definitions() | |
| logger.info(f"Autonomous MCP Agent initialized with HuggingFace ({self.provider})") | |
| logger.info(f"Available tools: {len(self.tools)}") | |
| def _create_tool_definitions(self) -> List[Dict[str, Any]]: | |
| """Convert MCP tool definitions to OpenAI/HuggingFace function calling format""" | |
| tools = [] | |
| for mcp_tool in MCP_TOOLS: | |
| tool = { | |
| "type": "function", | |
| "function": { | |
| "name": mcp_tool["name"], | |
| "description": mcp_tool["description"], | |
| "parameters": mcp_tool["input_schema"] | |
| } | |
| } | |
| tools.append(tool) | |
| return tools | |
| async def run( | |
| self, | |
| task: str, | |
| max_iterations: int = 15 | |
| ) -> AsyncGenerator[Dict[str, Any], None]: | |
| """ | |
| Run the agent autonomously on a task using native tool calling. | |
| Args: | |
| task: The task to complete | |
| max_iterations: Maximum tool calls to prevent infinite loops | |
| Yields: | |
| Events showing agent's progress and tool calls | |
| """ | |
| yield { | |
| "type": "agent_start", | |
| "message": f"Autonomous AI Agent (HuggingFace) starting task", | |
| "task": task, | |
| "model": self.model, | |
| "provider": self.provider | |
| } | |
| # System prompt for the agent | |
| system_prompt = """You are an autonomous AI agent for B2B sales automation. | |
| You have access to MCP tools including: | |
| - search_web: Search the web for company information | |
| - find_verified_contacts: Find REAL decision-makers (searches LinkedIn, company websites, directories) | |
| - save_prospect: Save a prospect company to the database | |
| - send_email: Draft outreach emails | |
| CRITICAL RULE: Only save prospects that have verified contacts. No contacts = don't save. | |
| REQUIRED WORKFLOW: | |
| 1. search_web to find potential prospect companies | |
| 2. find_verified_contacts FIRST to check if contacts exist | |
| 3. IF contacts found (count > 0): save_prospect, then send_email | |
| 4. IF no contacts found (count = 0): SKIP this company, try the next one | |
| TOOL CALL FORMAT - output valid JSON: | |
| Step 1 - Find contacts FIRST: | |
| {"company_name": "Acme Corp", "company_domain": "acme.com", "target_titles": ["CEO", "Founder", "VP Sales", "CTO"], "max_contacts": 3} | |
| Step 2 - ONLY if contacts found, save prospect: | |
| {"prospect_id": "prospect_1", "company_id": "company_1", "company_name": "Acme Corp", "company_domain": "acme.com", "fit_score": 85} | |
| The find_verified_contacts tool searches: | |
| - Company website (team/about pages) | |
| - LinkedIn profiles | |
| - Crunchbase, ZoomInfo, directories | |
| - Press releases and news | |
| - Social media profiles | |
| IMPORTANT: | |
| - A prospect without contacts is USELESS - don't save it | |
| - NEVER invent contact names or emails | |
| - Keep searching until you find prospects WITH verified contacts | |
| After completing, summarize: | |
| - Prospects saved (with contacts) | |
| - Companies skipped (no contacts)""" | |
| # Initialize conversation | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": task} | |
| ] | |
| iteration = 0 | |
| while iteration < max_iterations: | |
| iteration += 1 | |
| yield { | |
| "type": "iteration_start", | |
| "iteration": iteration, | |
| "message": f"Iteration {iteration}: AI reasoning..." | |
| } | |
| try: | |
| # Call HuggingFace Inference API with tools | |
| logger.info(f"Calling HuggingFace API (iteration {iteration})...") | |
| logger.info(f" Provider: {self.provider}, Model: {self.model}") | |
| # Run synchronous API call in executor | |
| response = await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| self._call_inference_api, | |
| messages | |
| ) | |
| # Handle response | |
| if response is None: | |
| yield { | |
| "type": "agent_error", | |
| "error": "Empty response from API", | |
| "message": "API returned empty response" | |
| } | |
| break | |
| # Get the assistant message | |
| assistant_message = response.choices[0].message | |
| # Check if AI wants to call tools | |
| if hasattr(assistant_message, 'tool_calls') and assistant_message.tool_calls: | |
| # Process each tool call | |
| tool_results = [] | |
| for tool_call in assistant_message.tool_calls: | |
| tool_name = tool_call.function.name | |
| try: | |
| tool_input = json.loads(tool_call.function.arguments) | |
| except json.JSONDecodeError: | |
| tool_input = {} | |
| yield { | |
| "type": "tool_call", | |
| "tool": tool_name, | |
| "input": tool_input, | |
| "message": f"Action: {tool_name}" | |
| } | |
| # Execute the MCP tool | |
| try: | |
| result = await self._execute_mcp_tool(tool_name, tool_input) | |
| yield { | |
| "type": "tool_result", | |
| "tool": tool_name, | |
| "result": result, | |
| "message": f"Tool {tool_name} completed" | |
| } | |
| tool_results.append({ | |
| "tool_call_id": tool_call.id, | |
| "role": "tool", | |
| "content": json.dumps(result, default=str) | |
| }) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Tool execution failed: {tool_name} - {error_msg}") | |
| yield { | |
| "type": "tool_error", | |
| "tool": tool_name, | |
| "error": error_msg, | |
| "message": f"Tool {tool_name} failed: {error_msg}" | |
| } | |
| tool_results.append({ | |
| "tool_call_id": tool_call.id, | |
| "role": "tool", | |
| "content": json.dumps({"error": error_msg}) | |
| }) | |
| # Add assistant message and tool results to conversation | |
| messages.append({ | |
| "role": "assistant", | |
| "content": assistant_message.content or "", | |
| "tool_calls": [ | |
| { | |
| "id": tc.id, | |
| "type": "function", | |
| "function": { | |
| "name": tc.function.name, | |
| "arguments": tc.function.arguments | |
| } | |
| } | |
| for tc in assistant_message.tool_calls | |
| ] | |
| }) | |
| messages.extend(tool_results) | |
| else: | |
| # No tool calls - AI is done or providing response | |
| final_content = assistant_message.content or "" | |
| raw_content = getattr(assistant_message, 'raw_content', final_content) | |
| # Log for debugging | |
| logger.info(f"Iteration {iteration}: No tool calls") | |
| logger.info(f" Raw content length: {len(raw_content)}") | |
| logger.info(f" Stripped content length: {len(final_content)}") | |
| if raw_content and not final_content: | |
| logger.info(f" Raw content preview: {raw_content[:200]}...") | |
| # Always yield thought event if we have ANY content (for tracking) | |
| if final_content: | |
| yield { | |
| "type": "thought", | |
| "thought": final_content, | |
| "message": f"AI Response: {final_content[:100]}..." if len(final_content) > 100 else f"AI Response: {final_content}" | |
| } | |
| elif raw_content: | |
| # Content was stripped but raw exists - yield a minimal thought | |
| yield { | |
| "type": "thought", | |
| "thought": f"[Processing: {len(raw_content)} chars of reasoning]", | |
| "message": "AI is reasoning..." | |
| } | |
| # Check if this looks like a final answer (after at least one iteration) | |
| if iteration > 1: | |
| # Ensure we have some content for final answer | |
| if not final_content and raw_content: | |
| # Try to extract something useful from raw thinking | |
| import re | |
| think_match = re.search(r'<think>(.*?)</think>', raw_content, flags=re.DOTALL) | |
| if think_match: | |
| think_text = think_match.group(1).strip() | |
| # Get last meaningful portion | |
| sentences = [s.strip() for s in think_text.split('.') if len(s.strip()) > 20] | |
| if sentences: | |
| final_content = '. '.join(sentences[-5:]) + '.' | |
| logger.info(f"Extracted final answer from thinking: {final_content[:100]}...") | |
| yield { | |
| "type": "agent_complete", | |
| "message": "Task complete!", | |
| "final_answer": final_content, | |
| "iterations": iteration | |
| } | |
| break | |
| # Add response to messages and continue | |
| messages.append({ | |
| "role": "assistant", | |
| "content": final_content or raw_content[:500] if raw_content else "" | |
| }) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"HuggingFace API error: {error_msg}", exc_info=True) | |
| # Check for common errors | |
| if "401" in error_msg or "unauthorized" in error_msg.lower(): | |
| yield { | |
| "type": "agent_error", | |
| "error": "Invalid HF_TOKEN", | |
| "message": "Authentication failed. Please check your HF_TOKEN." | |
| } | |
| elif "rate" in error_msg.lower() or "limit" in error_msg.lower(): | |
| yield { | |
| "type": "agent_error", | |
| "error": "Rate limit reached", | |
| "message": "Rate limit reached. Try again later or upgrade to HF PRO." | |
| } | |
| else: | |
| yield { | |
| "type": "agent_error", | |
| "error": error_msg, | |
| "message": f"API error: {error_msg}" | |
| } | |
| break | |
| if iteration >= max_iterations: | |
| yield { | |
| "type": "agent_max_iterations", | |
| "message": f"Reached maximum iterations ({max_iterations})", | |
| "iterations": iteration | |
| } | |
| def _call_inference_api(self, messages: List[Dict], retry_count: int = 0) -> Any: | |
| """ | |
| Call HuggingFace Inference API via the new router endpoint. | |
| Uses the configured provider (e.g., nscale for Qwen3-32B). | |
| """ | |
| import requests | |
| headers = { | |
| "Authorization": f"Bearer {self.hf_token}", | |
| "Content-Type": "application/json" | |
| } | |
| last_error = None | |
| # Add provider header if using a specific provider | |
| if self.provider and self.provider != "hf-inference": | |
| headers["X-HF-Provider"] = self.provider | |
| # Use the router endpoint for chat completions | |
| api_url = "https://router.huggingface.co/v1/chat/completions" | |
| # Try the configured model first | |
| try: | |
| logger.info(f"Trying primary model: {self.model} via {self.provider}") | |
| payload = { | |
| "model": self.model, | |
| "messages": messages, | |
| "max_tokens": 2048, | |
| "temperature": 0.7, | |
| "stream": False, | |
| "tools": self.tools, # Include tool definitions! | |
| "tool_choice": "auto" # Let model decide when to use tools | |
| } | |
| response = requests.post(api_url, headers=headers, json=payload, timeout=120) | |
| if response.status_code == 200: | |
| result = response.json() | |
| logger.info(f"Success with {self.model} via {self.provider}") | |
| return self._create_chat_response(result) | |
| elif response.status_code == 402: | |
| logger.warning(f"Payment required for {self.model} via {self.provider}. Falling back...") | |
| last_error = "Payment required - exceeded monthly credits" | |
| elif response.status_code == 404: | |
| logger.warning(f"Model {self.model} not found via {self.provider}. Falling back...") | |
| last_error = f"Model not found via {self.provider}" | |
| else: | |
| logger.warning(f"Model {self.model} returned {response.status_code}: {response.text[:200]}") | |
| last_error = f"HTTP {response.status_code}" | |
| except Exception as e: | |
| last_error = str(e) | |
| logger.warning(f"Primary model failed: {last_error}") | |
| # Fallback models with their providers | |
| fallback_models = [ | |
| ("Qwen/Qwen2.5-72B-Instruct", None), # No provider = serverless | |
| ("meta-llama/Llama-3.1-70B-Instruct", None), | |
| ("mistralai/Mixtral-8x7B-Instruct-v0.1", None), | |
| ("Qwen/Qwen3-32B", "nebius"), # Try nebius as backup | |
| ("Qwen/Qwen3-8B", "together"), # Try together as backup | |
| ] | |
| for model, provider in fallback_models: | |
| try: | |
| logger.info(f"Trying fallback model: {model}" + (f" via {provider}" if provider else "")) | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "max_tokens": 2048, | |
| "temperature": 0.7, | |
| "stream": False, | |
| "tools": self.tools, # Include tool definitions! | |
| "tool_choice": "auto" | |
| } | |
| # Set headers for this fallback | |
| fallback_headers = { | |
| "Authorization": f"Bearer {self.hf_token}", | |
| "Content-Type": "application/json" | |
| } | |
| if provider: | |
| fallback_headers["X-HF-Provider"] = provider | |
| response = requests.post(api_url, headers=fallback_headers, json=payload, timeout=120) | |
| if response.status_code == 200: | |
| result = response.json() | |
| logger.info(f"Success with fallback model: {model}") | |
| return self._create_chat_response(result) | |
| elif response.status_code in [402, 404]: | |
| logger.warning(f"Model {model} returned {response.status_code}, trying next...") | |
| continue | |
| elif response.status_code == 503: | |
| logger.info(f"Model {model} is loading, trying next...") | |
| continue | |
| else: | |
| logger.warning(f"Model {model} returned {response.status_code}") | |
| continue | |
| except Exception as e: | |
| last_error = str(e) | |
| logger.warning(f"Model {model} failed: {str(e)[:100]}") | |
| continue | |
| logger.error(f"All models failed. Last error: {last_error}") | |
| raise Exception(f"All inference attempts failed: {last_error}") | |
| def _strip_thinking_tags(self, text: str) -> str: | |
| """Remove Qwen3's <think>...</think> tags and return the actual response""" | |
| import re | |
| if not text: | |
| return "" | |
| # Remove <think>...</think> blocks (Qwen3 chain-of-thought) | |
| cleaned = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL) | |
| result = cleaned.strip() | |
| # If stripped content is empty but original had thinking, extract a summary | |
| if not result and '<think>' in text: | |
| # Try to extract the last meaningful sentence from thinking as a fallback | |
| think_match = re.search(r'<think>(.*?)</think>', text, flags=re.DOTALL) | |
| if think_match: | |
| think_content = think_match.group(1).strip() | |
| # Get last few sentences as summary (model's conclusion) | |
| sentences = [s.strip() for s in think_content.split('.') if s.strip()] | |
| if sentences: | |
| # Return last 2-3 meaningful sentences as the response | |
| result = '. '.join(sentences[-3:]) + '.' | |
| logger.info(f"Extracted thinking summary: {result[:100]}...") | |
| return result | |
| def _create_chat_response(self, result: dict) -> Any: | |
| """Create a response object from chat completion result""" | |
| strip_thinking = self._strip_thinking_tags | |
| class MockChoice: | |
| def __init__(self, message_data): | |
| self.message = MockMessage(message_data) | |
| class MockMessage: | |
| def __init__(self, data): | |
| # Handle None content properly (API might return {"content": null}) | |
| raw_content = data.get("content") or "" | |
| # Strip Qwen3 thinking tags to get actual response | |
| self.content = strip_thinking(raw_content) | |
| # Store raw content for debugging/fallback | |
| self.raw_content = raw_content | |
| self.tool_calls = self._parse_tool_calls_from_response(data, raw_content) | |
| def _parse_tool_calls_from_response(self, data, raw_content): | |
| """Parse tool calls from API response or from content""" | |
| # Check if API returned tool_calls directly | |
| if "tool_calls" in data and data["tool_calls"]: | |
| return [MockToolCall(tc) for tc in data["tool_calls"]] | |
| # Otherwise try to parse from content (use raw content to find tool calls) | |
| return self._parse_tool_calls_from_text(raw_content) | |
| def _infer_tool_from_params(self, params): | |
| """Infer tool name from parameter keys""" | |
| if not isinstance(params, dict): | |
| return None | |
| keys = set(params.keys()) | |
| # Check for discover_prospects_with_contacts (HIGHEST PRIORITY - all-in-one tool) | |
| if "client_company" in keys and "client_industry" in keys: | |
| return "discover_prospects_with_contacts" | |
| if "client_company" in keys and "target_prospects" in keys: | |
| return "discover_prospects_with_contacts" | |
| # Check for find_verified_contacts patterns (single company) | |
| if "company_name" in keys and "company_domain" in keys and "target_titles" in keys: | |
| return "find_verified_contacts" | |
| if "company_name" in keys and "company_domain" in keys and "max_contacts" in keys: | |
| return "find_verified_contacts" | |
| # Check for save_prospect patterns | |
| if "prospect_id" in keys or ("company_name" in keys and "fit_score" in keys): | |
| return "save_prospect" | |
| # Check for save_company patterns | |
| if "company_id" in keys and ("name" in keys or "domain" in keys) and "prospect_id" not in keys: | |
| return "save_company" | |
| # Check for save_contact patterns (only for contacts returned by find_verified_contacts) | |
| if "contact_id" in keys or ("email" in keys and ("first_name" in keys or "last_name" in keys)): | |
| return "save_contact" | |
| # Check for send_email patterns | |
| if "to" in keys and "subject" in keys and "body" in keys: | |
| return "send_email" | |
| # Check for search patterns | |
| if "query" in keys and len(keys) <= 2: | |
| return "search_web" | |
| # Check for save_fact patterns | |
| if "fact_type" in keys or ("content" in keys and "company_id" in keys): | |
| return "save_fact" | |
| return None | |
| def _parse_tool_calls_from_text(self, text): | |
| """Try to parse tool calls from text response - handles Qwen3 text-based tool descriptions""" | |
| import re | |
| tool_calls = [] | |
| def extract_json_objects(text): | |
| """Extract all JSON objects from text, handling nested braces""" | |
| objects = [] | |
| i = 0 | |
| while i < len(text): | |
| if text[i] == '{': | |
| start = i | |
| depth = 1 | |
| i += 1 | |
| while i < len(text) and depth > 0: | |
| if text[i] == '{': | |
| depth += 1 | |
| elif text[i] == '}': | |
| depth -= 1 | |
| i += 1 | |
| if depth == 0: | |
| try: | |
| obj = json.loads(text[start:i]) | |
| objects.append(obj) | |
| except: | |
| pass | |
| else: | |
| i += 1 | |
| return objects | |
| # IMPORTANT: Search BOTH raw text AND stripped text for JSON objects | |
| # Qwen3 may put tool calls inside <think> tags | |
| all_json_objects = extract_json_objects(text) # Search raw first | |
| # Also search stripped version in case JSON is outside think tags | |
| text_clean = strip_thinking(text) | |
| if text_clean != text: | |
| all_json_objects.extend(extract_json_objects(text_clean)) | |
| logger.info(f"Found {len(all_json_objects)} JSON objects in response") | |
| # Process each JSON object and infer tool | |
| seen_signatures = set() # Avoid duplicates | |
| for obj in all_json_objects: | |
| tool_name = self._infer_tool_from_params(obj) | |
| if tool_name: | |
| # Create a signature to avoid duplicates | |
| sig = f"{tool_name}:{json.dumps(obj, sort_keys=True)}" | |
| if sig not in seen_signatures: | |
| seen_signatures.add(sig) | |
| tool_calls.append(MockToolCallFromText({"tool": tool_name, "parameters": obj})) | |
| logger.info(f"Parsed tool call: {tool_name} with params: {list(obj.keys())}") | |
| # Also check code fence blocks (sometimes JSON is formatted there) | |
| code_blocks = re.findall(r'```(?:json)?\s*(.+?)\s*```', text_clean, re.DOTALL) | |
| for block in code_blocks: | |
| block_objects = extract_json_objects(block) | |
| for obj in block_objects: | |
| tool_name = self._infer_tool_from_params(obj) | |
| if tool_name: | |
| sig = f"{tool_name}:{json.dumps(obj, sort_keys=True)}" | |
| if sig not in seen_signatures: | |
| seen_signatures.add(sig) | |
| tool_calls.append(MockToolCallFromText({"tool": tool_name, "parameters": obj})) | |
| logger.info(f"Parsed tool from code block: {tool_name}") | |
| if tool_calls: | |
| logger.info(f"Total tool calls parsed from text: {len(tool_calls)}") | |
| return tool_calls if tool_calls else None | |
| class MockToolCall: | |
| def __init__(self, data): | |
| self.function = MockFunction(data.get("function", {})) | |
| self.id = data.get("id", f"call_{id(self)}") | |
| class MockToolCallFromText: | |
| def __init__(self, data): | |
| self.function = MockFunctionFromText(data) | |
| self.id = f"call_{id(self)}" | |
| class MockFunction: | |
| def __init__(self, data): | |
| self.name = data.get("name", "") | |
| self.arguments = data.get("arguments", "{}") | |
| class MockFunctionFromText: | |
| def __init__(self, data): | |
| self.name = data.get("tool", data.get("name", "")) | |
| self.arguments = json.dumps(data.get("parameters", data.get("arguments", {}))) | |
| class MockResponse: | |
| def __init__(self, result): | |
| choices_data = result.get("choices", []) | |
| if choices_data: | |
| self.choices = [MockChoice(c.get("message", {})) for c in choices_data] | |
| else: | |
| self.choices = [] | |
| return MockResponse(result) | |
| async def _execute_mcp_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: | |
| """ | |
| Execute an MCP tool by routing to the appropriate MCP server. | |
| This is where we actually call the MCP servers! | |
| """ | |
| # ============ SEARCH MCP SERVER ============ | |
| if tool_name == "search_web": | |
| query = tool_input["query"] | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(query, max_results=max_results) | |
| return { | |
| "results": results[:max_results], | |
| "count": len(results[:max_results]) | |
| } | |
| elif tool_name == "search_news": | |
| query = tool_input["query"] | |
| max_results = tool_input.get("max_results", 5) | |
| results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) | |
| return { | |
| "results": results[:max_results], | |
| "count": len(results[:max_results]) | |
| } | |
| # ============ OPTIMIZED PROSPECT DISCOVERY WITH CONTACTS ============ | |
| elif tool_name == "discover_prospects_with_contacts": | |
| from services.enhanced_contact_finder import EnhancedContactFinder | |
| from urllib.parse import urlparse | |
| client_company = tool_input["client_company"] | |
| client_industry = tool_input["client_industry"] | |
| target_prospects = tool_input.get("target_prospects", 3) | |
| target_titles = tool_input.get("target_titles", ["CEO", "Founder", "VP Sales", "CTO", "Head of Sales"]) | |
| logger.info(f"Discovering {target_prospects} prospects with contacts for {client_company}") | |
| print(f"\n[PROSPECT DISCOVERY] ========================================") | |
| print(f"[PROSPECT DISCOVERY] Finding {target_prospects} prospects WITH verified contacts") | |
| print(f"[PROSPECT DISCOVERY] Client: {client_company}") | |
| print(f"[PROSPECT DISCOVERY] ========================================") | |
| contact_finder = EnhancedContactFinder(mcp_registry=self.mcp_registry) | |
| saved_prospects = [] | |
| all_contacts = [] | |
| skipped_companies = [] | |
| companies_checked = 0 | |
| max_companies_to_check = target_prospects * 8 # Check more companies to find enough with contacts | |
| # Build smart search queries based on what the client company does | |
| # The goal is to find CUSTOMERS for the client, not articles ABOUT the client | |
| client_lower = client_company.lower() | |
| industry_lower = client_industry.lower() | |
| # Determine prospect type based on client business | |
| # E-commerce platforms (Shopify, BigCommerce, etc.) -> retailers, DTC brands | |
| # CRM software -> B2B companies, sales teams | |
| # Marketing tools -> businesses needing marketing | |
| # etc. | |
| search_queries = [] | |
| # Check for e-commerce/retail platform clients | |
| if any(kw in client_lower or kw in industry_lower for kw in ['ecommerce', 'e-commerce', 'shopify', 'online store', 'retail platform', 'shopping cart']): | |
| search_queries = [ | |
| "DTC brands fashion apparel company", | |
| "online boutique store founder CEO", | |
| "independent retail brand ecommerce", | |
| "emerging consumer brands direct to consumer", | |
| "small business online store owner", | |
| "handmade crafts seller business", | |
| "subscription box company founder", | |
| ] | |
| # Check for CRM/Sales software clients | |
| elif any(kw in client_lower or kw in industry_lower for kw in ['crm', 'salesforce', 'sales software', 'customer relationship']): | |
| search_queries = [ | |
| "B2B SaaS company sales team", | |
| "growing startup sales operations", | |
| "enterprise software company VP Sales", | |
| "technology company Head of Sales", | |
| ] | |
| # Check for marketing/advertising clients | |
| elif any(kw in client_lower or kw in industry_lower for kw in ['marketing', 'advertising', 'ads', 'seo', 'content']): | |
| search_queries = [ | |
| "growing startup marketing director", | |
| "ecommerce brand marketing team", | |
| "B2B company CMO marketing", | |
| "technology startup growth marketing", | |
| ] | |
| # Default: find growing companies that might need the client's services | |
| else: | |
| search_queries = [ | |
| f"growing companies {industry_lower} customers list", | |
| f"startups using {industry_lower} solutions", | |
| f"businesses {industry_lower} case study customer", | |
| f"companies similar to {client_company} customers", | |
| "fast growing startups Series A B2B", | |
| "emerging technology companies founder CEO", | |
| "mid-market companies digital transformation", | |
| ] | |
| # Add generic business-finding queries | |
| search_queries.extend([ | |
| "Inc 5000 fastest growing companies", | |
| "emerging brands startup founders", | |
| "venture backed startups series A", | |
| ]) | |
| seen_domains = set() | |
| # Skip domains that are NOT actual company websites | |
| skip_domains = [ | |
| # Social media | |
| 'linkedin.com', 'facebook.com', 'twitter.com', 'instagram.com', 'tiktok.com', | |
| # Reference/directory sites | |
| 'wikipedia.org', 'crunchbase.com', 'zoominfo.com', 'apollo.io', 'yelp.com', | |
| 'glassdoor.com', 'g2.com', 'capterra.com', 'trustpilot.com', 'bbb.org', | |
| # News/media sites | |
| 'forbes.com', 'businessinsider.com', 'techcrunch.com', 'bloomberg.com', | |
| 'cnbc.com', 'reuters.com', 'wsj.com', 'nytimes.com', 'theverge.com', | |
| 'wired.com', 'mashable.com', 'venturebeat.com', 'inc.com', 'entrepreneur.com', | |
| # Blog/article/review sites | |
| 'medium.com', 'hubspot.com', 'blog.', 'wordpress.com', 'blogspot.com', | |
| 'quora.com', 'reddit.com', 'youtube.com', 'vimeo.com', | |
| # Generic/aggregator sites | |
| 'amazon.com', 'ebay.com', 'alibaba.com', 'aliexpress.com', | |
| 'google.com', 'bing.com', 'yahoo.com', 'duckduckgo.com', | |
| # The client company itself (don't prospect yourself!) | |
| client_company.lower().replace(' ', '') + '.com', | |
| ] | |
| # Also skip titles that look like articles, not company names | |
| skip_title_patterns = [ | |
| 'what is', 'how to', 'guide', 'review', 'best ', 'top ', 'vs ', | |
| ' vs ', 'comparison', 'tutorial', 'tips', 'ways to', 'complete', | |
| 'everything you need', 'beginner', 'introduction', 'explained', | |
| '2024', '2025', '2023', '[', ']', 'list of', 'examples' | |
| ] | |
| for query in search_queries: | |
| if len(saved_prospects) >= target_prospects: | |
| break | |
| try: | |
| print(f"\n[PROSPECT DISCOVERY] Searching: {query}") | |
| results = await self.mcp_registry.search.query(query, max_results=10) | |
| for result in results: | |
| if len(saved_prospects) >= target_prospects: | |
| break | |
| if companies_checked >= max_companies_to_check: | |
| break | |
| url = result.get('url', '') | |
| title = result.get('title', '') | |
| # Extract domain from URL | |
| try: | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.replace('www.', '') | |
| if not domain or domain in seen_domains: | |
| continue | |
| seen_domains.add(domain) | |
| except: | |
| continue | |
| # Skip non-company domains | |
| if any(skip in domain.lower() for skip in skip_domains): | |
| print(f"[PROSPECT DISCOVERY] ⏭️ Skipping non-company domain: {domain}") | |
| continue | |
| # Skip titles that look like articles, not companies | |
| title_lower = title.lower() | |
| if any(pattern in title_lower for pattern in skip_title_patterns): | |
| print(f"[PROSPECT DISCOVERY] ⏭️ Skipping article title: {title[:50]}...") | |
| continue | |
| # Extract company name from title - be smarter about it | |
| # Try to get actual company name, not article title | |
| company_name = title.split(' - ')[0].split(' | ')[0].split(':')[0].strip() | |
| # If company name is too long (probably article title), use domain | |
| if len(company_name) > 40 or ' ' in company_name and len(company_name.split()) > 5: | |
| # Extract company name from domain instead | |
| company_name = domain.split('.')[0].replace('-', ' ').title() | |
| if not company_name or len(company_name) < 2: | |
| continue | |
| companies_checked += 1 | |
| print(f"\n[PROSPECT DISCOVERY] Checking ({companies_checked}/{max_companies_to_check}): {company_name} ({domain})") | |
| # Find contacts for this company | |
| try: | |
| contacts = await contact_finder.find_real_contacts( | |
| company_name=company_name, | |
| domain=domain, | |
| target_titles=target_titles, | |
| max_contacts=3 | |
| ) | |
| if contacts and len(contacts) > 0: | |
| # Save prospect | |
| prospect_id = f"prospect_{len(saved_prospects) + 1}" | |
| company_id = domain.replace(".", "_") | |
| prospect_data = { | |
| "id": prospect_id, | |
| "company": { | |
| "id": company_id, | |
| "name": company_name, | |
| "domain": domain | |
| }, | |
| "fit_score": 75, | |
| "status": "new", | |
| "metadata": {"source": "automated_discovery"} | |
| } | |
| await self.mcp_registry.store.save_prospect(prospect_data) | |
| # Save contacts | |
| contact_list = [] | |
| for contact in contacts: | |
| contact_data = { | |
| "id": contact.id, | |
| "name": contact.name, | |
| "email": contact.email, | |
| "title": contact.title, | |
| "company": company_name, | |
| "domain": domain, | |
| "verified": True, | |
| "source": "web_search_and_scraping" | |
| } | |
| contact_list.append(contact_data) | |
| all_contacts.append(contact_data) | |
| await self.mcp_registry.store.save_contact({ | |
| "id": contact.id, | |
| "company_id": company_id, | |
| "email": contact.email, | |
| "first_name": contact.name.split()[0] if contact.name else "", | |
| "last_name": contact.name.split()[-1] if len(contact.name.split()) > 1 else "", | |
| "title": contact.title | |
| }) | |
| saved_prospects.append({ | |
| "prospect_id": prospect_id, | |
| "company_name": company_name, | |
| "domain": domain, | |
| "contacts": contact_list, | |
| "contact_count": len(contact_list) | |
| }) | |
| print(f"[PROSPECT DISCOVERY] ✅ SAVED: {company_name} with {len(contacts)} contacts") | |
| else: | |
| skipped_companies.append({"name": company_name, "domain": domain, "reason": "no_contacts"}) | |
| print(f"[PROSPECT DISCOVERY] ⏭️ SKIPPED: {company_name} (no verified contacts)") | |
| except Exception as e: | |
| logger.debug(f"Error checking {company_name}: {str(e)}") | |
| skipped_companies.append({"name": company_name, "domain": domain, "reason": str(e)}) | |
| continue | |
| except Exception as e: | |
| logger.debug(f"Search error: {str(e)}") | |
| continue | |
| print(f"\n[PROSPECT DISCOVERY] ========================================") | |
| print(f"[PROSPECT DISCOVERY] DISCOVERY COMPLETE") | |
| print(f"[PROSPECT DISCOVERY] ========================================") | |
| print(f"[PROSPECT DISCOVERY] Prospects saved: {len(saved_prospects)}/{target_prospects}") | |
| print(f"[PROSPECT DISCOVERY] Total contacts: {len(all_contacts)}") | |
| print(f"[PROSPECT DISCOVERY] Companies checked: {companies_checked}") | |
| print(f"[PROSPECT DISCOVERY] Companies skipped: {len(skipped_companies)}") | |
| print(f"[PROSPECT DISCOVERY] ========================================\n") | |
| return { | |
| "status": "success" if len(saved_prospects) > 0 else "no_prospects_found", | |
| "prospects": saved_prospects, | |
| "prospects_count": len(saved_prospects), | |
| "contacts_count": len(all_contacts), | |
| "companies_checked": companies_checked, | |
| "companies_skipped": len(skipped_companies), | |
| "target_met": len(saved_prospects) >= target_prospects, | |
| "message": f"Found {len(saved_prospects)} prospects with {len(all_contacts)} verified contacts. Checked {companies_checked} companies, skipped {len(skipped_companies)} (no contacts)." | |
| } | |
| # ============ VERIFIED CONTACT FINDER (Single Company) ============ | |
| elif tool_name == "find_verified_contacts": | |
| from services.enhanced_contact_finder import EnhancedContactFinder | |
| company_name = tool_input["company_name"] | |
| company_domain = tool_input["company_domain"] | |
| target_titles = tool_input.get("target_titles", ["CEO", "Founder", "VP Sales", "CTO", "Head of Sales"]) | |
| max_contacts = tool_input.get("max_contacts", 3) | |
| logger.info(f"Finding verified contacts for {company_name} ({company_domain})") | |
| contact_finder = EnhancedContactFinder(mcp_registry=self.mcp_registry) | |
| try: | |
| contacts = await contact_finder.find_real_contacts( | |
| company_name=company_name, | |
| domain=company_domain, | |
| target_titles=target_titles, | |
| max_contacts=max_contacts | |
| ) | |
| contact_list = [] | |
| for contact in contacts: | |
| contact_data = { | |
| "id": contact.id, | |
| "name": contact.name, | |
| "email": contact.email, | |
| "title": contact.title, | |
| "company": company_name, | |
| "domain": company_domain, | |
| "verified": True, | |
| "source": "web_search_and_scraping" | |
| } | |
| contact_list.append(contact_data) | |
| await self.mcp_registry.store.save_contact({ | |
| "id": contact.id, | |
| "company_id": company_domain.replace(".", "_"), | |
| "email": contact.email, | |
| "first_name": contact.name.split()[0] if contact.name else "", | |
| "last_name": contact.name.split()[-1] if contact.name and len(contact.name.split()) > 1 else "", | |
| "title": contact.title | |
| }) | |
| if contact_list: | |
| return { | |
| "status": "success", | |
| "contacts": contact_list, | |
| "count": len(contact_list), | |
| "message": f"Found {len(contact_list)} verified contacts at {company_name}", | |
| "should_save_prospect": True | |
| } | |
| else: | |
| return { | |
| "status": "no_contacts_found", | |
| "contacts": [], | |
| "count": 0, | |
| "message": f"No verified contacts found for {company_name}. Skip this prospect.", | |
| "should_save_prospect": False | |
| } | |
| except Exception as e: | |
| logger.error(f"Error finding contacts for {company_name}: {str(e)}") | |
| return { | |
| "status": "error", | |
| "contacts": [], | |
| "count": 0, | |
| "message": f"Error searching for contacts: {str(e)}", | |
| "should_save_prospect": False | |
| } | |
| # ============ STORE MCP SERVER ============ | |
| elif tool_name == "save_prospect": | |
| prospect_data = { | |
| "id": tool_input.get("prospect_id", str(uuid.uuid4())), | |
| "company": { | |
| "id": tool_input.get("company_id"), | |
| "name": tool_input.get("company_name"), | |
| "domain": tool_input.get("company_domain") | |
| }, | |
| "fit_score": tool_input.get("fit_score", 0), | |
| "status": tool_input.get("status", "new"), | |
| "metadata": tool_input.get("metadata", {}) | |
| } | |
| result = await self.mcp_registry.store.save_prospect(prospect_data) | |
| return {"status": result, "prospect_id": prospect_data["id"]} | |
| elif tool_name == "get_prospect": | |
| prospect_id = tool_input["prospect_id"] | |
| prospect = await self.mcp_registry.store.get_prospect(prospect_id) | |
| return prospect or {"error": "Prospect not found"} | |
| elif tool_name == "list_prospects": | |
| prospects = await self.mcp_registry.store.list_prospects() | |
| status_filter = tool_input.get("status") | |
| if status_filter: | |
| prospects = [p for p in prospects if p.get("status") == status_filter] | |
| return { | |
| "prospects": prospects, | |
| "count": len(prospects) | |
| } | |
| elif tool_name == "save_company": | |
| company_data = { | |
| "id": tool_input.get("company_id", str(uuid.uuid4())), | |
| "name": tool_input["name"], | |
| "domain": tool_input["domain"], | |
| "industry": tool_input.get("industry"), | |
| "description": tool_input.get("description"), | |
| "employee_count": tool_input.get("employee_count") | |
| } | |
| result = await self.mcp_registry.store.save_company(company_data) | |
| return {"status": result, "company_id": company_data["id"]} | |
| elif tool_name == "get_company": | |
| company_id = tool_input["company_id"] | |
| company = await self.mcp_registry.store.get_company(company_id) | |
| return company or {"error": "Company not found"} | |
| elif tool_name == "save_fact": | |
| fact_data = { | |
| "id": tool_input.get("fact_id", str(uuid.uuid4())), | |
| "company_id": tool_input["company_id"], | |
| "fact_type": tool_input["fact_type"], | |
| "content": tool_input["content"], | |
| "source_url": tool_input.get("source_url"), | |
| "confidence_score": tool_input.get("confidence_score", 0.8) | |
| } | |
| result = await self.mcp_registry.store.save_fact(fact_data) | |
| return {"status": result, "fact_id": fact_data["id"]} | |
| elif tool_name == "save_contact": | |
| contact_data = { | |
| "id": tool_input.get("contact_id", str(uuid.uuid4())), | |
| "company_id": tool_input["company_id"], | |
| "email": tool_input["email"], | |
| "first_name": tool_input.get("first_name"), | |
| "last_name": tool_input.get("last_name"), | |
| "title": tool_input.get("title"), | |
| "seniority": tool_input.get("seniority") | |
| } | |
| result = await self.mcp_registry.store.save_contact(contact_data) | |
| return {"status": result, "contact_id": contact_data["id"]} | |
| elif tool_name == "list_contacts_by_domain": | |
| domain = tool_input["domain"] | |
| contacts = await self.mcp_registry.store.list_contacts_by_domain(domain) | |
| return { | |
| "contacts": contacts, | |
| "count": len(contacts) | |
| } | |
| elif tool_name == "check_suppression": | |
| supp_type = tool_input["suppression_type"] | |
| value = tool_input["value"] | |
| is_suppressed = await self.mcp_registry.store.check_suppression(supp_type, value) | |
| return { | |
| "suppressed": is_suppressed, | |
| "value": value, | |
| "type": supp_type | |
| } | |
| # ============ EMAIL MCP SERVER ============ | |
| elif tool_name == "send_email": | |
| to = tool_input["to"] | |
| subject = tool_input["subject"] | |
| body = tool_input["body"] | |
| prospect_id = tool_input["prospect_id"] | |
| thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) | |
| return { | |
| "status": "sent", | |
| "thread_id": thread_id, | |
| "to": to | |
| } | |
| elif tool_name == "get_email_thread": | |
| prospect_id = tool_input["prospect_id"] | |
| thread = await self.mcp_registry.email.get_thread(prospect_id) | |
| return thread or {"error": "No email thread found"} | |
| # ============ CALENDAR MCP SERVER ============ | |
| elif tool_name == "suggest_meeting_slots": | |
| num_slots = tool_input.get("num_slots", 3) | |
| slots = await self.mcp_registry.calendar.suggest_slots() | |
| return { | |
| "slots": slots[:num_slots], | |
| "count": len(slots[:num_slots]) | |
| } | |
| elif tool_name == "generate_calendar_invite": | |
| start_time = tool_input["start_time"] | |
| end_time = tool_input["end_time"] | |
| title = tool_input["title"] | |
| slot = { | |
| "start_iso": start_time, | |
| "end_iso": end_time, | |
| "title": title | |
| } | |
| ics = await self.mcp_registry.calendar.generate_ics(slot) | |
| return { | |
| "ics_content": ics, | |
| "meeting": slot | |
| } | |
| else: | |
| raise ValueError(f"Unknown MCP tool: {tool_name}") | |