| ```markdown | |
| # MCP Server Implementation Guide | |
| ## Overview | |
| RewardPilot implements a multi-agent MCP (Model Context Protocol) architecture with 4 independent microservices that work together to provide intelligent credit card recommendations. | |
| ## Architecture Diagram | |
| ``` | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β User Interface β | |
| β (Gradio 6.0 App) β | |
| ββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ | |
| β | |
| βΌ | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β Orchestrator Agent β | |
| β (Claude 3.5 Sonnet) β | |
| β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β | |
| β β Phase 1: Planning β β | |
| β β - Analyze transaction context β β | |
| β β - Determine required MCP servers β β | |
| β β - Create execution strategy β β | |
| β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β | |
| ββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ | |
| β | |
| ββββββββββββββΌβββββββββββββ | |
| βΌ βΌ βΌ | |
| βββββββββββββββββ ββββββββββββ ββββββββββββββ | |
| β Smart Wallet β β RAG β β Forecast β | |
| β MCP Server β β MCP β β MCP Server β | |
| βββββββββ¬ββββββββ ββββββ¬ββββββ βββββββ¬βββββββ | |
| β β β | |
| βΌ βΌ βΌ | |
| ββββββββββββββββββββββββββββββββββββββββββββ | |
| β Gemini 2.0 Flash β | |
| β (Reasoning & Synthesis) β | |
| ββββββββββββββββ¬ββββββββββββββββββββββββββββ | |
| β | |
| βΌ | |
| βββββββββββββββββ | |
| β Final Responseβ | |
| βββββββββββββββββ | |
| ``` | |
| --- | |
| ## MCP Server 1: Orchestrator | |
| ### Purpose | |
| Coordinates all MCP servers and manages the agent workflow. | |
| ### Deployment | |
| - **URL:** https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space | |
| - **Stack:** FastAPI + Claude 3.5 Sonnet | |
| - **Hosting:** Hugging Face Spaces | |
| ### API Endpoints | |
| #### POST `/recommend` | |
| Get card recommendation for a transaction. | |
| **Request:** | |
| ```json | |
| { | |
| "user_id": "u_alice", | |
| "merchant": "Whole Foods", | |
| "mcc": "5411", | |
| "amount_usd": 127.50, | |
| "category": "Groceries" | |
| } | |
| ``` | |
| **Response:** | |
| ```json | |
| { | |
| "recommended_card": { | |
| "card_id": "c_amex_gold", | |
| "card_name": "American Express Gold", | |
| "issuer": "American Express" | |
| }, | |
| "rewards": { | |
| "points_earned": 510, | |
| "cash_value": 5.10, | |
| "earn_rate": "4x points" | |
| }, | |
| "reasoning": "Amex Gold offers 4x points on U.S. supermarkets...", | |
| "confidence": 0.95, | |
| "alternatives": [ | |
| { | |
| "card_name": "Citi Custom Cash", | |
| "rewards": 3.82, | |
| "reason": "5% but monthly cap already hit" | |
| } | |
| ], | |
| "warnings": [ | |
| "You're at $450/$1500 monthly cap. 3 more grocery trips available." | |
| ] | |
| } | |
| ``` | |
| ### Implementation | |
| ```python | |
| # orchestrator_server.py | |
| from fastapi import FastAPI, HTTPException | |
| from anthropic import Anthropic | |
| import httpx | |
| import asyncio | |
| app = FastAPI(title="RewardPilot Orchestrator") | |
| anthropic = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) | |
| @app.post("/recommend") | |
| async def recommend_card(request: TransactionRequest): | |
| # Phase 1: Planning with Claude | |
| plan = await create_execution_plan(request) | |
| # Phase 2: Parallel MCP calls | |
| mcp_results = await execute_mcp_calls(plan) | |
| # Phase 3: Reasoning with Gemini | |
| explanation = await synthesize_reasoning(request, mcp_results) | |
| # Phase 4: Format response | |
| return format_recommendation(mcp_results, explanation) | |
| async def create_execution_plan(request: TransactionRequest): | |
| """Claude analyzes transaction and plans MCP calls""" | |
| prompt = f""" | |
| Analyze this transaction and determine which MCP servers to call: | |
| Transaction: | |
| - Merchant: {request.merchant} | |
| - Category: {request.category} | |
| - Amount: ${request.amount_usd} | |
| Available MCP servers: | |
| 1. smart_wallet - Card recommendations and reward calculations | |
| 2. rewards_rag - Semantic search of card benefits | |
| 3. spend_forecast - Spending predictions and cap warnings | |
| Return a JSON plan with: | |
| - strategy: optimization approach | |
| - mcp_calls: list of servers to call (priority order) | |
| - confidence_threshold: minimum confidence for recommendation | |
| """ | |
| response = anthropic.messages.create( | |
| model="claude-3-5-sonnet-20241022", | |
| max_tokens=1024, | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| return json.loads(response.content[0].text) | |
| async def execute_mcp_calls(plan: dict): | |
| """Call MCP servers in parallel""" | |
| tasks = [] | |
| for mcp_call in plan["mcp_calls"]: | |
| if mcp_call["service"] == "smart_wallet": | |
| tasks.append(call_smart_wallet(request)) | |
| elif mcp_call["service"] == "rewards_rag": | |
| tasks.append(call_rewards_rag(request)) | |
| elif mcp_call["service"] == "spend_forecast": | |
| tasks.append(call_forecast(request)) | |
| results = await asyncio.gather(*tasks) | |
| return dict(zip([c["service"] for c in plan["mcp_calls"]], results)) | |
| ``` | |
| --- | |
| ## MCP Server 2: Smart Wallet | |
| ### Purpose | |
| Analyzes user's credit cards and calculates optimal rewards. | |
| ### Deployment | |
| - **URL:** https://mcp-1st-birthday-rewardpilot-smart-wallet.hf.space | |
| - **Stack:** FastAPI + Python + PostgreSQL | |
| - **Hosting:** Hugging Face Spaces | |
| ### API Endpoints | |
| #### POST `/analyze` | |
| Analyze transaction against user's wallet. | |
| **Request:** | |
| ```json | |
| { | |
| "user_id": "u_alice", | |
| "merchant": "Whole Foods", | |
| "mcc": "5411", | |
| "amount_usd": 127.50 | |
| } | |
| ``` | |
| **Response:** | |
| ```json | |
| { | |
| "recommended_card": { | |
| "card_id": "c_amex_gold", | |
| "card_name": "American Express Gold", | |
| "rewards_earned": 5.10, | |
| "earn_rate": "4x points", | |
| "points_earned": 510 | |
| }, | |
| "all_cards_comparison": [ | |
| { | |
| "card_name": "Amex Gold", | |
| "rewards": 5.10, | |
| "rank": 1 | |
| }, | |
| { | |
| "card_name": "Citi Custom Cash", | |
| "rewards": 3.82, | |
| "rank": 2, | |
| "note": "Cap already hit this month" | |
| } | |
| ] | |
| } | |
| ``` | |
| ### Implementation | |
| ```python | |
| # smart_wallet_server.py | |
| from fastapi import FastAPI | |
| from sqlalchemy import create_engine | |
| from typing import List | |
| app = FastAPI(title="Smart Wallet MCP") | |
| class CardAnalyzer: | |
| def __init__(self, user_id: str): | |
| self.user_id = user_id | |
| self.cards = self.load_user_cards() | |
| def analyze_transaction(self, merchant: str, mcc: str, amount: float): | |
| """Calculate rewards for all cards""" | |
| results = [] | |
| for card in self.cards: | |
| # Get reward rate for this MCC | |
| reward_rate = self.get_reward_rate(card, mcc) | |
| # Check spending caps | |
| current_spending = self.get_monthly_spending(card, mcc) | |
| cap_remaining = card.monthly_cap - current_spending | |
| # Calculate rewards | |
| if cap_remaining >= amount: | |
| rewards = amount * reward_rate | |
| else: | |
| # Partial cap scenario | |
| rewards = (cap_remaining * reward_rate) + | |
| ((amount - cap_remaining) * card.base_rate) | |
| results.append({ | |
| "card": card, | |
| "rewards": rewards, | |
| "effective_rate": rewards / amount, | |
| "cap_status": { | |
| "current": current_spending, | |
| "limit": card.monthly_cap, | |
| "remaining": cap_remaining | |
| } | |
| }) | |
| # Sort by rewards (descending) | |
| results.sort(key=lambda x: x["rewards"], reverse=True) | |
| return results[0] # Return best card | |
| ``` | |
| --- | |
| ## MCP Server 3: Rewards RAG | |
| ### Purpose | |
| Semantic search across credit card benefit documents. | |
| ### Deployment | |
| - **URL:** https://mcp-1st-birthday-rewardpilot-rewards-rag.hf.space | |
| - **Stack:** FastAPI + LlamaIndex + ChromaDB | |
| - **Hosting:** Hugging Face Spaces | |
| ### API Endpoints | |
| #### POST `/query` | |
| Search card benefits with natural language. | |
| **Request:** | |
| ```json | |
| { | |
| "query": "Does Amex Gold work at Costco for groceries?", | |
| "card_name": "American Express Gold", | |
| "top_k": 3 | |
| } | |
| ``` | |
| **Response:** | |
| ```json | |
| { | |
| "answer": "No, American Express cards are not accepted at Costco warehouse locations due to Costco's exclusive Visa agreement. However, Amex Gold works at Costco.com for online orders.", | |
| "sources": [ | |
| { | |
| "card_name": "American Express Gold", | |
| "content": "Merchant acceptance: Not accepted at Costco warehouses...", | |
| "relevance_score": 0.92 | |
| } | |
| ] | |
| } | |
| ``` | |
| ### Implementation | |
| See `docs/llamaindex_setup.md` for detailed RAG implementation. | |
| --- | |
| ## MCP Server 4: Spend Forecast | |
| ### Purpose | |
| ML-based spending predictions and cap warnings. | |
| ### Deployment | |
| - **URL:** https://mcp-1st-birthday-rewardpilot-spend-forecast.hf.space | |
| - **Stack:** FastAPI + Scikit-learn + Redis | |
| - **Hosting:** Hugging Face Spaces | |
| ### API Endpoints | |
| #### POST `/predict` | |
| Predict spending for next period. | |
| **Request:** | |
| ```json | |
| { | |
| "user_id": "u_alice", | |
| "card_id": "c_amex_gold", | |
| "category": "Groceries", | |
| "horizon_days": 30 | |
| } | |
| ``` | |
| **Response:** | |
| ```json | |
| { | |
| "predicted_spending": 520.50, | |
| "confidence_interval": [480.00, 560.00], | |
| "warnings": [ | |
| { | |
| "type": "cap_warning", | |
| "message": "Likely to exceed $500 monthly cap", | |
| "probability": 0.78, | |
| "suggested_action": "Switch to Citi Custom Cash after $500" | |
| } | |
| ] | |
| } | |
| ``` | |
| ### Implementation | |
| ```python | |
| # forecast_server.py | |
| from fastapi import FastAPI | |
| from sklearn.ensemble import RandomForestRegressor | |
| import numpy as np | |
| app = FastAPI(title="Spend Forecast MCP") | |
| class SpendingForecaster: | |
| def __init__(self): | |
| self.model = RandomForestRegressor(n_estimators=100) | |
| def predict(self, user_id: str, category: str, horizon_days: int): | |
| """Predict spending for next N days""" | |
| # Load historical data | |
| history = self.load_user_history(user_id, category) | |
| # Feature engineering | |
| features = self.extract_features(history) | |
| # Predict | |
| prediction = self.model.predict(features) | |
| # Calculate confidence interval | |
| predictions = [tree.predict(features) for tree in self.model.estimators_] | |
| lower = np.percentile(predictions, 5) | |
| upper = np.percentile(predictions, 95) | |
| return { | |
| "predicted_spending": float(prediction[0]), | |
| "confidence_interval": [float(lower), float(upper)] | |
| } | |
| ``` | |
| --- | |
| ## Communication Flow | |
| ### Sequence Diagram | |
| ``` | |
| User -> Gradio: Enter transaction | |
| Gradio -> Orchestrator: POST /recommend | |
| Orchestrator -> Claude: Create execution plan | |
| Claude -> Orchestrator: {plan: call all 3 MCPs} | |
| Orchestrator -> Smart Wallet: POST /analyze | |
| Orchestrator -> RAG: POST /query | |
| Orchestrator -> Forecast: POST /predict | |
| Smart Wallet -> Orchestrator: {best_card: Amex Gold, rewards: 5.10} | |
| RAG -> Orchestrator: {benefits: "4x on groceries..."} | |
| Forecast -> Orchestrator: {warning: "Near cap"} | |
| Orchestrator -> Gemini: Synthesize results | |
| Gemini -> Orchestrator: {explanation: "Use Amex Gold because..."} | |
| Orchestrator -> Gradio: Final recommendation | |
| Gradio -> User: Display result | |
| ``` | |
| --- | |
| ## Deployment Instructions | |
| ### 1. Deploy Each MCP Server to Hugging Face | |
| ```bash | |
| # Clone template | |
| git clone https://huggingface.co/spaces/YOUR_USERNAME/rewardpilot-orchestrator | |
| # Add files | |
| cp orchestrator_server.py app.py | |
| cp requirements.txt . | |
| # Create Space on HF | |
| huggingface-cli repo create rewardpilot-orchestrator --type space --space_sdk gradio | |
| # Push | |
| git add . | |
| git commit -m "Deploy orchestrator" | |
| git push | |
| ``` | |
| ### 2. Set Environment Variables | |
| In each Space's settings, add: | |
| ```bash | |
| ANTHROPIC_API_KEY=sk-ant-xxxxx | |
| GEMINI_API_KEY=AIzaSyxxxxx | |
| OPENAI_API_KEY=sk-xxxxx | |
| ``` | |
| ### 3. Configure Endpoints | |
| In main `app.py`: | |
| ```python | |
| MCP_ENDPOINTS = { | |
| "orchestrator": "https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space", | |
| "smart_wallet": "https://mcp-1st-birthday-rewardpilot-smart-wallet.hf.space", | |
| "rewards_rag": "https://mcp-1st-birthday-rewardpilot-rewards-rag.hf.space", | |
| "forecast": "https://mcp-1st-birthday-rewardpilot-spend-forecast.hf.space" | |
| } | |
| ``` | |
| --- | |
| ## Error Handling | |
| ### Graceful Degradation | |
| ```python | |
| async def call_mcp_with_fallback(service_name: str, request_data: dict): | |
| """Call MCP server with timeout and fallback""" | |
| try: | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| response = await client.post( | |
| MCP_ENDPOINTS[service_name], | |
| json=request_data | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.TimeoutException: | |
| logger.error(f"{service_name} timeout") | |
| return get_fallback_response(service_name) | |
| except httpx.HTTPError as e: | |
| logger.error(f"{service_name} error: {e}") | |
| return get_fallback_response(service_name) | |
| ``` | |
| --- | |
| ## Monitoring | |
| ### Health Checks | |
| ```python | |
| @app.get("/health") | |
| async def health_check(): | |
| """Check status of all MCP servers""" | |
| statuses = {} | |
| for service, url in MCP_ENDPOINTS.items(): | |
| try: | |
| async with httpx.AsyncClient(timeout=5.0) as client: | |
| response = await client.get(f"{url}/health") | |
| statuses[service] = { | |
| "status": "healthy" if response.status_code == 200 else "unhealthy", | |
| "latency_ms": response.elapsed.total_seconds() * 1000 | |
| } | |
| except Exception as e: | |
| statuses[service] = {"status": "down", "error": str(e)} | |
| return statuses | |
| ``` | |
| --- | |
| ## Performance Optimization | |
| ### Caching Strategy | |
| ```python | |
| from functools import lru_cache | |
| import redis | |
| # Redis cache for frequent queries | |
| redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True) | |
| @lru_cache(maxsize=1000) | |
| def get_card_benefits(card_name: str): | |
| """Cache card benefits for 1 hour""" | |
| cache_key = f"benefits:{card_name}" | |
| # Check cache | |
| cached = redis_client.get(cache_key) | |
| if cached: | |
| return json.loads(cached) | |
| # Fetch from RAG | |
| result = call_rewards_rag({"query": f"Get all benefits for {card_name}"}) | |
| # Cache for 1 hour | |
| redis_client.setex(cache_key, 3600, json.dumps(result)) | |
| return result | |
| ``` | |
| --- | |
| ## Testing | |
| ### Integration Tests | |
| ```python | |
| import pytest | |
| import httpx | |
| @pytest.mark.asyncio | |
| async def test_orchestrator_end_to_end(): | |
| """Test full recommendation flow""" | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| f"{MCP_ENDPOINTS['orchestrator']}/recommend", | |
| json={ | |
| "user_id": "test_user", | |
| "merchant": "Whole Foods", | |
| "amount_usd": 100.00 | |
| } | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "recommended_card" in data | |
| assert "rewards" in data | |
| assert "reasoning" in data | |
| ``` | |
| --- | |
| ## Next Steps | |
| 1. **Scale MCP servers** - Add load balancing | |
| 2. **Add authentication** - JWT tokens for API access | |
| 3. **Implement webhooks** - Real-time transaction notifications | |
| 4. **Add more MCP servers** - Travel optimization, business expenses, etc. | |
| --- | |
| **Related Documentation:** | |
| - [Modal Deployment Guide](./modal_deployment.md) | |
| - [LlamaIndex RAG Setup](./llamaindex_setup.md) | |
| - [Agent Reasoning Flow](./agent_reasoning.md) | |
| ``` | |
| --- | |