| | """ |
| | WebSocket API for Data Collection Services |
| | |
| | This module provides WebSocket endpoints for real-time data streaming |
| | from all data collection services. |
| | """ |
| |
|
| | import asyncio |
| | from datetime import datetime |
| | from typing import Any, Dict, Optional |
| | from fastapi import APIRouter, WebSocket, WebSocketDisconnect |
| | import logging |
| |
|
| | from backend.services.ws_service_manager import ws_manager, ServiceType |
| | from collectors.market_data import MarketDataCollector |
| | from collectors.explorers import ExplorerDataCollector |
| | from collectors.news import NewsCollector |
| | from collectors.sentiment import SentimentCollector |
| | from collectors.whale_tracking import WhaleTrackingCollector |
| | from collectors.rpc_nodes import RPCNodeCollector |
| | from collectors.onchain import OnChainCollector |
| | from config import Config |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | router = APIRouter() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class DataCollectionStreamers: |
| | """Handles data streaming for all collection services""" |
| |
|
| | def __init__(self): |
| | self.config = Config() |
| | self.market_data_collector = MarketDataCollector(self.config) |
| | self.explorer_collector = ExplorerDataCollector(self.config) |
| | self.news_collector = NewsCollector(self.config) |
| | self.sentiment_collector = SentimentCollector(self.config) |
| | self.whale_collector = WhaleTrackingCollector(self.config) |
| | self.rpc_collector = RPCNodeCollector(self.config) |
| | self.onchain_collector = OnChainCollector(self.config) |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_market_data(self): |
| | """Stream real-time market data""" |
| | try: |
| | data = await self.market_data_collector.collect() |
| | if data: |
| | return { |
| | "prices": data.get("prices", {}), |
| | "volumes": data.get("volumes", {}), |
| | "market_caps": data.get("market_caps", {}), |
| | "price_changes": data.get("price_changes", {}), |
| | "source": data.get("source", "unknown"), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming market data: {e}") |
| | return None |
| |
|
| | async def stream_order_books(self): |
| | """Stream order book data""" |
| | try: |
| | |
| | data = await self.market_data_collector.collect() |
| | if data and "order_book" in data: |
| | return { |
| | "bids": data["order_book"].get("bids", []), |
| | "asks": data["order_book"].get("asks", []), |
| | "spread": data["order_book"].get("spread"), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming order books: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_explorer_data(self): |
| | """Stream blockchain explorer data""" |
| | try: |
| | data = await self.explorer_collector.collect() |
| | if data: |
| | return { |
| | "latest_block": data.get("latest_block"), |
| | "network_hashrate": data.get("network_hashrate"), |
| | "difficulty": data.get("difficulty"), |
| | "mempool_size": data.get("mempool_size"), |
| | "transactions_count": data.get("transactions_count"), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming explorer data: {e}") |
| | return None |
| |
|
| | async def stream_transactions(self): |
| | """Stream recent transactions""" |
| | try: |
| | data = await self.explorer_collector.collect() |
| | if data and "recent_transactions" in data: |
| | return { |
| | "transactions": data["recent_transactions"], |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming transactions: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_news(self): |
| | """Stream news updates""" |
| | try: |
| | data = await self.news_collector.collect() |
| | if data and "articles" in data: |
| | return { |
| | "articles": data["articles"][:10], |
| | "sources": data.get("sources", []), |
| | "categories": data.get("categories", []), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming news: {e}") |
| | return None |
| |
|
| | async def stream_breaking_news(self): |
| | """Stream breaking news alerts""" |
| | try: |
| | data = await self.news_collector.collect() |
| | if data and "breaking" in data: |
| | return { |
| | "breaking_news": data["breaking"], |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming breaking news: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_sentiment(self): |
| | """Stream sentiment analysis data""" |
| | try: |
| | data = await self.sentiment_collector.collect() |
| | if data: |
| | return { |
| | "overall_sentiment": data.get("overall_sentiment"), |
| | "sentiment_score": data.get("sentiment_score"), |
| | "social_volume": data.get("social_volume"), |
| | "trending_topics": data.get("trending_topics", []), |
| | "sentiment_by_source": data.get("by_source", {}), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming sentiment: {e}") |
| | return None |
| |
|
| | async def stream_social_trends(self): |
| | """Stream social media trends""" |
| | try: |
| | data = await self.sentiment_collector.collect() |
| | if data and "social_trends" in data: |
| | return { |
| | "trends": data["social_trends"], |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming social trends: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_whale_activity(self): |
| | """Stream whale transaction data""" |
| | try: |
| | data = await self.whale_collector.collect() |
| | if data: |
| | return { |
| | "large_transactions": data.get("large_transactions", []), |
| | "whale_wallets": data.get("whale_wallets", []), |
| | "total_volume": data.get("total_volume"), |
| | "alert_threshold": data.get("alert_threshold"), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming whale activity: {e}") |
| | return None |
| |
|
| | async def stream_whale_alerts(self): |
| | """Stream whale transaction alerts""" |
| | try: |
| | data = await self.whale_collector.collect() |
| | if data and "alerts" in data: |
| | return { |
| | "alerts": data["alerts"], |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming whale alerts: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_rpc_status(self): |
| | """Stream RPC node status""" |
| | try: |
| | data = await self.rpc_collector.collect() |
| | if data: |
| | return { |
| | "nodes": data.get("nodes", []), |
| | "active_nodes": data.get("active_nodes"), |
| | "total_nodes": data.get("total_nodes"), |
| | "average_latency": data.get("average_latency"), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming RPC status: {e}") |
| | return None |
| |
|
| | async def stream_blockchain_events(self): |
| | """Stream blockchain events from RPC nodes""" |
| | try: |
| | data = await self.rpc_collector.collect() |
| | if data and "events" in data: |
| | return { |
| | "events": data["events"], |
| | "block_number": data.get("block_number"), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming blockchain events: {e}") |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | async def stream_onchain_metrics(self): |
| | """Stream on-chain analytics""" |
| | try: |
| | data = await self.onchain_collector.collect() |
| | if data: |
| | return { |
| | "active_addresses": data.get("active_addresses"), |
| | "transaction_count": data.get("transaction_count"), |
| | "total_fees": data.get("total_fees"), |
| | "gas_price": data.get("gas_price"), |
| | "network_utilization": data.get("network_utilization"), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming on-chain metrics: {e}") |
| | return None |
| |
|
| | async def stream_contract_events(self): |
| | """Stream smart contract events""" |
| | try: |
| | data = await self.onchain_collector.collect() |
| | if data and "contract_events" in data: |
| | return { |
| | "events": data["contract_events"], |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error streaming contract events: {e}") |
| | return None |
| |
|
| |
|
| | |
| | data_streamers = DataCollectionStreamers() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | async def start_data_collection_streams(): |
| | """Start all data collection stream tasks""" |
| | logger.info("Starting data collection WebSocket streams") |
| |
|
| | tasks = [ |
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.MARKET_DATA, |
| | data_streamers.stream_market_data, |
| | interval=5.0 |
| | )), |
| |
|
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.EXPLORERS, |
| | data_streamers.stream_explorer_data, |
| | interval=10.0 |
| | )), |
| |
|
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.NEWS, |
| | data_streamers.stream_news, |
| | interval=60.0 |
| | )), |
| |
|
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.SENTIMENT, |
| | data_streamers.stream_sentiment, |
| | interval=30.0 |
| | )), |
| |
|
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.WHALE_TRACKING, |
| | data_streamers.stream_whale_activity, |
| | interval=15.0 |
| | )), |
| |
|
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.RPC_NODES, |
| | data_streamers.stream_rpc_status, |
| | interval=20.0 |
| | )), |
| |
|
| | |
| | asyncio.create_task(ws_manager.start_service_stream( |
| | ServiceType.ONCHAIN, |
| | data_streamers.stream_onchain_metrics, |
| | interval=30.0 |
| | )), |
| | ] |
| |
|
| | await asyncio.gather(*tasks, return_exceptions=True) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.websocket("/ws/data") |
| | async def websocket_data_endpoint(websocket: WebSocket): |
| | """ |
| | Unified WebSocket endpoint for all data collection services |
| | |
| | Connection URL: ws://host:port/ws/data |
| | |
| | After connecting, send subscription messages: |
| | { |
| | "action": "subscribe", |
| | "service": "market_data" | "explorers" | "news" | "sentiment" | |
| | "whale_tracking" | "rpc_nodes" | "onchain" | "all" |
| | } |
| | |
| | To unsubscribe: |
| | { |
| | "action": "unsubscribe", |
| | "service": "service_name" |
| | } |
| | |
| | To get status: |
| | { |
| | "action": "get_status" |
| | } |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| |
|
| | try: |
| | while True: |
| | |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| |
|
| | except WebSocketDisconnect: |
| | logger.info(f"Client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"WebSocket error for client {connection.client_id}: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/market_data") |
| | async def websocket_market_data(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for market data |
| | |
| | Auto-subscribes to market_data service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.MARKET_DATA) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"Market data client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"Market data WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/whale_tracking") |
| | async def websocket_whale_tracking(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for whale tracking |
| | |
| | Auto-subscribes to whale_tracking service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.WHALE_TRACKING) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"Whale tracking client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"Whale tracking WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/news") |
| | async def websocket_news(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for news |
| | |
| | Auto-subscribes to news service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.NEWS) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"News client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"News WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|
| |
|
| | @router.websocket("/ws/sentiment") |
| | async def websocket_sentiment(websocket: WebSocket): |
| | """ |
| | Dedicated WebSocket endpoint for sentiment analysis |
| | |
| | Auto-subscribes to sentiment service |
| | """ |
| | connection = await ws_manager.connect(websocket) |
| | connection.subscribe(ServiceType.SENTIMENT) |
| |
|
| | try: |
| | while True: |
| | data = await websocket.receive_json() |
| | await ws_manager.handle_client_message(connection, data) |
| | except WebSocketDisconnect: |
| | logger.info(f"Sentiment client disconnected: {connection.client_id}") |
| | except Exception as e: |
| | logger.error(f"Sentiment WebSocket error: {e}") |
| | finally: |
| | await ws_manager.disconnect(connection.client_id) |
| |
|