Spaces:
Sleeping
Sleeping
| import asyncio | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Optional | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.preprocessing import StandardScaler | |
| from src.core import MarketAnalyzer, MarketAnalyzerImpl | |
| from src.core import ( | |
| PortfolioOptimizerImpl, | |
| MarketAnalyzerImpl, | |
| OptimizationConstraints | |
| ) | |
| from src.analysis.ml_analyzer import MLEnhancedAnalyzer | |
| from src.core import PortfolioOptimizerImpl, OptimizationConstraints | |
| class RealTimeAlert: | |
| timestamp: datetime | |
| alert_type: str | |
| severity: str # 'low', 'medium', 'high', 'critical' | |
| message: str | |
| affected_assets: List[str] | |
| metrics: Dict[str, float] | |
| action_required: bool | |
| suggested_actions: List[str] | |
| class EnhancedRealTimeMonitor: | |
| """Système de surveillance en temps réel avancé avec ML et analyse comportementale""" | |
| def __init__(self, | |
| portfolio_optimizer, | |
| market_analyzer, | |
| risk_manager, | |
| initial_capital: float, | |
| alert_thresholds: Dict[str, float]): | |
| if not isinstance(portfolio_optimizer, PortfolioOptimizerImpl): | |
| raise TypeError("portfolio_optimizer must be instance of PortfolioOptimizerImpl") | |
| ml_analyzer = MLEnhancedAnalyzer() | |
| if market_analyzer is None: | |
| ml_analyzer = MLEnhancedAnalyzer() | |
| self.market_analyzer = MarketAnalyzerImpl(ml_analyzer=ml_analyzer) | |
| else: | |
| self.market_analyzer = market_analyzer | |
| self.risk_manager = risk_manager | |
| self.initial_capital = initial_capital | |
| self.alert_thresholds = alert_thresholds | |
| # Modèles ML pour la détection d'anomalies | |
| self.anomaly_detector = IsolationForest(contamination=0.1) | |
| self.scaler = StandardScaler() | |
| # État du système | |
| self.current_portfolio = None | |
| self.market_state = None | |
| self.active_alerts = [] | |
| self.historical_data = pd.DataFrame() | |
| async def start_monitoring(self, portfolio: Dict[str, float]): | |
| """Démarre la surveillance en temps réel""" | |
| self.current_portfolio = portfolio | |
| try: | |
| await asyncio.gather( | |
| self._market_data_loop() | |
| self._portfolio_analysis_loop(), | |
| self._risk_monitoring_loop(), | |
| self._anomaly_detection_loop() | |
| ) | |
| except Exception as e: | |
| self._handle_monitoring_error(e) | |
| async def _market_data_stream(self): | |
| """Stream des données de marché en temps réel""" | |
| while True: | |
| try: | |
| # Récupération des données de marché | |
| market_data = await self._fetch_market_data() | |
| market_analysis = await self.market_analyzer.analyze_market_conditions(market_data) | |
| # Mise à jour de l'état du marché | |
| self.market_state = self.market_analyzer.analyze_market_conditions(market_data) | |
| # Détection de changements significatifs | |
| if self._detect_significant_changes(market_data): | |
| await self._handle_market_changes() | |
| # Mise à jour des données historiques | |
| self._update_historical_data(market_data) | |
| await asyncio.sleep(1) # Intervalle de mise à jour | |
| except Exception as e: | |
| self._handle_streaming_error(e) | |
| async def _portfolio_analysis_loop(self): | |
| """Analyse continue du portefeuille""" | |
| while True: | |
| try: | |
| # Analyse de la performance | |
| performance_metrics = self._calculate_performance_metrics() | |
| # Vérification des contraintes | |
| constraints_violated = self._check_portfolio_constraints() | |
| if constraints_violated: | |
| await self._generate_rebalancing_signals() | |
| # Analyse des opportunités | |
| opportunities = self._identify_opportunities() | |
| if opportunities: | |
| await self._handle_opportunities(opportunities) | |
| await asyncio.sleep(5) # Intervalle d'analyse | |
| except Exception as e: | |
| self._handle_analysis_error(e) | |
| async def _risk_monitoring_loop(self): | |
| """Surveillance continue des risques""" | |
| while True: | |
| try: | |
| # Calcul des métriques de risque | |
| market_conditions = await self.market_analyzer.analyze_market_conditions(data) | |
| risk_metrics = self.risk_manager.calculate_risk_metrics( | |
| self.current_portfolio, | |
| self.market_state | |
| ) | |
| # Vérification des seuils de risque | |
| risk_alerts = self._check_risk_thresholds(risk_metrics) | |
| # Génération des alertes si nécessaire | |
| for alert in risk_alerts: | |
| await self._handle_risk_alert(alert) | |
| # Mise à jour du profil de risque | |
| self._update_risk_profile(risk_metrics) | |
| await asyncio.sleep(3) # Intervalle de surveillance | |
| except Exception as e: | |
| self._handle_risk_error(e) | |
| async def _anomaly_detection_loop(self): | |
| """Détection d'anomalies en temps réel""" | |
| while True: | |
| try: | |
| # Préparation des données pour la détection | |
| features = self._prepare_anomaly_features() | |
| # Détection des anomalies | |
| anomalies = self._detect_anomalies(features) | |
| if anomalies: | |
| await self._handle_anomalies(anomalies) | |
| # Mise à jour du modèle si nécessaire | |
| self._update_anomaly_model() | |
| await asyncio.sleep(10) # Intervalle de détection | |
| except Exception as e: | |
| self._handle_anomaly_error(e) | |
| def _detect_anomalies(self, features: np.ndarray) -> List[Dict]: | |
| """Détection des anomalies dans les données""" | |
| try: | |
| # Normalisation des données | |
| scaled_features = self.scaler.transform(features) | |
| # Prédiction des anomalies | |
| predictions = self.anomaly_detector.predict(scaled_features) | |
| anomalies = [] | |
| for idx, is_anomaly in enumerate(predictions): | |
| if is_anomaly == -1: # Anomalie détectée | |
| anomaly_info = { | |
| 'timestamp': datetime.now(), | |
| 'feature_idx': idx, | |
| 'severity': self._calculate_anomaly_severity(features[idx]), | |
| 'affected_metrics': self._identify_affected_metrics(idx) | |
| } | |
| anomalies.append(anomaly_info) | |
| return anomalies | |
| except Exception as e: | |
| self._handle_detection_error(e) | |
| return [] | |
| async def _handle_anomalies(self, anomalies: List[Dict]): | |
| """Gestion des anomalies détectées""" | |
| for anomaly in anomalies: | |
| alert = RealTimeAlert( | |
| timestamp=anomaly['timestamp'], | |
| alert_type='anomaly', | |
| severity=anomaly['severity'], | |
| message=self._generate_anomaly_message(anomaly), | |
| affected_assets=self._identify_affected_assets(anomaly), | |
| metrics=self._get_relevant_metrics(anomaly), | |
| action_required=True, | |
| suggested_actions=self._generate_anomaly_actions(anomaly) | |
| ) | |
| # Ajouter l'alerte à la liste des alertes actives | |
| self.active_alerts.append(alert) | |
| # Déclencher les actions automatiques si nécessaire | |
| if alert.severity in ['high', 'critical']: | |
| await self._execute_automatic_actions(alert) | |
| def _calculate_anomaly_severity(self, feature_values: np.ndarray) -> str: | |
| """Calcul de la sévérité d'une anomalie""" | |
| # Calculer la déviation par rapport à la normale | |
| z_score = np.abs((feature_values - np.mean(feature_values)) / np.std(feature_values)) | |
| if z_score.max() > 3: | |
| return 'critical' | |
| elif z_score.max() > 2: | |
| return 'high' | |
| elif z_score.max() > 1.5: | |
| return 'medium' | |
| else: | |
| return 'low' | |
| async def _execute_automatic_actions(self, alert: RealTimeAlert): | |
| """Exécution des actions automatiques en réponse aux alertes""" | |
| if alert.severity == 'critical': | |
| # Rééquilibrage d'urgence du portefeuille | |
| await self._emergency_rebalancing() | |
| # Enregistrement des actions prises | |
| self._log_action_taken(alert) |