import numpy as np import pandas as pd from scipy import stats from typing import Optional, Dict from arch import arch_model from sklearn.mixture import GaussianMixture from dataclasses import dataclass from typing import Dict @dataclass class RiskAssessment: volatility: float var_95: float cvar_95: float tail_risk: float correlation_risk: float liquidity_risk: float regime_risk: float systemic_risk: float stress_test_results: Dict[str, float] @dataclass class MarketMetrics: """Container for market metrics""" volatility: float skewness: float kurtosis: float tail_risk: float liquidity_score: float correlation_structure: Dict[str, float] regime_probabilities: Dict[str, float] @classmethod def get_default(cls) -> 'MarketMetrics': """Get default market metrics""" return cls( volatility=0.15, skewness=0.0, kurtosis=3.0, tail_risk=0.02, liquidity_score=0.5, correlation_structure={ 'avg_correlation': 0.0, 'max_correlation': 1.0, 'min_correlation': -1.0 }, regime_probabilities={} ) class EnhancedRiskMetrics: """Calcul amélioré des métriques de risque et performance""" def __init__(self, returns: pd.Series = None, prices: pd.DataFrame = None, risk_free_rate: float = 0.02): self.returns = returns self.prices = prices self.risk_free_rate = risk_free_rate self._cache = {} def _cached_calculation(self, key: str, calculation_func): """Utilise le cache pour les calculs coûteux""" if key not in self._cache: self._cache[key] = calculation_func() return self._cache[key] def reset_cache(self): """Réinitialise le cache des calculs""" self._cache = {} def calculate_volatility(self) -> float: """Volatilité avec estimation EWMA pour meilleure réactivité""" ewm_vol = self.returns.ewm(span=63).std() * np.sqrt(self.trading_days) garch_vol = self._fit_garch() # Combine les deux estimations return 0.7 * ewm_vol + 0.3 * garch_vol def calculate_annualized_volatility(self) -> float: return self.calculate_volatility() def _fit_garch(self) -> float: """Implémentation GARCH(1,1) pour volatilité""" from arch import arch_model model = arch_model(self.returns, vol='Garch', p=1, q=1) results = model.fit(disp='off') forecast = results.forecast(horizon=1) return np.sqrt(forecast.variance.values[-1]) * np.sqrt(self.trading_days) def calculate_sharpe_ratio(self) -> float: """Sharpe ratio avec ajustement pour skewness""" excess_returns = self.returns - self.rf self.sr = np.sqrt(self.trading_days) * (excess_returns.mean() / excess_returns.std()) # Stocké dans self skew_adj = self._skewness_adjustment() return self.sr * skew_adj def _skewness_adjustment(self) -> float: """Ajustement du Sharpe ratio pour la skewness""" skew = stats.skew(self.returns) kurt = stats.kurtosis(self.returns) adj = 1 - (skew/6) * self.sr + (kurt-3)/24 * self.sr**2 return adj def calculate_sortino_ratio(self) -> float: """Sortino ratio avec seuil dynamique""" threshold = max(0, self.rf) downside_returns = self.returns[self.returns < threshold] downside_std = np.sqrt(np.sum(downside_returns**2)/len(self.returns)) * np.sqrt(self.trading_days) return (self.returns.mean() - self.rf) * self.trading_days / downside_std def calculate_modified_sortino_ratio(self) -> float: return self.calculate_sortino_ratio() * (1 + abs(self.returns.skew()) * 0.2) def calculate_max_drawdown(self, returns: Optional[pd.Series] = None) -> float: """Calculate Maximum Drawdown""" if returns is None: returns = self.returns cumulative = (1 + returns).cumprod() running_max = cumulative.expanding().max() drawdowns = cumulative / running_max - 1 return drawdowns.min() def calculate_conditional_sharpe_ratio(self) -> float: """Calcul du ratio de Sharpe conditionnel""" var_95 = self.calculate_var(confidence=0.95) cvar_95 = self.calculate_cvar(confidence=0.95) return (self.returns.mean() - self.rf) * np.sqrt(self.trading_days) / cvar_95 def calculate_tail_ratio(self, returns: pd.Series) -> float: """Calculate tail ratio""" if returns is None or len(returns) == 0: return 1.0 return abs(np.percentile(returns, 95)) / abs(np.percentile(returns, 5)) def calculate_beta(self, market_returns: pd.Series = None) -> float: """Calcul du bêta avec correction pour l'autocorrélation""" if market_returns is None: # Utiliser un benchmark par défaut si non fourni market_returns = self.returns.mean(axis=1) if isinstance(self.returns, pd.DataFrame) else self.returns # Correction pour l'autocorrélation returns_lag = pd.concat([self.returns, self.returns.shift(1)], axis=1).dropna() market_lag = pd.concat([market_returns, market_returns.shift(1)], axis=1).dropna() beta = np.cov(returns_lag.iloc[:,0], market_lag.iloc[:,0])[0,1] / np.var(market_lag.iloc[:,0]) return beta def calculate_regime_based_risk(self) -> Dict[str, float]: """Analyse du risque basée sur les régimes de marché""" # Détection des régimes avec GMM gmm = GaussianMixture(n_components=3, random_state=42) gmm.fit(self.returns.values.reshape(-1, 1)) regimes = gmm.predict(self.returns.values.reshape(-1, 1)) regime_risks = {} for i in range(3): regime_returns = self.returns[regimes == i] regime_risks[f'regime_{i}'] = { 'volatility': regime_returns.std() * np.sqrt(self.trading_days), 'var_95': np.percentile(regime_returns, 5), 'mean_return': regime_returns.mean() * self.trading_days } return regime_risks def _calculate_regime_metrics(self, returns: pd.Series) -> Dict[str, Dict[str, float]]: """Calculate metrics under different market regimes""" try: # Assurer que returns est une Series if isinstance(returns, np.ndarray): returns = pd.Series(returns) # Vérifier la validité des données if returns is None or len(returns) == 0: return {} # Reshape les données pour le GMM data = returns.values.reshape(-1, 1) # Fit le modèle GMM gmm = GaussianMixture(n_components=3, random_state=42) gmm.fit(data) # Prédire les régimes regimes = gmm.predict(data) # Créer un DataFrame avec les returns et les régimes regime_data = pd.DataFrame({ 'returns': returns, 'regime': regimes }) # Calculer les métriques par régime regime_metrics = {} for i in range(gmm.n_components): regime_returns = regime_data[regime_data['regime'] == i]['returns'] if len(regime_returns) > 0: regime_metrics[f'regime_{i}'] = { 'frequency': len(regime_returns) / len(returns), 'avg_return': regime_returns.mean() * 252, # Annualisé 'volatility': regime_returns.std() * np.sqrt(252), 'sharpe': self.calculate_regime_sharpe(regime_returns), 'max_drawdown': self.calculate_max_drawdown(regime_returns), 'var_95': np.percentile(regime_returns, 5), 'skewness': stats.skew(regime_returns), 'kurtosis': stats.kurtosis(regime_returns) } return regime_metrics except Exception as e: print(f"Error calculating regime metrics: {str(e)}") return {} def calculate_regime_sharpe(self, returns: pd.Series) -> float: """Calculate Sharpe ratio for specific regime""" try: if len(returns) == 0: return 0.0 # Calculer le ratio de Sharpe annualisé excess_returns = returns - (self.risk_free_rate / 252) if excess_returns.std() == 0: return 0.0 return np.sqrt(252) * excess_returns.mean() / excess_returns.std() except Exception as e: print(f"Error calculating regime Sharpe: {str(e)}") return 0.0 def calculate_max_drawdown(self, returns: pd.Series) -> float: """Calculate maximum drawdown""" try: if len(returns) == 0: return 0.0 # Calculer les rendements cumulatifs cum_returns = (1 + returns).cumprod() rolling_max = cum_returns.expanding().max() drawdowns = cum_returns / rolling_max - 1 return drawdowns.min() except Exception as e: print(f"Error calculating max drawdown: {str(e)}") return 0.0 def calculate_extreme_drawdown_risk(self, confidence: float = 0.95) -> float: """Estimation du risque de drawdown extrême""" rolling_returns = self.returns.rolling(window=63).sum() return np.percentile(rolling_returns, (1-confidence)*100) def calculate_garch_volatility(self) -> pd.Series: """Estimation GARCH de la volatilité""" try: import arch model = arch.arch_model(self.returns, vol='Garch', p=1, q=1) res = model.fit(disp='off') return np.sqrt(res.conditional_volatility) * np.sqrt(self.trading_days) except ImportError: return self.returns.rolling(window=63).std() * np.sqrt(self.trading_days) def calculate_information_ratio(self, benchmark_returns: pd.Series) -> float: """Calcul du ratio d'information avec ajustement""" active_returns = self.returns - benchmark_returns tracking_error = active_returns.std() * np.sqrt(self.trading_days) ir = active_returns.mean() * self.trading_days / tracking_error return ir def calculate_cagr(self, returns: pd.Series) -> float: """Calculate Compound Annual Growth Rate""" total_return = (1 + returns).prod() n_years = len(returns) / 252 # Assuming 252 trading days per year return (total_return ** (1/n_years)) - 1 def calculate_treynor_ratio(self) -> float: beta = self.calculate_beta() if abs(beta) < 1e-6: return np.inf return (self.returns.mean() - self.rf) / beta def calculate_calmar_ratio(self) -> float: max_dd = self.calculate_max_drawdown() if abs(max_dd) < 1e-6: return np.inf return -self.calculate_cagr(self.returns) / max_dd def calculate_average_drawdown(self) -> float: cum_returns = (1 + self.returns).cumprod() rolling_max = cum_returns.expanding().max() drawdowns = cum_returns / rolling_max - 1 return drawdowns.mean() def calculate_drawdown_duration(self) -> int: cum_returns = (1 + self.returns).cumprod() rolling_max = cum_returns.expanding().max() drawdowns = cum_returns / rolling_max - 1 in_drawdown = False current_duration = 0 max_duration = 0 for dd in drawdowns: if dd < 0: if not in_drawdown: in_drawdown = True current_duration += 1 else: if in_drawdown: max_duration = max(max_duration, current_duration) current_duration = 0 in_drawdown = False return max_duration def calculate_kurtosis_adjusted_sharpe(self) -> float: sharpe = self.calculate_sharpe_ratio() kurt = self.returns.kurtosis() return sharpe * (1 - (kurt - 3) * 0.1) def calculate_omega_ratio(self, threshold: float = 0) -> float: returns_above = self.returns[self.returns > threshold].sum() returns_below = abs(self.returns[self.returns <= threshold].sum()) return returns_above / returns_below if returns_below != 0 else np.inf def calculate_historical_var(self, returns: pd.Series, confidence: float = 0.95) -> float: """Calculate historical Value at Risk""" if returns is None or len(returns) == 0: return 0.0 return np.percentile(returns, (1 - confidence) * 100) def calculate_gaussian_var(self, returns: pd.Series, confidence: float = 0.95) -> float: try: z_score = stats.norm.ppf(confidence) mu = returns.mean() sigma = returns.std() return -(mu + z_score * sigma) except Exception as e: print(f"Error calculating Gaussian VaR: {e}") return 0.0 def calculate_downside_deviation(self, returns: pd.Series) -> float: """Calculate downside deviation""" if returns is None or len(returns) == 0: return 0.0 negative_returns = returns[returns < 0] return negative_returns.std() * np.sqrt(252) if len(negative_returns) > 0 else 0.0 def calculate_upside_volatility(self, returns: Optional[pd.Series] = None) -> float: try: if returns is None: returns = self.returns positive_returns = returns[returns > 0] return positive_returns.std() * np.sqrt(252) if len(positive_returns) > 0 else 0.0 except Exception as e: print(f"Error calculating upside volatility: {e}") return 0.0 def calculate_rolling_metrics(self, window: int = 252): """Calculate rolling versions of key metrics""" metrics = { 'rolling_sharpe': self.returns.rolling(window).apply( lambda x: np.sqrt(252) * (x.mean() - self.risk_free_rate/252) / x.std() ), 'rolling_sortino': self.returns.rolling(window).apply( lambda x: np.sqrt(252) * x.mean() / x[x < 0].std() if len(x[x < 0]) > 0 else np.inf ), 'rolling_volatility': self.returns.rolling(window).std() * np.sqrt(252), 'rolling_beta': self.returns.rolling(window).apply( lambda x: np.cov(x, pd.Series(1, index=x.index))[0,1] / 1 ), 'rolling_var': self.returns.rolling(window).quantile(0.05) } return metrics def calculate_regime_sharpe(self, returns: pd.Series) -> float: if returns.std() == 0: return np.inf return np.sqrt(252) * returns.mean() / returns.std() def calculate_regime_drawdown(self, returns: pd.Series) -> float: return self.calculate_max_drawdown(returns) def calculate_skewness_adjusted_sharpe(self) -> float: """Calculate Sharpe ratio adjusted for skewness""" sharpe = self.calculate_sharpe_ratio() skew = stats.skew(self.returns) return sharpe * (1 - (skew/6)) def calculate_rolling_sharpe(self, window: int) -> pd.Series: """Calculate rolling Sharpe ratio""" rolling_mean = self.returns.rolling(window=window).mean() rolling_std = self.returns.rolling(window=window).std() return np.sqrt(252) * (rolling_mean - self.rf) / rolling_std def calculate_rolling_sortino(self, window: int) -> pd.Series: """Calculate rolling Sortino ratio""" rolling_mean = self.returns.rolling(window=window).mean() downside_returns = self.returns[self.returns < 0] rolling_downside = downside_returns.rolling(window=window).std() return np.sqrt(252) * (rolling_mean - self.rf) / rolling_downside def calculate_rolling_volatility(self, window: int) -> pd.Series: """Calculate rolling volatility""" return self.returns.rolling(window=window).std() * np.sqrt(252) def calculate_rolling_beta(self, window: int) -> pd.Series: """Calculate rolling beta""" returns = pd.DataFrame(self.returns) # Convertir en DataFrame market_returns = self.returns # Utiliser directement les returns comme benchmark # Calculer covs et vars roulants rolling_cov = returns.rolling(window=window).cov(market_returns) rolling_var = pd.Series(market_returns).rolling(window=window).var() return rolling_cov / rolling_var def calculate_rolling_var(self, window: int) -> pd.Series: """Calculate rolling VaR""" return self.returns.rolling(window=window).quantile(0.05) def calculate_skewness(self) -> float: """Calculate returns skewness""" return stats.skew(self.returns) def calculate_kurtosis(self) -> float: """Calculate returns kurtosis""" return stats.kurtosis(self.returns) def calculate_jarque_bera(self, returns: Optional[pd.Series] = None) -> float: try: if returns is None: returns = self.returns return stats.jarque_bera(returns)[0] except Exception as e: print(f"Error calculating Jarque-Bera: {e}") return 0.0 def calculate_alpha(self) -> float: """Calculate Jensen's alpha""" beta = self.calculate_beta() return self.returns.mean() - (self.rf + beta * (self.returns.mean() - self.rf)) def calculate_r_squared(self) -> float: """Calculate R-squared""" # Utiliser le benchmark ou créer un market proxy market_returns = self.returns.mean() if isinstance(self.returns, pd.DataFrame) else self.returns # Calculer la corrélation au carré correlation = self.returns.corr(market_returns) return correlation ** 2 def calculate_ulcer_index(self, returns: pd.Series) -> float: """Calculate Ulcer Index""" if returns is None or len(returns) == 0: return 0.0 cumulative = (1 + returns).cumprod() drawdowns = cumulative / cumulative.expanding().max() - 1 return np.sqrt(np.mean(drawdowns ** 2)) def calculate_pain_index(self, returns: pd.Series) -> float: """Calculate Pain index""" cumulative = (1 + returns).cumprod() drawdowns = 1 - cumulative / cumulative.expanding().max() return drawdowns.mean() def calculate_pain_ratio(self, returns: Optional[pd.Series] = None) -> float: try: if returns is None: returns = self.returns pain_index = self.calculate_pain_index(returns) if pain_index == 0: return 0.0 return (returns.mean() - self.risk_free_rate) / pain_index except Exception as e: print(f"Error calculating pain ratio: {e}") return 0.0 def _calculate_drawdowns(self) -> np.ndarray: """Calculate drawdown series""" cumulative_returns = (1 + self.returns).cumprod() rolling_max = cumulative_returns.expanding().max() drawdowns = (cumulative_returns - rolling_max) / rolling_max return drawdowns.values def calculate_cvar(self, returns: pd.Series, confidence: float = 0.95) -> float: """Calculate Conditional Value at Risk""" var = self.calculate_var(returns, confidence) return returns[returns <= var].mean() def calculate_alpha(self, returns: pd.Series) -> float: """Calculate alpha""" beta = self.calculate_beta(returns) market_return = self.returns.mean() return returns.mean() - (self.risk_free_rate + beta * (market_return - self.risk_free_rate)) def calculate_tracking_error(self) -> float: """Calculate tracking error""" if self.returns is None: return 0.0 try: return self.returns.std() * np.sqrt(252) except Exception as e: print(f"Error calculating tracking error: {e}") return 0.0 def calculate_burke_ratio(self) -> float: """Calculate Burke ratio with proper error handling""" try: if self.returns is None or len(self.returns) == 0: return 0.0 drawdowns = self._calculate_drawdowns() squared_drawdowns = np.sum(drawdowns ** 2) if squared_drawdowns == 0: return 0.0 return (self.returns.mean() - self.risk_free_rate/252) / np.sqrt(squared_drawdowns) except Exception as e: print(f"Error calculating Burke ratio: {e}") return 0.0 def calculate_var(self, returns: pd.Series, confidence: float = 0.95) -> float: """Calculate Value at Risk""" try: if returns is None or len(returns) == 0: return 0.0 return np.percentile(returns, (1 - confidence) * 100) except Exception as e: print(f"Error calculating VaR: {e}") return 0.0 def calculate_volatility_skew(self, returns: pd.Series) -> float: """Calculate volatility skew""" try: if returns is None: return 0.0 positive_returns = returns[returns > 0] negative_returns = returns[returns < 0] return positive_returns.std() / negative_returns.std() if len(negative_returns) > 0 else 1.0 except Exception as e: print(f"Error calculating volatility skew: {e}") return 1.0