File size: 9,322 Bytes
2106f78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import asyncio
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from src.core import MarketAnalyzer, MarketAnalyzerImpl
from src.core import (
    PortfolioOptimizerImpl,
    MarketAnalyzerImpl,
    OptimizationConstraints
)
from src.analysis.ml_analyzer import MLEnhancedAnalyzer
from src.core import PortfolioOptimizerImpl, OptimizationConstraints

@dataclass
class RealTimeAlert:
    timestamp: datetime
    alert_type: str
    severity: str  # 'low', 'medium', 'high', 'critical'
    message: str
    affected_assets: List[str]
    metrics: Dict[str, float]
    action_required: bool
    suggested_actions: List[str]

class EnhancedRealTimeMonitor:
    """Système de surveillance en temps réel avancé avec ML et analyse comportementale"""
    
    def __init__(self, 
                 portfolio_optimizer,
                 market_analyzer,
                 risk_manager,
                 initial_capital: float,
                 alert_thresholds: Dict[str, float]):
        
        if not isinstance(portfolio_optimizer, PortfolioOptimizerImpl):
            raise TypeError("portfolio_optimizer must be instance of PortfolioOptimizerImpl")
        ml_analyzer = MLEnhancedAnalyzer()
        if market_analyzer is None:
            ml_analyzer = MLEnhancedAnalyzer()
            self.market_analyzer = MarketAnalyzerImpl(ml_analyzer=ml_analyzer)
        else:
            self.market_analyzer = market_analyzer
        self.risk_manager = risk_manager
        self.initial_capital = initial_capital
        self.alert_thresholds = alert_thresholds
        
        # Modèles ML pour la détection d'anomalies
        self.anomaly_detector = IsolationForest(contamination=0.1)
        self.scaler = StandardScaler()
        
        # État du système
        self.current_portfolio = None
        self.market_state = None
        self.active_alerts = []
        self.historical_data = pd.DataFrame()
        
    async def start_monitoring(self, portfolio: Dict[str, float]):
        """Démarre la surveillance en temps réel"""
        self.current_portfolio = portfolio
        
        try:
            await asyncio.gather(
                self._market_data_loop()
                self._portfolio_analysis_loop(),
                self._risk_monitoring_loop(),
                self._anomaly_detection_loop()
            )
        except Exception as e:
            self._handle_monitoring_error(e)

    async def _market_data_stream(self):
        """Stream des données de marché en temps réel"""
        while True:
            try:
                # Récupération des données de marché
                market_data = await self._fetch_market_data()
                market_analysis = await self.market_analyzer.analyze_market_conditions(market_data)
                
                # Mise à jour de l'état du marché
                self.market_state = self.market_analyzer.analyze_market_conditions(market_data)
                
                # Détection de changements significatifs
                if self._detect_significant_changes(market_data):
                    await self._handle_market_changes()
                
                # Mise à jour des données historiques
                self._update_historical_data(market_data)
                
                await asyncio.sleep(1)  # Intervalle de mise à jour
                
            except Exception as e:
                self._handle_streaming_error(e)

    async def _portfolio_analysis_loop(self):
        """Analyse continue du portefeuille"""
        while True:
            try:
                # Analyse de la performance
                performance_metrics = self._calculate_performance_metrics()
                
                # Vérification des contraintes
                constraints_violated = self._check_portfolio_constraints()
                
                if constraints_violated:
                    await self._generate_rebalancing_signals()
                
                # Analyse des opportunités
                opportunities = self._identify_opportunities()
                if opportunities:
                    await self._handle_opportunities(opportunities)
                
                await asyncio.sleep(5)  # Intervalle d'analyse
                
            except Exception as e:
                self._handle_analysis_error(e)

    async def _risk_monitoring_loop(self):
        """Surveillance continue des risques"""
        while True:
            try:
                # Calcul des métriques de risque
                market_conditions = await self.market_analyzer.analyze_market_conditions(data)
                risk_metrics = self.risk_manager.calculate_risk_metrics(
                    self.current_portfolio,
                    self.market_state
                )
                
                # Vérification des seuils de risque
                risk_alerts = self._check_risk_thresholds(risk_metrics)
                
                # Génération des alertes si nécessaire
                for alert in risk_alerts:
                    await self._handle_risk_alert(alert)
                
                # Mise à jour du profil de risque
                self._update_risk_profile(risk_metrics)
                
                await asyncio.sleep(3)  # Intervalle de surveillance
                
            except Exception as e:
                self._handle_risk_error(e)

    async def _anomaly_detection_loop(self):
        """Détection d'anomalies en temps réel"""
        while True:
            try:
                # Préparation des données pour la détection
                features = self._prepare_anomaly_features()
                
                # Détection des anomalies
                anomalies = self._detect_anomalies(features)
                
                if anomalies:
                    await self._handle_anomalies(anomalies)
                
                # Mise à jour du modèle si nécessaire
                self._update_anomaly_model()
                
                await asyncio.sleep(10)  # Intervalle de détection
                
            except Exception as e:
                self._handle_anomaly_error(e)

    def _detect_anomalies(self, features: np.ndarray) -> List[Dict]:
        """Détection des anomalies dans les données"""
        try:
            # Normalisation des données
            scaled_features = self.scaler.transform(features)
            
            # Prédiction des anomalies
            predictions = self.anomaly_detector.predict(scaled_features)
            
            anomalies = []
            for idx, is_anomaly in enumerate(predictions):
                if is_anomaly == -1:  # Anomalie détectée
                    anomaly_info = {
                        'timestamp': datetime.now(),
                        'feature_idx': idx,
                        'severity': self._calculate_anomaly_severity(features[idx]),
                        'affected_metrics': self._identify_affected_metrics(idx)
                    }
                    anomalies.append(anomaly_info)
                    
            return anomalies
            
        except Exception as e:
            self._handle_detection_error(e)
            return []

    async def _handle_anomalies(self, anomalies: List[Dict]):
        """Gestion des anomalies détectées"""
        for anomaly in anomalies:
            alert = RealTimeAlert(
                timestamp=anomaly['timestamp'],
                alert_type='anomaly',
                severity=anomaly['severity'],
                message=self._generate_anomaly_message(anomaly),
                affected_assets=self._identify_affected_assets(anomaly),
                metrics=self._get_relevant_metrics(anomaly),
                action_required=True,
                suggested_actions=self._generate_anomaly_actions(anomaly)
            )
            
            # Ajouter l'alerte à la liste des alertes actives
            self.active_alerts.append(alert)
            
            # Déclencher les actions automatiques si nécessaire
            if alert.severity in ['high', 'critical']:
                await self._execute_automatic_actions(alert)

    def _calculate_anomaly_severity(self, feature_values: np.ndarray) -> str:
        """Calcul de la sévérité d'une anomalie"""
        # Calculer la déviation par rapport à la normale
        z_score = np.abs((feature_values - np.mean(feature_values)) / np.std(feature_values))
        
        if z_score.max() > 3:
            return 'critical'
        elif z_score.max() > 2:
            return 'high'
        elif z_score.max() > 1.5:
            return 'medium'
        else:
            return 'low'

    async def _execute_automatic_actions(self, alert: RealTimeAlert):
        """Exécution des actions automatiques en réponse aux alertes"""
        if alert.severity == 'critical':
            # Rééquilibrage d'urgence du portefeuille
            await self._emergency_rebalancing()
            
        # Enregistrement des actions prises
        self._log_action_taken(alert)