jackonthemike's picture
feat: Sync backend updates including AI Revenue Analyst
cef0de3
import base64
import hashlib
import hmac
import html
import json
import os
import secrets
import string
import struct
import time
from datetime import datetime, timedelta
from typing import Optional
import httpx
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel, EmailStr, validator
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from starlette.middleware.base import BaseHTTPMiddleware
# ============= Rate Limiting Setup =============
# Disable rate limiting in test mode
TESTING = os.environ.get("TESTING", "false").lower() == "true"
def get_real_key(request: Request) -> str:
"""Get rate limit key - returns test bypass or real IP"""
if TESTING:
return "test-bypass" # All tests share same key but limits are high
return get_remote_address(request)
limiter = Limiter(key_func=get_real_key, enabled=not TESTING)
# Initialize FastAPI
app = FastAPI(title="InnSight-AI API", version="1.0.0")
# Simple ping endpoint that works even if everything else fails
@app.get("/api/ping")
async def ping():
"""Minimal test endpoint"""
return {"ping": "pong"}
# Debug endpoint to check database state
@app.get("/api/debug/db")
async def debug_db():
"""Debug endpoint to check database state"""
global db, _db_initialized, DB_IMPORT_ERROR
user_count = 0
user_emails = []
try:
if isinstance(db, InMemoryDatabase):
user_emails = list(db._users.keys())
user_count = len(user_emails)
elif db is not None:
# For PostgresDatabase, try to get users
try:
rows = db._execute_query("SELECT email FROM users LIMIT 10")
user_emails = [r["email"] for r in rows]
user_count = len(rows)
except:
pass
except:
pass
return {
"db_type": type(db).__name__ if db else "None",
"db_initialized": _db_initialized,
"db_import_error": DB_IMPORT_ERROR,
"user_count": user_count,
"user_emails": user_emails
}
# Endpoint to seed database (protected by secret)
@app.post("/api/debug/seed")
async def seed_db(secret: str = ""):
"""Seed the database with initial users (requires secret)"""
global db
# Simple secret check to prevent abuse
expected_secret = os.environ.get("SEED_SECRET", "innsight-seed-2026")
if secret != expected_secret:
raise HTTPException(status_code=403, detail="Invalid secret")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
results = []
# Initialize tables first (creates them if they don't exist)
try:
await db.init_tables()
results.append({"action": "init_tables", "status": "success"})
except Exception as e:
results.append({"action": "init_tables", "status": f"error: {e}"})
admin_users = [
{
"email": ADMIN_EMAIL,
"password_hash": hash_password("InnSight2026!"),
"full_name": "Jack Amichai",
"is_admin": True,
"totp_secret": ADMIN_TOTP_SECRET
},
{
"email": "[email protected]",
"password_hash": hash_password("InnSight2026Adi!"),
"full_name": "Adi Ohayon",
"is_admin": True,
"totp_secret": "HXDMVJECJJWSRB3HWIZR4IFUGFTMXBOZ"
}
]
regular_users = [
{
"email": "[email protected]",
"password_hash": hash_password("TestUser2026!"),
"full_name": "Test Client",
"is_admin": False,
"totp_secret": "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ"
}
]
for user in admin_users + regular_users:
try:
existing = await db.get_user_by_email(user["email"])
if not existing:
await db.create_user(
email=user["email"],
password_hash=user["password_hash"],
full_name=user["full_name"],
is_admin=user["is_admin"],
totp_secret=user["totp_secret"]
)
results.append({"email": user["email"], "status": "created"})
else:
results.append({"email": user["email"], "status": "exists"})
except Exception as e:
results.append({"email": user["email"], "status": f"error: {e}"})
return {"results": results}
# Add backend to path to import scrapers
import sys
from pathlib import Path
backend_path = Path(__file__).parent.parent / "backend"
sys.path.append(str(backend_path))
try:
from app.scrapers.israeli_hotel_scraper import IsraeliHotelScraper, HotelPriceResult
from app.scrapers.direct_hotel_scraper import DirectHotelScraper
SCRAPERS_AVAILABLE = True
except ImportError as e:
print(f"Warning: Could not import scrapers: {e}")
SCRAPERS_AVAILABLE = False
# Add rate limiter to app state
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# ============= Search Endpoint =============
class SearchRequest(BaseModel):
destination: str
check_in: str
num_nights: int = 1
adults: int = 2
children: int = 0
children_ages: Optional[list[int]] = None
source: str = "booking" # "booking" or "direct"
hotel_url: Optional[str] = None # Required if source="direct"
@app.post("/api/search")
async def search_hotels(request: SearchRequest):
"""
Real-time hotel search using IsraeliHotelScraper.
"""
# ... (existing content) ...
if not SCRAPERS_AVAILABLE:
raise HTTPException(status_code=501, detail="Scrapers not available on this environment")
try:
if request.source == "direct" and request.hotel_url:
# Direct scraping for specific hotel
results = await IsraeliHotelScraper.search_hotel(
hotel_name=request.destination,
check_in=request.check_in,
num_nights=request.num_nights,
adults=request.adults,
source="direct",
hotel_url=request.hotel_url
)
else:
# Standard destination search on Booking.com
results = await IsraeliHotelScraper.search_destination(
destination=request.destination,
check_in=request.check_in,
num_nights=request.num_nights,
adults=request.adults,
children=request.children,
children_ages=request.children_ages
)
# Convert results to dicts
return {
"results": [
{
"hotel_name": r.hotel_name,
"price": r.price,
"currency": r.currency,
"room_type": r.room_type,
"meal_plan": r.meal_plan,
"chain": r.chain
}
for r in results
]
}
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
class HotelValidationRequest(BaseModel):
booking_url: Optional[str] = None
website_url: Optional[str] = None
@app.post("/api/hotels/validate")
def validate_hotel(request: HotelValidationRequest):
"""
Validate Booking.com and/or Direct Website URLs.
Uses DrissionPage (blocking), so run as sync endpoint.
"""
if not SCRAPERS_AVAILABLE:
raise HTTPException(status_code=501, detail="Scrapers not available")
# Initialize scraper with headless=True
scraper = DirectHotelScraper(headless=True)
results = {"booking": None, "website": None}
try:
# Validate Website first (faster)
if request.website_url:
results["website"] = scraper.validate_website_url(request.website_url)
# Validate Booking (slower)
if request.booking_url:
results["booking"] = scraper.validate_booking_url(request.booking_url)
except Exception as e:
print(f"Validation error: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
scraper.quit()
return results
# ============= Environment Configuration =============
# SECURITY: All secrets MUST be set via environment variables in production
# No fallback values for sensitive data
def get_required_env(key: str, default: Optional[str] = None) -> str:
"""Get environment variable, raise error if required and not set"""
value = os.environ.get(key, default)
if value is None:
raise RuntimeError(f"Required environment variable {key} is not set")
return value
# Environment mode
ENV_MODE = os.environ.get("ENV_MODE", "development")
IS_PRODUCTION = ENV_MODE == "production"
# JWT Secret - REQUIRED in production, has dev fallback
if IS_PRODUCTION:
JWT_SECRET = get_required_env("JWT_SECRET")
else:
JWT_SECRET = os.environ.get("JWT_SECRET", "dev-only-secret-change-in-production")
# CORS Configuration - restricted in production
ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "").split(",") if IS_PRODUCTION else [
"http://localhost:3000",
"http://localhost:5173",
"http://127.0.0.1:3000",
"http://127.0.0.1:5173",
"https://innsight-ai.vercel.app",
"https://*.vercel.app",
]
# Filter out empty strings
ALLOWED_ORIGINS = [origin.strip() for origin in ALLOWED_ORIGINS if origin.strip()]
# CORS middleware with restricted origins
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS if ALLOWED_ORIGINS else ["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Authorization", "Content-Type", "Accept", "Origin", "X-Requested-With"],
)
security = HTTPBearer()
# ============= Security Headers Middleware =============
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Add security headers to all responses"""
async def dispatch(self, request: Request, call_next) -> Response:
response = await call_next(request)
# Prevent MIME type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# XSS Protection (legacy browsers)
response.headers["X-XSS-Protection"] = "1; mode=block"
# Referrer policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Content Security Policy for API
response.headers["Content-Security-Policy"] = "default-src 'none'; frame-ancestors 'none'"
# Only add HSTS in production (requires HTTPS)
if IS_PRODUCTION:
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response
app.add_middleware(SecurityHeadersMiddleware)
# ============= Environment Variables =============
POSTGRES_URL = os.environ.get("POSTGRES_URL", "")
NVIDIA_API_KEY = os.environ.get("NVIDIA_API_KEY", "")
# Microsoft OAuth Configuration
MS_CLIENT_ID = os.environ.get("MS_CLIENT_ID", "")
MS_CLIENT_SECRET = os.environ.get("MS_CLIENT_SECRET", "")
MS_TENANT_ID = os.environ.get("MS_TENANT_ID", "common") # Use 'common' for multi-tenant or specific tenant ID
MS_REDIRECT_URI = os.environ.get("MS_REDIRECT_URI", "")
# Google OAuth Configuration
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "")
GOOGLE_REDIRECT_URI = os.environ.get("GOOGLE_REDIRECT_URI", "")
ADMIN_EMAIL = os.environ.get("ADMIN_EMAIL", "[email protected]") # Admin email for access control
# TOTP Secret for admin 2FA
# SECURITY: In production, these should be stored in the database per user
# Using env vars with secure defaults for now
ADMIN_TOTP_SECRET = os.environ.get("ADMIN_TOTP_SECRET")
if not ADMIN_TOTP_SECRET and IS_PRODUCTION:
raise RuntimeError("ADMIN_TOTP_SECRET must be set in production")
elif not ADMIN_TOTP_SECRET:
ADMIN_TOTP_SECRET = "JBSWY3DPEHPK3PXP" # Dev fallback only
# ============= TOTP Functions =============
def get_totp_token(secret: str) -> str:
"""Generate current TOTP token"""
# Decode base32 secret
key = base64.b32decode(secret.upper() + '=' * (8 - len(secret) % 8) if len(secret) % 8 else secret.upper())
# Get current time step (30 second intervals)
counter = int(time.time() // 30)
# Pack counter as big-endian 8-byte integer
counter_bytes = struct.pack('>Q', counter)
# Generate HMAC-SHA1
hmac_hash = hmac.new(key, counter_bytes, hashlib.sha1).digest()
# Dynamic truncation
offset = hmac_hash[-1] & 0x0F
code = struct.unpack('>I', hmac_hash[offset:offset+4])[0] & 0x7FFFFFFF
# Return 6-digit code
return str(code % 1000000).zfill(6)
def verify_totp(secret: str, token: str) -> bool:
"""Verify TOTP token with 1 step tolerance"""
for offset in [-1, 0, 1]: # Allow 30 seconds before/after
key = base64.b32decode(secret.upper() + '=' * (8 - len(secret) % 8) if len(secret) % 8 else secret.upper())
counter = int(time.time() // 30) + offset
counter_bytes = struct.pack('>Q', counter)
hmac_hash = hmac.new(key, counter_bytes, hashlib.sha1).digest()
offset_val = hmac_hash[-1] & 0x0F
code = struct.unpack('>I', hmac_hash[offset_val:offset_val+4])[0] & 0x7FFFFFFF
expected = str(code % 1000000).zfill(6)
if token == expected:
return True
return False
# ============= Pydantic Models =============
class UserCreate(BaseModel):
email: EmailStr
password: str
full_name: str
@validator('full_name')
def sanitize_full_name(cls, v):
return html.escape(v)
class UserLogin(BaseModel):
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
email: str
full_name: str
created_at: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: UserResponse
class ChatMessage(BaseModel):
message: str
class ChatResponse(BaseModel):
response: str
class HotelCreate(BaseModel):
name: str
booking_url: Optional[str] = None
website_url: Optional[str] = None
class ComparisonRequest(BaseModel):
hotel_ids: list[int]
check_in: str
check_out: str
occupancy: str = "couple"
class AdminLoginRequest(BaseModel):
email: EmailStr
password: str
totp_code: str
class AdminLoginStep1Request(BaseModel):
email: EmailStr
password: str
class CreateUserRequest(BaseModel):
email: EmailStr
full_name: str
class UserWithCredentials(BaseModel):
id: int
email: str
full_name: str
username: str
password: str
totp_secret: str
created_at: str
is_admin: bool = False
class UserListResponse(BaseModel):
id: int
email: str
full_name: str
created_at: str
is_admin: bool
last_login: Optional[str] = None
# ============= Password Hashing =============
def hash_password(password: str) -> str:
"""Hash password using PBKDF2 with SHA256"""
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
return base64.b64encode(salt + key).decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""Verify password against hash"""
try:
decoded = base64.b64decode(hashed.encode('utf-8'))
salt = decoded[:32]
stored_key = decoded[32:]
new_key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
return hmac.compare_digest(stored_key, new_key)
except Exception:
return False
def generate_random_password(length: int = 12) -> str:
"""Generate a secure random password"""
alphabet = string.ascii_letters + string.digits + "!@#$%"
return ''.join(secrets.choice(alphabet) for _ in range(length))
def generate_totp_secret() -> str:
"""Generate a random Base32 TOTP secret"""
# Generate 20 random bytes (160 bits as recommended for TOTP)
random_bytes = secrets.token_bytes(20)
# Encode as base32 and remove padding
return base64.b32encode(random_bytes).decode('utf-8').rstrip('=')
def generate_username_from_email(email: str) -> str:
"""Generate username from email"""
username_base = email.split('@')[0].lower()
# Remove special characters
username = ''.join(c for c in username_base if c.isalnum() or c in '_-.')
return username
# ============= JWT Tokens =============
def create_jwt_token(payload: dict, expires_delta: timedelta = timedelta(days=7)) -> str:
"""Create a simple JWT token"""
header = {"alg": "HS256", "typ": "JWT"}
payload["exp"] = (datetime.utcnow() + expires_delta).timestamp()
payload["iat"] = datetime.utcnow().timestamp()
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip("=")
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip("=")
message = f"{header_b64}.{payload_b64}"
signature = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest()
signature_b64 = base64.urlsafe_b64encode(signature).decode().rstrip("=")
return f"{message}.{signature_b64}"
def verify_jwt_token(token: str) -> Optional[dict]:
"""Verify and decode JWT token"""
try:
parts = token.split(".")
if len(parts) != 3:
return None
header_b64, payload_b64, signature_b64 = parts
# Verify signature
message = f"{header_b64}.{payload_b64}"
expected_sig = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest()
expected_sig_b64 = base64.urlsafe_b64encode(expected_sig).decode().rstrip("=")
if not hmac.compare_digest(signature_b64, expected_sig_b64):
return None
# Decode payload
padding = 4 - len(payload_b64) % 4
if padding != 4:
payload_b64 += "=" * padding
payload = json.loads(base64.urlsafe_b64decode(payload_b64))
# Check expiration
if payload.get("exp", 0) < datetime.utcnow().timestamp():
return None
return payload
except Exception:
return None
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Get current authenticated user"""
token = credentials.credentials
payload = verify_jwt_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid or expired token")
return payload
# Optional security that doesn't require auth
security_optional = HTTPBearer(auto_error=False)
async def get_current_user_optional(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security_optional)) -> Optional[dict]:
"""Get current user if authenticated, otherwise return None (for demo/public endpoints)"""
if not credentials:
return None
token = credentials.credentials
payload = verify_jwt_token(token)
return payload
# ============= Database Setup =============
try:
# Try relative import first (for Vercel serverless)
try:
from .database import db, InMemoryDatabase, PostgresDatabase
except ImportError:
# Fall back to absolute import (for local development)
from database import db, InMemoryDatabase, PostgresDatabase
DB_IMPORT_ERROR = None
print(f"[DEBUG] Database imported successfully. db type: {type(db)}, db is None: {db is None}")
except Exception as e:
import traceback
DB_IMPORT_ERROR = f"{e}\n{traceback.format_exc()}"
print(f"[DEBUG] Database import failed: {DB_IMPORT_ERROR}")
# Create a fallback InMemoryDatabase if import fails
class InMemoryDatabase:
"""Fallback InMemoryDatabase when import fails"""
pass
class PostgresDatabase:
"""Fallback PostgresDatabase when import fails"""
pass
db = None
# Track if database has been initialized (for Vercel serverless cold starts)
_db_initialized = False
def seed_database_sync():
"""Synchronously seed the database with initial data"""
global _db_initialized, db
print(f"[DEBUG] seed_database_sync called. _db_initialized: {_db_initialized}, db: {db}, isinstance: {isinstance(db, InMemoryDatabase) if db else False}")
if _db_initialized:
print("[DEBUG] Already initialized, skipping")
return
if db is None:
print(f"Database import failed: {DB_IMPORT_ERROR}")
_db_initialized = True
return
# Seed admin users if using in-memory database
if isinstance(db, InMemoryDatabase):
print("[DEBUG] Seeding admin users (InMemoryDatabase)...")
db.seed_admin_users([
{
"email": ADMIN_EMAIL,
"password_hash": hash_password("InnSight2026!"),
"full_name": "Jack Amichai",
"totp_secret": ADMIN_TOTP_SECRET
},
{
"email": "[email protected]",
"password_hash": hash_password("InnSight2026Adi!"),
"full_name": "Adi Ohayon",
"totp_secret": "HXDMVJECJJWSRB3HWIZR4IFUGFTMXBOZ"
}
])
print("[DEBUG] Seeding regular users...")
db.seed_regular_users([
{
"email": "[email protected]",
"password_hash": hash_password("TestUser2026!"),
"full_name": "Test Client",
"totp_secret": "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ"
}
])
# Seed demo hotels
db.seed_demo_hotels([
{"name": "צובה", "booking_url": "https://booking.com/hotel/tzuba", "website_url": "https://tzuba.co.il"},
{"name": "רמת רחל", "booking_url": "https://booking.com/hotel/ramat-rachel", "website_url": "https://ramatrachel.co.il"},
{"name": "יד השמונה", "booking_url": "https://booking.com/hotel/yad-hashmona", "website_url": "https://yadhashmona.co.il"},
{"name": "יערים", "booking_url": "https://booking.com/hotel/yearim", "website_url": "https://yearim.co.il"},
{"name": "נווה אילן", "booking_url": "https://booking.com/hotel/neve-ilan", "website_url": "https://neveilan.co.il"},
])
print(f"Database seeded with {len(db._users)} users")
# Seed users in PostgresDatabase if they don't exist
elif isinstance(db, PostgresDatabase):
print("[DEBUG] Checking/seeding users in PostgresDatabase...")
import asyncio
async def seed_postgres_users():
admin_users = [
{
"email": ADMIN_EMAIL,
"password_hash": hash_password("InnSight2026!"),
"full_name": "Jack Amichai",
"is_admin": True,
"totp_secret": ADMIN_TOTP_SECRET
},
{
"email": "[email protected]",
"password_hash": hash_password("InnSight2026Adi!"),
"full_name": "Adi Ohayon",
"is_admin": True,
"totp_secret": "HXDMVJECJJWSRB3HWIZR4IFUGFTMXBOZ"
}
]
regular_users = [
{
"email": "[email protected]",
"password_hash": hash_password("TestUser2026!"),
"full_name": "Test Client",
"is_admin": False,
"totp_secret": "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ"
}
]
for user in admin_users + regular_users:
try:
# Check if user exists
existing = await db.get_user_by_email(user["email"])
if not existing:
await db.create_user(
email=user["email"],
password_hash=user["password_hash"],
full_name=user["full_name"],
is_admin=user["is_admin"],
totp_secret=user["totp_secret"]
)
print(f" Created user: {user['email']}")
else:
print(f" User exists: {user['email']}")
except Exception as e:
print(f" Error creating user {user['email']}: {e}")
# Run async function synchronously
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# We're in an async context, create a task
asyncio.create_task(seed_postgres_users())
else:
loop.run_until_complete(seed_postgres_users())
except RuntimeError:
# No event loop, create one
asyncio.run(seed_postgres_users())
_db_initialized = True
print("Database initialized successfully")
async def init_database():
"""Initialize database and seed data"""
global _db_initialized, db
if _db_initialized:
return # Already initialized
if db is None:
print(f"Database import failed: {DB_IMPORT_ERROR}")
_db_initialized = True
return
# Initialize tables
await db.init_tables()
# For InMemoryDatabase, seed is handled in seed_database_sync
if isinstance(db, InMemoryDatabase):
seed_database_sync()
# For PostgresDatabase, seed directly here with await
elif isinstance(db, PostgresDatabase):
print("[DEBUG] Seeding users in PostgresDatabase...")
admin_users = [
{
"email": ADMIN_EMAIL,
"password_hash": hash_password("InnSight2026!"),
"full_name": "Jack Amichai",
"is_admin": True,
"totp_secret": ADMIN_TOTP_SECRET
},
{
"email": "[email protected]",
"password_hash": hash_password("InnSight2026Adi!"),
"full_name": "Adi Ohayon",
"is_admin": True,
"totp_secret": "HXDMVJECJJWSRB3HWIZR4IFUGFTMXBOZ"
}
]
regular_users = [
{
"email": "[email protected]",
"password_hash": hash_password("TestUser2026!"),
"full_name": "Test Client",
"is_admin": False,
"totp_secret": "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ"
}
]
for user in admin_users + regular_users:
try:
# Check if user exists
existing = await db.get_user_by_email(user["email"])
if not existing:
await db.create_user(
email=user["email"],
password_hash=user["password_hash"],
full_name=user["full_name"],
is_admin=user["is_admin"],
totp_secret=user["totp_secret"]
)
print(f" Created user: {user['email']}")
else:
print(f" User exists: {user['email']}")
except Exception as e:
print(f" Error creating user {user['email']}: {e}")
_db_initialized = True
print("Database initialized successfully")
def sync_init_database():
"""Synchronous wrapper for database initialization (for module-level init)"""
global _db_initialized
if _db_initialized:
return
# Only run sync seeding for InMemoryDatabase
# PostgreSQL seeding is handled in the async init_database
if isinstance(db, InMemoryDatabase):
seed_database_sync()
# Initialize database at module load time (for Vercel serverless)
try:
sync_init_database()
except Exception as e:
print(f"Error during module-level db init: {e}")
# Middleware to ensure database is initialized on first request
# This is crucial for Vercel serverless where startup events may not run
@app.middleware("http")
async def ensure_db_initialized(request: Request, call_next):
global _db_initialized
if not _db_initialized:
await init_database()
response = await call_next(request)
return response
# Initialize database on startup (for local development)
@app.on_event("startup")
async def startup_event():
await init_database()
# Legacy compatibility - keep demo_users as a view into the database
# This will be removed once all code is migrated to use db directly
class DemoUsersProxy:
"""Proxy class to maintain backward compatibility with demo_users dict"""
def get(self, email: str, default=None):
# Simply use the module-level db variable
if isinstance(db, InMemoryDatabase):
return db._users.get(email, default)
return default
def __contains__(self, email: str) -> bool:
if isinstance(db, InMemoryDatabase):
return email in db._users
return False
def __setitem__(self, email: str, value: dict):
if isinstance(db, InMemoryDatabase):
db._users[email] = value
def items(self):
if isinstance(db, InMemoryDatabase):
return db._users.items()
return []
def values(self):
if isinstance(db, InMemoryDatabase):
return db._users.values()
return []
def __len__(self):
if isinstance(db, InMemoryDatabase):
return len(db._users)
return 0
demo_users = DemoUsersProxy()
# User login activity tracking
user_activity = {}
# ============= Async User Lookup Helper =============
async def get_user_by_email_async(email: str) -> Optional[dict]:
"""
Get user by email from any database type (InMemory or Postgres).
This is the primary way to look up users in async contexts.
"""
if db is None:
return None
if isinstance(db, InMemoryDatabase):
return db._users.get(email)
elif isinstance(db, PostgresDatabase):
return await db.get_user_by_email(email)
return None
async def list_all_users_async() -> list[dict]:
"""
List all users from any database type.
"""
if db is None:
return []
if isinstance(db, InMemoryDatabase):
return list(db._users.values())
elif isinstance(db, PostgresDatabase):
return await db.list_users()
return []
async def get_user_by_id_async(user_id: int) -> Optional[dict]:
"""
Get user by ID from any database type.
"""
if db is None:
return None
if isinstance(db, InMemoryDatabase):
for user in db._users.values():
if user["id"] == user_id:
return user
return None
elif isinstance(db, PostgresDatabase):
return await db.get_user_by_id(user_id)
return None
# ============= TOTP Setup Route =============
@app.get("/api/auth/totp-setup")
async def totp_setup():
"""Generate QR code for TOTP setup - ONE TIME USE ONLY"""
# Generate QR code URL for Google Authenticator / Microsoft Authenticator
account_name = "InnSight-AI Admin"
issuer = "InnSight-AI"
secret = ADMIN_TOTP_SECRET
# Create otpauth URL
otpauth_url = f"otpauth://totp/{issuer}:{account_name}?secret={secret}&issuer={issuer}&algorithm=SHA1&digits=6&period=30"
# Generate QR code using a simple HTML page with QR code library
html = f'''
<!DOCTYPE html>
<html dir="rtl" lang="he">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>InnSight-AI - הגדרת אימות דו-שלבי</title>
<script src="https://cdn.jsdelivr.net/npm/[email protected]/build/qrcode.min.js"></script>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #1a1a2e 0%, #4a1942 50%, #1a1a2e 100%);
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
padding: 20px;
}}
.container {{
background: rgba(255, 255, 255, 0.1);
backdrop-filter: blur(10px);
border-radius: 20px;
padding: 40px;
max-width: 500px;
width: 100%;
text-align: center;
border: 1px solid rgba(255, 255, 255, 0.2);
}}
h1 {{
color: #fff;
margin-bottom: 10px;
font-size: 28px;
}}
.subtitle {{
color: #a78bfa;
margin-bottom: 30px;
font-size: 16px;
}}
.qr-container {{
background: white;
padding: 20px;
border-radius: 15px;
display: inline-block;
margin: 20px 0;
}}
.instructions {{
color: #e5e5e5;
text-align: right;
margin: 20px 0;
line-height: 1.8;
}}
.instructions li {{
margin: 10px 0;
}}
.secret-box {{
background: rgba(167, 139, 250, 0.2);
border: 1px solid #a78bfa;
border-radius: 10px;
padding: 15px;
margin: 20px 0;
}}
.secret-label {{
color: #a78bfa;
font-size: 12px;
margin-bottom: 5px;
}}
.secret-code {{
color: #fff;
font-family: monospace;
font-size: 18px;
letter-spacing: 2px;
}}
.warning {{
background: rgba(239, 68, 68, 0.2);
border: 1px solid #ef4444;
border-radius: 10px;
padding: 15px;
color: #fca5a5;
margin-top: 20px;
font-size: 14px;
}}
.success {{
background: rgba(34, 197, 94, 0.2);
border: 1px solid #22c55e;
border-radius: 10px;
padding: 15px;
color: #86efac;
margin-top: 20px;
}}
</style>
</head>
<body>
<div class="container">
<h1>🔐 InnSight-AI</h1>
<p class="subtitle">הגדרת אימות דו-שלבי למנהל</p>
<div class="qr-container">
<canvas id="qrcode"></canvas>
</div>
<ol class="instructions">
<li>פתח את אפליקציית <strong>Microsoft Authenticator</strong></li>
<li>לחץ על <strong>"+"</strong> להוספת חשבון</li>
<li>בחר <strong>"חשבון אחר (Google, Facebook וכו')"</strong></li>
<li>סרוק את קוד ה-QR שלמעלה</li>
</ol>
<div class="secret-box">
<div class="secret-label">או הזן את הקוד הסודי ידנית:</div>
<div class="secret-code">{secret}</div>
</div>
<div class="success">
✅ לאחר הסריקה, האפליקציה תציג קוד בן 6 ספרות שמתחלף כל 30 שניות.<br>
השתמש בקוד זה בעת ההתחברות למערכת.
</div>
<div class="warning">
⚠️ <strong>אזהרה:</strong> שמור על הקוד הסודי במקום בטוח!<br>
דף זה מיועד לשימוש חד-פעמי בלבד.
</div>
</div>
<script>
QRCode.toCanvas(document.getElementById('qrcode'), '{otpauth_url}', {{
width: 200,
margin: 2,
color: {{
dark: '#1a1a2e',
light: '#ffffff'
}}
}});
</script>
</body>
</html>
'''
return HTMLResponse(content=html)
# ============= Auth Routes =============
@app.post("/api/auth/register", response_model=TokenResponse)
@limiter.limit("5/minute") # Rate limit: 5 registrations per minute per IP
async def register(request: Request, user: UserCreate):
"""Register a new user"""
# Check if user exists
if user.email in demo_users:
raise HTTPException(status_code=400, detail="Email already registered")
# Hash password
password_hash = hash_password(user.password)
# Create user
user_id = len(demo_users) + 1
created_at = datetime.utcnow().isoformat()
demo_users[user.email] = {
"id": user_id,
"email": user.email,
"password_hash": password_hash,
"full_name": user.full_name,
"created_at": created_at
}
# Create token
token = create_jwt_token({"user_id": user_id, "email": user.email})
return TokenResponse(
access_token=token,
user=UserResponse(
id=user_id,
email=user.email,
full_name=user.full_name,
created_at=created_at
)
)
@app.post("/api/auth/login", response_model=TokenResponse)
@limiter.limit("10/minute") # Rate limit: 10 login attempts per minute per IP
async def login(request: Request, credentials: UserLogin):
"""Login user - all users require TOTP 2FA"""
# Use async database lookup for both InMemory and Postgres
user = await get_user_by_email_async(credentials.email)
if not user or not verify_password(credentials.password, user["password_hash"]):
raise HTTPException(status_code=401, detail="Invalid email or password")
# All users require 2FA now
raise HTTPException(
status_code=403,
detail="2-step authentication required. Please provide TOTP code."
)
@app.post("/api/auth/login-2fa", response_model=TokenResponse)
@limiter.limit("10/minute") # Rate limit: 10 2FA attempts per minute per IP
async def login_with_2fa(request: Request, credentials: AdminLoginRequest):
"""Login any user with 2-step TOTP authentication"""
# Use async database lookup for both InMemory and Postgres
user = await get_user_by_email_async(credentials.email)
if not user:
raise HTTPException(status_code=401, detail="Invalid email or password")
# Verify password
if not verify_password(credentials.password, user["password_hash"]):
raise HTTPException(status_code=401, detail="Invalid email or password")
# Get user's TOTP secret
user_totp_secret = user.get("totp_secret", ADMIN_TOTP_SECRET)
# Verify TOTP code
if not verify_totp(user_totp_secret, credentials.totp_code):
raise HTTPException(status_code=401, detail="Invalid authentication code")
# Update last login (async for PostgresDatabase)
if isinstance(db, PostgresDatabase):
await db.update_user_last_login(credentials.email)
elif isinstance(db, InMemoryDatabase):
user["last_login"] = datetime.utcnow().isoformat()
# Track activity
if credentials.email not in user_activity:
user_activity[credentials.email] = []
user_activity[credentials.email].append({
"action": "login",
"timestamp": datetime.utcnow().isoformat()
})
token = create_jwt_token({
"user_id": user["id"],
"email": user["email"],
"is_admin": user.get("is_admin", False)
})
return TokenResponse(
access_token=token,
user=UserResponse(
id=user["id"],
email=user["email"],
full_name=user["full_name"],
created_at=user["created_at"]
)
)
@app.post("/api/auth/admin-login", response_model=TokenResponse)
@limiter.limit("5/minute") # Rate limit: 5 admin login attempts per minute per IP
async def admin_login(request: Request, credentials: AdminLoginRequest):
"""Admin login with 2-step TOTP authentication"""
# Use async database lookup for both InMemory and Postgres
user = await get_user_by_email_async(credentials.email)
if not user:
raise HTTPException(status_code=401, detail="Invalid email or password")
# Must be admin
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Not an admin account")
# Verify password
if not verify_password(credentials.password, user["password_hash"]):
raise HTTPException(status_code=401, detail="Invalid email or password")
# Get admin's TOTP secret from their user record (or fallback to global admin secret)
admin_totp_secret = user.get("totp_secret", ADMIN_TOTP_SECRET)
# Verify TOTP code
if not verify_totp(admin_totp_secret, credentials.totp_code):
raise HTTPException(status_code=401, detail="Invalid authentication code")
token = create_jwt_token({"user_id": user["id"], "email": user["email"], "is_admin": True})
return TokenResponse(
access_token=token,
user=UserResponse(
id=user["id"],
email=user["email"],
full_name=user["full_name"],
created_at=user["created_at"]
)
)
@app.get("/api/auth/me", response_model=UserResponse)
async def get_me(current_user: dict = Depends(get_current_user)):
"""Get current user info"""
# Use async database lookup for both InMemory and Postgres
user = await get_user_by_email_async(current_user.get("email"))
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse(
id=user["id"],
email=user["email"],
full_name=user["full_name"],
created_at=user["created_at"]
)
# ============= Admin User Management Routes =============
async def verify_admin(current_user: dict = Depends(get_current_user)):
"""Verify current user is admin"""
if not current_user.get("is_admin"):
raise HTTPException(status_code=403, detail="Admin access required")
return current_user
@app.post("/api/admin/users", response_model=UserWithCredentials)
async def create_user(user_data: CreateUserRequest, admin: dict = Depends(verify_admin)):
"""Create a new user with generated credentials and TOTP secret"""
# Check if email already exists using async lookup
existing_user = await get_user_by_email_async(user_data.email)
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
# Generate credentials
username = generate_username_from_email(user_data.email)
password = generate_random_password()
totp_secret = generate_totp_secret()
# Hash password
password_hash = hash_password(password)
created_at = datetime.utcnow().isoformat()
# Create user in appropriate database
if isinstance(db, PostgresDatabase):
new_user = await db.create_user(
email=user_data.email,
password_hash=password_hash,
full_name=user_data.full_name,
is_admin=False,
totp_secret=totp_secret
)
user_id = new_user["id"]
created_at = new_user.get("created_at", created_at)
elif isinstance(db, InMemoryDatabase):
user_id = len(db._users) + 1
db._users[user_data.email] = {
"id": user_id,
"email": user_data.email,
"password_hash": password_hash,
"full_name": user_data.full_name,
"created_at": created_at,
"is_admin": False,
"totp_secret": totp_secret,
"last_login": None,
"username": username,
}
else:
raise HTTPException(status_code=500, detail="Database not configured")
return UserWithCredentials(
id=user_id,
email=user_data.email,
full_name=user_data.full_name,
username=username,
password=password,
totp_secret=totp_secret,
created_at=created_at,
is_admin=False
)
@app.get("/api/admin/users", response_model=list[UserListResponse])
async def list_users(admin: dict = Depends(verify_admin)):
"""List all users (admin only)"""
# Use async database lookup for both InMemory and Postgres
all_users = await list_all_users_async()
users = []
for user in all_users:
users.append(UserListResponse(
id=user["id"],
email=user["email"],
full_name=user["full_name"],
created_at=user["created_at"],
is_admin=user.get("is_admin", False),
last_login=user.get("last_login")
))
return users
@app.delete("/api/admin/users/{user_id}")
async def delete_user(user_id: int, admin: dict = Depends(verify_admin)):
"""Delete a user (admin only)"""
# Use async database lookup for both InMemory and Postgres
user = await get_user_by_id_async(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Don't allow deleting admin
if user.get("is_admin"):
raise HTTPException(status_code=400, detail="Cannot delete admin user")
# Delete from appropriate database
if isinstance(db, PostgresDatabase):
await db.delete_user(user_id)
elif isinstance(db, InMemoryDatabase):
for email, u in list(db._users.items()):
if u["id"] == user_id:
del db._users[email]
break
return {"message": "User deleted successfully"}
@app.get("/api/admin/users/{user_id}/totp-setup")
async def get_user_totp_setup(user_id: int, admin: dict = Depends(verify_admin)):
"""Get TOTP setup info for a specific user (admin only)"""
# Use async database lookup for both InMemory and Postgres
target_user = await get_user_by_id_async(user_id)
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
totp_secret = target_user.get("totp_secret", "")
account_name = target_user["email"]
issuer = "InnSight-AI"
otpauth_url = f"otpauth://totp/{issuer}:{account_name}?secret={totp_secret}&issuer={issuer}&algorithm=SHA1&digits=6&period=30"
# Generate HTML QR page
html = f'''
<!DOCTYPE html>
<html dir="rtl" lang="he">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>InnSight-AI - הגדרת 2FA עבור {target_user["full_name"]}</title>
<script src="https://cdn.jsdelivr.net/npm/[email protected]/build/qrcode.min.js"></script>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #f8fafc 0%, #e2e8f0 100%);
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
padding: 20px;
}}
.container {{
background: white;
border-radius: 20px;
padding: 40px;
max-width: 500px;
width: 100%;
text-align: center;
box-shadow: 0 10px 40px rgba(0,0,0,0.1);
border: 1px solid #e2e8f0;
}}
h1 {{ color: #1e293b; margin-bottom: 10px; font-size: 28px; }}
.subtitle {{ color: #7c3aed; margin-bottom: 30px; font-size: 16px; }}
.user-name {{ color: #334155; font-size: 18px; margin-bottom: 20px; }}
.qr-container {{
background: #f8fafc;
padding: 20px;
border-radius: 15px;
display: inline-block;
margin: 20px 0;
border: 1px solid #e2e8f0;
}}
.instructions {{
color: #475569;
text-align: right;
margin: 20px 0;
line-height: 1.8;
}}
.instructions li {{ margin: 10px 0; }}
.secret-box {{
background: #f3e8ff;
border: 1px solid #c4b5fd;
border-radius: 10px;
padding: 15px;
margin: 20px 0;
}}
.secret-label {{ color: #7c3aed; font-size: 12px; margin-bottom: 5px; }}
.secret-code {{ color: #1e293b; font-family: monospace; font-size: 18px; letter-spacing: 2px; }}
.warning {{
background: #fef2f2;
border: 1px solid #fecaca;
border-radius: 10px;
padding: 15px;
color: #dc2626;
margin-top: 20px;
font-size: 14px;
}}
</style>
</head>
<body>
<div class="container">
<h1>🔐 InnSight-AI</h1>
<p class="subtitle">הגדרת אימות דו-שלבי</p>
<p class="user-name">עבור: <strong>{target_user["full_name"]}</strong></p>
<div class="qr-container">
<canvas id="qrcode"></canvas>
</div>
<ol class="instructions">
<li>פתח את אפליקציית <strong>Microsoft Authenticator</strong></li>
<li>לחץ על <strong>"+"</strong> להוספת חשבון</li>
<li>בחר <strong>"חשבון אחר"</strong></li>
<li>סרוק את קוד ה-QR שלמעלה</li>
</ol>
<div class="secret-box">
<div class="secret-label">קוד סודי (להזנה ידנית):</div>
<div class="secret-code">{totp_secret}</div>
</div>
<div class="warning">
⚠️ שתף עמוד זה עם המשתמש באופן מאובטח.<br>
הקוד הסודי יש לשמור במקום בטוח!
</div>
</div>
<script>
QRCode.toCanvas(document.getElementById('qrcode'), '{otpauth_url}', {{
width: 200,
margin: 2,
color: {{ dark: '#1e293b', light: '#ffffff' }}
}});
</script>
</body>
</html>
'''
return HTMLResponse(content=html)
@app.get("/api/admin/analytics")
async def get_admin_analytics(admin: dict = Depends(verify_admin)):
"""Get user analytics for admin dashboard"""
total_users = len(demo_users)
active_users = sum(1 for u in demo_users.values() if u.get("last_login"))
admin_count = sum(1 for u in demo_users.values() if u.get("is_admin"))
# Get recent activity
recent_logins = []
for email, activities in user_activity.items():
for activity in activities[-5:]: # Last 5 activities per user
recent_logins.append({
"email": email,
"full_name": demo_users.get(email, {}).get("full_name", "Unknown"),
**activity
})
# Sort by timestamp descending
recent_logins.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
return {
"total_users": total_users,
"active_users": active_users,
"admin_count": admin_count,
"regular_users": total_users - admin_count,
"recent_activity": recent_logins[:20] # Last 20 activities
}
# ============= Microsoft OAuth Routes =============
@app.get("/api/auth/microsoft/login")
async def microsoft_login():
"""Initiate Microsoft OAuth flow"""
if not MS_CLIENT_ID:
raise HTTPException(
status_code=500,
detail="Microsoft OAuth not configured. Please contact administrator."
)
# Build the authorization URL
redirect_uri = MS_REDIRECT_URI or "https://innsight-ai.vercel.app/api/auth/microsoft/callback"
scope = "openid email profile"
state = base64.urlsafe_b64encode(os.urandom(16)).decode().rstrip("=")
auth_url = (
f"https://login.microsoftonline.com/{MS_TENANT_ID}/oauth2/v2.0/authorize?"
f"client_id={MS_CLIENT_ID}&"
f"response_type=code&"
f"redirect_uri={redirect_uri}&"
f"scope={scope}&"
f"state={state}&"
f"response_mode=query"
)
return {"auth_url": auth_url}
@app.get("/api/auth/microsoft/callback")
async def microsoft_callback(
code: Optional[str] = None,
error: Optional[str] = None,
error_description: Optional[str] = None
):
"""Handle Microsoft OAuth callback"""
if error:
raise HTTPException(
status_code=400,
detail=f"Microsoft OAuth error: {error_description or error}"
)
if not code:
raise HTTPException(status_code=400, detail="No authorization code received")
if not MS_CLIENT_ID or not MS_CLIENT_SECRET:
raise HTTPException(status_code=500, detail="Microsoft OAuth not configured")
redirect_uri = MS_REDIRECT_URI or "https://innsight-ai.vercel.app/api/auth/microsoft/callback"
# Exchange code for tokens
async with httpx.AsyncClient() as client:
token_response = await client.post(
f"https://login.microsoftonline.com/{MS_TENANT_ID}/oauth2/v2.0/token",
data={
"client_id": MS_CLIENT_ID,
"client_secret": MS_CLIENT_SECRET,
"code": code,
"redirect_uri": redirect_uri,
"grant_type": "authorization_code",
"scope": "openid email profile"
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if token_response.status_code != 200:
raise HTTPException(
status_code=400,
detail="Failed to exchange code for token"
)
tokens = token_response.json()
access_token = tokens.get("access_token")
# Get user info from Microsoft Graph
user_response = await client.get(
"https://graph.microsoft.com/v1.0/me",
headers={"Authorization": f"Bearer {access_token}"}
)
if user_response.status_code != 200:
raise HTTPException(status_code=400, detail="Failed to get user info")
ms_user = user_response.json()
email = ms_user.get("mail") or ms_user.get("userPrincipalName", "")
full_name = ms_user.get("displayName", "")
# Check if user is authorized (admin email or registered user)
# For now, allow admin email and any users that admin has added
if email.lower() not in [ADMIN_EMAIL.lower()] and email not in demo_users:
raise HTTPException(
status_code=403,
detail="Access denied. Please contact administrator for access."
)
# Create or update user
if email not in demo_users:
user_id = len(demo_users) + 1
created_at = datetime.utcnow().isoformat()
demo_users[email] = {
"id": user_id,
"email": email,
"password_hash": "", # No password for OAuth users
"full_name": full_name or email.split("@")[0],
"created_at": created_at,
"auth_provider": "microsoft"
}
user = demo_users[email]
# Create JWT token
jwt_token = create_jwt_token({"user_id": user["id"], "email": user["email"]})
# Build user data for frontend
frontend_url = "https://innsight-ai.vercel.app"
base64.urlsafe_b64encode(
json.dumps({
'id': user['id'],
'email': user['email'],
'full_name': user['full_name'],
'created_at': user['created_at']
}).encode()
).decode()
# Return HTML that redirects to frontend
html_response = f'''
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Authenticating...</title>
<script>
localStorage.setItem('token', '{jwt_token}');
localStorage.setItem('user', JSON.stringify({json.dumps({'id': user['id'], 'email': user['email'], 'full_name': user['full_name'], 'created_at': user['created_at']})}));
window.location.href = '{frontend_url}/dashboard';
</script>
</head>
<body>
<p>Authenticating... If you are not redirected, <a href="{frontend_url}/dashboard">click here</a>.</p>
</body>
</html>
'''
return HTMLResponse(content=html_response)
# ============= Google OAuth Routes =============
@app.get("/api/auth/google/login")
async def google_login():
"""Initiate Google OAuth flow"""
if not GOOGLE_CLIENT_ID:
raise HTTPException(
status_code=500,
detail="Google OAuth not configured. Please contact administrator."
)
# Build the authorization URL
redirect_uri = GOOGLE_REDIRECT_URI or "https://innsight-ai.vercel.app/api/auth/google/callback"
scope = "openid email profile"
state = base64.urlsafe_b64encode(os.urandom(16)).decode().rstrip("=")
auth_url = (
f"https://accounts.google.com/o/oauth2/v2/auth?"
f"client_id={GOOGLE_CLIENT_ID}&"
f"redirect_uri={redirect_uri}&"
f"response_type=code&"
f"scope={scope}&"
f"state={state}&"
f"access_type=offline&"
f"prompt=consent"
)
return {"auth_url": auth_url}
@app.get("/api/auth/google/callback")
async def google_callback(
code: Optional[str] = None,
error: Optional[str] = None,
error_description: Optional[str] = None
):
"""Handle Google OAuth callback"""
if error:
raise HTTPException(
status_code=400,
detail=f"Google OAuth error: {error_description or error}"
)
if not code:
raise HTTPException(status_code=400, detail="No authorization code received")
if not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET:
raise HTTPException(status_code=500, detail="Google OAuth not configured")
redirect_uri = GOOGLE_REDIRECT_URI or "https://innsight-ai.vercel.app/api/auth/google/callback"
# Exchange code for tokens
async with httpx.AsyncClient() as client:
token_response = await client.post(
"https://oauth2.googleapis.com/token",
data={
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"code": code,
"redirect_uri": redirect_uri,
"grant_type": "authorization_code"
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if token_response.status_code != 200:
raise HTTPException(
status_code=400,
detail="Failed to exchange code for token"
)
tokens = token_response.json()
access_token = tokens.get("access_token")
# Get user info from Google
user_response = await client.get(
"https://www.googleapis.com/oauth2/v2/userinfo",
headers={"Authorization": f"Bearer {access_token}"}
)
if user_response.status_code != 200:
raise HTTPException(status_code=400, detail="Failed to get user info")
google_user = user_response.json()
email = google_user.get("email", "")
full_name = google_user.get("name", "")
# Check if user is authorized (admin email or registered user)
if email.lower() not in [ADMIN_EMAIL.lower()] and email not in demo_users:
raise HTTPException(
status_code=403,
detail="Access denied. Please contact administrator for access."
)
# Create or update user
if email not in demo_users:
user_id = len(demo_users) + 1
created_at = datetime.utcnow().isoformat()
totp_secret = generate_totp_secret()
demo_users[email] = {
"id": user_id,
"email": email,
"password_hash": "", # No password for OAuth users
"full_name": full_name or email.split("@")[0],
"created_at": created_at,
"auth_provider": "google",
"totp_secret": totp_secret,
"is_admin": email.lower() == ADMIN_EMAIL.lower()
}
user = demo_users[email]
# Create JWT token
jwt_token = create_jwt_token({
"user_id": user["id"],
"email": user["email"],
"is_admin": user.get("is_admin", False)
})
# Build user data for frontend
frontend_url = "https://innsight-ai.vercel.app"
user_data = {
'id': user['id'],
'email': user['email'],
'full_name': user['full_name'],
'created_at': user['created_at'],
'is_admin': user.get('is_admin', False)
}
# Return HTML that redirects to frontend
html_response = f'''
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Authenticating...</title>
<script>
localStorage.setItem('token', '{jwt_token}');
localStorage.setItem('user', JSON.stringify({json.dumps(user_data)}));
localStorage.setItem('auth-storage', JSON.stringify({{
"state": {{
"token": "{jwt_token}",
"user": {json.dumps(user_data)},
"isAuthenticated": true,
"isAdmin": {'true' if user.get('is_admin', False) else 'false'}
}},
"version": 0
}}));
window.location.href = '{frontend_url}/dashboard';
</script>
</head>
<body>
<p>Authenticating... If you are not redirected, <a href="{frontend_url}/dashboard">click here</a>.</p>
</body>
</html>
'''
return HTMLResponse(content=html_response)
# ============= Chatbot Route (NVIDIA Nemotron) =============
@app.post("/api/chat", response_model=ChatResponse)
@limiter.limit("30/minute") # Rate limit: 30 chat messages per minute per IP
async def chat(request: Request, message: ChatMessage, current_user: Optional[dict] = Depends(get_current_user_optional)):
"""Chat with NVIDIA Nemotron AI assistant"""
# If user is not authenticated, return demo response
if not current_user:
# Return a helpful demo response
demo_responses = {
"מה": "אני InnSight-AI, העוזר החכם שלך לניהול הכנסות מלונאיות. אני יכול לעזור לך להבין מגמות מחירים, להשוות מתחרים, ולתת המלצות לאופטימיזציה של מחירים.",
"איך": "אתה יכול להשתמש בפלטפורמה כדי: 1) להוסיף מלונות מתחרים למעקב, 2) לצפות בהשוואות מחירים בזמן אמת, 3) לייצא דוחות לאקסל, 4) לקבל תובנות על מגמות השוק.",
"מחיר": "לניתוח מחירים, עבור לדף 'השוואת מחירים' ובחר את המלונות והתאריכים להשוואה. המערכת תציג גרפים ותובנות על פערי המחירים.",
"RevPAR": "כדי להעלות RevPAR, מומלץ: 1) לבחון את התמחור הדינמי ביחס לתפוסה, 2) להציע שדרוגים במעמד הצ'ק-אין, 3) לשווק חבילות ערך מוסף בימים חלשים.",
"default": "שלום! אני העוזר החכם של InnSight-AI. אני יכול לעזור לך עם שאלות על ניהול הכנסות, השוואת מחירים, ואופטימיזציה של תעריפים. במה אוכל לסייע?"
}
response_text = demo_responses["default"]
for key, value in demo_responses.items():
if key in message.message: # Simple string matching
response_text = value
break
return ChatResponse(response=response_text)
# If authenticated, use NVIDIA Nemotron API (if key exists)
if not NVIDIA_API_KEY:
# Fallback to demo if key missing even for auth users
return ChatResponse(response="NVIDIA API Key not configured. Using demo mode.")
# Call NVIDIA Nemotron API
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://integrate.api.nvidia.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": "nvidia/llama-3.1-nemotron-70b-instruct",
"messages": [
{
"role": "system",
"content": """אתה עוזר חכם בשם InnSight-AI המתמחה בניהול הכנסות מלונאיות (Revenue Management) בישראל.
תפקידך לסייע למנהלי מלונות להבין מגמות מחירים, לנתח מתחרים, ולתת המלצות לאופטימיזציה של תעריפים.
ענה תמיד בעברית, בצורה מקצועית אך ידידותית.
התמקד בנושאים: תמחור דינמי, ניתוח מתחרים, עונתיות, תפוסה, ו-RevPAR."""
},
{
"role": "user",
"content": message.message
}
],
"temperature": 0.7,
"max_tokens": 1024
},
timeout=30.0
)
if response.status_code == 200:
data = response.json()
return ChatResponse(response=data["choices"][0]["message"]["content"])
else:
raise HTTPException(status_code=500, detail="AI service error")
except httpx.TimeoutException as exc:
raise HTTPException(status_code=504, detail="AI service timeout") from exc
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
# ============= Dashboard Routes =============
@app.get("/api/dashboard/stats")
async def get_dashboard_stats(current_user: dict = Depends(get_current_user)):
"""Get dashboard statistics"""
return {
"total_hotels": 5,
"total_comparisons": 24,
"avg_price_difference": 12.5,
"last_scrape": datetime.utcnow().isoformat(),
"hotels_tracked": [
{"name": "צובה", "current_price": 850, "change": 5.2},
{"name": "רמת רחל", "current_price": 920, "change": -2.1},
{"name": "יד השמונה", "current_price": 780, "change": 0},
{"name": "יערים", "current_price": 890, "change": 3.5},
{"name": "נווה אילן", "current_price": 950, "change": -1.8}
]
}
@app.get("/api/dashboard/price-trends")
async def get_price_trends(current_user: dict = Depends(get_current_user)):
"""Get price trends for charts"""
return {
"labels": ["ינואר", "פברואר", "מרץ", "אפריל", "מאי", "יוני"],
"datasets": [
{"hotel": "צובה", "data": [820, 850, 880, 850, 870, 850]},
{"hotel": "רמת רחל", "data": [900, 920, 950, 930, 940, 920]},
{"hotel": "יד השמונה", "data": [750, 780, 800, 770, 790, 780]},
{"hotel": "יערים", "data": [870, 890, 920, 900, 910, 890]},
{"hotel": "נווה אילן", "data": [940, 950, 980, 960, 970, 950]}
]
}
@app.get("/api/dashboard/occupancy")
async def get_occupancy_data(current_user: dict = Depends(get_current_user)):
"""Get occupancy comparison data"""
return {
"couple": {"avg": 850, "min": 750, "max": 950},
"couple_1_child": {"avg": 950, "min": 850, "max": 1050},
"couple_2_children": {"avg": 1050, "min": 950, "max": 1150}
}
# ============= Hotels Routes =============
# Demo hotels data - uses database proxy
class DemoHotelsProxy:
"""Proxy class to maintain backward compatibility with demo_hotels list"""
def __iter__(self):
from database import db as _db
if isinstance(_db, InMemoryDatabase):
return iter(_db._hotels)
return iter([])
def __len__(self):
from database import db as _db
if isinstance(_db, InMemoryDatabase):
return len(_db._hotels)
return 0
def append(self, hotel: dict):
from database import db as _db
if isinstance(_db, InMemoryDatabase):
_db._hotels.append(hotel)
demo_hotels = DemoHotelsProxy()
@app.get("/api/hotels")
async def get_hotels(current_user: dict = Depends(get_current_user)):
"""Get all hotels (filtered by ownership)"""
user_id = current_user["user_id"]
# Check if admin to allow seeing all hotels
user = await get_user_by_id_async(user_id)
if user and user.get("is_admin"):
hotels = await db.get_hotels(owner_id=None)
else:
hotels = await db.get_hotels(owner_id=user_id)
return hotels
@app.post("/api/hotels")
async def create_hotel(hotel: HotelCreate, current_user: dict = Depends(get_current_user)):
"""Create a new hotel"""
new_hotel = await db.create_hotel(
name=hotel.name,
owner_id=current_user["user_id"],
booking_url=hotel.booking_url,
website_url=hotel.website_url
)
return new_hotel
@app.delete("/api/hotels/{hotel_id}")
async def delete_hotel(hotel_id: int, current_user: dict = Depends(get_current_user)):
"""Delete a hotel"""
user_id = current_user["user_id"]
# Check if admin
user = await get_user_by_id_async(user_id)
owner_id = user_id
if user and user.get("is_admin"):
owner_id = None
success = await db.delete_hotel(hotel_id, owner_id=owner_id)
if not success:
raise HTTPException(status_code=404, detail="Hotel not found or access denied")
return {"message": "Hotel deleted"}
# ============= Comparison Routes =============
@app.post("/api/comparisons/compare")
async def compare_hotels(request: ComparisonRequest, current_user: dict = Depends(get_current_user)):
"""Compare hotel prices"""
# Demo comparison data
return {
"check_in": request.check_in,
"check_out": request.check_out,
"occupancy": request.occupancy,
"results": [
{
"hotel_id": 1,
"hotel_name": "צובה",
"prices": {
"booking": {"BB": 850, "HB": 1050, "RO": 750},
"website": {"BB": 820, "HB": 1020, "RO": 720}
}
},
{
"hotel_id": 2,
"hotel_name": "רמת רחל",
"prices": {
"booking": {"BB": 920, "HB": 1120, "RO": 820},
"website": {"BB": 900, "HB": 1100, "RO": 800}
}
},
{
"hotel_id": 3,
"hotel_name": "יד השמונה",
"prices": {
"booking": {"BB": 780, "HB": 980, "RO": 680},
"website": {"BB": 760, "HB": 960, "RO": 660}
}
}
]
}
# ============= Export Routes =============
class ExportRequest(BaseModel):
"""Export request with validation"""
data: list[dict]
format: str = "xlsx" # xlsx, csv
filename: Optional[str] = None
class ExportResponse(BaseModel):
"""Export response"""
download_url: str
filename: str
format: str
MAX_EXPORT_ITEMS = 10000 # Maximum number of items to export
ALLOWED_EXPORT_FORMATS = {"xlsx", "csv"}
@app.post("/api/export", response_model=ExportResponse)
async def export_data(
request: ExportRequest,
current_user: dict = Depends(get_current_user)
):
"""
Export data to Excel or CSV format.
Requires authentication.
Validates format and limits dataset size to prevent abuse.
"""
# Validate format
if request.format not in ALLOWED_EXPORT_FORMATS:
raise HTTPException(
status_code=422,
detail=f"Invalid format. Must be one of: {', '.join(ALLOWED_EXPORT_FORMATS)}"
)
# Validate data is not empty
if not request.data:
raise HTTPException(
status_code=422,
detail="Data cannot be empty"
)
# Validate data size
if len(request.data) > MAX_EXPORT_ITEMS:
raise HTTPException(
status_code=422,
detail=f"Data too large. Maximum {MAX_EXPORT_ITEMS} items allowed"
)
# Generate filename
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
base_filename = request.filename or f"export_{timestamp}"
# Sanitize filename - remove path separators and special chars
safe_filename = "".join(c for c in base_filename if c.isalnum() or c in "_-")[:50]
filename = f"{safe_filename}.{request.format}"
# In a real implementation, this would generate the file and return a download URL
download_url = f"/api/export/download/{filename}"
return ExportResponse(
download_url=download_url,
filename=filename,
format=request.format
)
@app.get("/api/export/download/{filename}")
async def download_export(
filename: str,
current_user: dict = Depends(get_current_user)
):
"""
Download an exported file.
Requires authentication to prevent unauthorized access to exports.
"""
# Security: Prevent path traversal
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
# In a real implementation, this would return the actual file
raise HTTPException(status_code=404, detail="File not found")
# ============= Health Check =============
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
global _db_initialized
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"db_initialized": _db_initialized,
"db_type": type(db).__name__ if db else "None"
}
@app.get("/api/debug")
async def debug_info():
"""Debug endpoint to check system status"""
global _db_initialized, DB_IMPORT_ERROR
try:
from database import PG8000_AVAILABLE
except Exception:
PG8000_AVAILABLE = False
return {
"status": "ok",
"db_initialized": _db_initialized,
"db_type": type(db).__name__ if db else "None",
"db_import_error": DB_IMPORT_ERROR,
"pg8000_available": PG8000_AVAILABLE,
"is_production": IS_PRODUCTION,
"env_mode": ENV_MODE
}
# End of file - Vercel automatically finds 'app' variable