Spaces:
Sleeping
Sleeping
| import base64 | |
| import hashlib | |
| import hmac | |
| import html | |
| import json | |
| import os | |
| import secrets | |
| import string | |
| import struct | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| import httpx | |
| from fastapi import Depends, FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import HTMLResponse, Response | |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | |
| from pydantic import BaseModel, EmailStr, validator | |
| from slowapi import Limiter, _rate_limit_exceeded_handler | |
| from slowapi.errors import RateLimitExceeded | |
| from slowapi.util import get_remote_address | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| # ============= Rate Limiting Setup ============= | |
| # Disable rate limiting in test mode | |
| TESTING = os.environ.get("TESTING", "false").lower() == "true" | |
| def get_real_key(request: Request) -> str: | |
| """Get rate limit key - returns test bypass or real IP""" | |
| if TESTING: | |
| return "test-bypass" # All tests share same key but limits are high | |
| return get_remote_address(request) | |
| limiter = Limiter(key_func=get_real_key, enabled=not TESTING) | |
| # Initialize FastAPI | |
| app = FastAPI(title="InnSight-AI API", version="1.0.0") | |
| # Simple ping endpoint that works even if everything else fails | |
| async def ping(): | |
| """Minimal test endpoint""" | |
| return {"ping": "pong"} | |
| # Debug endpoint to check database state | |
| async def debug_db(): | |
| """Debug endpoint to check database state""" | |
| global db, _db_initialized, DB_IMPORT_ERROR | |
| user_count = 0 | |
| user_emails = [] | |
| try: | |
| if isinstance(db, InMemoryDatabase): | |
| user_emails = list(db._users.keys()) | |
| user_count = len(user_emails) | |
| elif db is not None: | |
| # For PostgresDatabase, try to get users | |
| try: | |
| rows = db._execute_query("SELECT email FROM users LIMIT 10") | |
| user_emails = [r["email"] for r in rows] | |
| user_count = len(rows) | |
| except: | |
| pass | |
| except: | |
| pass | |
| return { | |
| "db_type": type(db).__name__ if db else "None", | |
| "db_initialized": _db_initialized, | |
| "db_import_error": DB_IMPORT_ERROR, | |
| "user_count": user_count, | |
| "user_emails": user_emails | |
| } | |
| # Endpoint to seed database (protected by secret) | |
| async def seed_db(secret: str = ""): | |
| """Seed the database with initial users (requires secret)""" | |
| global db | |
| # Simple secret check to prevent abuse | |
| expected_secret = os.environ.get("SEED_SECRET", "innsight-seed-2026") | |
| if secret != expected_secret: | |
| raise HTTPException(status_code=403, detail="Invalid secret") | |
| if db is None: | |
| raise HTTPException(status_code=500, detail="Database not initialized") | |
| results = [] | |
| # Initialize tables first (creates them if they don't exist) | |
| try: | |
| await db.init_tables() | |
| results.append({"action": "init_tables", "status": "success"}) | |
| except Exception as e: | |
| results.append({"action": "init_tables", "status": f"error: {e}"}) | |
| admin_users = [ | |
| { | |
| "email": ADMIN_EMAIL, | |
| "password_hash": hash_password("InnSight2026!"), | |
| "full_name": "Jack Amichai", | |
| "is_admin": True, | |
| "totp_secret": ADMIN_TOTP_SECRET | |
| }, | |
| { | |
| "email": "[email protected]", | |
| "password_hash": hash_password("InnSight2026Adi!"), | |
| "full_name": "Adi Ohayon", | |
| "is_admin": True, | |
| "totp_secret": "HXDMVJECJJWSRB3HWIZR4IFUGFTMXBOZ" | |
| } | |
| ] | |
| regular_users = [ | |
| { | |
| "email": "[email protected]", | |
| "password_hash": hash_password("TestUser2026!"), | |
| "full_name": "Test Client", | |
| "is_admin": False, | |
| "totp_secret": "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ" | |
| } | |
| ] | |
| for user in admin_users + regular_users: | |
| try: | |
| existing = await db.get_user_by_email(user["email"]) | |
| if not existing: | |
| await db.create_user( | |
| email=user["email"], | |
| password_hash=user["password_hash"], | |
| full_name=user["full_name"], | |
| is_admin=user["is_admin"], | |
| totp_secret=user["totp_secret"] | |
| ) | |
| results.append({"email": user["email"], "status": "created"}) | |
| else: | |
| results.append({"email": user["email"], "status": "exists"}) | |
| except Exception as e: | |
| results.append({"email": user["email"], "status": f"error: {e}"}) | |
| return {"results": results} | |
| # Add backend to path to import scrapers | |
| import sys | |
| from pathlib import Path | |
| backend_path = Path(__file__).parent.parent / "backend" | |
| sys.path.append(str(backend_path)) | |
| try: | |
| from app.scrapers.israeli_hotel_scraper import IsraeliHotelScraper, HotelPriceResult | |
| from app.scrapers.direct_hotel_scraper import DirectHotelScraper | |
| SCRAPERS_AVAILABLE = True | |
| except ImportError as e: | |
| print(f"Warning: Could not import scrapers: {e}") | |
| SCRAPERS_AVAILABLE = False | |
| # Add rate limiter to app state | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| # ============= Search Endpoint ============= | |
| class SearchRequest(BaseModel): | |
| destination: str | |
| check_in: str | |
| num_nights: int = 1 | |
| adults: int = 2 | |
| children: int = 0 | |
| children_ages: Optional[list[int]] = None | |
| source: str = "booking" # "booking" or "direct" | |
| hotel_url: Optional[str] = None # Required if source="direct" | |
| async def search_hotels(request: SearchRequest): | |
| """ | |
| Real-time hotel search using IsraeliHotelScraper. | |
| """ | |
| # ... (existing content) ... | |
| if not SCRAPERS_AVAILABLE: | |
| raise HTTPException(status_code=501, detail="Scrapers not available on this environment") | |
| try: | |
| if request.source == "direct" and request.hotel_url: | |
| # Direct scraping for specific hotel | |
| results = await IsraeliHotelScraper.search_hotel( | |
| hotel_name=request.destination, | |
| check_in=request.check_in, | |
| num_nights=request.num_nights, | |
| adults=request.adults, | |
| source="direct", | |
| hotel_url=request.hotel_url | |
| ) | |
| else: | |
| # Standard destination search on Booking.com | |
| results = await IsraeliHotelScraper.search_destination( | |
| destination=request.destination, | |
| check_in=request.check_in, | |
| num_nights=request.num_nights, | |
| adults=request.adults, | |
| children=request.children, | |
| children_ages=request.children_ages | |
| ) | |
| # Convert results to dicts | |
| return { | |
| "results": [ | |
| { | |
| "hotel_name": r.hotel_name, | |
| "price": r.price, | |
| "currency": r.currency, | |
| "room_type": r.room_type, | |
| "meal_plan": r.meal_plan, | |
| "chain": r.chain | |
| } | |
| for r in results | |
| ] | |
| } | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| class HotelValidationRequest(BaseModel): | |
| booking_url: Optional[str] = None | |
| website_url: Optional[str] = None | |
| def validate_hotel(request: HotelValidationRequest): | |
| """ | |
| Validate Booking.com and/or Direct Website URLs. | |
| Uses DrissionPage (blocking), so run as sync endpoint. | |
| """ | |
| if not SCRAPERS_AVAILABLE: | |
| raise HTTPException(status_code=501, detail="Scrapers not available") | |
| # Initialize scraper with headless=True | |
| scraper = DirectHotelScraper(headless=True) | |
| results = {"booking": None, "website": None} | |
| try: | |
| # Validate Website first (faster) | |
| if request.website_url: | |
| results["website"] = scraper.validate_website_url(request.website_url) | |
| # Validate Booking (slower) | |
| if request.booking_url: | |
| results["booking"] = scraper.validate_booking_url(request.booking_url) | |
| except Exception as e: | |
| print(f"Validation error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| scraper.quit() | |
| return results | |
| # ============= Environment Configuration ============= | |
| # SECURITY: All secrets MUST be set via environment variables in production | |
| # No fallback values for sensitive data | |
| def get_required_env(key: str, default: Optional[str] = None) -> str: | |
| """Get environment variable, raise error if required and not set""" | |
| value = os.environ.get(key, default) | |
| if value is None: | |
| raise RuntimeError(f"Required environment variable {key} is not set") | |
| return value | |
| # Environment mode | |
| ENV_MODE = os.environ.get("ENV_MODE", "development") | |
| IS_PRODUCTION = ENV_MODE == "production" | |
| # JWT Secret - REQUIRED in production, has dev fallback | |
| if IS_PRODUCTION: | |
| JWT_SECRET = get_required_env("JWT_SECRET") | |
| else: | |
| JWT_SECRET = os.environ.get("JWT_SECRET", "dev-only-secret-change-in-production") | |
| # CORS Configuration - restricted in production | |
| ALLOWED_ORIGINS = os.environ.get("ALLOWED_ORIGINS", "").split(",") if IS_PRODUCTION else [ | |
| "http://localhost:3000", | |
| "http://localhost:5173", | |
| "http://127.0.0.1:3000", | |
| "http://127.0.0.1:5173", | |
| "https://innsight-ai.vercel.app", | |
| "https://*.vercel.app", | |
| ] | |
| # Filter out empty strings | |
| ALLOWED_ORIGINS = [origin.strip() for origin in ALLOWED_ORIGINS if origin.strip()] | |
| # CORS middleware with restricted origins | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=ALLOWED_ORIGINS if ALLOWED_ORIGINS else ["*"], | |
| allow_credentials=True, | |
| allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], | |
| allow_headers=["Authorization", "Content-Type", "Accept", "Origin", "X-Requested-With"], | |
| ) | |
| security = HTTPBearer() | |
| # ============= Security Headers Middleware ============= | |
| class SecurityHeadersMiddleware(BaseHTTPMiddleware): | |
| """Add security headers to all responses""" | |
| async def dispatch(self, request: Request, call_next) -> Response: | |
| response = await call_next(request) | |
| # Prevent MIME type sniffing | |
| response.headers["X-Content-Type-Options"] = "nosniff" | |
| # Prevent clickjacking | |
| response.headers["X-Frame-Options"] = "DENY" | |
| # XSS Protection (legacy browsers) | |
| response.headers["X-XSS-Protection"] = "1; mode=block" | |
| # Referrer policy | |
| response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" | |
| # Content Security Policy for API | |
| response.headers["Content-Security-Policy"] = "default-src 'none'; frame-ancestors 'none'" | |
| # Only add HSTS in production (requires HTTPS) | |
| if IS_PRODUCTION: | |
| response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" | |
| return response | |
| app.add_middleware(SecurityHeadersMiddleware) | |
| # ============= Environment Variables ============= | |
| POSTGRES_URL = os.environ.get("POSTGRES_URL", "") | |
| NVIDIA_API_KEY = os.environ.get("NVIDIA_API_KEY", "") | |
| # Microsoft OAuth Configuration | |
| MS_CLIENT_ID = os.environ.get("MS_CLIENT_ID", "") | |
| MS_CLIENT_SECRET = os.environ.get("MS_CLIENT_SECRET", "") | |
| MS_TENANT_ID = os.environ.get("MS_TENANT_ID", "common") # Use 'common' for multi-tenant or specific tenant ID | |
| MS_REDIRECT_URI = os.environ.get("MS_REDIRECT_URI", "") | |
| # Google OAuth Configuration | |
| GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "") | |
| GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "") | |
| GOOGLE_REDIRECT_URI = os.environ.get("GOOGLE_REDIRECT_URI", "") | |
| ADMIN_EMAIL = os.environ.get("ADMIN_EMAIL", "[email protected]") # Admin email for access control | |
| # TOTP Secret for admin 2FA | |
| # SECURITY: In production, these should be stored in the database per user | |
| # Using env vars with secure defaults for now | |
| ADMIN_TOTP_SECRET = os.environ.get("ADMIN_TOTP_SECRET") | |
| if not ADMIN_TOTP_SECRET and IS_PRODUCTION: | |
| raise RuntimeError("ADMIN_TOTP_SECRET must be set in production") | |
| elif not ADMIN_TOTP_SECRET: | |
| ADMIN_TOTP_SECRET = "JBSWY3DPEHPK3PXP" # Dev fallback only | |
| # ============= TOTP Functions ============= | |
| def get_totp_token(secret: str) -> str: | |
| """Generate current TOTP token""" | |
| # Decode base32 secret | |
| key = base64.b32decode(secret.upper() + '=' * (8 - len(secret) % 8) if len(secret) % 8 else secret.upper()) | |
| # Get current time step (30 second intervals) | |
| counter = int(time.time() // 30) | |
| # Pack counter as big-endian 8-byte integer | |
| counter_bytes = struct.pack('>Q', counter) | |
| # Generate HMAC-SHA1 | |
| hmac_hash = hmac.new(key, counter_bytes, hashlib.sha1).digest() | |
| # Dynamic truncation | |
| offset = hmac_hash[-1] & 0x0F | |
| code = struct.unpack('>I', hmac_hash[offset:offset+4])[0] & 0x7FFFFFFF | |
| # Return 6-digit code | |
| return str(code % 1000000).zfill(6) | |
| def verify_totp(secret: str, token: str) -> bool: | |
| """Verify TOTP token with 1 step tolerance""" | |
| for offset in [-1, 0, 1]: # Allow 30 seconds before/after | |
| key = base64.b32decode(secret.upper() + '=' * (8 - len(secret) % 8) if len(secret) % 8 else secret.upper()) | |
| counter = int(time.time() // 30) + offset | |
| counter_bytes = struct.pack('>Q', counter) | |
| hmac_hash = hmac.new(key, counter_bytes, hashlib.sha1).digest() | |
| offset_val = hmac_hash[-1] & 0x0F | |
| code = struct.unpack('>I', hmac_hash[offset_val:offset_val+4])[0] & 0x7FFFFFFF | |
| expected = str(code % 1000000).zfill(6) | |
| if token == expected: | |
| return True | |
| return False | |
| # ============= Pydantic Models ============= | |
| class UserCreate(BaseModel): | |
| email: EmailStr | |
| password: str | |
| full_name: str | |
| def sanitize_full_name(cls, v): | |
| return html.escape(v) | |
| class UserLogin(BaseModel): | |
| email: EmailStr | |
| password: str | |
| class UserResponse(BaseModel): | |
| id: int | |
| email: str | |
| full_name: str | |
| created_at: str | |
| class TokenResponse(BaseModel): | |
| access_token: str | |
| token_type: str = "bearer" | |
| user: UserResponse | |
| class ChatMessage(BaseModel): | |
| message: str | |
| class ChatResponse(BaseModel): | |
| response: str | |
| class HotelCreate(BaseModel): | |
| name: str | |
| booking_url: Optional[str] = None | |
| website_url: Optional[str] = None | |
| class ComparisonRequest(BaseModel): | |
| hotel_ids: list[int] | |
| check_in: str | |
| check_out: str | |
| occupancy: str = "couple" | |
| class AdminLoginRequest(BaseModel): | |
| email: EmailStr | |
| password: str | |
| totp_code: str | |
| class AdminLoginStep1Request(BaseModel): | |
| email: EmailStr | |
| password: str | |
| class CreateUserRequest(BaseModel): | |
| email: EmailStr | |
| full_name: str | |
| class UserWithCredentials(BaseModel): | |
| id: int | |
| email: str | |
| full_name: str | |
| username: str | |
| password: str | |
| totp_secret: str | |
| created_at: str | |
| is_admin: bool = False | |
| class UserListResponse(BaseModel): | |
| id: int | |
| email: str | |
| full_name: str | |
| created_at: str | |
| is_admin: bool | |
| last_login: Optional[str] = None | |
| # ============= Password Hashing ============= | |
| def hash_password(password: str) -> str: | |
| """Hash password using PBKDF2 with SHA256""" | |
| salt = os.urandom(32) | |
| key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000) | |
| return base64.b64encode(salt + key).decode('utf-8') | |
| def verify_password(password: str, hashed: str) -> bool: | |
| """Verify password against hash""" | |
| try: | |
| decoded = base64.b64decode(hashed.encode('utf-8')) | |
| salt = decoded[:32] | |
| stored_key = decoded[32:] | |
| new_key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000) | |
| return hmac.compare_digest(stored_key, new_key) | |
| except Exception: | |
| return False | |
| def generate_random_password(length: int = 12) -> str: | |
| """Generate a secure random password""" | |
| alphabet = string.ascii_letters + string.digits + "!@#$%" | |
| return ''.join(secrets.choice(alphabet) for _ in range(length)) | |
| def generate_totp_secret() -> str: | |
| """Generate a random Base32 TOTP secret""" | |
| # Generate 20 random bytes (160 bits as recommended for TOTP) | |
| random_bytes = secrets.token_bytes(20) | |
| # Encode as base32 and remove padding | |
| return base64.b32encode(random_bytes).decode('utf-8').rstrip('=') | |
| def generate_username_from_email(email: str) -> str: | |
| """Generate username from email""" | |
| username_base = email.split('@')[0].lower() | |
| # Remove special characters | |
| username = ''.join(c for c in username_base if c.isalnum() or c in '_-.') | |
| return username | |
| # ============= JWT Tokens ============= | |
| def create_jwt_token(payload: dict, expires_delta: timedelta = timedelta(days=7)) -> str: | |
| """Create a simple JWT token""" | |
| header = {"alg": "HS256", "typ": "JWT"} | |
| payload["exp"] = (datetime.utcnow() + expires_delta).timestamp() | |
| payload["iat"] = datetime.utcnow().timestamp() | |
| header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip("=") | |
| payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip("=") | |
| message = f"{header_b64}.{payload_b64}" | |
| signature = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest() | |
| signature_b64 = base64.urlsafe_b64encode(signature).decode().rstrip("=") | |
| return f"{message}.{signature_b64}" | |
| def verify_jwt_token(token: str) -> Optional[dict]: | |
| """Verify and decode JWT token""" | |
| try: | |
| parts = token.split(".") | |
| if len(parts) != 3: | |
| return None | |
| header_b64, payload_b64, signature_b64 = parts | |
| # Verify signature | |
| message = f"{header_b64}.{payload_b64}" | |
| expected_sig = hmac.new(JWT_SECRET.encode(), message.encode(), hashlib.sha256).digest() | |
| expected_sig_b64 = base64.urlsafe_b64encode(expected_sig).decode().rstrip("=") | |
| if not hmac.compare_digest(signature_b64, expected_sig_b64): | |
| return None | |
| # Decode payload | |
| padding = 4 - len(payload_b64) % 4 | |
| if padding != 4: | |
| payload_b64 += "=" * padding | |
| payload = json.loads(base64.urlsafe_b64decode(payload_b64)) | |
| # Check expiration | |
| if payload.get("exp", 0) < datetime.utcnow().timestamp(): | |
| return None | |
| return payload | |
| except Exception: | |
| return None | |
| async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| """Get current authenticated user""" | |
| token = credentials.credentials | |
| payload = verify_jwt_token(token) | |
| if not payload: | |
| raise HTTPException(status_code=401, detail="Invalid or expired token") | |
| return payload | |
| # Optional security that doesn't require auth | |
| security_optional = HTTPBearer(auto_error=False) | |
| async def get_current_user_optional(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security_optional)) -> Optional[dict]: | |
| """Get current user if authenticated, otherwise return None (for demo/public endpoints)""" | |
| if not credentials: | |
| return None | |
| token = credentials.credentials | |
| payload = verify_jwt_token(token) | |
| return payload | |
| # ============= Database Setup ============= | |
| try: | |
| # Try relative import first (for Vercel serverless) | |
| try: | |
| from .database import db, InMemoryDatabase, PostgresDatabase | |
| except ImportError: | |
| # Fall back to absolute import (for local development) | |
| from database import db, InMemoryDatabase, PostgresDatabase | |
| DB_IMPORT_ERROR = None | |
| print(f"[DEBUG] Database imported successfully. db type: {type(db)}, db is None: {db is None}") | |
| except Exception as e: | |
| import traceback | |
| DB_IMPORT_ERROR = f"{e}\n{traceback.format_exc()}" | |
| print(f"[DEBUG] Database import failed: {DB_IMPORT_ERROR}") | |
| # Create a fallback InMemoryDatabase if import fails | |
| class InMemoryDatabase: | |
| """Fallback InMemoryDatabase when import fails""" | |
| pass | |
| class PostgresDatabase: | |
| """Fallback PostgresDatabase when import fails""" | |
| pass | |
| db = None | |
| # Track if database has been initialized (for Vercel serverless cold starts) | |
| _db_initialized = False | |
| def seed_database_sync(): | |
| """Synchronously seed the database with initial data""" | |
| global _db_initialized, db | |
| print(f"[DEBUG] seed_database_sync called. _db_initialized: {_db_initialized}, db: {db}, isinstance: {isinstance(db, InMemoryDatabase) if db else False}") | |
| if _db_initialized: | |
| print("[DEBUG] Already initialized, skipping") | |
| return | |
| if db is None: | |
| print(f"Database import failed: {DB_IMPORT_ERROR}") | |
| _db_initialized = True | |
| return | |
| # Seed admin users if using in-memory database | |
| if isinstance(db, InMemoryDatabase): | |
| print("[DEBUG] Seeding admin users (InMemoryDatabase)...") | |
| db.seed_admin_users([ | |
| { | |
| "email": ADMIN_EMAIL, | |
| "password_hash": hash_password("InnSight2026!"), | |
| "full_name": "Jack Amichai", | |
| "totp_secret": ADMIN_TOTP_SECRET | |
| }, | |
| { | |
| "email": "[email protected]", | |
| "password_hash": hash_password("InnSight2026Adi!"), | |
| "full_name": "Adi Ohayon", | |
| "totp_secret": "HXDMVJECJJWSRB3HWIZR4IFUGFTMXBOZ" | |
| } | |
| ]) | |
| print("[DEBUG] Seeding regular users...") | |
| db.seed_regular_users([ | |
| { | |
| "email": "[email protected]", | |
| "password_hash": hash_password("TestUser2026!"), | |
| "full_name": "Test Client", | |
| "totp_secret": "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ" | |
| } | |
| ]) | |
| # Seed demo hotels | |
| db.seed_demo_hotels([ | |
| {"name": "צובה", "booking_url": "https://booking.com/hotel/tzuba", "website_url": "https://tzuba.co.il"}, | |
| {"name": "רמת רחל", "booking_url": "https://booking.com/hotel/ramat-rachel", "website_url": "https://ramatrachel.co.il"}, | |
| {"name": "יד השמונה", "booking_url": "https://booking.com/hotel/yad-hashmona", "website_url": "https://yadhashmona.co.il"}, | |
| {"name": "יערים", "booking_url": "https://booking.com/hotel/yearim", "website_url": "https://yearim.co.il"}, | |
| {"name": "נווה אילן", "booking_url": "https://booking.com/hotel/neve-ilan", "website_url": "https://neveilan.co.il"}, | |
| ]) | |
| print(f"Database seeded with {len(db._users)} users") | |
| # Seed users in PostgresDatabase if they don't exist | |
| elif isinstance(db, PostgresDatabase): | |
| print("[DEBUG] Checking/seeding users in PostgresDatabase...") | |
| import asyncio | |
| async def seed_postgres_users(): | |
| admin_users = [ | |
| { | |
| "email": ADMIN_EMAIL, | |
| "password_hash": hash_password("InnSight2026!"), | |
| "full_name": "Jack Amichai", | |
| "is_admin": True, | |
| "totp_secret": ADMIN_TOTP_SECRET | |
| }, | |
| { | |
| "email": "[email protected]", | |
| "password_hash": hash_password("InnSight2026Adi!"), | |
| "full_name": "Adi Ohayon", | |
| "is_admin": True, | |
| "totp_secret": "HXDMVJECJJWSRB3HWIZR4IFUGFTMXBOZ" | |
| } | |
| ] | |
| regular_users = [ | |
| { | |
| "email": "[email protected]", | |
| "password_hash": hash_password("TestUser2026!"), | |
| "full_name": "Test Client", | |
| "is_admin": False, | |
| "totp_secret": "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ" | |
| } | |
| ] | |
| for user in admin_users + regular_users: | |
| try: | |
| # Check if user exists | |
| existing = await db.get_user_by_email(user["email"]) | |
| if not existing: | |
| await db.create_user( | |
| email=user["email"], | |
| password_hash=user["password_hash"], | |
| full_name=user["full_name"], | |
| is_admin=user["is_admin"], | |
| totp_secret=user["totp_secret"] | |
| ) | |
| print(f" Created user: {user['email']}") | |
| else: | |
| print(f" User exists: {user['email']}") | |
| except Exception as e: | |
| print(f" Error creating user {user['email']}: {e}") | |
| # Run async function synchronously | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| # We're in an async context, create a task | |
| asyncio.create_task(seed_postgres_users()) | |
| else: | |
| loop.run_until_complete(seed_postgres_users()) | |
| except RuntimeError: | |
| # No event loop, create one | |
| asyncio.run(seed_postgres_users()) | |
| _db_initialized = True | |
| print("Database initialized successfully") | |
| async def init_database(): | |
| """Initialize database and seed data""" | |
| global _db_initialized, db | |
| if _db_initialized: | |
| return # Already initialized | |
| if db is None: | |
| print(f"Database import failed: {DB_IMPORT_ERROR}") | |
| _db_initialized = True | |
| return | |
| # Initialize tables | |
| await db.init_tables() | |
| # For InMemoryDatabase, seed is handled in seed_database_sync | |
| if isinstance(db, InMemoryDatabase): | |
| seed_database_sync() | |
| # For PostgresDatabase, seed directly here with await | |
| elif isinstance(db, PostgresDatabase): | |
| print("[DEBUG] Seeding users in PostgresDatabase...") | |
| admin_users = [ | |
| { | |
| "email": ADMIN_EMAIL, | |
| "password_hash": hash_password("InnSight2026!"), | |
| "full_name": "Jack Amichai", | |
| "is_admin": True, | |
| "totp_secret": ADMIN_TOTP_SECRET | |
| }, | |
| { | |
| "email": "[email protected]", | |
| "password_hash": hash_password("InnSight2026Adi!"), | |
| "full_name": "Adi Ohayon", | |
| "is_admin": True, | |
| "totp_secret": "HXDMVJECJJWSRB3HWIZR4IFUGFTMXBOZ" | |
| } | |
| ] | |
| regular_users = [ | |
| { | |
| "email": "[email protected]", | |
| "password_hash": hash_password("TestUser2026!"), | |
| "full_name": "Test Client", | |
| "is_admin": False, | |
| "totp_secret": "GEZDGNBVGY3TQOJQGEZDGNBVGY3TQOJQ" | |
| } | |
| ] | |
| for user in admin_users + regular_users: | |
| try: | |
| # Check if user exists | |
| existing = await db.get_user_by_email(user["email"]) | |
| if not existing: | |
| await db.create_user( | |
| email=user["email"], | |
| password_hash=user["password_hash"], | |
| full_name=user["full_name"], | |
| is_admin=user["is_admin"], | |
| totp_secret=user["totp_secret"] | |
| ) | |
| print(f" Created user: {user['email']}") | |
| else: | |
| print(f" User exists: {user['email']}") | |
| except Exception as e: | |
| print(f" Error creating user {user['email']}: {e}") | |
| _db_initialized = True | |
| print("Database initialized successfully") | |
| def sync_init_database(): | |
| """Synchronous wrapper for database initialization (for module-level init)""" | |
| global _db_initialized | |
| if _db_initialized: | |
| return | |
| # Only run sync seeding for InMemoryDatabase | |
| # PostgreSQL seeding is handled in the async init_database | |
| if isinstance(db, InMemoryDatabase): | |
| seed_database_sync() | |
| # Initialize database at module load time (for Vercel serverless) | |
| try: | |
| sync_init_database() | |
| except Exception as e: | |
| print(f"Error during module-level db init: {e}") | |
| # Middleware to ensure database is initialized on first request | |
| # This is crucial for Vercel serverless where startup events may not run | |
| async def ensure_db_initialized(request: Request, call_next): | |
| global _db_initialized | |
| if not _db_initialized: | |
| await init_database() | |
| response = await call_next(request) | |
| return response | |
| # Initialize database on startup (for local development) | |
| async def startup_event(): | |
| await init_database() | |
| # Legacy compatibility - keep demo_users as a view into the database | |
| # This will be removed once all code is migrated to use db directly | |
| class DemoUsersProxy: | |
| """Proxy class to maintain backward compatibility with demo_users dict""" | |
| def get(self, email: str, default=None): | |
| # Simply use the module-level db variable | |
| if isinstance(db, InMemoryDatabase): | |
| return db._users.get(email, default) | |
| return default | |
| def __contains__(self, email: str) -> bool: | |
| if isinstance(db, InMemoryDatabase): | |
| return email in db._users | |
| return False | |
| def __setitem__(self, email: str, value: dict): | |
| if isinstance(db, InMemoryDatabase): | |
| db._users[email] = value | |
| def items(self): | |
| if isinstance(db, InMemoryDatabase): | |
| return db._users.items() | |
| return [] | |
| def values(self): | |
| if isinstance(db, InMemoryDatabase): | |
| return db._users.values() | |
| return [] | |
| def __len__(self): | |
| if isinstance(db, InMemoryDatabase): | |
| return len(db._users) | |
| return 0 | |
| demo_users = DemoUsersProxy() | |
| # User login activity tracking | |
| user_activity = {} | |
| # ============= Async User Lookup Helper ============= | |
| async def get_user_by_email_async(email: str) -> Optional[dict]: | |
| """ | |
| Get user by email from any database type (InMemory or Postgres). | |
| This is the primary way to look up users in async contexts. | |
| """ | |
| if db is None: | |
| return None | |
| if isinstance(db, InMemoryDatabase): | |
| return db._users.get(email) | |
| elif isinstance(db, PostgresDatabase): | |
| return await db.get_user_by_email(email) | |
| return None | |
| async def list_all_users_async() -> list[dict]: | |
| """ | |
| List all users from any database type. | |
| """ | |
| if db is None: | |
| return [] | |
| if isinstance(db, InMemoryDatabase): | |
| return list(db._users.values()) | |
| elif isinstance(db, PostgresDatabase): | |
| return await db.list_users() | |
| return [] | |
| async def get_user_by_id_async(user_id: int) -> Optional[dict]: | |
| """ | |
| Get user by ID from any database type. | |
| """ | |
| if db is None: | |
| return None | |
| if isinstance(db, InMemoryDatabase): | |
| for user in db._users.values(): | |
| if user["id"] == user_id: | |
| return user | |
| return None | |
| elif isinstance(db, PostgresDatabase): | |
| return await db.get_user_by_id(user_id) | |
| return None | |
| # ============= TOTP Setup Route ============= | |
| async def totp_setup(): | |
| """Generate QR code for TOTP setup - ONE TIME USE ONLY""" | |
| # Generate QR code URL for Google Authenticator / Microsoft Authenticator | |
| account_name = "InnSight-AI Admin" | |
| issuer = "InnSight-AI" | |
| secret = ADMIN_TOTP_SECRET | |
| # Create otpauth URL | |
| otpauth_url = f"otpauth://totp/{issuer}:{account_name}?secret={secret}&issuer={issuer}&algorithm=SHA1&digits=6&period=30" | |
| # Generate QR code using a simple HTML page with QR code library | |
| html = f''' | |
| <!DOCTYPE html> | |
| <html dir="rtl" lang="he"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>InnSight-AI - הגדרת אימות דו-שלבי</title> | |
| <script src="https://cdn.jsdelivr.net/npm/[email protected]/build/qrcode.min.js"></script> | |
| <style> | |
| * {{ margin: 0; padding: 0; box-sizing: border-box; }} | |
| body {{ | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| background: linear-gradient(135deg, #1a1a2e 0%, #4a1942 50%, #1a1a2e 100%); | |
| min-height: 100vh; | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| padding: 20px; | |
| }} | |
| .container {{ | |
| background: rgba(255, 255, 255, 0.1); | |
| backdrop-filter: blur(10px); | |
| border-radius: 20px; | |
| padding: 40px; | |
| max-width: 500px; | |
| width: 100%; | |
| text-align: center; | |
| border: 1px solid rgba(255, 255, 255, 0.2); | |
| }} | |
| h1 {{ | |
| color: #fff; | |
| margin-bottom: 10px; | |
| font-size: 28px; | |
| }} | |
| .subtitle {{ | |
| color: #a78bfa; | |
| margin-bottom: 30px; | |
| font-size: 16px; | |
| }} | |
| .qr-container {{ | |
| background: white; | |
| padding: 20px; | |
| border-radius: 15px; | |
| display: inline-block; | |
| margin: 20px 0; | |
| }} | |
| .instructions {{ | |
| color: #e5e5e5; | |
| text-align: right; | |
| margin: 20px 0; | |
| line-height: 1.8; | |
| }} | |
| .instructions li {{ | |
| margin: 10px 0; | |
| }} | |
| .secret-box {{ | |
| background: rgba(167, 139, 250, 0.2); | |
| border: 1px solid #a78bfa; | |
| border-radius: 10px; | |
| padding: 15px; | |
| margin: 20px 0; | |
| }} | |
| .secret-label {{ | |
| color: #a78bfa; | |
| font-size: 12px; | |
| margin-bottom: 5px; | |
| }} | |
| .secret-code {{ | |
| color: #fff; | |
| font-family: monospace; | |
| font-size: 18px; | |
| letter-spacing: 2px; | |
| }} | |
| .warning {{ | |
| background: rgba(239, 68, 68, 0.2); | |
| border: 1px solid #ef4444; | |
| border-radius: 10px; | |
| padding: 15px; | |
| color: #fca5a5; | |
| margin-top: 20px; | |
| font-size: 14px; | |
| }} | |
| .success {{ | |
| background: rgba(34, 197, 94, 0.2); | |
| border: 1px solid #22c55e; | |
| border-radius: 10px; | |
| padding: 15px; | |
| color: #86efac; | |
| margin-top: 20px; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <h1>🔐 InnSight-AI</h1> | |
| <p class="subtitle">הגדרת אימות דו-שלבי למנהל</p> | |
| <div class="qr-container"> | |
| <canvas id="qrcode"></canvas> | |
| </div> | |
| <ol class="instructions"> | |
| <li>פתח את אפליקציית <strong>Microsoft Authenticator</strong></li> | |
| <li>לחץ על <strong>"+"</strong> להוספת חשבון</li> | |
| <li>בחר <strong>"חשבון אחר (Google, Facebook וכו')"</strong></li> | |
| <li>סרוק את קוד ה-QR שלמעלה</li> | |
| </ol> | |
| <div class="secret-box"> | |
| <div class="secret-label">או הזן את הקוד הסודי ידנית:</div> | |
| <div class="secret-code">{secret}</div> | |
| </div> | |
| <div class="success"> | |
| ✅ לאחר הסריקה, האפליקציה תציג קוד בן 6 ספרות שמתחלף כל 30 שניות.<br> | |
| השתמש בקוד זה בעת ההתחברות למערכת. | |
| </div> | |
| <div class="warning"> | |
| ⚠️ <strong>אזהרה:</strong> שמור על הקוד הסודי במקום בטוח!<br> | |
| דף זה מיועד לשימוש חד-פעמי בלבד. | |
| </div> | |
| </div> | |
| <script> | |
| QRCode.toCanvas(document.getElementById('qrcode'), '{otpauth_url}', {{ | |
| width: 200, | |
| margin: 2, | |
| color: {{ | |
| dark: '#1a1a2e', | |
| light: '#ffffff' | |
| }} | |
| }}); | |
| </script> | |
| </body> | |
| </html> | |
| ''' | |
| return HTMLResponse(content=html) | |
| # ============= Auth Routes ============= | |
| # Rate limit: 5 registrations per minute per IP | |
| async def register(request: Request, user: UserCreate): | |
| """Register a new user""" | |
| # Check if user exists | |
| if user.email in demo_users: | |
| raise HTTPException(status_code=400, detail="Email already registered") | |
| # Hash password | |
| password_hash = hash_password(user.password) | |
| # Create user | |
| user_id = len(demo_users) + 1 | |
| created_at = datetime.utcnow().isoformat() | |
| demo_users[user.email] = { | |
| "id": user_id, | |
| "email": user.email, | |
| "password_hash": password_hash, | |
| "full_name": user.full_name, | |
| "created_at": created_at | |
| } | |
| # Create token | |
| token = create_jwt_token({"user_id": user_id, "email": user.email}) | |
| return TokenResponse( | |
| access_token=token, | |
| user=UserResponse( | |
| id=user_id, | |
| email=user.email, | |
| full_name=user.full_name, | |
| created_at=created_at | |
| ) | |
| ) | |
| # Rate limit: 10 login attempts per minute per IP | |
| async def login(request: Request, credentials: UserLogin): | |
| """Login user - all users require TOTP 2FA""" | |
| # Use async database lookup for both InMemory and Postgres | |
| user = await get_user_by_email_async(credentials.email) | |
| if not user or not verify_password(credentials.password, user["password_hash"]): | |
| raise HTTPException(status_code=401, detail="Invalid email or password") | |
| # All users require 2FA now | |
| raise HTTPException( | |
| status_code=403, | |
| detail="2-step authentication required. Please provide TOTP code." | |
| ) | |
| # Rate limit: 10 2FA attempts per minute per IP | |
| async def login_with_2fa(request: Request, credentials: AdminLoginRequest): | |
| """Login any user with 2-step TOTP authentication""" | |
| # Use async database lookup for both InMemory and Postgres | |
| user = await get_user_by_email_async(credentials.email) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Invalid email or password") | |
| # Verify password | |
| if not verify_password(credentials.password, user["password_hash"]): | |
| raise HTTPException(status_code=401, detail="Invalid email or password") | |
| # Get user's TOTP secret | |
| user_totp_secret = user.get("totp_secret", ADMIN_TOTP_SECRET) | |
| # Verify TOTP code | |
| if not verify_totp(user_totp_secret, credentials.totp_code): | |
| raise HTTPException(status_code=401, detail="Invalid authentication code") | |
| # Update last login (async for PostgresDatabase) | |
| if isinstance(db, PostgresDatabase): | |
| await db.update_user_last_login(credentials.email) | |
| elif isinstance(db, InMemoryDatabase): | |
| user["last_login"] = datetime.utcnow().isoformat() | |
| # Track activity | |
| if credentials.email not in user_activity: | |
| user_activity[credentials.email] = [] | |
| user_activity[credentials.email].append({ | |
| "action": "login", | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| token = create_jwt_token({ | |
| "user_id": user["id"], | |
| "email": user["email"], | |
| "is_admin": user.get("is_admin", False) | |
| }) | |
| return TokenResponse( | |
| access_token=token, | |
| user=UserResponse( | |
| id=user["id"], | |
| email=user["email"], | |
| full_name=user["full_name"], | |
| created_at=user["created_at"] | |
| ) | |
| ) | |
| # Rate limit: 5 admin login attempts per minute per IP | |
| async def admin_login(request: Request, credentials: AdminLoginRequest): | |
| """Admin login with 2-step TOTP authentication""" | |
| # Use async database lookup for both InMemory and Postgres | |
| user = await get_user_by_email_async(credentials.email) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Invalid email or password") | |
| # Must be admin | |
| if not user.get("is_admin"): | |
| raise HTTPException(status_code=403, detail="Not an admin account") | |
| # Verify password | |
| if not verify_password(credentials.password, user["password_hash"]): | |
| raise HTTPException(status_code=401, detail="Invalid email or password") | |
| # Get admin's TOTP secret from their user record (or fallback to global admin secret) | |
| admin_totp_secret = user.get("totp_secret", ADMIN_TOTP_SECRET) | |
| # Verify TOTP code | |
| if not verify_totp(admin_totp_secret, credentials.totp_code): | |
| raise HTTPException(status_code=401, detail="Invalid authentication code") | |
| token = create_jwt_token({"user_id": user["id"], "email": user["email"], "is_admin": True}) | |
| return TokenResponse( | |
| access_token=token, | |
| user=UserResponse( | |
| id=user["id"], | |
| email=user["email"], | |
| full_name=user["full_name"], | |
| created_at=user["created_at"] | |
| ) | |
| ) | |
| async def get_me(current_user: dict = Depends(get_current_user)): | |
| """Get current user info""" | |
| # Use async database lookup for both InMemory and Postgres | |
| user = await get_user_by_email_async(current_user.get("email")) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return UserResponse( | |
| id=user["id"], | |
| email=user["email"], | |
| full_name=user["full_name"], | |
| created_at=user["created_at"] | |
| ) | |
| # ============= Admin User Management Routes ============= | |
| async def verify_admin(current_user: dict = Depends(get_current_user)): | |
| """Verify current user is admin""" | |
| if not current_user.get("is_admin"): | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| return current_user | |
| async def create_user(user_data: CreateUserRequest, admin: dict = Depends(verify_admin)): | |
| """Create a new user with generated credentials and TOTP secret""" | |
| # Check if email already exists using async lookup | |
| existing_user = await get_user_by_email_async(user_data.email) | |
| if existing_user: | |
| raise HTTPException(status_code=400, detail="Email already registered") | |
| # Generate credentials | |
| username = generate_username_from_email(user_data.email) | |
| password = generate_random_password() | |
| totp_secret = generate_totp_secret() | |
| # Hash password | |
| password_hash = hash_password(password) | |
| created_at = datetime.utcnow().isoformat() | |
| # Create user in appropriate database | |
| if isinstance(db, PostgresDatabase): | |
| new_user = await db.create_user( | |
| email=user_data.email, | |
| password_hash=password_hash, | |
| full_name=user_data.full_name, | |
| is_admin=False, | |
| totp_secret=totp_secret | |
| ) | |
| user_id = new_user["id"] | |
| created_at = new_user.get("created_at", created_at) | |
| elif isinstance(db, InMemoryDatabase): | |
| user_id = len(db._users) + 1 | |
| db._users[user_data.email] = { | |
| "id": user_id, | |
| "email": user_data.email, | |
| "password_hash": password_hash, | |
| "full_name": user_data.full_name, | |
| "created_at": created_at, | |
| "is_admin": False, | |
| "totp_secret": totp_secret, | |
| "last_login": None, | |
| "username": username, | |
| } | |
| else: | |
| raise HTTPException(status_code=500, detail="Database not configured") | |
| return UserWithCredentials( | |
| id=user_id, | |
| email=user_data.email, | |
| full_name=user_data.full_name, | |
| username=username, | |
| password=password, | |
| totp_secret=totp_secret, | |
| created_at=created_at, | |
| is_admin=False | |
| ) | |
| async def list_users(admin: dict = Depends(verify_admin)): | |
| """List all users (admin only)""" | |
| # Use async database lookup for both InMemory and Postgres | |
| all_users = await list_all_users_async() | |
| users = [] | |
| for user in all_users: | |
| users.append(UserListResponse( | |
| id=user["id"], | |
| email=user["email"], | |
| full_name=user["full_name"], | |
| created_at=user["created_at"], | |
| is_admin=user.get("is_admin", False), | |
| last_login=user.get("last_login") | |
| )) | |
| return users | |
| async def delete_user(user_id: int, admin: dict = Depends(verify_admin)): | |
| """Delete a user (admin only)""" | |
| # Use async database lookup for both InMemory and Postgres | |
| user = await get_user_by_id_async(user_id) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| # Don't allow deleting admin | |
| if user.get("is_admin"): | |
| raise HTTPException(status_code=400, detail="Cannot delete admin user") | |
| # Delete from appropriate database | |
| if isinstance(db, PostgresDatabase): | |
| await db.delete_user(user_id) | |
| elif isinstance(db, InMemoryDatabase): | |
| for email, u in list(db._users.items()): | |
| if u["id"] == user_id: | |
| del db._users[email] | |
| break | |
| return {"message": "User deleted successfully"} | |
| async def get_user_totp_setup(user_id: int, admin: dict = Depends(verify_admin)): | |
| """Get TOTP setup info for a specific user (admin only)""" | |
| # Use async database lookup for both InMemory and Postgres | |
| target_user = await get_user_by_id_async(user_id) | |
| if not target_user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| totp_secret = target_user.get("totp_secret", "") | |
| account_name = target_user["email"] | |
| issuer = "InnSight-AI" | |
| otpauth_url = f"otpauth://totp/{issuer}:{account_name}?secret={totp_secret}&issuer={issuer}&algorithm=SHA1&digits=6&period=30" | |
| # Generate HTML QR page | |
| html = f''' | |
| <!DOCTYPE html> | |
| <html dir="rtl" lang="he"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>InnSight-AI - הגדרת 2FA עבור {target_user["full_name"]}</title> | |
| <script src="https://cdn.jsdelivr.net/npm/[email protected]/build/qrcode.min.js"></script> | |
| <style> | |
| * {{ margin: 0; padding: 0; box-sizing: border-box; }} | |
| body {{ | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| background: linear-gradient(135deg, #f8fafc 0%, #e2e8f0 100%); | |
| min-height: 100vh; | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| padding: 20px; | |
| }} | |
| .container {{ | |
| background: white; | |
| border-radius: 20px; | |
| padding: 40px; | |
| max-width: 500px; | |
| width: 100%; | |
| text-align: center; | |
| box-shadow: 0 10px 40px rgba(0,0,0,0.1); | |
| border: 1px solid #e2e8f0; | |
| }} | |
| h1 {{ color: #1e293b; margin-bottom: 10px; font-size: 28px; }} | |
| .subtitle {{ color: #7c3aed; margin-bottom: 30px; font-size: 16px; }} | |
| .user-name {{ color: #334155; font-size: 18px; margin-bottom: 20px; }} | |
| .qr-container {{ | |
| background: #f8fafc; | |
| padding: 20px; | |
| border-radius: 15px; | |
| display: inline-block; | |
| margin: 20px 0; | |
| border: 1px solid #e2e8f0; | |
| }} | |
| .instructions {{ | |
| color: #475569; | |
| text-align: right; | |
| margin: 20px 0; | |
| line-height: 1.8; | |
| }} | |
| .instructions li {{ margin: 10px 0; }} | |
| .secret-box {{ | |
| background: #f3e8ff; | |
| border: 1px solid #c4b5fd; | |
| border-radius: 10px; | |
| padding: 15px; | |
| margin: 20px 0; | |
| }} | |
| .secret-label {{ color: #7c3aed; font-size: 12px; margin-bottom: 5px; }} | |
| .secret-code {{ color: #1e293b; font-family: monospace; font-size: 18px; letter-spacing: 2px; }} | |
| .warning {{ | |
| background: #fef2f2; | |
| border: 1px solid #fecaca; | |
| border-radius: 10px; | |
| padding: 15px; | |
| color: #dc2626; | |
| margin-top: 20px; | |
| font-size: 14px; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <h1>🔐 InnSight-AI</h1> | |
| <p class="subtitle">הגדרת אימות דו-שלבי</p> | |
| <p class="user-name">עבור: <strong>{target_user["full_name"]}</strong></p> | |
| <div class="qr-container"> | |
| <canvas id="qrcode"></canvas> | |
| </div> | |
| <ol class="instructions"> | |
| <li>פתח את אפליקציית <strong>Microsoft Authenticator</strong></li> | |
| <li>לחץ על <strong>"+"</strong> להוספת חשבון</li> | |
| <li>בחר <strong>"חשבון אחר"</strong></li> | |
| <li>סרוק את קוד ה-QR שלמעלה</li> | |
| </ol> | |
| <div class="secret-box"> | |
| <div class="secret-label">קוד סודי (להזנה ידנית):</div> | |
| <div class="secret-code">{totp_secret}</div> | |
| </div> | |
| <div class="warning"> | |
| ⚠️ שתף עמוד זה עם המשתמש באופן מאובטח.<br> | |
| הקוד הסודי יש לשמור במקום בטוח! | |
| </div> | |
| </div> | |
| <script> | |
| QRCode.toCanvas(document.getElementById('qrcode'), '{otpauth_url}', {{ | |
| width: 200, | |
| margin: 2, | |
| color: {{ dark: '#1e293b', light: '#ffffff' }} | |
| }}); | |
| </script> | |
| </body> | |
| </html> | |
| ''' | |
| return HTMLResponse(content=html) | |
| async def get_admin_analytics(admin: dict = Depends(verify_admin)): | |
| """Get user analytics for admin dashboard""" | |
| total_users = len(demo_users) | |
| active_users = sum(1 for u in demo_users.values() if u.get("last_login")) | |
| admin_count = sum(1 for u in demo_users.values() if u.get("is_admin")) | |
| # Get recent activity | |
| recent_logins = [] | |
| for email, activities in user_activity.items(): | |
| for activity in activities[-5:]: # Last 5 activities per user | |
| recent_logins.append({ | |
| "email": email, | |
| "full_name": demo_users.get(email, {}).get("full_name", "Unknown"), | |
| **activity | |
| }) | |
| # Sort by timestamp descending | |
| recent_logins.sort(key=lambda x: x.get("timestamp", ""), reverse=True) | |
| return { | |
| "total_users": total_users, | |
| "active_users": active_users, | |
| "admin_count": admin_count, | |
| "regular_users": total_users - admin_count, | |
| "recent_activity": recent_logins[:20] # Last 20 activities | |
| } | |
| # ============= Microsoft OAuth Routes ============= | |
| async def microsoft_login(): | |
| """Initiate Microsoft OAuth flow""" | |
| if not MS_CLIENT_ID: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Microsoft OAuth not configured. Please contact administrator." | |
| ) | |
| # Build the authorization URL | |
| redirect_uri = MS_REDIRECT_URI or "https://innsight-ai.vercel.app/api/auth/microsoft/callback" | |
| scope = "openid email profile" | |
| state = base64.urlsafe_b64encode(os.urandom(16)).decode().rstrip("=") | |
| auth_url = ( | |
| f"https://login.microsoftonline.com/{MS_TENANT_ID}/oauth2/v2.0/authorize?" | |
| f"client_id={MS_CLIENT_ID}&" | |
| f"response_type=code&" | |
| f"redirect_uri={redirect_uri}&" | |
| f"scope={scope}&" | |
| f"state={state}&" | |
| f"response_mode=query" | |
| ) | |
| return {"auth_url": auth_url} | |
| async def microsoft_callback( | |
| code: Optional[str] = None, | |
| error: Optional[str] = None, | |
| error_description: Optional[str] = None | |
| ): | |
| """Handle Microsoft OAuth callback""" | |
| if error: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Microsoft OAuth error: {error_description or error}" | |
| ) | |
| if not code: | |
| raise HTTPException(status_code=400, detail="No authorization code received") | |
| if not MS_CLIENT_ID or not MS_CLIENT_SECRET: | |
| raise HTTPException(status_code=500, detail="Microsoft OAuth not configured") | |
| redirect_uri = MS_REDIRECT_URI or "https://innsight-ai.vercel.app/api/auth/microsoft/callback" | |
| # Exchange code for tokens | |
| async with httpx.AsyncClient() as client: | |
| token_response = await client.post( | |
| f"https://login.microsoftonline.com/{MS_TENANT_ID}/oauth2/v2.0/token", | |
| data={ | |
| "client_id": MS_CLIENT_ID, | |
| "client_secret": MS_CLIENT_SECRET, | |
| "code": code, | |
| "redirect_uri": redirect_uri, | |
| "grant_type": "authorization_code", | |
| "scope": "openid email profile" | |
| }, | |
| headers={"Content-Type": "application/x-www-form-urlencoded"} | |
| ) | |
| if token_response.status_code != 200: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Failed to exchange code for token" | |
| ) | |
| tokens = token_response.json() | |
| access_token = tokens.get("access_token") | |
| # Get user info from Microsoft Graph | |
| user_response = await client.get( | |
| "https://graph.microsoft.com/v1.0/me", | |
| headers={"Authorization": f"Bearer {access_token}"} | |
| ) | |
| if user_response.status_code != 200: | |
| raise HTTPException(status_code=400, detail="Failed to get user info") | |
| ms_user = user_response.json() | |
| email = ms_user.get("mail") or ms_user.get("userPrincipalName", "") | |
| full_name = ms_user.get("displayName", "") | |
| # Check if user is authorized (admin email or registered user) | |
| # For now, allow admin email and any users that admin has added | |
| if email.lower() not in [ADMIN_EMAIL.lower()] and email not in demo_users: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Access denied. Please contact administrator for access." | |
| ) | |
| # Create or update user | |
| if email not in demo_users: | |
| user_id = len(demo_users) + 1 | |
| created_at = datetime.utcnow().isoformat() | |
| demo_users[email] = { | |
| "id": user_id, | |
| "email": email, | |
| "password_hash": "", # No password for OAuth users | |
| "full_name": full_name or email.split("@")[0], | |
| "created_at": created_at, | |
| "auth_provider": "microsoft" | |
| } | |
| user = demo_users[email] | |
| # Create JWT token | |
| jwt_token = create_jwt_token({"user_id": user["id"], "email": user["email"]}) | |
| # Build user data for frontend | |
| frontend_url = "https://innsight-ai.vercel.app" | |
| base64.urlsafe_b64encode( | |
| json.dumps({ | |
| 'id': user['id'], | |
| 'email': user['email'], | |
| 'full_name': user['full_name'], | |
| 'created_at': user['created_at'] | |
| }).encode() | |
| ).decode() | |
| # Return HTML that redirects to frontend | |
| html_response = f''' | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="utf-8"> | |
| <title>Authenticating...</title> | |
| <script> | |
| localStorage.setItem('token', '{jwt_token}'); | |
| localStorage.setItem('user', JSON.stringify({json.dumps({'id': user['id'], 'email': user['email'], 'full_name': user['full_name'], 'created_at': user['created_at']})})); | |
| window.location.href = '{frontend_url}/dashboard'; | |
| </script> | |
| </head> | |
| <body> | |
| <p>Authenticating... If you are not redirected, <a href="{frontend_url}/dashboard">click here</a>.</p> | |
| </body> | |
| </html> | |
| ''' | |
| return HTMLResponse(content=html_response) | |
| # ============= Google OAuth Routes ============= | |
| async def google_login(): | |
| """Initiate Google OAuth flow""" | |
| if not GOOGLE_CLIENT_ID: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Google OAuth not configured. Please contact administrator." | |
| ) | |
| # Build the authorization URL | |
| redirect_uri = GOOGLE_REDIRECT_URI or "https://innsight-ai.vercel.app/api/auth/google/callback" | |
| scope = "openid email profile" | |
| state = base64.urlsafe_b64encode(os.urandom(16)).decode().rstrip("=") | |
| auth_url = ( | |
| f"https://accounts.google.com/o/oauth2/v2/auth?" | |
| f"client_id={GOOGLE_CLIENT_ID}&" | |
| f"redirect_uri={redirect_uri}&" | |
| f"response_type=code&" | |
| f"scope={scope}&" | |
| f"state={state}&" | |
| f"access_type=offline&" | |
| f"prompt=consent" | |
| ) | |
| return {"auth_url": auth_url} | |
| async def google_callback( | |
| code: Optional[str] = None, | |
| error: Optional[str] = None, | |
| error_description: Optional[str] = None | |
| ): | |
| """Handle Google OAuth callback""" | |
| if error: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Google OAuth error: {error_description or error}" | |
| ) | |
| if not code: | |
| raise HTTPException(status_code=400, detail="No authorization code received") | |
| if not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET: | |
| raise HTTPException(status_code=500, detail="Google OAuth not configured") | |
| redirect_uri = GOOGLE_REDIRECT_URI or "https://innsight-ai.vercel.app/api/auth/google/callback" | |
| # Exchange code for tokens | |
| async with httpx.AsyncClient() as client: | |
| token_response = await client.post( | |
| "https://oauth2.googleapis.com/token", | |
| data={ | |
| "client_id": GOOGLE_CLIENT_ID, | |
| "client_secret": GOOGLE_CLIENT_SECRET, | |
| "code": code, | |
| "redirect_uri": redirect_uri, | |
| "grant_type": "authorization_code" | |
| }, | |
| headers={"Content-Type": "application/x-www-form-urlencoded"} | |
| ) | |
| if token_response.status_code != 200: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Failed to exchange code for token" | |
| ) | |
| tokens = token_response.json() | |
| access_token = tokens.get("access_token") | |
| # Get user info from Google | |
| user_response = await client.get( | |
| "https://www.googleapis.com/oauth2/v2/userinfo", | |
| headers={"Authorization": f"Bearer {access_token}"} | |
| ) | |
| if user_response.status_code != 200: | |
| raise HTTPException(status_code=400, detail="Failed to get user info") | |
| google_user = user_response.json() | |
| email = google_user.get("email", "") | |
| full_name = google_user.get("name", "") | |
| # Check if user is authorized (admin email or registered user) | |
| if email.lower() not in [ADMIN_EMAIL.lower()] and email not in demo_users: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Access denied. Please contact administrator for access." | |
| ) | |
| # Create or update user | |
| if email not in demo_users: | |
| user_id = len(demo_users) + 1 | |
| created_at = datetime.utcnow().isoformat() | |
| totp_secret = generate_totp_secret() | |
| demo_users[email] = { | |
| "id": user_id, | |
| "email": email, | |
| "password_hash": "", # No password for OAuth users | |
| "full_name": full_name or email.split("@")[0], | |
| "created_at": created_at, | |
| "auth_provider": "google", | |
| "totp_secret": totp_secret, | |
| "is_admin": email.lower() == ADMIN_EMAIL.lower() | |
| } | |
| user = demo_users[email] | |
| # Create JWT token | |
| jwt_token = create_jwt_token({ | |
| "user_id": user["id"], | |
| "email": user["email"], | |
| "is_admin": user.get("is_admin", False) | |
| }) | |
| # Build user data for frontend | |
| frontend_url = "https://innsight-ai.vercel.app" | |
| user_data = { | |
| 'id': user['id'], | |
| 'email': user['email'], | |
| 'full_name': user['full_name'], | |
| 'created_at': user['created_at'], | |
| 'is_admin': user.get('is_admin', False) | |
| } | |
| # Return HTML that redirects to frontend | |
| html_response = f''' | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="utf-8"> | |
| <title>Authenticating...</title> | |
| <script> | |
| localStorage.setItem('token', '{jwt_token}'); | |
| localStorage.setItem('user', JSON.stringify({json.dumps(user_data)})); | |
| localStorage.setItem('auth-storage', JSON.stringify({{ | |
| "state": {{ | |
| "token": "{jwt_token}", | |
| "user": {json.dumps(user_data)}, | |
| "isAuthenticated": true, | |
| "isAdmin": {'true' if user.get('is_admin', False) else 'false'} | |
| }}, | |
| "version": 0 | |
| }})); | |
| window.location.href = '{frontend_url}/dashboard'; | |
| </script> | |
| </head> | |
| <body> | |
| <p>Authenticating... If you are not redirected, <a href="{frontend_url}/dashboard">click here</a>.</p> | |
| </body> | |
| </html> | |
| ''' | |
| return HTMLResponse(content=html_response) | |
| # ============= Chatbot Route (NVIDIA Nemotron) ============= | |
| # Rate limit: 30 chat messages per minute per IP | |
| async def chat(request: Request, message: ChatMessage, current_user: Optional[dict] = Depends(get_current_user_optional)): | |
| """Chat with NVIDIA Nemotron AI assistant""" | |
| # If user is not authenticated, return demo response | |
| if not current_user: | |
| # Return a helpful demo response | |
| demo_responses = { | |
| "מה": "אני InnSight-AI, העוזר החכם שלך לניהול הכנסות מלונאיות. אני יכול לעזור לך להבין מגמות מחירים, להשוות מתחרים, ולתת המלצות לאופטימיזציה של מחירים.", | |
| "איך": "אתה יכול להשתמש בפלטפורמה כדי: 1) להוסיף מלונות מתחרים למעקב, 2) לצפות בהשוואות מחירים בזמן אמת, 3) לייצא דוחות לאקסל, 4) לקבל תובנות על מגמות השוק.", | |
| "מחיר": "לניתוח מחירים, עבור לדף 'השוואת מחירים' ובחר את המלונות והתאריכים להשוואה. המערכת תציג גרפים ותובנות על פערי המחירים.", | |
| "RevPAR": "כדי להעלות RevPAR, מומלץ: 1) לבחון את התמחור הדינמי ביחס לתפוסה, 2) להציע שדרוגים במעמד הצ'ק-אין, 3) לשווק חבילות ערך מוסף בימים חלשים.", | |
| "default": "שלום! אני העוזר החכם של InnSight-AI. אני יכול לעזור לך עם שאלות על ניהול הכנסות, השוואת מחירים, ואופטימיזציה של תעריפים. במה אוכל לסייע?" | |
| } | |
| response_text = demo_responses["default"] | |
| for key, value in demo_responses.items(): | |
| if key in message.message: # Simple string matching | |
| response_text = value | |
| break | |
| return ChatResponse(response=response_text) | |
| # If authenticated, use NVIDIA Nemotron API (if key exists) | |
| if not NVIDIA_API_KEY: | |
| # Fallback to demo if key missing even for auth users | |
| return ChatResponse(response="NVIDIA API Key not configured. Using demo mode.") | |
| # Call NVIDIA Nemotron API | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| "https://integrate.api.nvidia.com/v1/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {NVIDIA_API_KEY}", | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "model": "nvidia/llama-3.1-nemotron-70b-instruct", | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": """אתה עוזר חכם בשם InnSight-AI המתמחה בניהול הכנסות מלונאיות (Revenue Management) בישראל. | |
| תפקידך לסייע למנהלי מלונות להבין מגמות מחירים, לנתח מתחרים, ולתת המלצות לאופטימיזציה של תעריפים. | |
| ענה תמיד בעברית, בצורה מקצועית אך ידידותית. | |
| התמקד בנושאים: תמחור דינמי, ניתוח מתחרים, עונתיות, תפוסה, ו-RevPAR.""" | |
| }, | |
| { | |
| "role": "user", | |
| "content": message.message | |
| } | |
| ], | |
| "temperature": 0.7, | |
| "max_tokens": 1024 | |
| }, | |
| timeout=30.0 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return ChatResponse(response=data["choices"][0]["message"]["content"]) | |
| else: | |
| raise HTTPException(status_code=500, detail="AI service error") | |
| except httpx.TimeoutException as exc: | |
| raise HTTPException(status_code=504, detail="AI service timeout") from exc | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| # ============= Dashboard Routes ============= | |
| async def get_dashboard_stats(current_user: dict = Depends(get_current_user)): | |
| """Get dashboard statistics""" | |
| return { | |
| "total_hotels": 5, | |
| "total_comparisons": 24, | |
| "avg_price_difference": 12.5, | |
| "last_scrape": datetime.utcnow().isoformat(), | |
| "hotels_tracked": [ | |
| {"name": "צובה", "current_price": 850, "change": 5.2}, | |
| {"name": "רמת רחל", "current_price": 920, "change": -2.1}, | |
| {"name": "יד השמונה", "current_price": 780, "change": 0}, | |
| {"name": "יערים", "current_price": 890, "change": 3.5}, | |
| {"name": "נווה אילן", "current_price": 950, "change": -1.8} | |
| ] | |
| } | |
| async def get_price_trends(current_user: dict = Depends(get_current_user)): | |
| """Get price trends for charts""" | |
| return { | |
| "labels": ["ינואר", "פברואר", "מרץ", "אפריל", "מאי", "יוני"], | |
| "datasets": [ | |
| {"hotel": "צובה", "data": [820, 850, 880, 850, 870, 850]}, | |
| {"hotel": "רמת רחל", "data": [900, 920, 950, 930, 940, 920]}, | |
| {"hotel": "יד השמונה", "data": [750, 780, 800, 770, 790, 780]}, | |
| {"hotel": "יערים", "data": [870, 890, 920, 900, 910, 890]}, | |
| {"hotel": "נווה אילן", "data": [940, 950, 980, 960, 970, 950]} | |
| ] | |
| } | |
| async def get_occupancy_data(current_user: dict = Depends(get_current_user)): | |
| """Get occupancy comparison data""" | |
| return { | |
| "couple": {"avg": 850, "min": 750, "max": 950}, | |
| "couple_1_child": {"avg": 950, "min": 850, "max": 1050}, | |
| "couple_2_children": {"avg": 1050, "min": 950, "max": 1150} | |
| } | |
| # ============= Hotels Routes ============= | |
| # Demo hotels data - uses database proxy | |
| class DemoHotelsProxy: | |
| """Proxy class to maintain backward compatibility with demo_hotels list""" | |
| def __iter__(self): | |
| from database import db as _db | |
| if isinstance(_db, InMemoryDatabase): | |
| return iter(_db._hotels) | |
| return iter([]) | |
| def __len__(self): | |
| from database import db as _db | |
| if isinstance(_db, InMemoryDatabase): | |
| return len(_db._hotels) | |
| return 0 | |
| def append(self, hotel: dict): | |
| from database import db as _db | |
| if isinstance(_db, InMemoryDatabase): | |
| _db._hotels.append(hotel) | |
| demo_hotels = DemoHotelsProxy() | |
| async def get_hotels(current_user: dict = Depends(get_current_user)): | |
| """Get all hotels (filtered by ownership)""" | |
| user_id = current_user["user_id"] | |
| # Check if admin to allow seeing all hotels | |
| user = await get_user_by_id_async(user_id) | |
| if user and user.get("is_admin"): | |
| hotels = await db.get_hotels(owner_id=None) | |
| else: | |
| hotels = await db.get_hotels(owner_id=user_id) | |
| return hotels | |
| async def create_hotel(hotel: HotelCreate, current_user: dict = Depends(get_current_user)): | |
| """Create a new hotel""" | |
| new_hotel = await db.create_hotel( | |
| name=hotel.name, | |
| owner_id=current_user["user_id"], | |
| booking_url=hotel.booking_url, | |
| website_url=hotel.website_url | |
| ) | |
| return new_hotel | |
| async def delete_hotel(hotel_id: int, current_user: dict = Depends(get_current_user)): | |
| """Delete a hotel""" | |
| user_id = current_user["user_id"] | |
| # Check if admin | |
| user = await get_user_by_id_async(user_id) | |
| owner_id = user_id | |
| if user and user.get("is_admin"): | |
| owner_id = None | |
| success = await db.delete_hotel(hotel_id, owner_id=owner_id) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Hotel not found or access denied") | |
| return {"message": "Hotel deleted"} | |
| # ============= Comparison Routes ============= | |
| async def compare_hotels(request: ComparisonRequest, current_user: dict = Depends(get_current_user)): | |
| """Compare hotel prices""" | |
| # Demo comparison data | |
| return { | |
| "check_in": request.check_in, | |
| "check_out": request.check_out, | |
| "occupancy": request.occupancy, | |
| "results": [ | |
| { | |
| "hotel_id": 1, | |
| "hotel_name": "צובה", | |
| "prices": { | |
| "booking": {"BB": 850, "HB": 1050, "RO": 750}, | |
| "website": {"BB": 820, "HB": 1020, "RO": 720} | |
| } | |
| }, | |
| { | |
| "hotel_id": 2, | |
| "hotel_name": "רמת רחל", | |
| "prices": { | |
| "booking": {"BB": 920, "HB": 1120, "RO": 820}, | |
| "website": {"BB": 900, "HB": 1100, "RO": 800} | |
| } | |
| }, | |
| { | |
| "hotel_id": 3, | |
| "hotel_name": "יד השמונה", | |
| "prices": { | |
| "booking": {"BB": 780, "HB": 980, "RO": 680}, | |
| "website": {"BB": 760, "HB": 960, "RO": 660} | |
| } | |
| } | |
| ] | |
| } | |
| # ============= Export Routes ============= | |
| class ExportRequest(BaseModel): | |
| """Export request with validation""" | |
| data: list[dict] | |
| format: str = "xlsx" # xlsx, csv | |
| filename: Optional[str] = None | |
| class ExportResponse(BaseModel): | |
| """Export response""" | |
| download_url: str | |
| filename: str | |
| format: str | |
| MAX_EXPORT_ITEMS = 10000 # Maximum number of items to export | |
| ALLOWED_EXPORT_FORMATS = {"xlsx", "csv"} | |
| async def export_data( | |
| request: ExportRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """ | |
| Export data to Excel or CSV format. | |
| Requires authentication. | |
| Validates format and limits dataset size to prevent abuse. | |
| """ | |
| # Validate format | |
| if request.format not in ALLOWED_EXPORT_FORMATS: | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"Invalid format. Must be one of: {', '.join(ALLOWED_EXPORT_FORMATS)}" | |
| ) | |
| # Validate data is not empty | |
| if not request.data: | |
| raise HTTPException( | |
| status_code=422, | |
| detail="Data cannot be empty" | |
| ) | |
| # Validate data size | |
| if len(request.data) > MAX_EXPORT_ITEMS: | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"Data too large. Maximum {MAX_EXPORT_ITEMS} items allowed" | |
| ) | |
| # Generate filename | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| base_filename = request.filename or f"export_{timestamp}" | |
| # Sanitize filename - remove path separators and special chars | |
| safe_filename = "".join(c for c in base_filename if c.isalnum() or c in "_-")[:50] | |
| filename = f"{safe_filename}.{request.format}" | |
| # In a real implementation, this would generate the file and return a download URL | |
| download_url = f"/api/export/download/{filename}" | |
| return ExportResponse( | |
| download_url=download_url, | |
| filename=filename, | |
| format=request.format | |
| ) | |
| async def download_export( | |
| filename: str, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """ | |
| Download an exported file. | |
| Requires authentication to prevent unauthorized access to exports. | |
| """ | |
| # Security: Prevent path traversal | |
| if ".." in filename or "/" in filename or "\\" in filename: | |
| raise HTTPException(status_code=400, detail="Invalid filename") | |
| # In a real implementation, this would return the actual file | |
| raise HTTPException(status_code=404, detail="File not found") | |
| # ============= Health Check ============= | |
| async def health_check(): | |
| """Health check endpoint""" | |
| global _db_initialized | |
| return { | |
| "status": "healthy", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "db_initialized": _db_initialized, | |
| "db_type": type(db).__name__ if db else "None" | |
| } | |
| async def debug_info(): | |
| """Debug endpoint to check system status""" | |
| global _db_initialized, DB_IMPORT_ERROR | |
| try: | |
| from database import PG8000_AVAILABLE | |
| except Exception: | |
| PG8000_AVAILABLE = False | |
| return { | |
| "status": "ok", | |
| "db_initialized": _db_initialized, | |
| "db_type": type(db).__name__ if db else "None", | |
| "db_import_error": DB_IMPORT_ERROR, | |
| "pg8000_available": PG8000_AVAILABLE, | |
| "is_production": IS_PRODUCTION, | |
| "env_mode": ENV_MODE | |
| } | |
| # End of file - Vercel automatically finds 'app' variable | |