""" Autonomous AI Agent with MCP Tool Calling using Ollama Python Client Uses the ollama Python package for LLM inference. Based on: https://github.com/ollama/ollama-python Example usage (from the guide): from ollama import chat response = chat( model='granite4:1b', messages=[ {'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': user_input} ], options={'temperature': 0.0, 'top_p': 1.0} ) output = response.message.content """ import os import json import uuid import logging import asyncio from typing import List, Dict, Any, AsyncGenerator from mcp.tools.definitions import MCP_TOOLS from mcp.registry import MCPRegistry logger = logging.getLogger(__name__) # Default model - IBM Granite 4 1B DEFAULT_MODEL = "granite4:1b" class AutonomousMCPAgentOllama: """ AI Agent using Ollama Python client (FREE local LLM) Uses ollama.chat() directly as per the official documentation. Temperature=0.0 and top_p=1.0 recommended for Granite family models. """ def __init__( self, mcp_registry: MCPRegistry, model: str = None ): self.mcp_registry = mcp_registry self.model = model or os.getenv("OLLAMA_MODEL", DEFAULT_MODEL) self.tools_description = self._build_tools_description() logger.info(f"Ollama Agent initialized with model: {self.model}") def _build_tools_description(self) -> str: """Build tool descriptions for the system prompt""" tools_text = "" for tool in MCP_TOOLS: tools_text += f"\n- **{tool['name']}**: {tool['description']}" props = tool.get('input_schema', {}).get('properties', {}) required = tool.get('input_schema', {}).get('required', []) if props: tools_text += "\n Parameters:" for param, details in props.items(): req = "(required)" if param in required else "(optional)" tools_text += f"\n - {param} {req}: {details.get('description', '')}" return tools_text def _build_system_prompt(self) -> str: return f"""You are an AI sales agent with access to tools. AVAILABLE TOOLS: {self.tools_description} TO USE A TOOL, respond with JSON: ```json {{"tool": "tool_name", "parameters": {{"param1": "value1"}}}} ``` RULES: 1. Use search_web to find information 2. Use save_prospect, save_contact to store data 3. Use send_email to draft emails 4. Say "DONE" when finished with a summary Be concise.""" async def run(self, task: str, max_iterations: int = 15) -> AsyncGenerator[Dict[str, Any], None]: """Run the agent on a task""" yield { "type": "agent_start", "message": f"Starting with Ollama ({self.model})", "model": self.model } system_prompt = self._build_system_prompt() messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": task} ] for iteration in range(1, max_iterations + 1): yield { "type": "iteration_start", "iteration": iteration, "message": f"Iteration {iteration}: Thinking..." } try: # Call Ollama using the Python client response = await self._call_ollama(messages) assistant_content = response.get("content", "") if not assistant_content: continue # Check for completion if "DONE" in assistant_content.upper(): final_text = assistant_content.replace("DONE", "").replace("done", "").strip() yield { "type": "thought", "thought": final_text, "message": "Task complete" } yield { "type": "agent_complete", "message": "Task complete!", "final_answer": final_text, "iterations": iteration } return # Parse tool calls tool_calls = self._parse_tool_calls(assistant_content) if tool_calls: messages.append({"role": "assistant", "content": assistant_content}) tool_results = [] for tool_call in tool_calls: tool_name = tool_call.get("tool", "") tool_params = tool_call.get("parameters", {}) yield { "type": "tool_call", "tool": tool_name, "input": tool_params, "message": f"Calling: {tool_name}" } try: result = await self._execute_tool(tool_name, tool_params) yield { "type": "tool_result", "tool": tool_name, "result": result, "message": f"{tool_name} completed" } tool_results.append({"tool": tool_name, "result": result}) except Exception as e: yield { "type": "tool_error", "tool": tool_name, "error": str(e) } tool_results.append({"tool": tool_name, "error": str(e)}) # Add results to conversation results_text = "Tool results:\n" + json.dumps(tool_results, indent=2, default=str)[:2000] messages.append({"role": "user", "content": results_text}) else: # No tool calls yield { "type": "thought", "thought": assistant_content, "message": f"AI: {assistant_content[:100]}..." } messages.append({"role": "assistant", "content": assistant_content}) messages.append({"role": "user", "content": "Continue. Use tools to complete the task. Say DONE when finished."}) except Exception as e: logger.error(f"Error: {e}") yield { "type": "agent_error", "error": str(e), "message": f"Error: {e}" } return yield { "type": "agent_max_iterations", "message": f"Reached max iterations ({max_iterations})", "iterations": max_iterations } async def _call_ollama(self, messages: List[Dict]) -> Dict: """ Call Ollama using the official Python client. Uses ollama.chat() directly as per the guide: https://github.com/ollama/ollama-python Temperature=0.0 and top_p=1.0 recommended for Granite models. """ try: from ollama import chat, ResponseError except ImportError: raise ImportError("ollama package not installed. Run: pip install ollama") try: # Use ollama.chat() directly as shown in the guide # Run in executor to not block the async event loop loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: chat( model=self.model, messages=messages, options={ "temperature": 0.0, # Deterministic output for tool calling "top_p": 1.0 # Full probability mass (Granite recommended) } ) ) # Extract response content: response.message.content content = "" if hasattr(response, 'message') and hasattr(response.message, 'content'): content = response.message.content elif isinstance(response, dict): content = response.get("message", {}).get("content", "") return {"content": content} except ResponseError as e: # Handle Ollama-specific errors (model not available, etc.) logger.error(f"Ollama ResponseError: {e}") raise Exception(f"Ollama error: {e}. Make sure Ollama is running and the model '{self.model}' is pulled.") except Exception as e: logger.error(f"Ollama call failed: {e}") raise Exception(f"Ollama error: {e}") def _parse_tool_calls(self, text: str) -> List[Dict]: """Parse tool calls from response""" import re tool_calls = [] patterns = [ r'```json\s*(\{[^`]+\})\s*```', r'```\s*(\{[^`]+\})\s*```', r'(\{"tool":\s*"[^"]+",\s*"parameters":\s*\{[^}]*\}\})', ] for pattern in patterns: matches = re.findall(pattern, text, re.DOTALL) for match in matches: try: data = json.loads(match.strip()) if "tool" in data: tool_calls.append(data) except: continue return tool_calls async def _execute_tool(self, tool_name: str, tool_input: Dict[str, Any]) -> Any: """Execute an MCP tool""" if tool_name == "search_web": query = tool_input.get("query", "") max_results = tool_input.get("max_results", 5) results = await self.mcp_registry.search.query(query, max_results=max_results) return {"results": results[:max_results], "count": len(results[:max_results])} elif tool_name == "search_news": query = tool_input.get("query", "") max_results = tool_input.get("max_results", 5) results = await self.mcp_registry.search.query(f"{query} news", max_results=max_results) return {"results": results[:max_results], "count": len(results[:max_results])} elif tool_name == "save_prospect": prospect_data = { "id": tool_input.get("prospect_id", str(uuid.uuid4())), "company": { "id": tool_input.get("company_id"), "name": tool_input.get("company_name"), "domain": tool_input.get("company_domain") }, "fit_score": tool_input.get("fit_score", 0), "status": tool_input.get("status", "new"), "metadata": tool_input.get("metadata", {}) } result = await self.mcp_registry.store.save_prospect(prospect_data) return {"status": result, "prospect_id": prospect_data["id"]} elif tool_name == "save_company": company_data = { "id": tool_input.get("company_id", str(uuid.uuid4())), "name": tool_input.get("name", ""), "domain": tool_input.get("domain", ""), "industry": tool_input.get("industry"), "description": tool_input.get("description"), "employee_count": tool_input.get("employee_count") } result = await self.mcp_registry.store.save_company(company_data) return {"status": result, "company_id": company_data["id"]} elif tool_name == "save_contact": contact_data = { "id": tool_input.get("contact_id", str(uuid.uuid4())), "company_id": tool_input.get("company_id", ""), "email": tool_input.get("email", ""), "first_name": tool_input.get("first_name"), "last_name": tool_input.get("last_name"), "title": tool_input.get("title"), "seniority": tool_input.get("seniority") } result = await self.mcp_registry.store.save_contact(contact_data) return {"status": result, "contact_id": contact_data["id"]} elif tool_name == "save_fact": fact_data = { "id": tool_input.get("fact_id", str(uuid.uuid4())), "company_id": tool_input.get("company_id", ""), "fact_type": tool_input.get("fact_type", ""), "content": tool_input.get("content", ""), "source_url": tool_input.get("source_url"), "confidence_score": tool_input.get("confidence_score", 0.8) } result = await self.mcp_registry.store.save_fact(fact_data) return {"status": result, "fact_id": fact_data["id"]} elif tool_name == "send_email": to = tool_input.get("to", "") subject = tool_input.get("subject", "") body = tool_input.get("body", "") prospect_id = tool_input.get("prospect_id", "") thread_id = await self.mcp_registry.email.send(to, subject, body, prospect_id) return {"status": "drafted", "thread_id": thread_id, "to": to} elif tool_name == "list_prospects": prospects = await self.mcp_registry.store.list_prospects() return {"prospects": prospects, "count": len(prospects)} elif tool_name == "get_prospect": prospect_id = tool_input.get("prospect_id", "") prospect = await self.mcp_registry.store.get_prospect(prospect_id) return prospect or {"error": "Not found"} elif tool_name == "suggest_meeting_slots": slots = await self.mcp_registry.calendar.suggest_slots() return {"slots": slots[:3], "count": len(slots[:3])} else: raise ValueError(f"Unknown tool: {tool_name}")