Spaces:
Sleeping
Sleeping
| from enum import Enum | |
| from dataclasses import dataclass, field | |
| from typing import Dict, Optional, List, Union, Tuple, Any, Union, TypeVar | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime | |
| from scipy import stats | |
| from sklearn.mixture import GaussianMixture | |
| from sklearn.preprocessing import StandardScaler | |
| from .risk_metrics import RiskAssessment | |
| class MarketRegime(Enum): | |
| """Extended market regimes with detailed characteristics""" | |
| BULL_MARKET = "bull_market" | |
| BEAR_MARKET = "bear_market" | |
| HIGH_VOLATILITY = "high_volatility" | |
| LOW_VOLATILITY = "low_volatility" | |
| NEUTRAL = "neutral" | |
| CRISIS = "crisis" | |
| RECOVERY = "recovery" | |
| STAGFLATION = "stagflation" | |
| TRANSITION = "transition" | |
| def get_risk_factor(cls, regime: 'MarketRegime') -> float: | |
| """Get risk adjustment factor for regime with conservative defaults""" | |
| risk_factors = { | |
| cls.BULL_MARKET: 1.2, # Increased risk tolerance in bull markets | |
| cls.BEAR_MARKET: 0.8, # Reduced risk in bear markets | |
| cls.HIGH_VOLATILITY: 0.7, # Significantly reduced risk in volatile periods | |
| cls.LOW_VOLATILITY: 1.1, # Slightly increased risk in calm periods | |
| cls.CRISIS: 0.5, # Minimal risk during crises | |
| cls.RECOVERY: 1.3, # Higher risk tolerance during recovery | |
| cls.STAGFLATION: 0.6, # Reduced risk during stagflation | |
| cls.TRANSITION: 1.0 # Neutral risk during transitions | |
| } | |
| return risk_factors.get(regime, 1.0) | |
| def from_metrics(cls, | |
| trend: float, | |
| volatility: float, | |
| inflation: float, | |
| growth: float) -> 'MarketRegime': | |
| """Determine market regime from multiple metrics""" | |
| if inflation > 0.05 and growth < 0.01: | |
| return cls.STAGFLATION | |
| elif volatility > 0.25: | |
| return cls.HIGH_VOLATILITY | |
| elif volatility < 0.10: | |
| return cls.LOW_VOLATILITY | |
| elif trend > 0.15: # Strong positive trend | |
| return cls.BULL_MARKET | |
| elif trend < -0.15: # Strong negative trend | |
| return cls.BEAR_MARKET | |
| elif -0.15 <= trend <= -0.05: | |
| return cls.CRISIS | |
| elif 0.05 <= trend <= 0.15: | |
| return cls.RECOVERY | |
| return cls.TRANSITION | |
| class MarketMetrics: | |
| """Container for core market metrics""" | |
| volatility: float | |
| trend: float | |
| momentum: float | |
| sentiment: float | |
| liquidity: float | |
| correlation: float | |
| tail_risk: float | |
| regime_probability: Dict[str, float] | |
| last_update: datetime = field(default_factory=datetime.now) | |
| def is_valid(self) -> bool: | |
| """Validate metric values""" | |
| try: | |
| return all([ | |
| 0 <= self.volatility <= 1, | |
| -1 <= self.trend <= 1, | |
| -1 <= self.momentum <= 1, | |
| 0 <= self.sentiment <= 1, | |
| 0 <= self.liquidity <= 1, | |
| -1 <= self.correlation <= 1, | |
| 0 <= self.tail_risk <= 1, | |
| abs(sum(self.regime_probability.values()) - 1.0) < 0.001 | |
| ]) | |
| except: | |
| return False | |
| class MacroIndicators: | |
| """Container for macroeconomic indicators""" | |
| gdp_growth: float | |
| inflation_rate: float | |
| interest_rate: float | |
| unemployment_rate: float | |
| consumer_confidence: float | |
| industrial_production: float | |
| retail_sales: float | |
| housing_market: float | |
| vix_level: Optional[float] = None | |
| def to_dict(self) -> Dict[str, float]: | |
| return {k: v for k, v in self.__dict__.items() if v is not None} | |
| class LiquidityConditions: | |
| """Container for market liquidity metrics""" | |
| bid_ask_spread: float | |
| market_depth: float | |
| trading_volume: float | |
| turnover_ratio: float | |
| amihud_ratio: float # Price impact measure | |
| volume_weighted_spread: float | |
| market_resilience: float | |
| def liquidity_score(self) -> float: | |
| """Calculate composite liquidity score""" | |
| weights = { | |
| 'bid_ask_spread': -0.3, | |
| 'market_depth': 0.2, | |
| 'trading_volume': 0.2, | |
| 'turnover_ratio': 0.1, | |
| 'amihud_ratio': -0.1, | |
| 'volume_weighted_spread': -0.05, | |
| 'market_resilience': 0.05 | |
| } | |
| score = sum(getattr(self, attr) * weight | |
| for attr, weight in weights.items()) | |
| return max(0.0, min(1.0, score)) | |
| class MarketConditions: | |
| """Market conditions dataclass""" | |
| current_regime: MarketRegime | |
| volatility_level: float | |
| market_sentiment: float | |
| interest_rate_environment: str | |
| inflation_rate: float | |
| gdp_growth: float | |
| leading_indicators: Dict[str, float] | |
| correlation_regime: Dict[str, float] | |
| liquidity_conditions: Dict[str, float] | |
| tail_risk_metrics: Dict[str, float] | |
| regime_probabilities: Dict[str, float] | |
| market_metrics: Optional[Dict[str, float]] = None | |
| def __post_init__(self): | |
| """Validate and initialize derived metrics""" | |
| self._validate_data() | |
| self._initialize_derived_metrics() | |
| def get_default(cls) -> 'MarketConditions': | |
| """Retourne des conditions de marché par défaut""" | |
| return cls( | |
| current_regime=MarketRegime.NEUTRAL, | |
| volatility_level=0.15, # Volatilité moyenne historique | |
| market_sentiment=0.5, # Sentiment neutre | |
| interest_rate_environment="NORMAL", | |
| inflation_rate=0.02, # Inflation cible typique | |
| gdp_growth=0.02, # Croissance moyenne historique | |
| leading_indicators={ | |
| 'pmi': 50.0, # PMI neutre | |
| 'consumer_confidence': 100.0, # Niveau de référence | |
| 'yield_curve': 0.0 # Spread neutre | |
| }, | |
| correlation_regime={ | |
| 'stocks_bonds': -0.2, # Corrélation historique typique | |
| 'average_correlation': 0.3 | |
| }, | |
| liquidity_conditions={ | |
| 'market_depth': 1.0, | |
| 'bid_ask_spread': 0.001, | |
| 'trading_volume': 1.0 | |
| }, | |
| tail_risk_metrics={ | |
| 'var_95': -0.02, | |
| 'expected_shortfall': -0.025, | |
| 'tail_dependence': 0.3 | |
| }, | |
| regime_probabilities={ | |
| regime.value: 1/len(MarketRegime) for regime in MarketRegime | |
| } | |
| ) | |
| def is_high_risk(self) -> bool: | |
| """Détermine si les conditions actuelles sont à haut risque""" | |
| return ( | |
| self.current_regime in [MarketRegime.BEAR_MARKET, MarketRegime.HIGH_VOLATILITY, MarketRegime.CRISIS] or | |
| self.volatility_level > 0.25 or | |
| self.market_sentiment < 0.3 | |
| ) | |
| def is_favorable_for_rebalancing(self) -> bool: | |
| """Détermine si les conditions sont favorables pour un rebalancement""" | |
| return ( | |
| self.current_regime != MarketRegime.CRISIS and | |
| self.volatility_level < 0.3 and | |
| self.liquidity_conditions.get('bid_ask_spread', float('inf')) < 0.005 | |
| ) | |
| def get_risk_adjustment_factor(self) -> float: | |
| """Calcule un facteur d'ajustement du risque basé sur les conditions""" | |
| base_factor = 1.0 | |
| # Ajustement selon le régime | |
| regime_factors = { | |
| MarketRegime.BULL_MARKET: 1.2, | |
| MarketRegime.BEAR_MARKET: 0.8, | |
| MarketRegime.HIGH_VOLATILITY: 0.7, | |
| MarketRegime.LOW_VOLATILITY: 1.1, | |
| MarketRegime.CRISIS: 0.5, | |
| MarketRegime.RECOVERY: 1.1, | |
| MarketRegime.NEUTRAL: 1.0 | |
| } | |
| base_factor *= regime_factors[self.current_regime] | |
| # Ajustement pour la volatilité | |
| if self.volatility_level > 0.2: | |
| base_factor *= 0.9 | |
| # Ajustement pour le sentiment | |
| if self.market_sentiment < 0.4: | |
| base_factor *= 0.9 | |
| elif self.market_sentiment > 0.6: | |
| base_factor *= 1.1 | |
| return base_factor | |
| def get_regime_transition_probabilities(self) -> Dict[str, float]: | |
| """Retourne les probabilités de transition de régime""" | |
| if not self.regime_probabilities: | |
| # Probabilités par défaut si non définies | |
| return {regime.value: 1/len(MarketRegime) for regime in MarketRegime} | |
| return self.regime_probabilities | |
| def to_dict(self) -> Dict: | |
| """Convertit les conditions en dictionnaire""" | |
| return { | |
| 'current_regime': self.current_regime.value, | |
| 'volatility_level': self.volatility_level, | |
| 'market_sentiment': self.market_sentiment, | |
| 'interest_rate_environment': self.interest_rate_environment, | |
| 'inflation_rate': self.inflation_rate, | |
| 'gdp_growth': self.gdp_growth, | |
| 'leading_indicators': self.leading_indicators, | |
| 'timestamp': self.timestamp.isoformat(), | |
| 'correlation_regime': self.correlation_regime, | |
| 'liquidity_conditions': self.liquidity_conditions, | |
| 'tail_risk_metrics': self.tail_risk_metrics, | |
| 'regime_probabilities': self.regime_probabilities | |
| } | |
| def _validate_data(self): | |
| """Validate core market data""" | |
| if not self.market_metrics.is_valid(): | |
| raise ValueError("Invalid market metrics") | |
| if self.volatility_level < 0: | |
| raise ValueError("Negative volatility") | |
| if not isinstance(self.current_regime, MarketRegime): | |
| raise ValueError("Invalid market regime") | |
| def _initialize_derived_metrics(self): | |
| """Initialize derived market metrics""" | |
| if not self.regime_probabilities: | |
| self.regime_probabilities = self._calculate_regime_probabilities() | |
| if not self.tail_risk_metrics: | |
| self.tail_risk_metrics = self._calculate_tail_risk_metrics() | |
| if not self.market_stress_indicators: | |
| self.market_stress_indicators = self._calculate_stress_indicators() | |
| def from_market_data(cls, | |
| market_data: pd.DataFrame, | |
| lookback_period: int = 252, | |
| rolling_window: int = 21) -> 'MarketConditions': | |
| """Create MarketConditions instance from market data with comprehensive analysis""" | |
| try: | |
| # Valider et préparer les données | |
| if not cls._validate_market_data(market_data): | |
| raise ValueError("Invalid market data format") | |
| # Calculer les métriques de base | |
| returns = cls._calculate_returns(market_data) | |
| volatility = cls._calculate_volatility(returns, rolling_window) | |
| trend = cls._calculate_trend(returns, lookback_period) | |
| # Calculer les métriques de marché | |
| market_metrics = MarketMetrics( | |
| volatility=volatility, | |
| trend=trend, | |
| momentum=cls._calculate_momentum(returns, rolling_window), | |
| sentiment=cls._calculate_market_sentiment(market_data), | |
| liquidity=cls._calculate_liquidity_score(market_data), | |
| correlation=cls._calculate_average_correlation(market_data), | |
| tail_risk=cls._calculate_tail_risk(returns), | |
| regime_probability=cls._calculate_regime_probabilities_from_data(returns), | |
| last_update=datetime.now() | |
| ) | |
| # Calculer les indicateurs macro | |
| macro_indicators = MacroIndicators( | |
| gdp_growth=cls._estimate_gdp_growth(market_data), | |
| inflation_rate=cls._estimate_inflation_rate(market_data), | |
| interest_rate=cls._get_interest_rate(market_data), | |
| unemployment_rate=cls._estimate_unemployment_rate(market_data), | |
| consumer_confidence=cls._estimate_consumer_confidence(market_data), | |
| industrial_production=cls._estimate_industrial_production(market_data), | |
| retail_sales=cls._estimate_retail_sales(market_data), | |
| housing_market=cls._estimate_housing_market(market_data) | |
| ) | |
| # Calculer les conditions de liquidité | |
| liquidity_conditions = LiquidityConditions( | |
| bid_ask_spread=cls._calculate_bid_ask_spread(market_data), | |
| market_depth=cls._calculate_market_depth(market_data), | |
| trading_volume=cls._calculate_trading_volume(market_data), | |
| turnover_ratio=cls._calculate_turnover_ratio(market_data), | |
| amihud_ratio=cls._calculate_amihud_ratio(market_data), | |
| volume_weighted_spread=cls._calculate_vw_spread(market_data), | |
| market_resilience=cls._calculate_market_resilience(market_data) | |
| ) | |
| # Déterminer le régime de marché | |
| current_regime = MarketRegime.from_metrics( | |
| trend=trend, | |
| volatility=volatility, | |
| inflation=macro_indicators.inflation_rate, | |
| growth=macro_indicators.gdp_growth | |
| ) | |
| return cls( | |
| current_regime=current_regime, | |
| volatility_level=volatility, | |
| market_metrics=market_metrics, | |
| macro_indicators=macro_indicators, | |
| liquidity_conditions=liquidity_conditions, | |
| timestamp=datetime.now(), | |
| correlation_regime=cls._calculate_correlation_regime(market_data), | |
| tail_risk_metrics=cls._calculate_detailed_tail_risk(returns), | |
| regime_probabilities=cls._calculate_regime_probabilities_from_data(returns), | |
| market_stress_indicators=cls._calculate_stress_indicators_from_data(market_data), | |
| cross_asset_correlations=cls._calculate_cross_asset_correlations(market_data), | |
| sentiment_indicators=cls._calculate_detailed_sentiment(market_data) | |
| ) | |
| except Exception as e: | |
| print(f"Error creating market conditions: {str(e)}") | |
| return cls.get_default() | |
| def _validate_market_data(market_data: pd.DataFrame) -> bool: | |
| """Validate market data structure and content""" | |
| required_columns = {'close', 'volume', 'high', 'low'} | |
| if not isinstance(market_data, pd.DataFrame): | |
| return False | |
| if not all(col in market_data.columns for col in required_columns): | |
| return False | |
| if market_data.empty or market_data.isnull().any().any(): | |
| return False | |
| if not isinstance(market_data.index, pd.DatetimeIndex): | |
| return False | |
| return True | |
| def _calculate_returns(data: pd.DataFrame, method: str = 'log') -> pd.Series: | |
| """Calculate returns with option for arithmetic or log returns""" | |
| prices = data['close'] | |
| if method == 'log': | |
| return np.log(prices / prices.shift(1)).dropna() | |
| return (prices / prices.shift(1) - 1).dropna() | |
| def _calculate_volatility(returns: pd.Series, window: int = 21) -> float: | |
| """Calculate volatility with exponential weighting""" | |
| try: | |
| # Utiliser une moyenne mobile exponentielle pour la volatilité | |
| ewm_std = returns.ewm(span=window, adjust=False).std() | |
| annualized_vol = ewm_std.iloc[-1] * np.sqrt(252) | |
| return float(annualized_vol) | |
| except Exception as e: | |
| print(f"Error calculating volatility: {e}") | |
| return 0.15 # Valeur par défaut raisonnable | |
| def _calculate_trend(returns: pd.Series, lookback_period: int = 252) -> float: | |
| """Calculate market trend using multiple indicators""" | |
| try: | |
| # Calculer plusieurs indicateurs de tendance | |
| cumulative_return = (1 + returns).prod() - 1 | |
| linear_reg = np.polyfit(range(len(returns)), returns.values, 1)[0] | |
| ma_ratio = returns.rolling(window=50).mean().iloc[-1] / \ | |
| returns.rolling(window=200).mean().iloc[-1] - 1 | |
| # Combiner les indicateurs avec des poids | |
| trend_score = ( | |
| 0.4 * cumulative_return + | |
| 0.4 * linear_reg * lookback_period + # Normaliser la pente | |
| 0.2 * ma_ratio | |
| ) | |
| return np.clip(trend_score, -1, 1) | |
| except Exception as e: | |
| print(f"Error calculating trend: {e}") | |
| return 0.0 | |
| def _calculate_momentum(returns: pd.Series, window: int = 21) -> float: | |
| """Calculate momentum using multiple timeframes""" | |
| try: | |
| # Calculer le momentum sur différentes périodes | |
| mom_1m = returns.rolling(window=21).sum() | |
| mom_3m = returns.rolling(window=63).sum() | |
| mom_6m = returns.rolling(window=126).sum() | |
| # Combiner avec des poids décroissants | |
| momentum = ( | |
| 0.5 * mom_1m.iloc[-1] + | |
| 0.3 * mom_3m.iloc[-1] + | |
| 0.2 * mom_6m.iloc[-1] | |
| ) | |
| return np.clip(momentum, -1, 1) | |
| except Exception as e: | |
| print(f"Error calculating momentum: {e}") | |
| return 0.0 | |
| def _calculate_liquidity_score(cls, market_data: pd.DataFrame) -> float: | |
| """Calculate comprehensive liquidity score""" | |
| try: | |
| # Calculer les composantes de liquidité | |
| volume_score = cls._calculate_volume_score(market_data) | |
| spread_score = cls._calculate_spread_score(market_data) | |
| depth_score = cls._calculate_depth_score(market_data) | |
| resilience_score = cls._calculate_resilience_score(market_data) | |
| # Combiner les scores avec des poids | |
| liquidity_score = ( | |
| 0.35 * volume_score + | |
| 0.25 * spread_score + | |
| 0.20 * depth_score + | |
| 0.20 * resilience_score | |
| ) | |
| return np.clip(liquidity_score, 0, 1) | |
| except Exception as e: | |
| print(f"Error calculating liquidity score: {e}") | |
| return 0.5 | |
| def _calculate_volume_score(market_data: pd.DataFrame) -> float: | |
| """Calculate volume-based liquidity score""" | |
| try: | |
| volume = market_data['volume'] | |
| avg_volume = volume.rolling(window=20).mean() | |
| vol_ratio = volume.iloc[-1] / avg_volume.iloc[-1] | |
| # Normaliser le ratio | |
| score = 1 / (1 + np.exp(-2 * (vol_ratio - 1))) | |
| return float(score) | |
| except Exception as e: | |
| print(f"Error calculating volume score: {e}") | |
| return 0.5 | |
| def _calculate_spread_score(market_data: pd.DataFrame) -> float: | |
| """Calculate spread-based liquidity score""" | |
| try: | |
| if 'ask' in market_data.columns and 'bid' in market_data.columns: | |
| spread = (market_data['ask'] - market_data['bid']) / \ | |
| ((market_data['ask'] + market_data['bid']) / 2) | |
| avg_spread = spread.rolling(window=20).mean().iloc[-1] | |
| return float(1 - np.clip(avg_spread * 20, 0, 1)) | |
| else: | |
| # Estimation alternative basée sur high/low | |
| hl_spread = (market_data['high'] - market_data['low']) / market_data['close'] | |
| return float(1 - np.clip(hl_spread.mean() * 10, 0, 1)) | |
| except Exception as e: | |
| print(f"Error calculating spread score: {e}") | |
| return 0.5 | |
| def _calculate_amihud_ratio(market_data: pd.DataFrame, window: int = 20) -> float: | |
| """Calculate Amihud illiquidity ratio""" | |
| try: | |
| returns = market_data['close'].pct_change().abs() | |
| volume = market_data['volume'] * market_data['close'] # Dollar volume | |
| amihud = (returns / volume).rolling(window=window).mean() | |
| # Normaliser le ratio | |
| return float(1 / (1 + amihud.iloc[-1] * 1e6)) | |
| except Exception as e: | |
| print(f"Error calculating Amihud ratio: {e}") | |
| return 0.5 | |
| def _calculate_market_resilience(market_data: pd.DataFrame) -> float: | |
| """Calculate market resilience metric""" | |
| try: | |
| returns = market_data['close'].pct_change() | |
| volume = market_data['volume'] | |
| # Calculer l'autocorrélation du volume | |
| vol_autocorr = volume.autocorr() | |
| # Calculer la vitesse de retour à la moyenne des rendements | |
| mean_reversion = -returns.autocorr() | |
| # Combiner les métriques | |
| resilience = 0.5 * (1 - abs(vol_autocorr)) + 0.5 * mean_reversion | |
| return float(np.clip(resilience, 0, 1)) | |
| except Exception as e: | |
| print(f"Error calculating market resilience: {e}") | |
| return 0.5 | |
| def _calculate_correlation_regime(cls, market_data: pd.DataFrame, window: int = 63) -> Dict[str, float]: | |
| """Calculate correlation regime characteristics""" | |
| try: | |
| # Calculer les rendements de tous les actifs | |
| returns = market_data.pct_change().dropna() | |
| # Calculer la matrice de corrélation dynamique | |
| correlation_matrix = cls._calculate_dynamic_correlation(returns, window) | |
| # Extraire les caractéristiques du régime de corrélation | |
| eigenvalues, eigenvectors = np.linalg.eigh(correlation_matrix) | |
| return { | |
| 'average_correlation': float(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)].mean()), | |
| 'correlation_dispersion': float(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)].std()), | |
| 'max_correlation': float(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)].max()), | |
| 'min_correlation': float(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)].min()), | |
| 'largest_eigenvalue': float(eigenvalues[-1]), | |
| 'eigenvalue_dispersion': float(np.std(eigenvalues)), | |
| 'effective_rank': float(np.sum(eigenvalues)**2 / np.sum(eigenvalues**2)) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating correlation regime: {e}") | |
| return { | |
| 'average_correlation': 0.0, | |
| 'correlation_dispersion': 0.0, | |
| 'max_correlation': 0.0, | |
| 'min_correlation': 0.0 | |
| } | |
| def _calculate_dynamic_correlation(returns: pd.DataFrame, window: int = 63) -> np.ndarray: | |
| """Calculate dynamic correlation matrix with exponential weighting""" | |
| try: | |
| # Paramètre de décroissance exponentielle | |
| lambda_param = 0.94 | |
| weights = np.array([lambda_param**(window-i-1) for i in range(window)]) | |
| weights = weights / np.sum(weights) | |
| # Normaliser les retours | |
| normalized_returns = returns.iloc[-window:] * np.sqrt(weights).reshape(-1, 1) | |
| # Calculer la matrice de corrélation | |
| corr_matrix = np.corrcoef(normalized_returns.T) | |
| # Assurer la positive semi-definiteness | |
| return cls._ensure_psd(corr_matrix) | |
| except Exception as e: | |
| print(f"Error calculating dynamic correlation: {e}") | |
| return np.eye(len(returns.columns)) | |
| def _ensure_psd(matrix: np.ndarray, epsilon: float = 1e-6) -> np.ndarray: | |
| """Ensure matrix is positive semi-definite""" | |
| try: | |
| eigenvalues, eigenvectors = np.linalg.eigh(matrix) | |
| eigenvalues = np.maximum(eigenvalues, epsilon) | |
| return eigenvectors @ np.diag(eigenvalues) @ eigenvectors.T | |
| except Exception as e: | |
| print(f"Error ensuring PSD: {e}") | |
| return matrix | |
| def _calculate_tail_risk_metrics(cls, returns: pd.Series) -> Dict[str, float]: | |
| """Calculate comprehensive tail risk metrics""" | |
| try: | |
| # Ajuster pour l'excès de kurtosis | |
| kurtosis = stats.kurtosis(returns) | |
| skewness = stats.skew(returns) | |
| # Calculer les métriques de risque de queue | |
| var_95 = cls._calculate_var(returns, 0.95) | |
| var_99 = cls._calculate_var(returns, 0.99) | |
| es_95 = cls._calculate_expected_shortfall(returns, 0.95) | |
| # Fit une GPD aux queues de distribution | |
| tail_index = cls._estimate_tail_index(returns) | |
| return { | |
| 'VaR_95': float(var_95), | |
| 'VaR_99': float(var_99), | |
| 'ES_95': float(es_95), | |
| 'tail_index': float(tail_index), | |
| 'kurtosis': float(kurtosis), | |
| 'skewness': float(skewness), | |
| 'tail_risk_score': float(cls._calculate_tail_risk_score( | |
| var_95, es_95, kurtosis, skewness, tail_index | |
| )) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating tail risk metrics: {e}") | |
| return { | |
| 'VaR_95': -0.02, | |
| 'ES_95': -0.03, | |
| 'tail_risk_score': 0.5 | |
| } | |
| def _calculate_var(returns: pd.Series, confidence: float = 0.95) -> float: | |
| """Calculate Value at Risk using multiple methods""" | |
| try: | |
| # Calcul historique | |
| hist_var = np.percentile(returns, (1 - confidence) * 100) | |
| # Calcul paramétrique | |
| mu = returns.mean() | |
| sigma = returns.std() | |
| param_var = stats.norm.ppf(1 - confidence, mu, sigma) | |
| # Calcul avec EVT (Extreme Value Theory) | |
| evt_var = cls._calculate_evt_var(returns, confidence) | |
| # Moyenne pondérée des différentes méthodes | |
| weighted_var = 0.4 * hist_var + 0.3 * param_var + 0.3 * evt_var | |
| return float(weighted_var) | |
| except Exception as e: | |
| print(f"Error calculating VaR: {e}") | |
| return float(np.percentile(returns, (1 - confidence) * 100)) | |
| def _calculate_expected_shortfall(returns: pd.Series, confidence: float = 0.95) -> float: | |
| """Calculate Expected Shortfall (Conditional VaR)""" | |
| try: | |
| var = np.percentile(returns, (1 - confidence) * 100) | |
| return float(returns[returns <= var].mean()) | |
| except Exception as e: | |
| print(f"Error calculating Expected Shortfall: {e}") | |
| return float(returns.mean() - 2 * returns.std()) | |
| def _calculate_stress_indicators_from_data(cls, market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Calculate comprehensive market stress indicators""" | |
| try: | |
| returns = market_data['close'].pct_change().dropna() | |
| volume = market_data['volume'] | |
| # Calcul des composantes de stress | |
| volatility_stress = cls._calculate_volatility_stress(returns) | |
| liquidity_stress = cls._calculate_liquidity_stress(market_data) | |
| correlation_stress = cls._calculate_correlation_stress(market_data) | |
| credit_stress = cls._calculate_credit_stress(market_data) | |
| # Indicateur composite de stress | |
| composite_stress = np.mean([ | |
| volatility_stress * 0.35, | |
| liquidity_stress * 0.25, | |
| correlation_stress * 0.20, | |
| credit_stress * 0.20 | |
| ]) | |
| return { | |
| 'composite_stress': float(composite_stress), | |
| 'volatility_stress': float(volatility_stress), | |
| 'liquidity_stress': float(liquidity_stress), | |
| 'correlation_stress': float(correlation_stress), | |
| 'credit_stress': float(credit_stress), | |
| 'stress_momentum': float(cls._calculate_stress_momentum(returns)), | |
| 'stress_regime': cls._determine_stress_regime(composite_stress) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating stress indicators: {e}") | |
| return {'composite_stress': 0.5} | |
| def _calculate_volatility_stress(returns: pd.Series, window: int = 21) -> float: | |
| """Calculate volatility-based stress indicator""" | |
| try: | |
| # Calculer la volatilité réalisée | |
| current_vol = returns.rolling(window=window).std().iloc[-1] | |
| historical_vol = returns.std() | |
| # Calculer les sauts de volatilité | |
| vol_jumps = returns.rolling(window=5).std().diff().abs() | |
| jump_intensity = vol_jumps.mean() | |
| # Calculer le stress basé sur la volatilité | |
| vol_ratio = current_vol / historical_vol | |
| stress_score = (0.7 * np.tanh(vol_ratio - 1) + | |
| 0.3 * np.tanh(jump_intensity / historical_vol)) | |
| return float(np.clip((stress_score + 1) / 2, 0, 1)) | |
| except Exception as e: | |
| print(f"Error calculating volatility stress: {e}") | |
| return 0.5 | |
| def _calculate_regime_probabilities_from_data(cls, | |
| returns: pd.Series, | |
| n_regimes: int = 3) -> Dict[str, float]: | |
| """Calculate regime probabilities using Gaussian Mixture Model""" | |
| try: | |
| # Préparer les features pour le GMM | |
| features = cls._prepare_regime_features(returns) | |
| # Fit du modèle GMM | |
| gmm = GaussianMixture( | |
| n_components=n_regimes, | |
| random_state=42, | |
| covariance_type='full', | |
| reg_covar=1e-4 | |
| ) | |
| # Normaliser les features | |
| scaler = StandardScaler() | |
| scaled_features = scaler.fit_transform(features) | |
| # Fit et prédiction des probabilités | |
| gmm.fit(scaled_features) | |
| probs = gmm.predict_proba(scaled_features[-1].reshape(1, -1))[0] | |
| # Mapper les probabilités aux régimes | |
| regime_names = ['low_vol', 'normal', 'high_vol'] | |
| return dict(zip(regime_names, probs)) | |
| except Exception as e: | |
| print(f"Error calculating regime probabilities: {e}") | |
| return {'normal': 1.0, 'high_vol': 0.0, 'low_vol': 0.0} | |
| def _prepare_regime_features(returns: pd.Series) -> np.ndarray: | |
| """Prepare features for regime detection""" | |
| try: | |
| window = 21 # Fenêtre glissante | |
| features = [] | |
| # Calculer les features | |
| rolling_std = returns.rolling(window=window).std() | |
| rolling_mean = returns.rolling(window=window).mean() | |
| rolling_skew = returns.rolling(window=window).skew() | |
| rolling_kurt = returns.rolling(window=window).kurt() | |
| # Ratio de volatilité haut/bas | |
| high_low_ratio = (returns > 0).rolling(window=window).mean() | |
| # Combiner les features | |
| features = np.column_stack([ | |
| rolling_std, | |
| rolling_mean, | |
| rolling_skew, | |
| rolling_kurt, | |
| high_low_ratio | |
| ]) | |
| return features | |
| except Exception as e: | |
| print(f"Error preparing regime features: {e}") | |
| return np.zeros((len(returns), 5)) | |
| def _determine_stress_regime(stress_level: float) -> str: | |
| """Determine market stress regime""" | |
| if stress_level < 0.3: | |
| return "normal" | |
| elif stress_level < 0.6: | |
| return "elevated" | |
| elif stress_level < 0.8: | |
| return "high" | |
| else: | |
| return "extreme" | |
| def _calculate_detailed_sentiment(cls, market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Calculate comprehensive market sentiment indicators""" | |
| try: | |
| # Calculer les différentes composantes du sentiment | |
| price_momentum = cls._calculate_price_momentum_sentiment(market_data) | |
| volume_sentiment = cls._calculate_volume_sentiment(market_data) | |
| volatility_sentiment = cls._calculate_volatility_sentiment(market_data) | |
| relative_strength = cls._calculate_relative_strength(market_data) | |
| # Indicateurs techniques avancés | |
| tech_indicators = cls._calculate_technical_indicators(market_data) | |
| # Combiner en score de sentiment global | |
| composite_sentiment = np.mean([ | |
| price_momentum * 0.3, | |
| volume_sentiment * 0.2, | |
| volatility_sentiment * 0.2, | |
| relative_strength * 0.15, | |
| tech_indicators['sentiment_score'] * 0.15 | |
| ]) | |
| return { | |
| 'composite_sentiment': float(composite_sentiment), | |
| 'price_momentum': float(price_momentum), | |
| 'volume_sentiment': float(volume_sentiment), | |
| 'volatility_sentiment': float(volatility_sentiment), | |
| 'relative_strength': float(relative_strength), | |
| 'technical_indicators': tech_indicators, | |
| 'sentiment_regime': cls._determine_sentiment_regime(composite_sentiment), | |
| 'sentiment_momentum': float(cls._calculate_sentiment_momentum(market_data)) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating detailed sentiment: {e}") | |
| return {'composite_sentiment': 0.5} | |
| def _calculate_technical_indicators(market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Calculate comprehensive technical indicators""" | |
| try: | |
| close = market_data['close'] | |
| # Moyennes mobiles | |
| ma_20 = close.rolling(window=20).mean() | |
| ma_50 = close.rolling(window=50).mean() | |
| ma_200 = close.rolling(window=200).mean() | |
| # RSI | |
| delta = close.diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() | |
| rs = gain / loss | |
| rsi = 100 - (100 / (1 + rs)) | |
| # MACD | |
| exp1 = close.ewm(span=12, adjust=False).mean() | |
| exp2 = close.ewm(span=26, adjust=False).mean() | |
| macd = exp1 - exp2 | |
| signal = macd.ewm(span=9, adjust=False).mean() | |
| # Bollinger Bands | |
| std = close.rolling(window=20).std() | |
| upper_band = ma_20 + (std * 2) | |
| lower_band = ma_20 - (std * 2) | |
| # Calculer le score technique | |
| tech_score = cls._combine_technical_signals( | |
| close.iloc[-1], | |
| ma_20.iloc[-1], | |
| ma_50.iloc[-1], | |
| ma_200.iloc[-1], | |
| rsi.iloc[-1], | |
| macd.iloc[-1], | |
| signal.iloc[-1], | |
| upper_band.iloc[-1], | |
| lower_band.iloc[-1] | |
| ) | |
| return { | |
| 'sentiment_score': float(tech_score), | |
| 'rsi': float(rsi.iloc[-1]), | |
| 'macd_histogram': float(macd.iloc[-1] - signal.iloc[-1]), | |
| 'bb_position': float((close.iloc[-1] - lower_band.iloc[-1]) / | |
| (upper_band.iloc[-1] - lower_band.iloc[-1])) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating technical indicators: {e}") | |
| return {'sentiment_score': 0.5} | |
| def _combine_technical_signals(*signals) -> float: | |
| """Combine multiple technical signals into a single score""" | |
| try: | |
| # Normaliser chaque signal entre 0 et 1 | |
| normalized_signals = [] | |
| for signal in signals: | |
| if isinstance(signal, (int, float)): | |
| norm_signal = np.clip((signal - np.min(signals)) / | |
| (np.max(signals) - np.min(signals)), 0, 1) | |
| normalized_signals.append(norm_signal) | |
| # Moyenne pondérée des signaux | |
| if normalized_signals: | |
| return float(np.mean(normalized_signals)) | |
| return 0.5 | |
| except Exception as e: | |
| print(f"Error combining technical signals: {e}") | |
| return 0.5 | |
| def _calculate_sentiment_momentum(market_data: pd.DataFrame, | |
| short_window: int = 5, | |
| long_window: int = 20) -> float: | |
| """Calculate sentiment momentum using multiple timeframes""" | |
| try: | |
| returns = market_data['close'].pct_change() | |
| # Calculer les moyennes mobiles du sentiment | |
| short_ma = returns.rolling(window=short_window).mean() | |
| long_ma = returns.rolling(window=long_window).mean() | |
| # Calculer le momentum du sentiment | |
| momentum = (short_ma.iloc[-1] - long_ma.iloc[-1]) / long_ma.iloc[-1] | |
| return float(np.clip(momentum, -1, 1)) | |
| except Exception as e: | |
| print(f"Error calculating sentiment momentum: {e}") | |
| return 0.0 | |
| def _estimate_macro_indicators(cls, market_data: pd.DataFrame) -> MacroIndicators: | |
| """Estimate comprehensive macroeconomic indicators from market data""" | |
| try: | |
| # Estimation des indicateurs principaux | |
| gdp_growth = cls._estimate_gdp_growth(market_data) | |
| inflation_rate = cls._estimate_inflation_rate(market_data) | |
| interest_rate = cls._estimate_interest_rate(market_data) | |
| unemployment = cls._estimate_unemployment_rate(market_data) | |
| # Indicateurs de confiance et activité | |
| consumer_confidence = cls._estimate_consumer_confidence(market_data) | |
| industrial_production = cls._estimate_industrial_production(market_data) | |
| retail_sales = cls._estimate_retail_sales(market_data) | |
| housing_market = cls._estimate_housing_market(market_data) | |
| # VIX si disponible | |
| vix_level = cls._get_vix_level(market_data) | |
| return MacroIndicators( | |
| gdp_growth=gdp_growth, | |
| inflation_rate=inflation_rate, | |
| interest_rate=interest_rate, | |
| unemployment_rate=unemployment, | |
| consumer_confidence=consumer_confidence, | |
| industrial_production=industrial_production, | |
| retail_sales=retail_sales, | |
| housing_market=housing_market, | |
| vix_level=vix_level | |
| ) | |
| except Exception as e: | |
| print(f"Error estimating macro indicators: {e}") | |
| return MacroIndicators.get_default() | |
| def _estimate_gdp_growth(market_data: pd.DataFrame, window: int = 252) -> float: | |
| """Estimate GDP growth from market indicators""" | |
| try: | |
| # Calculer les rendements de long terme | |
| returns = market_data['close'].pct_change() | |
| long_term_return = (1 + returns).rolling(window=window).prod() - 1 | |
| # Calculer la croissance du volume | |
| volume_growth = (market_data['volume'].rolling(window=window).mean().pct_change()) | |
| # Calculer un score composite | |
| growth_score = (0.6 * long_term_return.iloc[-1] + | |
| 0.4 * volume_growth.iloc[-1]) | |
| # Normaliser entre -0.05 et 0.05 (fourchette typique de croissance GDP) | |
| return float(np.clip(growth_score * 0.05, -0.05, 0.05)) | |
| except Exception as e: | |
| print(f"Error estimating GDP growth: {e}") | |
| return 0.02 # Valeur de croissance par défaut | |
| def _estimate_inflation_rate(market_data: pd.DataFrame, window: int = 126) -> float: | |
| """Estimate inflation rate from price trends""" | |
| try: | |
| # Calculer la tendance des prix | |
| price_trend = market_data['close'].pct_change().rolling(window=window).mean() | |
| # Ajuster pour obtenir un taux annualisé | |
| inflation_estimate = price_trend.iloc[-1] * 252 / window | |
| # Normaliser entre 0 et 0.1 (fourchette typique d'inflation) | |
| return float(np.clip(inflation_estimate, 0, 0.1)) | |
| except Exception as e: | |
| print(f"Error estimating inflation rate: {e}") | |
| return 0.02 # Taux d'inflation par défaut | |
| def _estimate_interest_rate(market_data: pd.DataFrame) -> float: | |
| """Estimate effective interest rate from market data""" | |
| try: | |
| # Calculer le taux implicite à partir des prix | |
| dividend_yield = market_data.get('dividend_yield', pd.Series([0.02])).iloc[-1] | |
| earnings_yield = market_data.get('earnings_yield', pd.Series([0.04])).iloc[-1] | |
| # Estimer le taux sans risque | |
| risk_free_rate = (dividend_yield + earnings_yield) / 2 | |
| return float(np.clip(risk_free_rate, 0, 0.1)) | |
| except Exception as e: | |
| print(f"Error estimating interest rate: {e}") | |
| return 0.02 # Taux d'intérêt par défaut | |
| def _estimate_consumer_confidence(cls, market_data: pd.DataFrame) -> float: | |
| """Estimate consumer confidence from market behavior""" | |
| try: | |
| # Analyser le comportement du marché | |
| volatility = cls._calculate_volatility(market_data['close'].pct_change()) | |
| momentum = cls._calculate_momentum(market_data['close'].pct_change()) | |
| volume_trend = cls._calculate_volume_trend(market_data) | |
| # Calculer un score composite | |
| confidence_score = (0.4 * (1 - volatility) + | |
| 0.4 * momentum + | |
| 0.2 * volume_trend) | |
| return float(np.clip(confidence_score, 0, 1)) | |
| except Exception as e: | |
| print(f"Error estimating consumer confidence: {e}") | |
| return 0.5 | |
| def _calculate_volume_trend(market_data: pd.DataFrame, window: int = 21) -> float: | |
| """Calculate normalized volume trend""" | |
| try: | |
| volume = market_data['volume'] | |
| volume_ma = volume.rolling(window=window).mean() | |
| volume_trend = (volume.iloc[-1] / volume_ma.iloc[-1]) - 1 | |
| return float(np.clip((volume_trend + 1) / 2, 0, 1)) | |
| except Exception as e: | |
| print(f"Error calculating volume trend: {e}") | |
| return 0.5 | |
| def _estimate_industrial_production(cls, market_data: pd.DataFrame) -> float: | |
| """Estimate industrial production index from market indicators""" | |
| try: | |
| # Analyser les tendances des secteurs industriels | |
| price_trend = cls._calculate_sector_trend(market_data, 'industrial') | |
| volume_trend = cls._calculate_sector_volume_trend(market_data, 'industrial') | |
| volatility = cls._calculate_sector_volatility(market_data, 'industrial') | |
| # Combiner les indicateurs | |
| production_score = ( | |
| 0.4 * price_trend + | |
| 0.4 * volume_trend + | |
| 0.2 * (1 - volatility) # Volatilité plus faible = meilleure production | |
| ) | |
| # Normaliser entre 0 et 1 | |
| return float(np.clip(production_score, 0, 1)) | |
| except Exception as e: | |
| print(f"Error estimating industrial production: {e}") | |
| return 0.5 | |
| def _estimate_retail_sales(market_data: pd.DataFrame, window: int = 63) -> float: | |
| """Estimate retail sales growth from market data""" | |
| try: | |
| # Calculer les indicateurs de vente au détail | |
| volume = market_data['volume'] | |
| price = market_data['close'] | |
| # Calculer la tendance du volume des transactions | |
| volume_ma = volume.rolling(window=window).mean() | |
| volume_growth = (volume.iloc[-1] / volume_ma.iloc[-1]) - 1 | |
| # Calculer la tendance des prix | |
| price_ma = price.rolling(window=window).mean() | |
| price_growth = (price.iloc[-1] / price_ma.iloc[-1]) - 1 | |
| # Combiner en indicateur de ventes | |
| sales_indicator = (0.6 * volume_growth + 0.4 * price_growth) | |
| return float(np.clip((sales_indicator + 1) / 2, 0, 1)) | |
| except Exception as e: | |
| print(f"Error estimating retail sales: {e}") | |
| return 0.5 | |
| def _estimate_housing_market(cls, market_data: pd.DataFrame) -> float: | |
| """Estimate housing market conditions""" | |
| try: | |
| # Analyser les indicateurs immobiliers | |
| price_momentum = cls._calculate_price_momentum(market_data, 'real_estate') | |
| volume_trend = cls._calculate_volume_trend(market_data) | |
| volatility = cls._calculate_volatility(market_data['close'].pct_change()) | |
| # Indicateurs spécifiques au secteur immobilier | |
| mortgage_rate_impact = cls._estimate_mortgage_rate_impact(market_data) | |
| housing_sentiment = cls._calculate_sector_sentiment(market_data, 'real_estate') | |
| # Combiner en score global | |
| housing_score = np.mean([ | |
| price_momentum * 0.3, | |
| volume_trend * 0.2, | |
| (1 - volatility) * 0.15, | |
| mortgage_rate_impact * 0.2, | |
| housing_sentiment * 0.15 | |
| ]) | |
| return float(np.clip(housing_score, 0, 1)) | |
| except Exception as e: | |
| print(f"Error estimating housing market: {e}") | |
| return 0.5 | |
| def _estimate_mortgage_rate_impact(market_data: pd.DataFrame) -> float: | |
| """Estimate impact of mortgage rates on housing market""" | |
| try: | |
| # Utiliser le rendement des obligations comme proxy du taux hypothécaire | |
| if 'yield' in market_data.columns: | |
| current_yield = market_data['yield'].iloc[-1] | |
| avg_yield = market_data['yield'].mean() | |
| # Impact normalisé (plus le taux est bas, meilleur est le score) | |
| rate_impact = 1 - (current_yield / avg_yield) | |
| return float(np.clip((rate_impact + 1) / 2, 0, 1)) | |
| return 0.5 | |
| except Exception as e: | |
| print(f"Error estimating mortgage rate impact: {e}") | |
| return 0.5 | |
| def _calculate_sector_trend(cls, | |
| market_data: pd.DataFrame, | |
| sector: str, | |
| window: int = 63) -> float: | |
| """Calculate price trend for specific market sector""" | |
| try: | |
| sector_data = cls._get_sector_data(market_data, sector) | |
| if sector_data is None: | |
| return 0.5 | |
| # Calculer la tendance des prix | |
| returns = sector_data['close'].pct_change() | |
| trend = returns.rolling(window=window).mean() | |
| momentum = cls._calculate_momentum(returns) | |
| # Combiner tendance et momentum | |
| sector_trend = 0.7 * trend.iloc[-1] + 0.3 * momentum | |
| return float(np.clip((sector_trend + 1) / 2, 0, 1)) | |
| except Exception as e: | |
| print(f"Error calculating sector trend: {e}") | |
| return 0.5 | |
| def _calculate_sector_volume_trend(market_data: pd.DataFrame, | |
| sector: str, | |
| window: int = 21) -> float: | |
| """Calculate volume trend for specific market sector""" | |
| try: | |
| # Calculer la tendance du volume | |
| volume = market_data['volume'] | |
| volume_ma = volume.rolling(window=window).mean() | |
| volume_trend = (volume.iloc[-1] / volume_ma.iloc[-1]) - 1 | |
| # Normaliser entre 0 et 1 | |
| return float(np.clip((volume_trend + 1) / 2, 0, 1)) | |
| except Exception as e: | |
| print(f"Error calculating sector volume trend: {e}") | |
| return 0.5 | |
| def _calculate_sector_volatility(market_data: pd.DataFrame, | |
| sector: str, | |
| window: int = 21) -> float: | |
| """Calculate volatility for specific market sector""" | |
| try: | |
| returns = market_data['close'].pct_change() | |
| volatility = returns.rolling(window=window).std() * np.sqrt(252) | |
| return float(np.clip(volatility, 0, 1)) | |
| except Exception as e: | |
| print(f"Error calculating sector volatility: {e}") | |
| return 0.5 | |
| def _analyze_economic_cycle(cls, market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Analyze current position in economic cycle""" | |
| try: | |
| # Calculer les indicateurs de cycle | |
| trend_strength = cls._calculate_trend_strength(market_data) | |
| cycle_position = cls._estimate_cycle_position(market_data) | |
| momentum_indicators = cls._calculate_cycle_momentum(market_data) | |
| leading_indicators = cls._calculate_leading_indicators(market_data) | |
| return { | |
| 'cycle_position': float(cycle_position), | |
| 'trend_strength': float(trend_strength), | |
| 'momentum_score': float(momentum_indicators['composite_momentum']), | |
| 'expansion_probability': float(momentum_indicators['expansion_prob']), | |
| 'contraction_probability': float(momentum_indicators['contraction_prob']), | |
| 'turning_point_probability': float(cls._estimate_turning_point_probability( | |
| market_data, leading_indicators | |
| )), | |
| 'cycle_maturity': float(cls._estimate_cycle_maturity( | |
| market_data, cycle_position | |
| )), | |
| 'leading_indicators': leading_indicators | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing economic cycle: {e}") | |
| return {'cycle_position': 0.5} | |
| def _calculate_trend_strength(market_data: pd.DataFrame, windows: List[int] = [50, 100, 200]) -> float: | |
| """Calculate multi-timeframe trend strength""" | |
| try: | |
| price = market_data['close'] | |
| trend_scores = [] | |
| for window in windows: | |
| ma = price.rolling(window=window).mean() | |
| # Score basé sur la position du prix par rapport à la MA | |
| score = (price.iloc[-1] / ma.iloc[-1]) - 1 | |
| trend_scores.append(score) | |
| # Moyenne pondérée des scores (plus de poids aux MAs courtes) | |
| weights = np.array([0.5, 0.3, 0.2]) | |
| composite_score = np.average(trend_scores, weights=weights) | |
| return float(np.clip((composite_score + 1) / 2, 0, 1)) | |
| except Exception as e: | |
| print(f"Error calculating trend strength: {e}") | |
| return 0.5 | |
| def _estimate_cycle_position(cls, market_data: pd.DataFrame) -> float: | |
| """Estimate current position in economic cycle (0 = trough, 0.5 = mid-cycle, 1 = peak)""" | |
| try: | |
| # Calculer les indicateurs composites | |
| price_cycle = cls._calculate_price_cycle(market_data) | |
| volume_cycle = cls._calculate_volume_cycle(market_data) | |
| momentum_cycle = cls._calculate_momentum_cycle(market_data) | |
| # Combiner les indicateurs | |
| cycle_position = np.mean([ | |
| price_cycle * 0.4, | |
| volume_cycle * 0.3, | |
| momentum_cycle * 0.3 | |
| ]) | |
| return float(np.clip(cycle_position, 0, 1)) | |
| except Exception as e: | |
| print(f"Error estimating cycle position: {e}") | |
| return 0.5 | |
| def _calculate_price_cycle(market_data: pd.DataFrame, window: int = 252) -> float: | |
| """Calculate price-based cycle indicator""" | |
| try: | |
| price = market_data['close'] | |
| # Calculer le prix relatif à la tendance long terme | |
| long_ma = price.rolling(window=window).mean() | |
| relative_price = (price - long_ma) / long_ma | |
| # Normaliser entre 0 et 1 | |
| normalized_position = (relative_price - relative_price.min()) / \ | |
| (relative_price.max() - relative_price.min()) | |
| return float(normalized_position.iloc[-1]) | |
| except Exception as e: | |
| print(f"Error calculating price cycle: {e}") | |
| return 0.5 | |
| def _calculate_cycle_momentum(market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Calculate cycle momentum indicators""" | |
| try: | |
| returns = market_data['close'].pct_change() | |
| # Calculer les probabilités de régime | |
| gmm = GaussianMixture(n_components=2, random_state=42) | |
| features = np.column_stack([ | |
| returns.rolling(window=21).mean(), | |
| returns.rolling(window=21).std() | |
| ]) | |
| gmm.fit(features) | |
| # Prédire les probabilités | |
| current_probs = gmm.predict_proba(features[-1].reshape(1, -1))[0] | |
| return { | |
| 'composite_momentum': float(np.mean([ | |
| returns.rolling(window=21).mean().iloc[-1], | |
| returns.rolling(window=63).mean().iloc[-1], | |
| returns.rolling(window=252).mean().iloc[-1] | |
| ])), | |
| 'expansion_prob': float(current_probs[0]), | |
| 'contraction_prob': float(current_probs[1]) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating cycle momentum: {e}") | |
| return { | |
| 'composite_momentum': 0.0, | |
| 'expansion_prob': 0.5, | |
| 'contraction_prob': 0.5 | |
| } | |
| def _calculate_leading_indicators(cls, market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Calculate composite leading indicators""" | |
| try: | |
| yield_curve = cls._calculate_yield_curve_indicator(market_data) | |
| credit_spread = cls._calculate_credit_spread_indicator(market_data) | |
| market_breadth = cls._calculate_market_breadth(market_data) | |
| momentum = cls._calculate_multi_timeframe_momentum(market_data) | |
| return { | |
| 'yield_curve': float(yield_curve), | |
| 'credit_spread': float(credit_spread), | |
| 'market_breadth': float(market_breadth), | |
| 'momentum': float(momentum), | |
| 'composite_leading': float(np.mean([ | |
| yield_curve * 0.3, | |
| credit_spread * 0.3, | |
| market_breadth * 0.2, | |
| momentum * 0.2 | |
| ])) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating leading indicators: {e}") | |
| return {'composite_leading': 0.5} | |
| def _estimate_turning_point_probability(cls, | |
| market_data: pd.DataFrame, | |
| leading_indicators: Dict[str, float]) -> float: | |
| """Estimate probability of market turning point""" | |
| try: | |
| # Analyser les signaux de retournement | |
| technical_signals = cls._analyze_technical_turning_points(market_data) | |
| momentum_signals = cls._analyze_momentum_turning_points(market_data) | |
| breadth_signals = cls._analyze_market_breadth_signals(market_data) | |
| volume_signals = cls._analyze_volume_patterns(market_data) | |
| # Calculer la probabilité composite | |
| turning_point_prob = np.mean([ | |
| technical_signals['turning_prob'] * 0.3, | |
| momentum_signals['turning_prob'] * 0.25, | |
| breadth_signals['turning_prob'] * 0.25, | |
| volume_signals['turning_prob'] * 0.2 | |
| ]) | |
| # Ajuster avec les indicateurs avancés | |
| if leading_indicators['composite_leading'] < 0.3 or leading_indicators['composite_leading'] > 0.7: | |
| turning_point_prob *= 1.2 # Augmenter la probabilité aux extrêmes | |
| return float(np.clip(turning_point_prob, 0, 1)) | |
| except Exception as e: | |
| print(f"Error estimating turning point probability: {e}") | |
| return 0.5 | |
| def _analyze_technical_turning_points(market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Analyze technical patterns for turning points""" | |
| try: | |
| price = market_data['close'] | |
| # Calculer les moyennes mobiles | |
| ma_50 = price.rolling(window=50).mean() | |
| ma_200 = price.rolling(window=200).mean() | |
| # Détecter les croisements de moyennes mobiles | |
| death_cross = (ma_50.iloc[-1] < ma_200.iloc[-1]) and (ma_50.iloc[-2] > ma_200.iloc[-2]) | |
| golden_cross = (ma_50.iloc[-1] > ma_200.iloc[-1]) and (ma_50.iloc[-2] < ma_200.iloc[-2]) | |
| # Détecter les divergences | |
| price_momentum = price.pct_change().rolling(window=14).mean() | |
| rsi = cls._calculate_rsi(price) | |
| divergence_score = cls._calculate_divergence_score(price, rsi) | |
| return { | |
| 'turning_prob': float(np.mean([ | |
| death_cross * 1.0, | |
| golden_cross * 1.0, | |
| divergence_score | |
| ])), | |
| 'bearish_signals': death_cross or (divergence_score > 0.7), | |
| 'bullish_signals': golden_cross or (divergence_score < 0.3) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing technical turning points: {e}") | |
| return {'turning_prob': 0.5} | |
| def _calculate_divergence_score(price: pd.Series, indicator: pd.Series) -> float: | |
| """Calculate divergence score between price and indicator""" | |
| try: | |
| # Calculer les tendances | |
| price_trend = price.pct_change().rolling(window=20).mean() | |
| indicator_trend = indicator.diff().rolling(window=20).mean() | |
| # Détecter les divergences | |
| bullish_div = (price_trend.iloc[-1] < 0) and (indicator_trend.iloc[-1] > 0) | |
| bearish_div = (price_trend.iloc[-1] > 0) and (indicator_trend.iloc[-1] < 0) | |
| if bullish_div: | |
| return float(abs(price_trend.iloc[-1] - indicator_trend.iloc[-1])) | |
| elif bearish_div: | |
| return float(1 - abs(price_trend.iloc[-1] - indicator_trend.iloc[-1])) | |
| return 0.5 | |
| except Exception as e: | |
| print(f"Error calculating divergence score: {e}") | |
| return 0.5 | |
| def _analyze_momentum_turning_points(cls, market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Analyze momentum indicators for turning points""" | |
| try: | |
| # Calculer plusieurs indicateurs de momentum | |
| rsi = cls._calculate_rsi(market_data['close']) | |
| macd = cls._calculate_macd(market_data['close']) | |
| stochastic = cls._calculate_stochastic(market_data) | |
| # Détecter les conditions de surachat/survente | |
| overbought = (rsi > 70) or (stochastic > 80) | |
| oversold = (rsi < 30) or (stochastic < 20) | |
| # Détecter les divergences de momentum | |
| momentum_divergence = cls._detect_momentum_divergence(market_data, macd) | |
| return { | |
| 'turning_prob': float(np.mean([ | |
| overbought * 0.8, | |
| oversold * 0.8, | |
| momentum_divergence * 0.7 | |
| ])), | |
| 'overbought': bool(overbought), | |
| 'oversold': bool(oversold) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing momentum turning points: {e}") | |
| return {'turning_prob': 0.5} | |
| def _calculate_stochastic(market_data: pd.DataFrame, period: int = 14) -> float: | |
| """Calculate stochastic oscillator""" | |
| try: | |
| high = market_data['high'].rolling(window=period).max() | |
| low = market_data['low'].rolling(window=period).min() | |
| close = market_data['close'] | |
| k = 100 * (close - low) / (high - low) | |
| d = k.rolling(window=3).mean() | |
| return float(d.iloc[-1]) | |
| except Exception as e: | |
| print(f"Error calculating stochastic: {e}") | |
| return 50.0 | |
| def _estimate_cycle_maturity(cls, | |
| market_data: pd.DataFrame, | |
| cycle_position: float) -> Dict[str, float]: | |
| """Estimate cycle maturity and remaining duration""" | |
| try: | |
| # Analyser les caractéristiques du cycle | |
| cycle_duration = cls._analyze_cycle_duration(market_data) | |
| trend_strength = cls._calculate_trend_strength(market_data) | |
| momentum_characteristics = cls._analyze_momentum_characteristics(market_data) | |
| volatility_regime = cls._analyze_volatility_regime(market_data) | |
| # Estimer la maturité relative | |
| maturity_score = cls._calculate_maturity_score( | |
| cycle_position, | |
| cycle_duration, | |
| trend_strength, | |
| momentum_characteristics, | |
| volatility_regime | |
| ) | |
| return { | |
| 'maturity_score': float(maturity_score), | |
| 'estimated_remaining_duration': float(cls._estimate_remaining_duration( | |
| maturity_score, cycle_duration | |
| )), | |
| 'cycle_characteristics': { | |
| 'duration': float(cycle_duration['current_duration']), | |
| 'average_duration': float(cycle_duration['average_duration']), | |
| 'trend_strength': float(trend_strength), | |
| 'momentum_state': str(momentum_characteristics['state']), | |
| 'volatility_regime': str(volatility_regime['current_regime']) | |
| } | |
| } | |
| except Exception as e: | |
| print(f"Error estimating cycle maturity: {e}") | |
| return {'maturity_score': 0.5} | |
| def _analyze_cycle_duration(market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Analyze cycle duration characteristics""" | |
| try: | |
| returns = market_data['close'].pct_change() | |
| # Détecter les points de retournement | |
| peaks = cls._detect_peaks(returns) | |
| troughs = cls._detect_troughs(returns) | |
| # Calculer les durées des cycles | |
| cycle_durations = [] | |
| for i in range(len(peaks)-1): | |
| cycle_durations.append(peaks[i+1] - peaks[i]) | |
| return { | |
| 'current_duration': float(len(returns) - peaks[-1]), | |
| 'average_duration': float(np.mean(cycle_durations)), | |
| 'min_duration': float(np.min(cycle_durations)), | |
| 'max_duration': float(np.max(cycle_durations)) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing cycle duration: {e}") | |
| return { | |
| 'current_duration': 0.0, | |
| 'average_duration': 252.0 # Durée moyenne d'un cycle | |
| } | |
| def _detect_peaks(series: pd.Series, window: int = 21) -> List[int]: | |
| """Detect peaks in time series""" | |
| peaks = [] | |
| for i in range(window, len(series)-window): | |
| if all(series[i] > series[i-j] for j in range(1, window+1)) and \ | |
| all(series[i] > series[i+j] for j in range(1, window+1)): | |
| peaks.append(i) | |
| return peaks | |
| def _detect_troughs(series: pd.Series, window: int = 21) -> List[int]: | |
| """Detect troughs in time series""" | |
| troughs = [] | |
| for i in range(window, len(series)-window): | |
| if all(series[i] < series[i-j] for j in range(1, window+1)) and \ | |
| all(series[i] < series[i+j] for j in range(1, window+1)): | |
| troughs.append(i) | |
| return troughs | |
| def _analyze_momentum_characteristics(cls, market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze momentum characteristics across multiple timeframes""" | |
| try: | |
| returns = market_data['close'].pct_change() | |
| # Calculer le momentum sur différentes périodes | |
| momentum_metrics = { | |
| 'short_term': cls._calculate_momentum(returns, 21), | |
| 'medium_term': cls._calculate_momentum(returns, 63), | |
| 'long_term': cls._calculate_momentum(returns, 252) | |
| } | |
| # Déterminer l'état du momentum | |
| momentum_state = cls._determine_momentum_state(momentum_metrics) | |
| return { | |
| 'metrics': momentum_metrics, | |
| 'state': momentum_state, | |
| 'strength': float(cls._calculate_momentum_strength(momentum_metrics)), | |
| 'consistency': float(cls._calculate_momentum_consistency(momentum_metrics)) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing momentum characteristics: {e}") | |
| return {'state': 'neutral'} | |
| def _determine_momentum_state(momentum_metrics: Dict[str, float]) -> str: | |
| """Determine current momentum state""" | |
| # Pondération des différentes périodes | |
| weights = {'short_term': 0.5, 'medium_term': 0.3, 'long_term': 0.2} | |
| weighted_momentum = sum(m * weights[k] for k, m in momentum_metrics.items()) | |
| if weighted_momentum > 0.5: | |
| return 'strong_positive' | |
| elif weighted_momentum > 0.2: | |
| return 'positive' | |
| elif weighted_momentum < -0.5: | |
| return 'strong_negative' | |
| elif weighted_momentum < -0.2: | |
| return 'negative' | |
| return 'neutral' | |
| def _calculate_momentum_strength(momentum_metrics: Dict[str, float]) -> float: | |
| """Calculate overall momentum strength""" | |
| try: | |
| # Utiliser la moyenne absolue des métriques de momentum | |
| return float(np.mean([abs(m) for m in momentum_metrics.values()])) | |
| except Exception as e: | |
| print(f"Error calculating momentum strength: {e}") | |
| return 0.0 | |
| def _calculate_momentum_consistency(momentum_metrics: Dict[str, float]) -> float: | |
| """Calculate momentum consistency across timeframes""" | |
| try: | |
| # Vérifier la cohérence des signes | |
| signs = [np.sign(m) for m in momentum_metrics.values()] | |
| return float(abs(np.mean(signs))) | |
| except Exception as e: | |
| print(f"Error calculating momentum consistency: {e}") | |
| return 0.0 | |
| def _analyze_volatility_regime(cls, market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze volatility regime characteristics using multiple methods""" | |
| try: | |
| returns = market_data['close'].pct_change() | |
| # Calculer les métriques de volatilité | |
| realized_vol = cls._calculate_realized_volatility(returns) | |
| conditional_vol = cls._estimate_conditional_volatility(returns) | |
| regime_probs = cls._estimate_volatility_regime_probabilities(returns) | |
| vol_term_structure = cls._analyze_volatility_term_structure(returns) | |
| # Déterminer le régime actuel | |
| current_regime = cls._identify_volatility_regime( | |
| realized_vol, | |
| conditional_vol, | |
| regime_probs | |
| ) | |
| return { | |
| 'current_regime': str(current_regime), | |
| 'realized_volatility': float(realized_vol), | |
| 'conditional_volatility': float(conditional_vol), | |
| 'regime_probabilities': regime_probs, | |
| 'term_structure': vol_term_structure, | |
| 'regime_metrics': { | |
| 'persistence': float(cls._calculate_regime_persistence(returns)), | |
| 'transition_probability': float(cls._calculate_regime_transition_prob(returns)) | |
| } | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing volatility regime: {e}") | |
| return {'current_regime': 'normal'} | |
| def _calculate_realized_volatility(returns: pd.Series, | |
| windows: List[int] = [21, 63, 252]) -> Dict[str, float]: | |
| """Calculate realized volatility across multiple timeframes""" | |
| try: | |
| realized_vols = {} | |
| for window in windows: | |
| vol = returns.rolling(window=window).std() * np.sqrt(252) | |
| realized_vols[f'{window}d'] = float(vol.iloc[-1]) | |
| return realized_vols | |
| except Exception as e: | |
| print(f"Error calculating realized volatility: {e}") | |
| return {f'{w}d': 0.15 for w in windows} | |
| def _estimate_conditional_volatility(returns: pd.Series) -> float: | |
| """Estimate conditional volatility using GARCH model""" | |
| try: | |
| # Utiliser un modèle GARCH(1,1) | |
| model = arch_model(returns, vol='Garch', p=1, q=1) | |
| result = model.fit(disp='off') | |
| forecast = result.forecast(horizon=1) | |
| return float(np.sqrt(forecast.variance.values[-1][0])) | |
| except Exception as e: | |
| print(f"Error estimating conditional volatility: {e}") | |
| return float(returns.std() * np.sqrt(252)) | |
| def _estimate_volatility_regime_probabilities(returns: pd.Series) -> Dict[str, float]: | |
| """Estimate probabilities of different volatility regimes using GMM""" | |
| try: | |
| # Préparer les données pour GMM | |
| vol_data = returns.rolling(window=21).std().dropna().values.reshape(-1, 1) | |
| # Fit GMM avec 3 composantes (low, normal, high vol) | |
| gmm = GaussianMixture(n_components=3, random_state=42) | |
| gmm.fit(vol_data) | |
| # Calculer les probabilités du régime actuel | |
| current_probs = gmm.predict_proba(vol_data[-1].reshape(1, -1))[0] | |
| return { | |
| 'low_vol': float(current_probs[0]), | |
| 'normal_vol': float(current_probs[1]), | |
| 'high_vol': float(current_probs[2]) | |
| } | |
| except Exception as e: | |
| print(f"Error estimating regime probabilities: {e}") | |
| return {'normal_vol': 1.0} | |
| def _analyze_volatility_term_structure(returns: pd.Series) -> Dict[str, float]: | |
| """Analyze volatility term structure""" | |
| try: | |
| windows = [5, 21, 63, 252] # 1w, 1m, 3m, 1y | |
| vols = {} | |
| for window in windows: | |
| vol = returns.rolling(window=window).std() * np.sqrt(252) | |
| vols[f'{window}d'] = float(vol.iloc[-1]) | |
| # Calculer la pente de la structure | |
| slope = (vols['252d'] - vols['21d']) / vols['21d'] | |
| return { | |
| 'term_structure': vols, | |
| 'slope': float(slope), | |
| 'convexity': float(vols['63d'] / np.mean([vols['21d'], vols['252d']])) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing volatility term structure: {e}") | |
| return {'slope': 0.0} | |
| def _calculate_regime_persistence(returns: pd.Series, lookback: int = 252) -> float: | |
| """Calculate volatility regime persistence""" | |
| try: | |
| vol = returns.rolling(window=21).std() * np.sqrt(252) | |
| vol_median = vol.median() | |
| # Calculer la durée moyenne des régimes | |
| regime = (vol > vol_median).astype(int) | |
| regime_changes = regime.diff().abs() | |
| persistence = 1 - (regime_changes.sum() / len(regime_changes)) | |
| return float(persistence) | |
| except Exception as e: | |
| print(f"Error calculating regime persistence: {e}") | |
| return 0.5 | |
| def _analyze_cycle_transitions(cls, market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze market cycle transitions and rotation signals""" | |
| try: | |
| # Analyser les différentes composantes | |
| price_signals = cls._analyze_price_transition_signals(market_data) | |
| sector_rotation = cls._analyze_sector_rotation(market_data) | |
| breadth_signals = cls._analyze_breadth_transitions(market_data) | |
| volatility_signals = cls._analyze_volatility_transitions(market_data) | |
| # Calculer la probabilité de transition de cycle | |
| transition_probability = cls._calculate_transition_probability( | |
| price_signals, | |
| sector_rotation, | |
| breadth_signals, | |
| volatility_signals | |
| ) | |
| return { | |
| 'transition_probability': float(transition_probability), | |
| 'transition_type': cls._determine_transition_type( | |
| price_signals, | |
| sector_rotation, | |
| breadth_signals | |
| ), | |
| 'price_signals': price_signals, | |
| 'sector_rotation': sector_rotation, | |
| 'breadth_signals': breadth_signals, | |
| 'volatility_signals': volatility_signals, | |
| 'confirmation_metrics': cls._calculate_confirmation_metrics(market_data) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing cycle transitions: {e}") | |
| return {'transition_probability': 0.0} | |
| def _analyze_price_transition_signals(market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Analyze price-based transition signals""" | |
| try: | |
| price = market_data['close'] | |
| returns = price.pct_change() | |
| # Calculer les moyennes mobiles | |
| ma_50 = price.rolling(window=50).mean() | |
| ma_200 = price.rolling(window=200).mean() | |
| # Détecter les croisements et divergences | |
| death_cross = (ma_50 < ma_200) & (ma_50.shift(1) > ma_200.shift(1)) | |
| golden_cross = (ma_50 > ma_200) & (ma_50.shift(1) < ma_200.shift(1)) | |
| # Calculer les tendances de prix | |
| price_trend = returns.rolling(window=63).mean() | |
| momentum = returns.rolling(window=21).mean() | |
| return { | |
| 'death_cross_signal': float(death_cross.iloc[-1]), | |
| 'golden_cross_signal': float(golden_cross.iloc[-1]), | |
| 'trend_strength': float(price_trend.iloc[-1]), | |
| 'momentum_signal': float(momentum.iloc[-1]), | |
| 'price_structure': cls._analyze_price_structure(price) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing price transition signals: {e}") | |
| return {'trend_strength': 0.0} | |
| def _analyze_sector_rotation(cls, market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze sector rotation patterns""" | |
| try: | |
| # Liste des secteurs clés | |
| sectors = ['defensive', 'cyclical', 'technology', 'financial', 'energy'] | |
| sector_returns = {} | |
| sector_momentum = {} | |
| for sector in sectors: | |
| sector_data = cls._get_sector_data(market_data, sector) | |
| if sector_data is not None: | |
| returns = sector_data['close'].pct_change() | |
| sector_returns[sector] = float(returns.mean()) | |
| sector_momentum[sector] = float(returns.rolling(window=63).mean().iloc[-1]) | |
| # Détecter la rotation sectorielle | |
| rotation_pattern = cls._detect_sector_rotation_pattern(sector_returns, sector_momentum) | |
| return { | |
| 'sector_returns': sector_returns, | |
| 'sector_momentum': sector_momentum, | |
| 'rotation_pattern': rotation_pattern, | |
| 'rotation_strength': float(cls._calculate_rotation_strength(sector_returns)), | |
| 'leading_sectors': cls._identify_leading_sectors(sector_momentum) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing sector rotation: {e}") | |
| return {'rotation_pattern': 'unclear'} | |
| def _detect_sector_rotation_pattern(sector_returns: Dict[str, float], | |
| sector_momentum: Dict[str, float]) -> str: | |
| """Detect the current sector rotation pattern""" | |
| try: | |
| # Classer les secteurs par performance | |
| ranked_sectors = sorted(sector_returns.items(), key=lambda x: x[1], reverse=True) | |
| # Analyser le pattern de rotation | |
| if ranked_sectors[0][0] in ['technology', 'financial']: | |
| return 'early_cycle' | |
| elif ranked_sectors[0][0] in ['energy', 'materials']: | |
| return 'mid_cycle' | |
| elif ranked_sectors[0][0] in ['defensive', 'utilities']: | |
| return 'late_cycle' | |
| else: | |
| return 'transition' | |
| except Exception as e: | |
| print(f"Error detecting sector rotation pattern: {e}") | |
| return 'unclear' | |
| def _calculate_rotation_strength(sector_returns: Dict[str, float]) -> float: | |
| """Calculate the strength of sector rotation""" | |
| try: | |
| # Calculer la dispersion des performances sectorielles | |
| returns = list(sector_returns.values()) | |
| return float(np.std(returns) / abs(np.mean(returns))) | |
| except Exception as e: | |
| print(f"Error calculating rotation strength: {e}") | |
| return 0.0 | |
| def _analyze_breadth_transitions(cls, market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze market breadth transition signals""" | |
| try: | |
| # Calculer les indicateurs de largeur de marché | |
| advance_decline = cls._calculate_advance_decline_ratio(market_data) | |
| new_highs_lows = cls._calculate_new_highs_lows_ratio(market_data) | |
| participation = cls._calculate_market_participation(market_data) | |
| # Analyser les divergences | |
| breadth_divergence = cls._detect_breadth_divergence( | |
| market_data['close'], | |
| advance_decline, | |
| participation | |
| ) | |
| return { | |
| 'advance_decline_ratio': float(advance_decline[-1]), | |
| 'new_highs_lows_ratio': float(new_highs_lows[-1]), | |
| 'participation_rate': float(participation[-1]), | |
| 'breadth_divergence': breadth_divergence, | |
| 'breadth_momentum': float(cls._calculate_breadth_momentum(advance_decline)), | |
| 'breadth_trend': str(cls._determine_breadth_trend( | |
| advance_decline, | |
| new_highs_lows, | |
| participation | |
| )) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing breadth transitions: {e}") | |
| return {'participation_rate': 0.5} | |
| def _calculate_advance_decline_ratio(market_data: pd.DataFrame, window: int = 21) -> pd.Series: | |
| """Calculate advance-decline ratio with moving average""" | |
| try: | |
| # Simuler le ratio advances/declines à partir des données disponibles | |
| returns = market_data['close'].pct_change() | |
| advances = (returns > 0).astype(int).rolling(window=window).sum() | |
| declines = (returns < 0).astype(int).rolling(window=window).sum() | |
| return advances / (declines + 1e-6) # éviter division par zéro | |
| except Exception as e: | |
| print(f"Error calculating A/D ratio: {e}") | |
| return pd.Series(1.0, index=market_data.index) | |
| def _calculate_confirmation_metrics(cls, market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Calculate confirmation metrics for cycle transitions""" | |
| try: | |
| # Analyser différents types de confirmation | |
| price_confirmation = cls._analyze_price_confirmation(market_data) | |
| volume_confirmation = cls._analyze_volume_confirmation(market_data) | |
| volatility_confirmation = cls._analyze_volatility_confirmation(market_data) | |
| breadth_confirmation = cls._analyze_breadth_confirmation(market_data) | |
| # Calculer le score de confirmation global | |
| confirmation_score = np.mean([ | |
| price_confirmation['score'], | |
| volume_confirmation['score'], | |
| volatility_confirmation['score'], | |
| breadth_confirmation['score'] | |
| ]) | |
| return { | |
| 'confirmation_score': float(confirmation_score), | |
| 'price_confirmation': price_confirmation, | |
| 'volume_confirmation': volume_confirmation, | |
| 'volatility_confirmation': volatility_confirmation, | |
| 'breadth_confirmation': breadth_confirmation, | |
| 'trend_quality': cls._calculate_trend_quality(market_data), | |
| 'momentum_quality': cls._calculate_momentum_quality(market_data) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating confirmation metrics: {e}") | |
| return {'confirmation_score': 0.5} | |
| def _analyze_price_confirmation(market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Analyze price-based confirmation signals""" | |
| try: | |
| price = market_data['close'] | |
| returns = price.pct_change() | |
| # Calculer les confirmations de prix | |
| trend_confirmation = cls._calculate_trend_confirmation(price) | |
| momentum_confirmation = cls._calculate_momentum_confirmation(returns) | |
| pattern_confirmation = cls._analyze_chart_patterns(price) | |
| return { | |
| 'score': float(np.mean([ | |
| trend_confirmation['score'], | |
| momentum_confirmation['score'], | |
| pattern_confirmation['score'] | |
| ])), | |
| 'trend_confirmation': trend_confirmation, | |
| 'momentum_confirmation': momentum_confirmation, | |
| 'pattern_confirmation': pattern_confirmation | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing price confirmation: {e}") | |
| return {'score': 0.5} | |
| def _analyze_volume_confirmation(market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Analyze volume-based confirmation signals""" | |
| try: | |
| volume = market_data['volume'] | |
| price = market_data['close'] | |
| # Calculer les tendances de volume | |
| volume_trend = volume.rolling(window=21).mean().pct_change() | |
| price_volume_correlation = pd.Series(price).rolling(window=21).corr(pd.Series(volume)) | |
| # Analyser la confirmation par le volume | |
| volume_confirmation = float( | |
| (volume_trend.iloc[-1] > 0) and | |
| (price_volume_correlation.iloc[-1] > 0.3) | |
| ) | |
| return { | |
| 'score': float(volume_confirmation), | |
| 'volume_trend': float(volume_trend.iloc[-1]), | |
| 'price_volume_corr': float(price_volume_correlation.iloc[-1]), | |
| 'volume_quality': cls._calculate_volume_quality(volume) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing volume confirmation: {e}") | |
| return {'score': 0.5} | |
| def _analyze_leading_rotation_indicators(cls, market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze leading indicators for market rotation and regime changes""" | |
| try: | |
| # Analyser les différentes composantes de rotation | |
| sector_signals = cls._analyze_sector_leadership_changes(market_data) | |
| style_rotation = cls._analyze_style_factor_rotation(market_data) | |
| macro_signals = cls._analyze_macro_rotation_signals(market_data) | |
| correlation_signals = cls._analyze_correlation_regime_changes(market_data) | |
| volatility_structure = cls._analyze_volatility_term_structure_changes(market_data) | |
| credit_signals = cls._analyze_credit_market_signals(market_data) | |
| # Calculer les probabilités de transition de régime | |
| regime_transition_probs = cls._calculate_regime_transition_probabilities( | |
| sector_signals, | |
| style_rotation, | |
| macro_signals, | |
| correlation_signals, | |
| volatility_structure, | |
| credit_signals | |
| ) | |
| # Identifier les secteurs en rotation | |
| rotation_candidates = cls._identify_sector_rotation_candidates( | |
| sector_signals['momentum'], | |
| sector_signals['relative_strength'] | |
| ) | |
| # Analyser la qualité des signaux | |
| signal_quality = cls._assess_rotation_signal_quality( | |
| sector_signals, | |
| style_rotation, | |
| macro_signals | |
| ) | |
| return { | |
| 'regime_transition_probabilities': regime_transition_probs, | |
| 'sector_signals': sector_signals, | |
| 'style_rotation': style_rotation, | |
| 'macro_signals': macro_signals, | |
| 'correlation_signals': correlation_signals, | |
| 'volatility_structure': volatility_structure, | |
| 'credit_signals': credit_signals, | |
| 'rotation_candidates': rotation_candidates, | |
| 'signal_quality': signal_quality, | |
| 'leading_indicators': { | |
| 'early_cycle': cls._calculate_early_cycle_probability(macro_signals, credit_signals), | |
| 'mid_cycle': cls._calculate_mid_cycle_probability(sector_signals, style_rotation), | |
| 'late_cycle': cls._calculate_late_cycle_probability(volatility_structure, correlation_signals) | |
| }, | |
| 'transition_confirmation': cls._analyze_transition_confirmation_signals(market_data) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing leading rotation indicators: {e}") | |
| return {'regime_transition_probabilities': {'current_to_new': 0.0}} | |
| def _analyze_sector_leadership_changes(market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze changes in sector leadership and rotation patterns""" | |
| try: | |
| sectors = ['technology', 'financial', 'healthcare', 'consumer', 'industrial', | |
| 'energy', 'materials', 'utilities', 'real_estate', 'communication'] | |
| sector_metrics = {} | |
| for sector in sectors: | |
| # Calculer les métriques par secteur | |
| sector_data = market_data[market_data['sector'] == sector] | |
| returns = sector_data['close'].pct_change() | |
| # Calculer les métriques de momentum et force relative | |
| momentum = returns.rolling(window=63).mean() | |
| rs = sector_data['close'] / market_data['close'].mean() | |
| relative_strength = rs.rolling(window=21).mean() | |
| # Calculer la volatilité et la corrélation | |
| volatility = returns.rolling(window=21).std() | |
| correlation = returns.rolling(window=63).corr(market_data['close'].pct_change()) | |
| sector_metrics[sector] = { | |
| 'momentum': float(momentum.iloc[-1]), | |
| 'relative_strength': float(relative_strength.iloc[-1]), | |
| 'volatility': float(volatility.iloc[-1]), | |
| 'correlation': float(correlation.iloc[-1]), | |
| 'leadership_score': float(cls._calculate_sector_leadership_score( | |
| momentum.iloc[-1], | |
| relative_strength.iloc[-1], | |
| volatility.iloc[-1], | |
| correlation.iloc[-1] | |
| )) | |
| } | |
| # Détecter les changements de leadership | |
| leadership_changes = cls._detect_leadership_changes(sector_metrics) | |
| # Calculer les rotations sectorielles | |
| rotation_matrix = cls._calculate_sector_rotation_matrix(sector_metrics) | |
| return { | |
| 'sector_metrics': sector_metrics, | |
| 'leadership_changes': leadership_changes, | |
| 'rotation_matrix': rotation_matrix, | |
| 'current_leaders': cls._identify_current_sector_leaders(sector_metrics), | |
| 'emerging_leaders': cls._identify_emerging_sector_leaders(sector_metrics), | |
| 'rotation_strength': float(cls._calculate_rotation_strength(sector_metrics)), | |
| 'rotation_momentum': cls._calculate_rotation_momentum(sector_metrics), | |
| 'sector_correlations': cls._calculate_sector_correlations(market_data), | |
| 'regime_characteristics': cls._analyze_sector_regime_characteristics(sector_metrics) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing sector leadership changes: {e}") | |
| return {'sector_metrics': {}} | |
| def _calculate_sector_leadership_score(momentum: float, relative_strength: float, | |
| volatility: float, correlation: float) -> float: | |
| """Calculate composite sector leadership score""" | |
| try: | |
| # Normaliser les composantes | |
| norm_momentum = np.clip(momentum / 0.02, -1, 1) # 2% comme référence | |
| norm_rs = np.clip(relative_strength - 1, -1, 1) | |
| norm_vol = 1 - np.clip(volatility / 0.2, 0, 1) # 20% comme référence | |
| norm_corr = 1 - np.abs(correlation) # Préférer une faible corrélation | |
| # Calculer le score composite | |
| weights = { | |
| 'momentum': 0.35, | |
| 'relative_strength': 0.35, | |
| 'volatility': 0.15, | |
| 'correlation': 0.15 | |
| } | |
| leadership_score = ( | |
| weights['momentum'] * norm_momentum + | |
| weights['relative_strength'] * norm_rs + | |
| weights['volatility'] * norm_vol + | |
| weights['correlation'] * norm_corr | |
| ) | |
| return float(np.clip(leadership_score, -1, 1)) | |
| except Exception as e: | |
| print(f"Error calculating sector leadership score: {e}") | |
| return 0.0 | |
| def _detect_leadership_changes(sector_metrics: Dict[str, Dict[str, float]]) -> Dict[str, Any]: | |
| """Detect changes in sector leadership patterns""" | |
| try: | |
| # Trier les secteurs par score de leadership | |
| ranked_sectors = sorted( | |
| sector_metrics.items(), | |
| key=lambda x: x[1]['leadership_score'], | |
| reverse=True | |
| ) | |
| # Identifier les changements de leadership | |
| current_leaders = ranked_sectors[:3] | |
| previous_leaders = ranked_sectors[3:6] | |
| emerging_leaders = [ | |
| sector for sector, metrics in ranked_sectors | |
| if metrics['momentum'] > metrics['relative_strength'] * 1.5 | |
| ] | |
| # Calculer les métriques de changement | |
| leadership_turnover = len(set(current_leaders) - set(previous_leaders)) / 3 | |
| emergence_strength = np.mean([ | |
| sector_metrics[sector]['momentum'] | |
| for sector in emerging_leaders | |
| ]) if emerging_leaders else 0.0 | |
| return { | |
| 'current_leaders': [sector for sector, _ in current_leaders], | |
| 'previous_leaders': [sector for sector, _ in previous_leaders], | |
| 'emerging_leaders': emerging_leaders, | |
| 'leadership_turnover': float(leadership_turnover), | |
| 'emergence_strength': float(emergence_strength), | |
| 'leadership_stability': float(1 - leadership_turnover), | |
| 'transition_characteristics': { | |
| 'type': cls._determine_transition_type(current_leaders, emerging_leaders), | |
| 'strength': float(emergence_strength / (leadership_turnover + 1e-6)), | |
| 'confirmation': float(len(emerging_leaders) / len(sector_metrics)) | |
| } | |
| } | |
| except Exception as e: | |
| print(f"Error detecting leadership changes: {e}") | |
| return {'current_leaders': [], 'emerging_leaders': []} | |
| def _analyze_style_factor_rotation(cls, market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze style factor rotation and regime transitions""" | |
| try: | |
| # Analyser les différents facteurs de style | |
| style_factors = { | |
| 'value': cls._calculate_value_factor(market_data), | |
| 'growth': cls._calculate_growth_factor(market_data), | |
| 'momentum': cls._calculate_momentum_factor(market_data), | |
| 'quality': cls._calculate_quality_factor(market_data), | |
| 'size': cls._calculate_size_factor(market_data), | |
| 'volatility': cls._calculate_volatility_factor(market_data), | |
| 'yield': cls._calculate_yield_factor(market_data) | |
| } | |
| # Calculer les rotations entre facteurs | |
| factor_rotations = cls._calculate_factor_rotations(style_factors) | |
| # Analyser les régimes de facteurs | |
| factor_regimes = cls._analyze_factor_regimes(style_factors) | |
| # Identifier les facteurs dominants et émergents | |
| dominant_factors = cls._identify_dominant_factors(style_factors) | |
| emerging_factors = cls._identify_emerging_factors(style_factors, factor_rotations) | |
| # Calculer les corrélations entre facteurs | |
| factor_correlations = cls._calculate_factor_correlations(style_factors) | |
| # Analyser la qualité des signaux de rotation | |
| rotation_quality = cls._analyze_factor_rotation_quality( | |
| style_factors, | |
| factor_rotations, | |
| factor_correlations | |
| ) | |
| return { | |
| 'style_factors': style_factors, | |
| 'factor_rotations': factor_rotations, | |
| 'factor_regimes': factor_regimes, | |
| 'dominant_factors': dominant_factors, | |
| 'emerging_factors': emerging_factors, | |
| 'factor_correlations': factor_correlations, | |
| 'rotation_quality': rotation_quality, | |
| 'regime_characteristics': { | |
| 'current_regime': cls._determine_factor_regime(factor_regimes), | |
| 'regime_strength': float(cls._calculate_regime_strength(factor_regimes)), | |
| 'transition_probability': float(cls._calculate_regime_transition_probability(factor_regimes)) | |
| }, | |
| 'style_forecasts': cls._forecast_style_factor_returns(style_factors, factor_regimes) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing style factor rotation: {e}") | |
| return {'style_factors': {}} | |
| def _calculate_value_factor(market_data: pd.DataFrame) -> Dict[str, float]: | |
| """Calculate value factor returns and characteristics""" | |
| try: | |
| # Calculer les ratios de valeur | |
| pe_ratio = market_data['price'] / market_data['earnings'] | |
| pb_ratio = market_data['price'] / market_data['book_value'] | |
| # Calculer le score composite de valeur | |
| value_score = 1 / ( | |
| 0.5 * (pe_ratio / pe_ratio.mean()) + | |
| 0.5 * (pb_ratio / pb_ratio.mean()) | |
| ) | |
| # Calculer les rendements du facteur | |
| value_returns = value_score.rolling(window=21).mean().pct_change() | |
| return { | |
| 'score': float(value_score.iloc[-1]), | |
| 'momentum': float(value_returns.mean()), | |
| 'volatility': float(value_returns.std()), | |
| 'z_score': float((value_score.iloc[-1] - value_score.mean()) / value_score.std()), | |
| 'percentile': float(stats.percentileofscore(value_score, value_score.iloc[-1]) / 100), | |
| 'trend': str(cls._determine_factor_trend(value_returns)) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating value factor: {e}") | |
| return {'score': 0.0} | |
| def _calculate_factor_rotations(style_factors: Dict[str, Dict[str, float]]) -> Dict[str, float]: | |
| """Calculate factor rotations and transitions""" | |
| try: | |
| rotations = {} | |
| for factor1 in style_factors: | |
| for factor2 in style_factors: | |
| if factor1 < factor2: | |
| # Calculer la rotation relative entre les facteurs | |
| rotation = (style_factors[factor1]['momentum'] - | |
| style_factors[factor2]['momentum']) | |
| rotations[f'{factor1}_vs_{factor2}'] = float(rotation) | |
| # Calculer les métriques de rotation globales | |
| return { | |
| 'factor_rotations': rotations, | |
| 'rotation_intensity': float(np.mean([abs(r) for r in rotations.values()])), | |
| 'max_rotation': float(max(abs(min(rotations.values())), | |
| abs(max(rotations.values())))), | |
| 'rotation_dispersion': float(np.std(list(rotations.values()))), | |
| 'dominant_rotation': max(rotations.items(), key=lambda x: abs(x[1]))[0] | |
| } | |
| except Exception as e: | |
| print(f"Error calculating factor rotations: {e}") | |
| return {'rotation_intensity': 0.0} | |
| def _analyze_factor_regimes(cls, style_factors: Dict[str, Dict[str, float]]) -> Dict[str, Any]: | |
| """Analyze factor regimes using machine learning""" | |
| try: | |
| # Préparer les données pour le clustering | |
| factor_data = np.array([ | |
| [factor['score'] for factor in style_factors.values()], | |
| [factor['momentum'] for factor in style_factors.values()], | |
| [factor['volatility'] for factor in style_factors.values()] | |
| ]).T | |
| # Utiliser GMM pour détecter les régimes | |
| gmm = GaussianMixture(n_components=3, random_state=42) | |
| regimes = gmm.fit_predict(factor_data) | |
| regime_probs = gmm.predict_proba(factor_data) | |
| # Caractériser les régimes | |
| regime_characteristics = cls._characterize_factor_regimes( | |
| regimes, | |
| regime_probs, | |
| style_factors | |
| ) | |
| return { | |
| 'current_regime': int(regimes[-1]), | |
| 'regime_probabilities': dict(enumerate(regime_probs[-1])), | |
| 'regime_characteristics': regime_characteristics, | |
| 'regime_stability': float(cls._calculate_regime_stability(regimes)), | |
| 'transition_matrix': cls._calculate_regime_transition_matrix(regimes), | |
| 'regime_duration': cls._calculate_regime_duration(regimes) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing factor regimes: {e}") | |
| return {'current_regime': 0} | |
| def _analyze_macro_rotation_signals(cls, market_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze macroeconomic signals for market rotation""" | |
| try: | |
| # Calculer les indicateurs macro clés | |
| growth_indicators = cls._calculate_growth_indicators(market_data) | |
| inflation_indicators = cls._calculate_inflation_indicators(market_data) | |
| monetary_indicators = cls._calculate_monetary_indicators(market_data) | |
| credit_conditions = cls._analyze_credit_conditions(market_data) | |
| # Déterminer le régime macro actuel | |
| macro_regime = cls._determine_macro_regime( | |
| growth_indicators, | |
| inflation_indicators, | |
| monetary_indicators | |
| ) | |
| # Calculer les probabilités de transition | |
| transition_probs = cls._calculate_macro_transition_probabilities( | |
| macro_regime, | |
| growth_indicators, | |
| inflation_indicators, | |
| monetary_indicators, | |
| credit_conditions | |
| ) | |
| return { | |
| 'current_regime': str(macro_regime), | |
| 'transition_probabilities': transition_probs, | |
| 'growth_indicators': growth_indicators, | |
| 'inflation_indicators': inflation_indicators, | |
| 'monetary_indicators': monetary_indicators, | |
| 'credit_conditions': credit_conditions, | |
| 'regime_implications': cls._calculate_regime_implications(macro_regime), | |
| 'asset_allocation_signals': cls._generate_allocation_signals( | |
| macro_regime, | |
| transition_probs | |
| ) | |
| } | |
| except Exception as e: | |
| print(f"Error analyzing macro rotation signals: {e}") | |
| return {'current_regime': 'neutral'} | |
| def _determine_macro_regime(growth: Dict[str, float], | |
| inflation: Dict[str, float], | |
| monetary: Dict[str, float]) -> str: | |
| """Determine current macroeconomic regime""" | |
| # Déterminer le régime en fonction des indicateurs | |
| if growth['composite_index'] > 0.7 and inflation['level'] < 0.3: | |
| return 'goldilocks' | |
| elif growth['composite_index'] > 0.5 and inflation['level'] > 0.7: | |
| return 'overheating' | |
| elif growth['composite_index'] < 0.3 and inflation['level'] > 0.5: | |
| return 'stagflation' | |
| elif growth['composite_index'] < 0.3 and inflation['level'] < 0.3: | |
| return 'recession' | |
| else: | |
| return 'neutral' | |
| def _generate_allocation_signals(cls, | |
| regime: str, | |
| transition_probs: Dict[str, float]) -> Dict[str, float]: | |
| """Generate asset allocation signals based on macro regime""" | |
| # Définir les allocations optimales par régime | |
| regime_allocations = { | |
| 'goldilocks': {'equities': 0.6, 'bonds': 0.3, 'alternatives': 0.1}, | |
| 'overheating': {'equities': 0.4, 'bonds': 0.2, 'alternatives': 0.4}, | |
| 'stagflation': {'equities': 0.2, 'bonds': 0.3, 'alternatives': 0.5}, | |
| 'recession': {'equities': 0.3, 'bonds': 0.6, 'alternatives': 0.1}, | |
| 'neutral': {'equities': 0.4, 'bonds': 0.4, 'alternatives': 0.2} | |
| } | |
| # Ajuster les allocations en fonction des probabilités de transition | |
| final_allocation = {} | |
| for asset_class in ['equities', 'bonds', 'alternatives']: | |
| weighted_alloc = sum( | |
| prob * regime_allocations[target_regime][asset_class] | |
| for target_regime, prob in transition_probs.items() | |
| ) | |
| final_allocation[asset_class] = weighted_alloc | |
| return final_allocation | |
| def _calculate_regime_implications(regime: str) -> Dict[str, List[str]]: | |
| """Calculate investment implications of current regime""" | |
| implications = { | |
| 'goldilocks': { | |
| 'favorable': ['growth stocks', 'technology', 'consumer discretionary'], | |
| 'unfavorable': ['utilities', 'commodities', 'defensive sectors'] | |
| }, | |
| 'overheating': { | |
| 'favorable': ['commodities', 'materials', 'energy'], | |
| 'unfavorable': ['bonds', 'growth stocks', 'utilities'] | |
| }, | |
| 'stagflation': { | |
| 'favorable': ['commodities', 'defensive stocks', 'real assets'], | |
| 'unfavorable': ['growth stocks', 'consumer discretionary', 'bonds'] | |
| }, | |
| 'recession': { | |
| 'favorable': ['bonds', 'defensive stocks', 'quality factors'], | |
| 'unfavorable': ['cyclicals', 'small caps', 'high yield'] | |
| }, | |
| 'neutral': { | |
| 'favorable': ['balanced exposure', 'quality factors'], | |
| 'unfavorable': ['extreme positioning', 'leverage'] | |
| } | |
| } | |
| return implications.get(regime, implications['neutral']) | |
| def get_default_market_conditions(self) -> 'MarketConditions': | |
| """Return default market conditions""" | |
| return MarketConditions( | |
| current_regime=MarketRegime.LOW_VOLATILITY, | |
| volatility_level=0.15, | |
| market_metrics=MarketMetrics( | |
| volatility=0.15, | |
| trend=0.0, | |
| momentum=0.0, | |
| sentiment=0.5, | |
| liquidity=0.5, | |
| correlation=0.5, | |
| tail_risk=0.5, | |
| regime_probability={'normal': 1.0} | |
| ), | |
| macro_indicators=MacroIndicators( | |
| gdp_growth=0.02, | |
| inflation_rate=0.02, | |
| interest_rate=0.02, | |
| unemployment_rate=0.05, | |
| consumer_confidence=0.5, | |
| industrial_production=0.0, | |
| retail_sales=0.0, | |
| housing_market=0.5 | |
| ), | |
| liquidity_conditions=LiquidityConditions( | |
| bid_ask_spread=0.001, | |
| market_depth=1.0, | |
| trading_volume=1000000, | |
| turnover_ratio=0.2, | |
| amihud_ratio=0.001, | |
| volume_weighted_spread=0.001, | |
| market_resilience=0.5 | |
| ), | |
| timestamp=datetime.now() | |
| ) |