Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from scipy import stats | |
| from typing import Optional, Dict | |
| from arch import arch_model | |
| from sklearn.mixture import GaussianMixture | |
| from dataclasses import dataclass | |
| from typing import Dict | |
| class RiskAssessment: | |
| volatility: float | |
| var_95: float | |
| cvar_95: float | |
| tail_risk: float | |
| correlation_risk: float | |
| liquidity_risk: float | |
| regime_risk: float | |
| systemic_risk: float | |
| stress_test_results: Dict[str, float] | |
| class MarketMetrics: | |
| """Container for market metrics""" | |
| volatility: float | |
| skewness: float | |
| kurtosis: float | |
| tail_risk: float | |
| liquidity_score: float | |
| correlation_structure: Dict[str, float] | |
| regime_probabilities: Dict[str, float] | |
| def get_default(cls) -> 'MarketMetrics': | |
| """Get default market metrics""" | |
| return cls( | |
| volatility=0.15, | |
| skewness=0.0, | |
| kurtosis=3.0, | |
| tail_risk=0.02, | |
| liquidity_score=0.5, | |
| correlation_structure={ | |
| 'avg_correlation': 0.0, | |
| 'max_correlation': 1.0, | |
| 'min_correlation': -1.0 | |
| }, | |
| regime_probabilities={} | |
| ) | |
| class EnhancedRiskMetrics: | |
| """Calcul amélioré des métriques de risque et performance""" | |
| def __init__(self, returns: pd.Series = None, prices: pd.DataFrame = None, risk_free_rate: float = 0.02): | |
| self.returns = returns | |
| self.prices = prices | |
| self.risk_free_rate = risk_free_rate | |
| self._cache = {} | |
| def _cached_calculation(self, key: str, calculation_func): | |
| """Utilise le cache pour les calculs coûteux""" | |
| if key not in self._cache: | |
| self._cache[key] = calculation_func() | |
| return self._cache[key] | |
| def reset_cache(self): | |
| """Réinitialise le cache des calculs""" | |
| self._cache = {} | |
| def calculate_volatility(self) -> float: | |
| """Volatilité avec estimation EWMA pour meilleure réactivité""" | |
| ewm_vol = self.returns.ewm(span=63).std() * np.sqrt(self.trading_days) | |
| garch_vol = self._fit_garch() | |
| # Combine les deux estimations | |
| return 0.7 * ewm_vol + 0.3 * garch_vol | |
| def calculate_annualized_volatility(self) -> float: | |
| return self.calculate_volatility() | |
| def _fit_garch(self) -> float: | |
| """Implémentation GARCH(1,1) pour volatilité""" | |
| from arch import arch_model | |
| model = arch_model(self.returns, vol='Garch', p=1, q=1) | |
| results = model.fit(disp='off') | |
| forecast = results.forecast(horizon=1) | |
| return np.sqrt(forecast.variance.values[-1]) * np.sqrt(self.trading_days) | |
| def calculate_sharpe_ratio(self) -> float: | |
| """Sharpe ratio avec ajustement pour skewness""" | |
| excess_returns = self.returns - self.rf | |
| self.sr = np.sqrt(self.trading_days) * (excess_returns.mean() / excess_returns.std()) # Stocké dans self | |
| skew_adj = self._skewness_adjustment() | |
| return self.sr * skew_adj | |
| def _skewness_adjustment(self) -> float: | |
| """Ajustement du Sharpe ratio pour la skewness""" | |
| skew = stats.skew(self.returns) | |
| kurt = stats.kurtosis(self.returns) | |
| adj = 1 - (skew/6) * self.sr + (kurt-3)/24 * self.sr**2 | |
| return adj | |
| def calculate_sortino_ratio(self) -> float: | |
| """Sortino ratio avec seuil dynamique""" | |
| threshold = max(0, self.rf) | |
| downside_returns = self.returns[self.returns < threshold] | |
| downside_std = np.sqrt(np.sum(downside_returns**2)/len(self.returns)) * np.sqrt(self.trading_days) | |
| return (self.returns.mean() - self.rf) * self.trading_days / downside_std | |
| def calculate_modified_sortino_ratio(self) -> float: | |
| return self.calculate_sortino_ratio() * (1 + abs(self.returns.skew()) * 0.2) | |
| def calculate_max_drawdown(self, returns: Optional[pd.Series] = None) -> float: | |
| """Calculate Maximum Drawdown""" | |
| if returns is None: | |
| returns = self.returns | |
| cumulative = (1 + returns).cumprod() | |
| running_max = cumulative.expanding().max() | |
| drawdowns = cumulative / running_max - 1 | |
| return drawdowns.min() | |
| def calculate_conditional_sharpe_ratio(self) -> float: | |
| """Calcul du ratio de Sharpe conditionnel""" | |
| var_95 = self.calculate_var(confidence=0.95) | |
| cvar_95 = self.calculate_cvar(confidence=0.95) | |
| return (self.returns.mean() - self.rf) * np.sqrt(self.trading_days) / cvar_95 | |
| def calculate_tail_ratio(self, returns: pd.Series) -> float: | |
| """Calculate tail ratio""" | |
| if returns is None or len(returns) == 0: | |
| return 1.0 | |
| return abs(np.percentile(returns, 95)) / abs(np.percentile(returns, 5)) | |
| def calculate_beta(self, market_returns: pd.Series = None) -> float: | |
| """Calcul du bêta avec correction pour l'autocorrélation""" | |
| if market_returns is None: | |
| # Utiliser un benchmark par défaut si non fourni | |
| market_returns = self.returns.mean(axis=1) if isinstance(self.returns, pd.DataFrame) else self.returns | |
| # Correction pour l'autocorrélation | |
| returns_lag = pd.concat([self.returns, self.returns.shift(1)], axis=1).dropna() | |
| market_lag = pd.concat([market_returns, market_returns.shift(1)], axis=1).dropna() | |
| beta = np.cov(returns_lag.iloc[:,0], market_lag.iloc[:,0])[0,1] / np.var(market_lag.iloc[:,0]) | |
| return beta | |
| def calculate_regime_based_risk(self) -> Dict[str, float]: | |
| """Analyse du risque basée sur les régimes de marché""" | |
| # Détection des régimes avec GMM | |
| gmm = GaussianMixture(n_components=3, random_state=42) | |
| gmm.fit(self.returns.values.reshape(-1, 1)) | |
| regimes = gmm.predict(self.returns.values.reshape(-1, 1)) | |
| regime_risks = {} | |
| for i in range(3): | |
| regime_returns = self.returns[regimes == i] | |
| regime_risks[f'regime_{i}'] = { | |
| 'volatility': regime_returns.std() * np.sqrt(self.trading_days), | |
| 'var_95': np.percentile(regime_returns, 5), | |
| 'mean_return': regime_returns.mean() * self.trading_days | |
| } | |
| return regime_risks | |
| def _calculate_regime_metrics(self, returns: pd.Series) -> Dict[str, Dict[str, float]]: | |
| """Calculate metrics under different market regimes""" | |
| try: | |
| # Assurer que returns est une Series | |
| if isinstance(returns, np.ndarray): | |
| returns = pd.Series(returns) | |
| # Vérifier la validité des données | |
| if returns is None or len(returns) == 0: | |
| return {} | |
| # Reshape les données pour le GMM | |
| data = returns.values.reshape(-1, 1) | |
| # Fit le modèle GMM | |
| gmm = GaussianMixture(n_components=3, random_state=42) | |
| gmm.fit(data) | |
| # Prédire les régimes | |
| regimes = gmm.predict(data) | |
| # Créer un DataFrame avec les returns et les régimes | |
| regime_data = pd.DataFrame({ | |
| 'returns': returns, | |
| 'regime': regimes | |
| }) | |
| # Calculer les métriques par régime | |
| regime_metrics = {} | |
| for i in range(gmm.n_components): | |
| regime_returns = regime_data[regime_data['regime'] == i]['returns'] | |
| if len(regime_returns) > 0: | |
| regime_metrics[f'regime_{i}'] = { | |
| 'frequency': len(regime_returns) / len(returns), | |
| 'avg_return': regime_returns.mean() * 252, # Annualisé | |
| 'volatility': regime_returns.std() * np.sqrt(252), | |
| 'sharpe': self.calculate_regime_sharpe(regime_returns), | |
| 'max_drawdown': self.calculate_max_drawdown(regime_returns), | |
| 'var_95': np.percentile(regime_returns, 5), | |
| 'skewness': stats.skew(regime_returns), | |
| 'kurtosis': stats.kurtosis(regime_returns) | |
| } | |
| return regime_metrics | |
| except Exception as e: | |
| print(f"Error calculating regime metrics: {str(e)}") | |
| return {} | |
| def calculate_regime_sharpe(self, returns: pd.Series) -> float: | |
| """Calculate Sharpe ratio for specific regime""" | |
| try: | |
| if len(returns) == 0: | |
| return 0.0 | |
| # Calculer le ratio de Sharpe annualisé | |
| excess_returns = returns - (self.risk_free_rate / 252) | |
| if excess_returns.std() == 0: | |
| return 0.0 | |
| return np.sqrt(252) * excess_returns.mean() / excess_returns.std() | |
| except Exception as e: | |
| print(f"Error calculating regime Sharpe: {str(e)}") | |
| return 0.0 | |
| def calculate_max_drawdown(self, returns: pd.Series) -> float: | |
| """Calculate maximum drawdown""" | |
| try: | |
| if len(returns) == 0: | |
| return 0.0 | |
| # Calculer les rendements cumulatifs | |
| cum_returns = (1 + returns).cumprod() | |
| rolling_max = cum_returns.expanding().max() | |
| drawdowns = cum_returns / rolling_max - 1 | |
| return drawdowns.min() | |
| except Exception as e: | |
| print(f"Error calculating max drawdown: {str(e)}") | |
| return 0.0 | |
| def calculate_extreme_drawdown_risk(self, confidence: float = 0.95) -> float: | |
| """Estimation du risque de drawdown extrême""" | |
| rolling_returns = self.returns.rolling(window=63).sum() | |
| return np.percentile(rolling_returns, (1-confidence)*100) | |
| def calculate_garch_volatility(self) -> pd.Series: | |
| """Estimation GARCH de la volatilité""" | |
| try: | |
| import arch | |
| model = arch.arch_model(self.returns, vol='Garch', p=1, q=1) | |
| res = model.fit(disp='off') | |
| return np.sqrt(res.conditional_volatility) * np.sqrt(self.trading_days) | |
| except ImportError: | |
| return self.returns.rolling(window=63).std() * np.sqrt(self.trading_days) | |
| def calculate_information_ratio(self, benchmark_returns: pd.Series) -> float: | |
| """Calcul du ratio d'information avec ajustement""" | |
| active_returns = self.returns - benchmark_returns | |
| tracking_error = active_returns.std() * np.sqrt(self.trading_days) | |
| ir = active_returns.mean() * self.trading_days / tracking_error | |
| return ir | |
| def calculate_cagr(self, returns: pd.Series) -> float: | |
| """Calculate Compound Annual Growth Rate""" | |
| total_return = (1 + returns).prod() | |
| n_years = len(returns) / 252 # Assuming 252 trading days per year | |
| return (total_return ** (1/n_years)) - 1 | |
| def calculate_treynor_ratio(self) -> float: | |
| beta = self.calculate_beta() | |
| if abs(beta) < 1e-6: | |
| return np.inf | |
| return (self.returns.mean() - self.rf) / beta | |
| def calculate_calmar_ratio(self) -> float: | |
| max_dd = self.calculate_max_drawdown() | |
| if abs(max_dd) < 1e-6: | |
| return np.inf | |
| return -self.calculate_cagr(self.returns) / max_dd | |
| def calculate_average_drawdown(self) -> float: | |
| cum_returns = (1 + self.returns).cumprod() | |
| rolling_max = cum_returns.expanding().max() | |
| drawdowns = cum_returns / rolling_max - 1 | |
| return drawdowns.mean() | |
| def calculate_drawdown_duration(self) -> int: | |
| cum_returns = (1 + self.returns).cumprod() | |
| rolling_max = cum_returns.expanding().max() | |
| drawdowns = cum_returns / rolling_max - 1 | |
| in_drawdown = False | |
| current_duration = 0 | |
| max_duration = 0 | |
| for dd in drawdowns: | |
| if dd < 0: | |
| if not in_drawdown: | |
| in_drawdown = True | |
| current_duration += 1 | |
| else: | |
| if in_drawdown: | |
| max_duration = max(max_duration, current_duration) | |
| current_duration = 0 | |
| in_drawdown = False | |
| return max_duration | |
| def calculate_kurtosis_adjusted_sharpe(self) -> float: | |
| sharpe = self.calculate_sharpe_ratio() | |
| kurt = self.returns.kurtosis() | |
| return sharpe * (1 - (kurt - 3) * 0.1) | |
| def calculate_omega_ratio(self, threshold: float = 0) -> float: | |
| returns_above = self.returns[self.returns > threshold].sum() | |
| returns_below = abs(self.returns[self.returns <= threshold].sum()) | |
| return returns_above / returns_below if returns_below != 0 else np.inf | |
| def calculate_historical_var(self, returns: pd.Series, confidence: float = 0.95) -> float: | |
| """Calculate historical Value at Risk""" | |
| if returns is None or len(returns) == 0: | |
| return 0.0 | |
| return np.percentile(returns, (1 - confidence) * 100) | |
| def calculate_gaussian_var(self, returns: pd.Series, confidence: float = 0.95) -> float: | |
| try: | |
| z_score = stats.norm.ppf(confidence) | |
| mu = returns.mean() | |
| sigma = returns.std() | |
| return -(mu + z_score * sigma) | |
| except Exception as e: | |
| print(f"Error calculating Gaussian VaR: {e}") | |
| return 0.0 | |
| def calculate_downside_deviation(self, returns: pd.Series) -> float: | |
| """Calculate downside deviation""" | |
| if returns is None or len(returns) == 0: | |
| return 0.0 | |
| negative_returns = returns[returns < 0] | |
| return negative_returns.std() * np.sqrt(252) if len(negative_returns) > 0 else 0.0 | |
| def calculate_upside_volatility(self, returns: Optional[pd.Series] = None) -> float: | |
| try: | |
| if returns is None: | |
| returns = self.returns | |
| positive_returns = returns[returns > 0] | |
| return positive_returns.std() * np.sqrt(252) if len(positive_returns) > 0 else 0.0 | |
| except Exception as e: | |
| print(f"Error calculating upside volatility: {e}") | |
| return 0.0 | |
| def calculate_rolling_metrics(self, window: int = 252): | |
| """Calculate rolling versions of key metrics""" | |
| metrics = { | |
| 'rolling_sharpe': self.returns.rolling(window).apply( | |
| lambda x: np.sqrt(252) * (x.mean() - self.risk_free_rate/252) / x.std() | |
| ), | |
| 'rolling_sortino': self.returns.rolling(window).apply( | |
| lambda x: np.sqrt(252) * x.mean() / x[x < 0].std() if len(x[x < 0]) > 0 else np.inf | |
| ), | |
| 'rolling_volatility': self.returns.rolling(window).std() * np.sqrt(252), | |
| 'rolling_beta': self.returns.rolling(window).apply( | |
| lambda x: np.cov(x, pd.Series(1, index=x.index))[0,1] / 1 | |
| ), | |
| 'rolling_var': self.returns.rolling(window).quantile(0.05) | |
| } | |
| return metrics | |
| def calculate_regime_sharpe(self, returns: pd.Series) -> float: | |
| if returns.std() == 0: | |
| return np.inf | |
| return np.sqrt(252) * returns.mean() / returns.std() | |
| def calculate_regime_drawdown(self, returns: pd.Series) -> float: | |
| return self.calculate_max_drawdown(returns) | |
| def calculate_skewness_adjusted_sharpe(self) -> float: | |
| """Calculate Sharpe ratio adjusted for skewness""" | |
| sharpe = self.calculate_sharpe_ratio() | |
| skew = stats.skew(self.returns) | |
| return sharpe * (1 - (skew/6)) | |
| def calculate_rolling_sharpe(self, window: int) -> pd.Series: | |
| """Calculate rolling Sharpe ratio""" | |
| rolling_mean = self.returns.rolling(window=window).mean() | |
| rolling_std = self.returns.rolling(window=window).std() | |
| return np.sqrt(252) * (rolling_mean - self.rf) / rolling_std | |
| def calculate_rolling_sortino(self, window: int) -> pd.Series: | |
| """Calculate rolling Sortino ratio""" | |
| rolling_mean = self.returns.rolling(window=window).mean() | |
| downside_returns = self.returns[self.returns < 0] | |
| rolling_downside = downside_returns.rolling(window=window).std() | |
| return np.sqrt(252) * (rolling_mean - self.rf) / rolling_downside | |
| def calculate_rolling_volatility(self, window: int) -> pd.Series: | |
| """Calculate rolling volatility""" | |
| return self.returns.rolling(window=window).std() * np.sqrt(252) | |
| def calculate_rolling_beta(self, window: int) -> pd.Series: | |
| """Calculate rolling beta""" | |
| returns = pd.DataFrame(self.returns) # Convertir en DataFrame | |
| market_returns = self.returns # Utiliser directement les returns comme benchmark | |
| # Calculer covs et vars roulants | |
| rolling_cov = returns.rolling(window=window).cov(market_returns) | |
| rolling_var = pd.Series(market_returns).rolling(window=window).var() | |
| return rolling_cov / rolling_var | |
| def calculate_rolling_var(self, window: int) -> pd.Series: | |
| """Calculate rolling VaR""" | |
| return self.returns.rolling(window=window).quantile(0.05) | |
| def calculate_skewness(self) -> float: | |
| """Calculate returns skewness""" | |
| return stats.skew(self.returns) | |
| def calculate_kurtosis(self) -> float: | |
| """Calculate returns kurtosis""" | |
| return stats.kurtosis(self.returns) | |
| def calculate_jarque_bera(self, returns: Optional[pd.Series] = None) -> float: | |
| try: | |
| if returns is None: | |
| returns = self.returns | |
| return stats.jarque_bera(returns)[0] | |
| except Exception as e: | |
| print(f"Error calculating Jarque-Bera: {e}") | |
| return 0.0 | |
| def calculate_alpha(self) -> float: | |
| """Calculate Jensen's alpha""" | |
| beta = self.calculate_beta() | |
| return self.returns.mean() - (self.rf + beta * (self.returns.mean() - self.rf)) | |
| def calculate_r_squared(self) -> float: | |
| """Calculate R-squared""" | |
| # Utiliser le benchmark ou créer un market proxy | |
| market_returns = self.returns.mean() if isinstance(self.returns, pd.DataFrame) else self.returns | |
| # Calculer la corrélation au carré | |
| correlation = self.returns.corr(market_returns) | |
| return correlation ** 2 | |
| def calculate_ulcer_index(self, returns: pd.Series) -> float: | |
| """Calculate Ulcer Index""" | |
| if returns is None or len(returns) == 0: | |
| return 0.0 | |
| cumulative = (1 + returns).cumprod() | |
| drawdowns = cumulative / cumulative.expanding().max() - 1 | |
| return np.sqrt(np.mean(drawdowns ** 2)) | |
| def calculate_pain_index(self, returns: pd.Series) -> float: | |
| """Calculate Pain index""" | |
| cumulative = (1 + returns).cumprod() | |
| drawdowns = 1 - cumulative / cumulative.expanding().max() | |
| return drawdowns.mean() | |
| def calculate_pain_ratio(self, returns: Optional[pd.Series] = None) -> float: | |
| try: | |
| if returns is None: | |
| returns = self.returns | |
| pain_index = self.calculate_pain_index(returns) | |
| if pain_index == 0: | |
| return 0.0 | |
| return (returns.mean() - self.risk_free_rate) / pain_index | |
| except Exception as e: | |
| print(f"Error calculating pain ratio: {e}") | |
| return 0.0 | |
| def _calculate_drawdowns(self) -> np.ndarray: | |
| """Calculate drawdown series""" | |
| cumulative_returns = (1 + self.returns).cumprod() | |
| rolling_max = cumulative_returns.expanding().max() | |
| drawdowns = (cumulative_returns - rolling_max) / rolling_max | |
| return drawdowns.values | |
| def calculate_cvar(self, returns: pd.Series, confidence: float = 0.95) -> float: | |
| """Calculate Conditional Value at Risk""" | |
| var = self.calculate_var(returns, confidence) | |
| return returns[returns <= var].mean() | |
| def calculate_alpha(self, returns: pd.Series) -> float: | |
| """Calculate alpha""" | |
| beta = self.calculate_beta(returns) | |
| market_return = self.returns.mean() | |
| return returns.mean() - (self.risk_free_rate + beta * (market_return - self.risk_free_rate)) | |
| def calculate_tracking_error(self) -> float: | |
| """Calculate tracking error""" | |
| if self.returns is None: | |
| return 0.0 | |
| try: | |
| return self.returns.std() * np.sqrt(252) | |
| except Exception as e: | |
| print(f"Error calculating tracking error: {e}") | |
| return 0.0 | |
| def calculate_burke_ratio(self) -> float: | |
| """Calculate Burke ratio with proper error handling""" | |
| try: | |
| if self.returns is None or len(self.returns) == 0: | |
| return 0.0 | |
| drawdowns = self._calculate_drawdowns() | |
| squared_drawdowns = np.sum(drawdowns ** 2) | |
| if squared_drawdowns == 0: | |
| return 0.0 | |
| return (self.returns.mean() - self.risk_free_rate/252) / np.sqrt(squared_drawdowns) | |
| except Exception as e: | |
| print(f"Error calculating Burke ratio: {e}") | |
| return 0.0 | |
| def calculate_var(self, returns: pd.Series, confidence: float = 0.95) -> float: | |
| """Calculate Value at Risk""" | |
| try: | |
| if returns is None or len(returns) == 0: | |
| return 0.0 | |
| return np.percentile(returns, (1 - confidence) * 100) | |
| except Exception as e: | |
| print(f"Error calculating VaR: {e}") | |
| return 0.0 | |
| def calculate_volatility_skew(self, returns: pd.Series) -> float: | |
| """Calculate volatility skew""" | |
| try: | |
| if returns is None: | |
| return 0.0 | |
| positive_returns = returns[returns > 0] | |
| negative_returns = returns[returns < 0] | |
| return positive_returns.std() / negative_returns.std() if len(negative_returns) > 0 else 1.0 | |
| except Exception as e: | |
| print(f"Error calculating volatility skew: {e}") | |
| return 1.0 | |