Really-amin's picture
Upload 325 files
b66240d verified
raw
history blame
9.58 kB
"""HuggingFace Cryptocurrency Data Engine - Main Application"""
from __future__ import annotations
import time
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from core.config import settings, get_supported_symbols, get_supported_intervals
from core.aggregator import get_aggregator
from core.cache import cache, cache_key, get_or_set
from core.models import (
OHLCVResponse, PricesResponse, SentimentResponse,
MarketOverviewResponse, HealthResponse, ErrorResponse, ErrorDetail, CacheInfo
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Rate limiter
limiter = Limiter(key_func=get_remote_address)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle manager for the application"""
logger.info("Starting HuggingFace Crypto Data Engine...")
logger.info(f"Version: {settings.VERSION}")
logger.info(f"Environment: {settings.ENV}")
# Initialize aggregator
aggregator = get_aggregator()
yield
# Cleanup
logger.info("Shutting down...")
await aggregator.close()
# Create FastAPI app
app = FastAPI(
title="HuggingFace Cryptocurrency Data Engine",
description="Comprehensive cryptocurrency data aggregator with multi-provider support",
version=settings.VERSION,
lifespan=lifespan
)
# Add rate limiter
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Global exception handler"""
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content=ErrorResponse(
error=ErrorDetail(
code="INTERNAL_ERROR",
message=str(exc)
),
timestamp=int(time.time() * 1000)
).dict()
)
@app.get("/")
async def root():
"""Root endpoint"""
return {
"service": "HuggingFace Cryptocurrency Data Engine",
"version": settings.VERSION,
"status": "online",
"endpoints": {
"health": "/api/health",
"ohlcv": "/api/ohlcv",
"prices": "/api/prices",
"sentiment": "/api/sentiment",
"market": "/api/market/overview",
"docs": "/docs"
}
}
@app.get("/api/health", response_model=HealthResponse)
@limiter.limit(f"{settings.RATE_LIMIT_HEALTH or 999999}/minute")
async def health_check(request: Request):
"""Health check endpoint with provider status"""
aggregator = get_aggregator()
# Get provider health
providers = await aggregator.get_all_provider_health()
# Determine overall status
online_count = sum(1 for p in providers if p.status == "online")
if online_count == 0:
overall_status = "unhealthy"
elif online_count < len(providers) / 2:
overall_status = "degraded"
else:
overall_status = "healthy"
# Get cache stats
cache_stats = cache.get_stats()
return HealthResponse(
status=overall_status,
uptime=aggregator.get_uptime(),
version=settings.VERSION,
providers=providers,
cache=CacheInfo(**cache_stats)
)
@app.get("/api/ohlcv", response_model=OHLCVResponse)
@limiter.limit(f"{settings.RATE_LIMIT_OHLCV}/minute")
async def get_ohlcv(
request: Request,
symbol: str = Query(..., description="Symbol (e.g., BTC, BTCUSDT, BTC/USDT)"),
interval: str = Query("1h", description="Interval (1m, 5m, 15m, 1h, 4h, 1d, 1w)"),
limit: int = Query(100, ge=1, le=1000, description="Number of candles (1-1000)")
):
"""Get OHLCV candlestick data with multi-provider fallback"""
# Validate interval
if interval not in get_supported_intervals():
raise HTTPException(
status_code=400,
detail=f"Invalid interval. Supported: {', '.join(get_supported_intervals())}"
)
# Normalize symbol
normalized_symbol = symbol.upper().replace("/", "").replace("-", "")
# Generate cache key
key = cache_key("ohlcv", symbol=normalized_symbol, interval=interval, limit=limit)
async def fetch():
aggregator = get_aggregator()
data, source = await aggregator.fetch_ohlcv(normalized_symbol, interval, limit)
return {"data": data, "source": source}
try:
# Get from cache or fetch
result = await get_or_set(key, settings.CACHE_TTL_OHLCV, fetch)
return OHLCVResponse(
data=result["data"],
symbol=normalized_symbol,
interval=interval,
count=len(result["data"]),
source=result["source"],
timestamp=int(time.time() * 1000)
)
except Exception as e:
logger.error(f"OHLCV fetch failed: {e}")
raise HTTPException(
status_code=503,
detail=ErrorDetail(
code="PROVIDER_ERROR",
message=f"All data providers failed: {str(e)}"
).dict()
)
@app.get("/api/prices", response_model=PricesResponse)
@limiter.limit(f"{settings.RATE_LIMIT_PRICES}/minute")
async def get_prices(
request: Request,
symbols: str = Query(None, description="Comma-separated symbols (e.g., BTC,ETH,SOL)"),
convert: str = Query("USDT", description="Convert to currency (USD, USDT)")
):
"""Get real-time prices with multi-provider aggregation"""
# Parse symbols
if symbols:
symbol_list = [s.strip().upper() for s in symbols.split(",")]
else:
# Use default symbols
symbol_list = get_supported_symbols()
# Generate cache key
key = cache_key("prices", symbols=",".join(sorted(symbol_list)))
async def fetch():
aggregator = get_aggregator()
data, source = await aggregator.fetch_prices(symbol_list)
return {"data": data, "source": source}
try:
# Get from cache or fetch
result = await get_or_set(key, settings.CACHE_TTL_PRICES, fetch)
return PricesResponse(
data=result["data"],
timestamp=int(time.time() * 1000),
source=result["source"]
)
except Exception as e:
logger.error(f"Price fetch failed: {e}")
raise HTTPException(
status_code=503,
detail=ErrorDetail(
code="PROVIDER_ERROR",
message=f"All price providers failed: {str(e)}"
).dict()
)
@app.get("/api/sentiment", response_model=SentimentResponse)
@limiter.limit(f"{settings.RATE_LIMIT_SENTIMENT}/minute")
async def get_sentiment(request: Request):
"""Get market sentiment data (Fear & Greed Index)"""
if not settings.ENABLE_SENTIMENT:
raise HTTPException(
status_code=503,
detail="Sentiment analysis is disabled"
)
# Cache key
key = cache_key("sentiment")
async def fetch():
aggregator = get_aggregator()
return await aggregator.fetch_sentiment()
try:
# Get from cache or fetch
data = await get_or_set(key, settings.CACHE_TTL_SENTIMENT, fetch)
return SentimentResponse(
data=data,
timestamp=int(time.time() * 1000)
)
except Exception as e:
logger.error(f"Sentiment fetch failed: {e}")
raise HTTPException(
status_code=503,
detail=ErrorDetail(
code="PROVIDER_ERROR",
message=f"Failed to fetch sentiment: {str(e)}"
).dict()
)
@app.get("/api/market/overview", response_model=MarketOverviewResponse)
@limiter.limit(f"{settings.RATE_LIMIT_SENTIMENT}/minute")
async def get_market_overview(request: Request):
"""Get market overview with global statistics"""
# Cache key
key = cache_key("market_overview")
async def fetch():
aggregator = get_aggregator()
return await aggregator.fetch_market_overview()
try:
# Get from cache or fetch
data = await get_or_set(key, settings.CACHE_TTL_MARKET, fetch)
return MarketOverviewResponse(
data=data,
timestamp=int(time.time() * 1000)
)
except Exception as e:
logger.error(f"Market overview fetch failed: {e}")
raise HTTPException(
status_code=503,
detail=ErrorDetail(
code="PROVIDER_ERROR",
message=f"Failed to fetch market overview: {str(e)}"
).dict()
)
@app.post("/api/cache/clear")
async def clear_cache():
"""Clear all cached data"""
cache.clear()
return {"success": True, "message": "Cache cleared"}
@app.get("/api/cache/stats")
async def cache_stats():
"""Get cache statistics"""
return cache.get_stats()
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=settings.HOST,
port=settings.PORT,
reload=(settings.ENV == "development"),
log_level="info"
)