rewardpilot-web-ui / docs /mcp_architecture.md
sammy786's picture
Create docs/mcp_architecture.md
1c67453 verified
```markdown
# MCP Server Implementation Guide
## Overview
RewardPilot implements a multi-agent MCP (Model Context Protocol) architecture with 4 independent microservices that work together to provide intelligent credit card recommendations.
## Architecture Diagram
```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ User Interface β”‚
β”‚ (Gradio 6.0 App) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Orchestrator Agent β”‚
β”‚ (Claude 3.5 Sonnet) β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Phase 1: Planning β”‚ β”‚
β”‚ β”‚ - Analyze transaction context β”‚ β”‚
β”‚ β”‚ - Determine required MCP servers β”‚ β”‚
β”‚ β”‚ - Create execution strategy β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β–Ό β–Ό β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Smart Wallet β”‚ β”‚ RAG β”‚ β”‚ Forecast β”‚
β”‚ MCP Server β”‚ β”‚ MCP β”‚ β”‚ MCP Server β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚ β”‚
β–Ό β–Ό β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Gemini 2.0 Flash β”‚
β”‚ (Reasoning & Synthesis) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Final Responseβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```
---
## MCP Server 1: Orchestrator
### Purpose
Coordinates all MCP servers and manages the agent workflow.
### Deployment
- **URL:** https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space
- **Stack:** FastAPI + Claude 3.5 Sonnet
- **Hosting:** Hugging Face Spaces
### API Endpoints
#### POST `/recommend`
Get card recommendation for a transaction.
**Request:**
```json
{
"user_id": "u_alice",
"merchant": "Whole Foods",
"mcc": "5411",
"amount_usd": 127.50,
"category": "Groceries"
}
```
**Response:**
```json
{
"recommended_card": {
"card_id": "c_amex_gold",
"card_name": "American Express Gold",
"issuer": "American Express"
},
"rewards": {
"points_earned": 510,
"cash_value": 5.10,
"earn_rate": "4x points"
},
"reasoning": "Amex Gold offers 4x points on U.S. supermarkets...",
"confidence": 0.95,
"alternatives": [
{
"card_name": "Citi Custom Cash",
"rewards": 3.82,
"reason": "5% but monthly cap already hit"
}
],
"warnings": [
"You're at $450/$1500 monthly cap. 3 more grocery trips available."
]
}
```
### Implementation
```python
# orchestrator_server.py
from fastapi import FastAPI, HTTPException
from anthropic import Anthropic
import httpx
import asyncio
app = FastAPI(title="RewardPilot Orchestrator")
anthropic = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
@app.post("/recommend")
async def recommend_card(request: TransactionRequest):
# Phase 1: Planning with Claude
plan = await create_execution_plan(request)
# Phase 2: Parallel MCP calls
mcp_results = await execute_mcp_calls(plan)
# Phase 3: Reasoning with Gemini
explanation = await synthesize_reasoning(request, mcp_results)
# Phase 4: Format response
return format_recommendation(mcp_results, explanation)
async def create_execution_plan(request: TransactionRequest):
"""Claude analyzes transaction and plans MCP calls"""
prompt = f"""
Analyze this transaction and determine which MCP servers to call:
Transaction:
- Merchant: {request.merchant}
- Category: {request.category}
- Amount: ${request.amount_usd}
Available MCP servers:
1. smart_wallet - Card recommendations and reward calculations
2. rewards_rag - Semantic search of card benefits
3. spend_forecast - Spending predictions and cap warnings
Return a JSON plan with:
- strategy: optimization approach
- mcp_calls: list of servers to call (priority order)
- confidence_threshold: minimum confidence for recommendation
"""
response = anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
async def execute_mcp_calls(plan: dict):
"""Call MCP servers in parallel"""
tasks = []
for mcp_call in plan["mcp_calls"]:
if mcp_call["service"] == "smart_wallet":
tasks.append(call_smart_wallet(request))
elif mcp_call["service"] == "rewards_rag":
tasks.append(call_rewards_rag(request))
elif mcp_call["service"] == "spend_forecast":
tasks.append(call_forecast(request))
results = await asyncio.gather(*tasks)
return dict(zip([c["service"] for c in plan["mcp_calls"]], results))
```
---
## MCP Server 2: Smart Wallet
### Purpose
Analyzes user's credit cards and calculates optimal rewards.
### Deployment
- **URL:** https://mcp-1st-birthday-rewardpilot-smart-wallet.hf.space
- **Stack:** FastAPI + Python + PostgreSQL
- **Hosting:** Hugging Face Spaces
### API Endpoints
#### POST `/analyze`
Analyze transaction against user's wallet.
**Request:**
```json
{
"user_id": "u_alice",
"merchant": "Whole Foods",
"mcc": "5411",
"amount_usd": 127.50
}
```
**Response:**
```json
{
"recommended_card": {
"card_id": "c_amex_gold",
"card_name": "American Express Gold",
"rewards_earned": 5.10,
"earn_rate": "4x points",
"points_earned": 510
},
"all_cards_comparison": [
{
"card_name": "Amex Gold",
"rewards": 5.10,
"rank": 1
},
{
"card_name": "Citi Custom Cash",
"rewards": 3.82,
"rank": 2,
"note": "Cap already hit this month"
}
]
}
```
### Implementation
```python
# smart_wallet_server.py
from fastapi import FastAPI
from sqlalchemy import create_engine
from typing import List
app = FastAPI(title="Smart Wallet MCP")
class CardAnalyzer:
def __init__(self, user_id: str):
self.user_id = user_id
self.cards = self.load_user_cards()
def analyze_transaction(self, merchant: str, mcc: str, amount: float):
"""Calculate rewards for all cards"""
results = []
for card in self.cards:
# Get reward rate for this MCC
reward_rate = self.get_reward_rate(card, mcc)
# Check spending caps
current_spending = self.get_monthly_spending(card, mcc)
cap_remaining = card.monthly_cap - current_spending
# Calculate rewards
if cap_remaining >= amount:
rewards = amount * reward_rate
else:
# Partial cap scenario
rewards = (cap_remaining * reward_rate) +
((amount - cap_remaining) * card.base_rate)
results.append({
"card": card,
"rewards": rewards,
"effective_rate": rewards / amount,
"cap_status": {
"current": current_spending,
"limit": card.monthly_cap,
"remaining": cap_remaining
}
})
# Sort by rewards (descending)
results.sort(key=lambda x: x["rewards"], reverse=True)
return results[0] # Return best card
```
---
## MCP Server 3: Rewards RAG
### Purpose
Semantic search across credit card benefit documents.
### Deployment
- **URL:** https://mcp-1st-birthday-rewardpilot-rewards-rag.hf.space
- **Stack:** FastAPI + LlamaIndex + ChromaDB
- **Hosting:** Hugging Face Spaces
### API Endpoints
#### POST `/query`
Search card benefits with natural language.
**Request:**
```json
{
"query": "Does Amex Gold work at Costco for groceries?",
"card_name": "American Express Gold",
"top_k": 3
}
```
**Response:**
```json
{
"answer": "No, American Express cards are not accepted at Costco warehouse locations due to Costco's exclusive Visa agreement. However, Amex Gold works at Costco.com for online orders.",
"sources": [
{
"card_name": "American Express Gold",
"content": "Merchant acceptance: Not accepted at Costco warehouses...",
"relevance_score": 0.92
}
]
}
```
### Implementation
See `docs/llamaindex_setup.md` for detailed RAG implementation.
---
## MCP Server 4: Spend Forecast
### Purpose
ML-based spending predictions and cap warnings.
### Deployment
- **URL:** https://mcp-1st-birthday-rewardpilot-spend-forecast.hf.space
- **Stack:** FastAPI + Scikit-learn + Redis
- **Hosting:** Hugging Face Spaces
### API Endpoints
#### POST `/predict`
Predict spending for next period.
**Request:**
```json
{
"user_id": "u_alice",
"card_id": "c_amex_gold",
"category": "Groceries",
"horizon_days": 30
}
```
**Response:**
```json
{
"predicted_spending": 520.50,
"confidence_interval": [480.00, 560.00],
"warnings": [
{
"type": "cap_warning",
"message": "Likely to exceed $500 monthly cap",
"probability": 0.78,
"suggested_action": "Switch to Citi Custom Cash after $500"
}
]
}
```
### Implementation
```python
# forecast_server.py
from fastapi import FastAPI
from sklearn.ensemble import RandomForestRegressor
import numpy as np
app = FastAPI(title="Spend Forecast MCP")
class SpendingForecaster:
def __init__(self):
self.model = RandomForestRegressor(n_estimators=100)
def predict(self, user_id: str, category: str, horizon_days: int):
"""Predict spending for next N days"""
# Load historical data
history = self.load_user_history(user_id, category)
# Feature engineering
features = self.extract_features(history)
# Predict
prediction = self.model.predict(features)
# Calculate confidence interval
predictions = [tree.predict(features) for tree in self.model.estimators_]
lower = np.percentile(predictions, 5)
upper = np.percentile(predictions, 95)
return {
"predicted_spending": float(prediction[0]),
"confidence_interval": [float(lower), float(upper)]
}
```
---
## Communication Flow
### Sequence Diagram
```
User -> Gradio: Enter transaction
Gradio -> Orchestrator: POST /recommend
Orchestrator -> Claude: Create execution plan
Claude -> Orchestrator: {plan: call all 3 MCPs}
Orchestrator -> Smart Wallet: POST /analyze
Orchestrator -> RAG: POST /query
Orchestrator -> Forecast: POST /predict
Smart Wallet -> Orchestrator: {best_card: Amex Gold, rewards: 5.10}
RAG -> Orchestrator: {benefits: "4x on groceries..."}
Forecast -> Orchestrator: {warning: "Near cap"}
Orchestrator -> Gemini: Synthesize results
Gemini -> Orchestrator: {explanation: "Use Amex Gold because..."}
Orchestrator -> Gradio: Final recommendation
Gradio -> User: Display result
```
---
## Deployment Instructions
### 1. Deploy Each MCP Server to Hugging Face
```bash
# Clone template
git clone https://huggingface.co/spaces/YOUR_USERNAME/rewardpilot-orchestrator
# Add files
cp orchestrator_server.py app.py
cp requirements.txt .
# Create Space on HF
huggingface-cli repo create rewardpilot-orchestrator --type space --space_sdk gradio
# Push
git add .
git commit -m "Deploy orchestrator"
git push
```
### 2. Set Environment Variables
In each Space's settings, add:
```bash
ANTHROPIC_API_KEY=sk-ant-xxxxx
GEMINI_API_KEY=AIzaSyxxxxx
OPENAI_API_KEY=sk-xxxxx
```
### 3. Configure Endpoints
In main `app.py`:
```python
MCP_ENDPOINTS = {
"orchestrator": "https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space",
"smart_wallet": "https://mcp-1st-birthday-rewardpilot-smart-wallet.hf.space",
"rewards_rag": "https://mcp-1st-birthday-rewardpilot-rewards-rag.hf.space",
"forecast": "https://mcp-1st-birthday-rewardpilot-spend-forecast.hf.space"
}
```
---
## Error Handling
### Graceful Degradation
```python
async def call_mcp_with_fallback(service_name: str, request_data: dict):
"""Call MCP server with timeout and fallback"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
MCP_ENDPOINTS[service_name],
json=request_data
)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
logger.error(f"{service_name} timeout")
return get_fallback_response(service_name)
except httpx.HTTPError as e:
logger.error(f"{service_name} error: {e}")
return get_fallback_response(service_name)
```
---
## Monitoring
### Health Checks
```python
@app.get("/health")
async def health_check():
"""Check status of all MCP servers"""
statuses = {}
for service, url in MCP_ENDPOINTS.items():
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{url}/health")
statuses[service] = {
"status": "healthy" if response.status_code == 200 else "unhealthy",
"latency_ms": response.elapsed.total_seconds() * 1000
}
except Exception as e:
statuses[service] = {"status": "down", "error": str(e)}
return statuses
```
---
## Performance Optimization
### Caching Strategy
```python
from functools import lru_cache
import redis
# Redis cache for frequent queries
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
@lru_cache(maxsize=1000)
def get_card_benefits(card_name: str):
"""Cache card benefits for 1 hour"""
cache_key = f"benefits:{card_name}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from RAG
result = call_rewards_rag({"query": f"Get all benefits for {card_name}"})
# Cache for 1 hour
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
```
---
## Testing
### Integration Tests
```python
import pytest
import httpx
@pytest.mark.asyncio
async def test_orchestrator_end_to_end():
"""Test full recommendation flow"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{MCP_ENDPOINTS['orchestrator']}/recommend",
json={
"user_id": "test_user",
"merchant": "Whole Foods",
"amount_usd": 100.00
}
)
assert response.status_code == 200
data = response.json()
assert "recommended_card" in data
assert "rewards" in data
assert "reasoning" in data
```
---
## Next Steps
1. **Scale MCP servers** - Add load balancing
2. **Add authentication** - JWT tokens for API access
3. **Implement webhooks** - Real-time transaction notifications
4. **Add more MCP servers** - Travel optimization, business expenses, etc.
---
**Related Documentation:**
- [Modal Deployment Guide](./modal_deployment.md)
- [LlamaIndex RAG Setup](./llamaindex_setup.md)
- [Agent Reasoning Flow](./agent_reasoning.md)
```
---