ProfilingAI / src /monitoring /enhanced_realtime_monitor.py
Sandrine Guétin
Version propre de DeepVest
2106f78
raw
history blame
9.32 kB
import asyncio
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from src.core import MarketAnalyzer, MarketAnalyzerImpl
from src.core import (
PortfolioOptimizerImpl,
MarketAnalyzerImpl,
OptimizationConstraints
)
from src.analysis.ml_analyzer import MLEnhancedAnalyzer
from src.core import PortfolioOptimizerImpl, OptimizationConstraints
@dataclass
class RealTimeAlert:
timestamp: datetime
alert_type: str
severity: str # 'low', 'medium', 'high', 'critical'
message: str
affected_assets: List[str]
metrics: Dict[str, float]
action_required: bool
suggested_actions: List[str]
class EnhancedRealTimeMonitor:
"""Système de surveillance en temps réel avancé avec ML et analyse comportementale"""
def __init__(self,
portfolio_optimizer,
market_analyzer,
risk_manager,
initial_capital: float,
alert_thresholds: Dict[str, float]):
if not isinstance(portfolio_optimizer, PortfolioOptimizerImpl):
raise TypeError("portfolio_optimizer must be instance of PortfolioOptimizerImpl")
ml_analyzer = MLEnhancedAnalyzer()
if market_analyzer is None:
ml_analyzer = MLEnhancedAnalyzer()
self.market_analyzer = MarketAnalyzerImpl(ml_analyzer=ml_analyzer)
else:
self.market_analyzer = market_analyzer
self.risk_manager = risk_manager
self.initial_capital = initial_capital
self.alert_thresholds = alert_thresholds
# Modèles ML pour la détection d'anomalies
self.anomaly_detector = IsolationForest(contamination=0.1)
self.scaler = StandardScaler()
# État du système
self.current_portfolio = None
self.market_state = None
self.active_alerts = []
self.historical_data = pd.DataFrame()
async def start_monitoring(self, portfolio: Dict[str, float]):
"""Démarre la surveillance en temps réel"""
self.current_portfolio = portfolio
try:
await asyncio.gather(
self._market_data_loop()
self._portfolio_analysis_loop(),
self._risk_monitoring_loop(),
self._anomaly_detection_loop()
)
except Exception as e:
self._handle_monitoring_error(e)
async def _market_data_stream(self):
"""Stream des données de marché en temps réel"""
while True:
try:
# Récupération des données de marché
market_data = await self._fetch_market_data()
market_analysis = await self.market_analyzer.analyze_market_conditions(market_data)
# Mise à jour de l'état du marché
self.market_state = self.market_analyzer.analyze_market_conditions(market_data)
# Détection de changements significatifs
if self._detect_significant_changes(market_data):
await self._handle_market_changes()
# Mise à jour des données historiques
self._update_historical_data(market_data)
await asyncio.sleep(1) # Intervalle de mise à jour
except Exception as e:
self._handle_streaming_error(e)
async def _portfolio_analysis_loop(self):
"""Analyse continue du portefeuille"""
while True:
try:
# Analyse de la performance
performance_metrics = self._calculate_performance_metrics()
# Vérification des contraintes
constraints_violated = self._check_portfolio_constraints()
if constraints_violated:
await self._generate_rebalancing_signals()
# Analyse des opportunités
opportunities = self._identify_opportunities()
if opportunities:
await self._handle_opportunities(opportunities)
await asyncio.sleep(5) # Intervalle d'analyse
except Exception as e:
self._handle_analysis_error(e)
async def _risk_monitoring_loop(self):
"""Surveillance continue des risques"""
while True:
try:
# Calcul des métriques de risque
market_conditions = await self.market_analyzer.analyze_market_conditions(data)
risk_metrics = self.risk_manager.calculate_risk_metrics(
self.current_portfolio,
self.market_state
)
# Vérification des seuils de risque
risk_alerts = self._check_risk_thresholds(risk_metrics)
# Génération des alertes si nécessaire
for alert in risk_alerts:
await self._handle_risk_alert(alert)
# Mise à jour du profil de risque
self._update_risk_profile(risk_metrics)
await asyncio.sleep(3) # Intervalle de surveillance
except Exception as e:
self._handle_risk_error(e)
async def _anomaly_detection_loop(self):
"""Détection d'anomalies en temps réel"""
while True:
try:
# Préparation des données pour la détection
features = self._prepare_anomaly_features()
# Détection des anomalies
anomalies = self._detect_anomalies(features)
if anomalies:
await self._handle_anomalies(anomalies)
# Mise à jour du modèle si nécessaire
self._update_anomaly_model()
await asyncio.sleep(10) # Intervalle de détection
except Exception as e:
self._handle_anomaly_error(e)
def _detect_anomalies(self, features: np.ndarray) -> List[Dict]:
"""Détection des anomalies dans les données"""
try:
# Normalisation des données
scaled_features = self.scaler.transform(features)
# Prédiction des anomalies
predictions = self.anomaly_detector.predict(scaled_features)
anomalies = []
for idx, is_anomaly in enumerate(predictions):
if is_anomaly == -1: # Anomalie détectée
anomaly_info = {
'timestamp': datetime.now(),
'feature_idx': idx,
'severity': self._calculate_anomaly_severity(features[idx]),
'affected_metrics': self._identify_affected_metrics(idx)
}
anomalies.append(anomaly_info)
return anomalies
except Exception as e:
self._handle_detection_error(e)
return []
async def _handle_anomalies(self, anomalies: List[Dict]):
"""Gestion des anomalies détectées"""
for anomaly in anomalies:
alert = RealTimeAlert(
timestamp=anomaly['timestamp'],
alert_type='anomaly',
severity=anomaly['severity'],
message=self._generate_anomaly_message(anomaly),
affected_assets=self._identify_affected_assets(anomaly),
metrics=self._get_relevant_metrics(anomaly),
action_required=True,
suggested_actions=self._generate_anomaly_actions(anomaly)
)
# Ajouter l'alerte à la liste des alertes actives
self.active_alerts.append(alert)
# Déclencher les actions automatiques si nécessaire
if alert.severity in ['high', 'critical']:
await self._execute_automatic_actions(alert)
def _calculate_anomaly_severity(self, feature_values: np.ndarray) -> str:
"""Calcul de la sévérité d'une anomalie"""
# Calculer la déviation par rapport à la normale
z_score = np.abs((feature_values - np.mean(feature_values)) / np.std(feature_values))
if z_score.max() > 3:
return 'critical'
elif z_score.max() > 2:
return 'high'
elif z_score.max() > 1.5:
return 'medium'
else:
return 'low'
async def _execute_automatic_actions(self, alert: RealTimeAlert):
"""Exécution des actions automatiques en réponse aux alertes"""
if alert.severity == 'critical':
# Rééquilibrage d'urgence du portefeuille
await self._emergency_rebalancing()
# Enregistrement des actions prises
self._log_action_taken(alert)