ProfilingAI / src /core /market_analyzer.py
Sandrine Guétin
Version propre de DeepVest
2106f78
raw
history blame
29.3 kB
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
import logging
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.mixture import GaussianMixture
from arch import arch_model
from abc import ABC, abstractmethod
from dataclasses import dataclass
import logging
from .market_states import MarketRegime, MarketConditions
from .types import MarketData, MLAnalyzerProtocol
from .market_states import MarketRegime, MarketConditions
from .technical_indicators import TechnicalIndicators
from .risk_metrics import EnhancedRiskMetrics, MarketMetrics
def setup_logger(name: str) -> logging.Logger:
"""Set up logger with proper formatting"""
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
class MarketAnalyzer(ABC):
"""Abstract base class defining market analyzer interface"""
@abstractmethod
async def analyze_market_conditions(self,
market_data: MarketData,
additional_data: Optional[Dict[str, Any]] = None) -> MarketConditions:
"""Analyze current market conditions"""
pass
@abstractmethod
async def detect_anomalies(self,
market_data: MarketData,
threshold: float = 0.95) -> List[Dict[str, Any]]:
"""Detect market anomalies"""
pass
@abstractmethod
async def calculate_all_metrics(self, market_data: MarketData) -> MarketMetrics:
"""Calculate comprehensive market metrics"""
pass
@abstractmethod
async def analyze_regime_transitions(self,
market_data: MarketData,
window: int = 252) -> Dict[str, float]:
"""Analyze market regime transitions"""
pass
@abstractmethod
async def calculate_technical_indicators(self,
market_data: MarketData) -> TechnicalIndicators:
"""Calculate technical indicators"""
pass
@dataclass
class RiskAssessment:
"""Risk assessment metrics and analysis"""
volatility: float # Volatilité historique annualisée
var_95: float # Value at Risk à 95%
cvar_95: float # Conditional Value at Risk à 95%
tail_risk: float # Mesure du risque de queue
correlation_risk: float # Mesure du risque de corrélation
liquidity_risk: float # Mesure du risque de liquidité
regime_risk: float # Risque lié au régime de marché actuel
systemic_risk: float # Mesure du risque systémique
stress_test_results: Dict[str, float] # Résultats des tests de stress
def __post_init__(self):
"""Validation après initialisation"""
if not 0 <= self.volatility <= 1:
raise ValueError("Volatility must be between 0 and 1")
if not -1 <= self.var_95 <= 0:
raise ValueError("VaR must be negative and >= -1")
if not -1 <= self.cvar_95 <= 0:
raise ValueError("CVaR must be negative and >= -1")
def get_risk_score(self) -> float:
"""Calculate overall risk score between 0 and 1"""
weights = {
'volatility': 0.3,
'var': 0.2,
'cvar': 0.15,
'tail': 0.1,
'correlation': 0.05,
'liquidity': 0.1,
'regime': 0.05,
'systemic': 0.05
}
score = (
weights['volatility'] * self.volatility +
weights['var'] * abs(self.var_95) +
weights['cvar'] * abs(self.cvar_95) +
weights['tail'] * self.tail_risk +
weights['correlation'] * self.correlation_risk +
weights['liquidity'] * self.liquidity_risk +
weights['regime'] * self.regime_risk +
weights['systemic'] * self.systemic_risk
)
return min(max(score, 0), 1)
def get_risk_breakdown(self) -> Dict[str, float]:
"""Get detailed breakdown of risk components"""
return {
'volatility_risk': self.volatility,
'market_risk': abs(self.var_95),
'tail_risk': self.tail_risk,
'correlation_risk': self.correlation_risk,
'liquidity_risk': self.liquidity_risk,
'regime_risk': self.regime_risk,
'systemic_risk': self.systemic_risk
}
def get_stress_test_results(self) -> Dict[str, float]:
"""Get results of stress test scenarios"""
return self.stress_test_results
def get_risk_alerts(self) -> List[Dict[str, Any]]:
"""Generate risk alerts based on thresholds"""
alerts = []
# Verification des seuils de risque
if self.volatility > 0.3:
alerts.append({
'type': 'volatility',
'level': 'high',
'message': 'Volatility exceeds normal levels'
})
if abs(self.var_95) > 0.2:
alerts.append({
'type': 'var',
'level': 'critical',
'message': 'Value at Risk above critical threshold'
})
if self.liquidity_risk > 0.7:
alerts.append({
'type': 'liquidity',
'level': 'warning',
'message': 'High liquidity risk detected'
})
return alerts
def to_dict(self) -> Dict[str, float]:
"""Convert risk assessment to dictionary format"""
return {
'volatility': self.volatility,
'var_95': self.var_95,
'cvar_95': self.cvar_95,
'tail_risk': self.tail_risk,
'correlation_risk': self.correlation_risk,
'liquidity_risk': self.liquidity_risk,
'regime_risk': self.regime_risk,
'systemic_risk': self.systemic_risk,
'risk_score': self.get_risk_score()
}
@dataclass
class MarketAnalysisConfig:
"""Configuration for market analysis"""
volatility_window: int = 63 # ~3 months
correlation_window: int = 252 # 1 year
regime_detection_lookback: int = 504 # 2 years
min_data_points: int = 20
confidence_level: float = 0.95
use_robust_estimators: bool = True
ml_parameters: Dict[str, Any] = None
def __post_init__(self):
if self.ml_parameters is None:
self.ml_parameters = {
'n_estimators': 100,
'random_state': 42,
'contamination': 'auto'
}
def validate(self) -> bool:
"""Validate configuration parameters"""
try:
assert self.volatility_window > 0
assert self.correlation_window > 0
assert self.regime_detection_lookback > 0
assert 0 < self.confidence_level < 1
return True
except AssertionError:
return False
@dataclass
class MarketAnalysisResult:
"""Container for market analysis results"""
conditions: MarketConditions
metrics: MarketMetrics
technical_indicators: TechnicalIndicators
anomalies: List[Dict[str, Any]]
timestamp: datetime
analysis_duration: float = None
def to_dict(self) -> Dict[str, Any]:
"""Convert results to dictionary format"""
return {
'conditions': {
'regime': self.conditions.current_regime.value,
'volatility': self.conditions.volatility_level,
'sentiment': self.conditions.market_sentiment,
'timestamp': self.timestamp.isoformat()
},
'metrics': {
'volatility': self.metrics.volatility,
'tail_risk': self.metrics.tail_risk,
'liquidity': self.metrics.liquidity_score
},
'technical': {
'rsi': self.technical_indicators.rsi,
'macd': self.technical_indicators.macd,
'moving_averages': self.technical_indicators.moving_averages
},
'anomalies': self.anomalies
}
logger = logging.getLogger(__name__)
def setup_logger(name: str) -> logging.Logger:
"""Set up logger with proper formatting"""
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def ensure_psd(matrix: np.ndarray, epsilon: float = 1e-6) -> np.ndarray:
"""Ensure matrix is positive semi-definite"""
try:
eigenvals, eigenvecs = np.linalg.eigh(matrix)
eigenvals = np.maximum(eigenvals, epsilon)
return eigenvecs @ np.diag(eigenvals) @ eigenvecs.T
except Exception as e:
logger.error(f"Error ensuring PSD: {e}")
return matrix
def calculate_dynamic_correlation(returns: pd.DataFrame,
window: int = 63,
lambda_param: float = 0.94) -> np.ndarray:
"""Calculate dynamic correlation with exponential weighting"""
try:
weights = np.array([lambda_param**(window-i-1) for i in range(window)])
weights = weights / np.sum(weights)
returns_window = returns.iloc[-window:]
weighted_returns = returns_window * np.sqrt(weights).reshape(-1, 1)
corr_matrix = np.corrcoef(weighted_returns.T)
return ensure_psd(corr_matrix)
except Exception as e:
logger.error(f"Error calculating dynamic correlation: {e}")
return np.eye(returns.shape[1])
def fit_gpd(excesses: np.ndarray) -> Tuple[float, float]:
"""Fit Generalized Pareto Distribution"""
try:
import statsmodels.api as sm
shape = sm.GeneralizedPareto(excesses).fit().params[0]
scale = np.mean(excesses) * (1 + shape) if shape != -1 else np.mean(excesses)
return shape, scale
except:
return 0.1, np.std(excesses)
def calculate_gpd_var(params: Tuple[float, float],
threshold: float,
n_samples: int,
confidence: float) -> float:
"""Calculate Value at Risk using GPD parameters"""
shape, scale = params
exceeding_prob = confidence
if shape != 0:
return threshold + (scale/shape) * ((n_samples * exceeding_prob)**(-shape) - 1)
return threshold - scale * np.log(n_samples * exceeding_prob)
class MarketAnalyzerImpl(MarketAnalyzer):
def __init__(self,
ml_analyzer: Optional[MLAnalyzerProtocol] = None,
config: Optional[MarketAnalysisConfig] = None):
self.ml_analyzer = ml_analyzer or MLEnhancedAnalyzer()
self.config = config or MarketAnalysisConfig()
# Risk metrics handler
self.risk_metrics = EnhancedRiskMetrics()
# Technical indicators
self.tech_indicators = TechnicalIndicators()
# Models for complex analysis
self.regime_model = GaussianMixture(
n_components=len(MarketRegime),
random_state=42,
covariance_type='full'
)
self.scaler = StandardScaler()
# Cache
self._cache = {}
self._last_update = datetime.now()
async def analyze_market_conditions(self,
market_data: MarketData,
additional_data: Optional[Dict[str, Any]] = None) -> MarketConditions:
"""Analyze current market conditions"""
try:
if not self._validate_market_data(market_data):
return MarketConditions.get_default()
processed_data = self._preprocess_market_data(market_data)
# ML analysis
ml_output = await self.ml_analyzer.analyze_market_conditions(processed_data)
regime = MarketRegime[ml_output['regime'].upper()]
volatility = self.risk_metrics.calculate_volatility(processed_data)
sentiment = ml_output['sentiment'].get('overall', 0.5)
# Risk metrics
risk_metrics = self.risk_metrics.calculate_all_metrics(processed_data)
correlation = self._analyze_correlation_structure(processed_data)
liquidity = self._analyze_liquidity_conditions(processed_data)
# Technical analysis
technicals = self.tech_indicators.calculate_all(processed_data)
return MarketConditions(
current_regime=regime,
volatility_level=volatility,
market_sentiment=sentiment,
technical_indicators=technicals,
risk_metrics=risk_metrics,
correlation_structure=correlation,
liquidity_conditions=liquidity,
timestamp=datetime.now()
)
except Exception as e:
self.logger.error(f"Error in market analysis: {str(e)}")
return MarketConditions.get_default()
def _preprocess_market_data(self, data: MarketData) -> MarketData:
"""Preprocess market data"""
processed = data.copy()
# Handle missing values
processed = processed.fillna(method='ffill').fillna(method='bfill')
# Calculate returns and volatility
processed['returns'] = processed['close'].pct_change()
processed['realized_vol'] = self.risk_metrics.calculate_rolling_volatility(processed['returns'])
# Normalize volume
if 'volume' in processed.columns:
processed['normalized_volume'] = processed['volume'] / processed['volume'].rolling(window=21).mean()
return processed
def _analyze_correlation_structure(self, data: MarketData) -> Dict[str, float]:
"""Analyze correlation structure"""
try:
returns = data.select_dtypes(include=[np.number]).pct_change().dropna()
# Calculate dynamic correlation
correlation = returns.corr()
eigenvalues = np.linalg.eigvals(correlation)
return {
'avg_correlation': float(correlation.mean().mean()),
'max_correlation': float(correlation.max().max()),
'effective_rank': float(np.sum(eigenvalues) ** 2 / np.sum(eigenvalues ** 2))
}
except Exception as e:
self.logger.error(f"Error in correlation analysis: {str(e)}")
return self._get_default_correlation_metrics()
def _analyze_liquidity_conditions(self, data: MarketData) -> Dict[str, float]:
"""Analyze market liquidity"""
try:
if 'volume' not in data.columns:
return {'liquidity_score': 0.5}
volume_score = self._calculate_volume_score(data)
spread_score = self._calculate_spread_score(data)
resilience = self._calculate_market_resilience(data)
return {
'volume_score': volume_score,
'spread_score': spread_score,
'market_resilience': resilience,
'liquidity_score': np.mean([volume_score, spread_score, resilience])
}
except Exception as e:
self.logger.error(f"Error in liquidity analysis: {str(e)}")
return {'liquidity_score': 0.5}
def _validate_market_data(self, data: MarketData) -> bool:
"""Validate market data structure"""
if data is None or data.empty:
return False
required_cols = {'open', 'high', 'low', 'close', 'volume'}
if not required_cols.issubset(data.columns):
return False
if data[list(required_cols)].isnull().any().any():
return False
return True
def _calculate_volume_score(self, data: MarketData) -> float:
"""Calculate volume-based liquidity score"""
volume = data['volume']
avg_volume = volume.rolling(window=20).mean()
volume_trend = avg_volume.pct_change(20)
trend_score = 0.5 + 0.5 * np.tanh(volume_trend.iloc[-1])
stability_score = 1 - volume.std() / volume.mean()
return np.mean([trend_score, stability_score])
def _calculate_spread_score(self, data: MarketData) -> float:
"""Calculate spread-based liquidity score"""
if not all(col in data.columns for col in ['bid', 'ask']):
return 0.5
spread = (data['ask'] - data['bid']) / data['bid']
avg_spread = spread.rolling(window=20).mean()
return float(1 - np.tanh(avg_spread.iloc[-1] * 10))
def _calculate_market_resilience(self, data: MarketData) -> float:
"""Calculate market resilience score"""
returns = data['returns'].dropna()
acf = sm.tsa.acf(returns, nlags=5)
return 1 - np.mean(np.abs(acf[1:]))
async def detect_anomalies(self, market_data: MarketData) -> List[Dict[str, Any]]:
"""Detect market anomalies using ML"""
try:
features = self._prepare_anomaly_features(market_data)
scaled_features = self.scaler.fit_transform(features)
# ML-based anomaly detection
anomalies = await self.ml_analyzer.detect_anomalies(
scaled_features,
threshold=self.config.anomaly_threshold
)
# Enrich detection results
for anomaly in anomalies:
anomaly['severity'] = self.risk_metrics.calculate_severity_score(
anomaly['metrics']
)
anomaly['confidence'] = self.ml_analyzer.calculate_detection_confidence(
anomaly['metrics']
)
return anomalies
except Exception as e:
self.logger.error(f"Error in anomaly detection: {e}")
return []
def _prepare_anomaly_features(self, data: MarketData) -> np.ndarray:
"""Prepare features for anomaly detection"""
returns = data['returns']
volatility = self.risk_metrics.calculate_rolling_volatility(returns)
skewness = returns.rolling(window=20).skew()
kurtosis = returns.rolling(window=20).apply(lambda x: x.kurtosis())
return np.column_stack([
returns,
volatility,
skewness,
kurtosis
])
def _update_cache(self, result: MarketAnalysisResult):
"""Update analysis cache"""
current_time = datetime.now()
cache_key = current_time.strftime('%Y-%m-%d_%H')
self._cache[cache_key] = {
'conditions': result.conditions,
'metrics': result.metrics,
'technical_indicators': result.technical_indicators,
'timestamp': current_time
}
self._clean_old_cache(max_age_hours=24)
def _clean_old_cache(self, max_age_hours: int):
"""Clean expired cache entries"""
current_time = datetime.now()
keys_to_remove = [
key for key, value in self._cache.items()
if (current_time - value['timestamp']).total_seconds() / 3600 > max_age_hours
]
for key in keys_to_remove:
del self._cache[key]
async def calculate_all_metrics(self, market_data: MarketData) -> MarketMetrics:
"""Calculate comprehensive market metrics"""
try:
returns = market_data['returns'].dropna()
# Core volatility and risk metrics
volatility = self.risk_metrics.calculate_conditional_volatility(returns)
skewness = float(returns.skew())
kurtosis = float(returns.kurt())
tail_risk = self.risk_metrics.calculate_tail_risk(returns)
# Liquidity analysis
liquidity_score = self._calculate_liquidity_score(market_data)
# Correlation analysis
correlation_structure = self._analyze_correlation_structure(market_data)
# Regime probabilities
regime_features = self._extract_regime_features(market_data)
regime_probs = self.regime_model.predict_proba(
regime_features[-1].reshape(1, -1)
)[0]
regime_probabilities = {
regime.value: float(prob)
for regime, prob in zip(MarketRegime, regime_probs)
}
return MarketMetrics(
volatility=volatility,
skewness=skewness,
kurtosis=kurtosis,
tail_risk=tail_risk,
liquidity_score=liquidity_score,
correlation_structure=correlation_structure,
regime_probabilities=regime_probabilities
)
except Exception as e:
self.logger.error(f"Error calculating market metrics: {e}")
return MarketMetrics.get_default()
def _extract_regime_features(self, market_data: MarketData) -> np.ndarray:
"""Extract features for regime detection"""
returns = market_data['returns'].dropna()
volatility = self.risk_metrics.calculate_rolling_volatility(returns)
skewness = returns.rolling(window=20).skew()
kurtosis = returns.rolling(window=20).apply(lambda x: x.kurtosis())
momentum = returns.rolling(window=20).mean()
features = np.column_stack([
returns,
volatility,
skewness,
kurtosis,
momentum
])
return self.scaler.fit_transform(features)
async def analyze_regime_transitions(self, market_data: MarketData, window: int = 252) -> Dict[str, float]:
"""Analyze market regime transitions"""
try:
features = self._extract_regime_features(market_data)
regime_probabilities = self.regime_model.predict_proba(features)
# Calculate transition frequencies
current_regime = regime_probabilities[-1]
prev_regime = regime_probabilities[-window] if len(regime_probabilities) > window else None
transitions = {}
for i, regime in enumerate(MarketRegime):
if prev_regime is not None:
transition_prob = current_regime[i] - prev_regime[i]
else:
transition_prob = 0.0
transitions[regime.value] = float(transition_prob)
return transitions
except Exception as e:
self.logger.error(f"Error analyzing regime transitions: {e}")
return {regime.value: 1.0/len(MarketRegime) for regime in MarketRegime}
async def calculate_technical_indicators(self, market_data: MarketData) -> TechnicalIndicators:
"""Calculate technical indicators"""
try:
price = market_data['close']
volume = market_data.get('volume', pd.Series(1, index=market_data.index))
# Calculate individual indicators
rsi = self.tech_indicators.calculate_rsi(price)
macd = self.tech_indicators.calculate_macd(price)
bbands = self.tech_indicators.calculate_bollinger_bands(price)
momentum = self.tech_indicators.calculate_momentum_indicators(price)
volume_profile = self._analyze_volume_profile(price, volume)
moving_averages = self.tech_indicators.calculate_moving_averages(price)
return TechnicalIndicators(
rsi=rsi,
macd=macd,
bollinger=bbands,
moving_averages=moving_averages,
volume_profile=volume_profile,
momentum_indicators=momentum
)
except Exception as e:
self.logger.error(f"Error calculating technical indicators: {e}")
return TechnicalIndicators.get_default()
def _analyze_volume_profile(self, price: pd.Series, volume: pd.Series) -> Dict[str, float]:
"""Advanced volume profile analysis"""
try:
vwap = (price * volume).sum() / volume.sum()
volume_ma = volume.rolling(window=20).mean()
volume_std = volume.rolling(window=20).std()
obv = (np.sign(price.diff()) * volume).cumsum()
volume_percentiles = np.percentile(volume, [25, 50, 75])
return {
'vwap': float(vwap),
'volume_ma': float(volume_ma.iloc[-1]),
'volume_std': float(volume_std.iloc[-1]),
'volume_skew': float(volume.skew()),
'obv_trend': float(obv.diff(20).iloc[-1]),
'volume_25th': float(volume_percentiles[0]),
'volume_median': float(volume_percentiles[1]),
'volume_75th': float(volume_percentiles[2])
}
except Exception as e:
self.logger.error(f"Error analyzing volume profile: {e}")
return {
'vwap': float(price.mean()),
'volume_ma': float(volume.mean())
}
def _determine_rate_environment(self, data: MarketData) -> str:
"""Determine interest rate environment"""
try:
rate_changes = data['close'].pct_change(20) # 20-day rate changes
recent_trend = rate_changes[-20:].mean()
if recent_trend > 0.001: # 10bps threshold
return 'RISING'
elif recent_trend < -0.001:
return 'FALLING'
else:
return 'STABLE'
except Exception as e:
self.logger.error(f"Error determining rate environment: {e}")
return 'STABLE'
def _estimate_inflation_rate(self, data: MarketData) -> float:
"""Estimate inflation rate from market data"""
try:
# This would normally use actual inflation data
# Here we use a simple proxy based on price trends
price_changes = data['close'].pct_change(252) # Annual changes
return float(price_changes.mean())
except Exception as e:
self.logger.error(f"Error estimating inflation: {e}")
return 0.02 # Default to 2%
def _estimate_gdp_growth(self, data: MarketData) -> float:
"""Estimate GDP growth from market indicators"""
try:
# This would normally use actual GDP data
# Here we use a simple proxy based on market returns
returns = data['returns'].rolling(window=252).mean() * 252
return float(returns.iloc[-1])
except Exception as e:
self.logger.error(f"Error estimating GDP growth: {e}")
return 0.015 # Default to 1.5%
def _calculate_leading_indicators(self, data: MarketData) -> Dict[str, float]:
"""Calculate leading economic indicators"""
try:
returns = data['returns']
volume = data['volume']
indicators = {
'trend_strength': float(returns.rolling(window=252).mean() * 252),
'volatility_regime': float(returns.rolling(window=63).std() * np.sqrt(252)),
'volume_trend': float((volume / volume.shift(20) - 1).mean()),
'momentum': float(returns.rolling(window=20).mean() * 252)
}
return indicators
except Exception as e:
self.logger.error(f"Error calculating leading indicators: {e}")
return {
'trend_strength': 0.0,
'volatility_regime': 0.15,
'volume_trend': 0.0,
'momentum': 0.0
}
def _get_default_correlation_metrics(self) -> Dict[str, float]:
"""Get default correlation metrics"""
return {
'avg_correlation': 0.0,
'max_correlation': 1.0,
'min_correlation': -1.0,
'correlation_dispersion': 0.5,
'largest_eigenvalue': 1.0,
'eigenvalue_dispersion': 0.0,
'effective_rank': 1.0
}