ProfilingAI / src /core /market_states.py
Sandrine Guétin
Version propre de DeepVest
2106f78
raw
history blame
108 kB
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Optional, List, Union, Tuple, Any, Union, TypeVar
import numpy as np
import pandas as pd
from datetime import datetime
from scipy import stats
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from .risk_metrics import RiskAssessment
class MarketRegime(Enum):
"""Extended market regimes with detailed characteristics"""
BULL_MARKET = "bull_market"
BEAR_MARKET = "bear_market"
HIGH_VOLATILITY = "high_volatility"
LOW_VOLATILITY = "low_volatility"
NEUTRAL = "neutral"
CRISIS = "crisis"
RECOVERY = "recovery"
STAGFLATION = "stagflation"
TRANSITION = "transition"
@classmethod
def get_risk_factor(cls, regime: 'MarketRegime') -> float:
"""Get risk adjustment factor for regime with conservative defaults"""
risk_factors = {
cls.BULL_MARKET: 1.2, # Increased risk tolerance in bull markets
cls.BEAR_MARKET: 0.8, # Reduced risk in bear markets
cls.HIGH_VOLATILITY: 0.7, # Significantly reduced risk in volatile periods
cls.LOW_VOLATILITY: 1.1, # Slightly increased risk in calm periods
cls.CRISIS: 0.5, # Minimal risk during crises
cls.RECOVERY: 1.3, # Higher risk tolerance during recovery
cls.STAGFLATION: 0.6, # Reduced risk during stagflation
cls.TRANSITION: 1.0 # Neutral risk during transitions
}
return risk_factors.get(regime, 1.0)
@classmethod
def from_metrics(cls,
trend: float,
volatility: float,
inflation: float,
growth: float) -> 'MarketRegime':
"""Determine market regime from multiple metrics"""
if inflation > 0.05 and growth < 0.01:
return cls.STAGFLATION
elif volatility > 0.25:
return cls.HIGH_VOLATILITY
elif volatility < 0.10:
return cls.LOW_VOLATILITY
elif trend > 0.15: # Strong positive trend
return cls.BULL_MARKET
elif trend < -0.15: # Strong negative trend
return cls.BEAR_MARKET
elif -0.15 <= trend <= -0.05:
return cls.CRISIS
elif 0.05 <= trend <= 0.15:
return cls.RECOVERY
return cls.TRANSITION
@dataclass
class MarketMetrics:
"""Container for core market metrics"""
volatility: float
trend: float
momentum: float
sentiment: float
liquidity: float
correlation: float
tail_risk: float
regime_probability: Dict[str, float]
last_update: datetime = field(default_factory=datetime.now)
def is_valid(self) -> bool:
"""Validate metric values"""
try:
return all([
0 <= self.volatility <= 1,
-1 <= self.trend <= 1,
-1 <= self.momentum <= 1,
0 <= self.sentiment <= 1,
0 <= self.liquidity <= 1,
-1 <= self.correlation <= 1,
0 <= self.tail_risk <= 1,
abs(sum(self.regime_probability.values()) - 1.0) < 0.001
])
except:
return False
@dataclass
class MacroIndicators:
"""Container for macroeconomic indicators"""
gdp_growth: float
inflation_rate: float
interest_rate: float
unemployment_rate: float
consumer_confidence: float
industrial_production: float
retail_sales: float
housing_market: float
vix_level: Optional[float] = None
def to_dict(self) -> Dict[str, float]:
return {k: v for k, v in self.__dict__.items() if v is not None}
@dataclass
class LiquidityConditions:
"""Container for market liquidity metrics"""
bid_ask_spread: float
market_depth: float
trading_volume: float
turnover_ratio: float
amihud_ratio: float # Price impact measure
volume_weighted_spread: float
market_resilience: float
@property
def liquidity_score(self) -> float:
"""Calculate composite liquidity score"""
weights = {
'bid_ask_spread': -0.3,
'market_depth': 0.2,
'trading_volume': 0.2,
'turnover_ratio': 0.1,
'amihud_ratio': -0.1,
'volume_weighted_spread': -0.05,
'market_resilience': 0.05
}
score = sum(getattr(self, attr) * weight
for attr, weight in weights.items())
return max(0.0, min(1.0, score))
@dataclass
class MarketConditions:
"""Market conditions dataclass"""
current_regime: MarketRegime
volatility_level: float
market_sentiment: float
interest_rate_environment: str
inflation_rate: float
gdp_growth: float
leading_indicators: Dict[str, float]
correlation_regime: Dict[str, float]
liquidity_conditions: Dict[str, float]
tail_risk_metrics: Dict[str, float]
regime_probabilities: Dict[str, float]
market_metrics: Optional[Dict[str, float]] = None
def __post_init__(self):
"""Validate and initialize derived metrics"""
self._validate_data()
self._initialize_derived_metrics()
@classmethod
def get_default(cls) -> 'MarketConditions':
"""Retourne des conditions de marché par défaut"""
return cls(
current_regime=MarketRegime.NEUTRAL,
volatility_level=0.15, # Volatilité moyenne historique
market_sentiment=0.5, # Sentiment neutre
interest_rate_environment="NORMAL",
inflation_rate=0.02, # Inflation cible typique
gdp_growth=0.02, # Croissance moyenne historique
leading_indicators={
'pmi': 50.0, # PMI neutre
'consumer_confidence': 100.0, # Niveau de référence
'yield_curve': 0.0 # Spread neutre
},
correlation_regime={
'stocks_bonds': -0.2, # Corrélation historique typique
'average_correlation': 0.3
},
liquidity_conditions={
'market_depth': 1.0,
'bid_ask_spread': 0.001,
'trading_volume': 1.0
},
tail_risk_metrics={
'var_95': -0.02,
'expected_shortfall': -0.025,
'tail_dependence': 0.3
},
regime_probabilities={
regime.value: 1/len(MarketRegime) for regime in MarketRegime
}
)
def is_high_risk(self) -> bool:
"""Détermine si les conditions actuelles sont à haut risque"""
return (
self.current_regime in [MarketRegime.BEAR_MARKET, MarketRegime.HIGH_VOLATILITY, MarketRegime.CRISIS] or
self.volatility_level > 0.25 or
self.market_sentiment < 0.3
)
def is_favorable_for_rebalancing(self) -> bool:
"""Détermine si les conditions sont favorables pour un rebalancement"""
return (
self.current_regime != MarketRegime.CRISIS and
self.volatility_level < 0.3 and
self.liquidity_conditions.get('bid_ask_spread', float('inf')) < 0.005
)
def get_risk_adjustment_factor(self) -> float:
"""Calcule un facteur d'ajustement du risque basé sur les conditions"""
base_factor = 1.0
# Ajustement selon le régime
regime_factors = {
MarketRegime.BULL_MARKET: 1.2,
MarketRegime.BEAR_MARKET: 0.8,
MarketRegime.HIGH_VOLATILITY: 0.7,
MarketRegime.LOW_VOLATILITY: 1.1,
MarketRegime.CRISIS: 0.5,
MarketRegime.RECOVERY: 1.1,
MarketRegime.NEUTRAL: 1.0
}
base_factor *= regime_factors[self.current_regime]
# Ajustement pour la volatilité
if self.volatility_level > 0.2:
base_factor *= 0.9
# Ajustement pour le sentiment
if self.market_sentiment < 0.4:
base_factor *= 0.9
elif self.market_sentiment > 0.6:
base_factor *= 1.1
return base_factor
def get_regime_transition_probabilities(self) -> Dict[str, float]:
"""Retourne les probabilités de transition de régime"""
if not self.regime_probabilities:
# Probabilités par défaut si non définies
return {regime.value: 1/len(MarketRegime) for regime in MarketRegime}
return self.regime_probabilities
def to_dict(self) -> Dict:
"""Convertit les conditions en dictionnaire"""
return {
'current_regime': self.current_regime.value,
'volatility_level': self.volatility_level,
'market_sentiment': self.market_sentiment,
'interest_rate_environment': self.interest_rate_environment,
'inflation_rate': self.inflation_rate,
'gdp_growth': self.gdp_growth,
'leading_indicators': self.leading_indicators,
'timestamp': self.timestamp.isoformat(),
'correlation_regime': self.correlation_regime,
'liquidity_conditions': self.liquidity_conditions,
'tail_risk_metrics': self.tail_risk_metrics,
'regime_probabilities': self.regime_probabilities
}
def _validate_data(self):
"""Validate core market data"""
if not self.market_metrics.is_valid():
raise ValueError("Invalid market metrics")
if self.volatility_level < 0:
raise ValueError("Negative volatility")
if not isinstance(self.current_regime, MarketRegime):
raise ValueError("Invalid market regime")
def _initialize_derived_metrics(self):
"""Initialize derived market metrics"""
if not self.regime_probabilities:
self.regime_probabilities = self._calculate_regime_probabilities()
if not self.tail_risk_metrics:
self.tail_risk_metrics = self._calculate_tail_risk_metrics()
if not self.market_stress_indicators:
self.market_stress_indicators = self._calculate_stress_indicators()
@classmethod
def from_market_data(cls,
market_data: pd.DataFrame,
lookback_period: int = 252,
rolling_window: int = 21) -> 'MarketConditions':
"""Create MarketConditions instance from market data with comprehensive analysis"""
try:
# Valider et préparer les données
if not cls._validate_market_data(market_data):
raise ValueError("Invalid market data format")
# Calculer les métriques de base
returns = cls._calculate_returns(market_data)
volatility = cls._calculate_volatility(returns, rolling_window)
trend = cls._calculate_trend(returns, lookback_period)
# Calculer les métriques de marché
market_metrics = MarketMetrics(
volatility=volatility,
trend=trend,
momentum=cls._calculate_momentum(returns, rolling_window),
sentiment=cls._calculate_market_sentiment(market_data),
liquidity=cls._calculate_liquidity_score(market_data),
correlation=cls._calculate_average_correlation(market_data),
tail_risk=cls._calculate_tail_risk(returns),
regime_probability=cls._calculate_regime_probabilities_from_data(returns),
last_update=datetime.now()
)
# Calculer les indicateurs macro
macro_indicators = MacroIndicators(
gdp_growth=cls._estimate_gdp_growth(market_data),
inflation_rate=cls._estimate_inflation_rate(market_data),
interest_rate=cls._get_interest_rate(market_data),
unemployment_rate=cls._estimate_unemployment_rate(market_data),
consumer_confidence=cls._estimate_consumer_confidence(market_data),
industrial_production=cls._estimate_industrial_production(market_data),
retail_sales=cls._estimate_retail_sales(market_data),
housing_market=cls._estimate_housing_market(market_data)
)
# Calculer les conditions de liquidité
liquidity_conditions = LiquidityConditions(
bid_ask_spread=cls._calculate_bid_ask_spread(market_data),
market_depth=cls._calculate_market_depth(market_data),
trading_volume=cls._calculate_trading_volume(market_data),
turnover_ratio=cls._calculate_turnover_ratio(market_data),
amihud_ratio=cls._calculate_amihud_ratio(market_data),
volume_weighted_spread=cls._calculate_vw_spread(market_data),
market_resilience=cls._calculate_market_resilience(market_data)
)
# Déterminer le régime de marché
current_regime = MarketRegime.from_metrics(
trend=trend,
volatility=volatility,
inflation=macro_indicators.inflation_rate,
growth=macro_indicators.gdp_growth
)
return cls(
current_regime=current_regime,
volatility_level=volatility,
market_metrics=market_metrics,
macro_indicators=macro_indicators,
liquidity_conditions=liquidity_conditions,
timestamp=datetime.now(),
correlation_regime=cls._calculate_correlation_regime(market_data),
tail_risk_metrics=cls._calculate_detailed_tail_risk(returns),
regime_probabilities=cls._calculate_regime_probabilities_from_data(returns),
market_stress_indicators=cls._calculate_stress_indicators_from_data(market_data),
cross_asset_correlations=cls._calculate_cross_asset_correlations(market_data),
sentiment_indicators=cls._calculate_detailed_sentiment(market_data)
)
except Exception as e:
print(f"Error creating market conditions: {str(e)}")
return cls.get_default()
@staticmethod
def _validate_market_data(market_data: pd.DataFrame) -> bool:
"""Validate market data structure and content"""
required_columns = {'close', 'volume', 'high', 'low'}
if not isinstance(market_data, pd.DataFrame):
return False
if not all(col in market_data.columns for col in required_columns):
return False
if market_data.empty or market_data.isnull().any().any():
return False
if not isinstance(market_data.index, pd.DatetimeIndex):
return False
return True
@staticmethod
def _calculate_returns(data: pd.DataFrame, method: str = 'log') -> pd.Series:
"""Calculate returns with option for arithmetic or log returns"""
prices = data['close']
if method == 'log':
return np.log(prices / prices.shift(1)).dropna()
return (prices / prices.shift(1) - 1).dropna()
@staticmethod
def _calculate_volatility(returns: pd.Series, window: int = 21) -> float:
"""Calculate volatility with exponential weighting"""
try:
# Utiliser une moyenne mobile exponentielle pour la volatilité
ewm_std = returns.ewm(span=window, adjust=False).std()
annualized_vol = ewm_std.iloc[-1] * np.sqrt(252)
return float(annualized_vol)
except Exception as e:
print(f"Error calculating volatility: {e}")
return 0.15 # Valeur par défaut raisonnable
@staticmethod
def _calculate_trend(returns: pd.Series, lookback_period: int = 252) -> float:
"""Calculate market trend using multiple indicators"""
try:
# Calculer plusieurs indicateurs de tendance
cumulative_return = (1 + returns).prod() - 1
linear_reg = np.polyfit(range(len(returns)), returns.values, 1)[0]
ma_ratio = returns.rolling(window=50).mean().iloc[-1] / \
returns.rolling(window=200).mean().iloc[-1] - 1
# Combiner les indicateurs avec des poids
trend_score = (
0.4 * cumulative_return +
0.4 * linear_reg * lookback_period + # Normaliser la pente
0.2 * ma_ratio
)
return np.clip(trend_score, -1, 1)
except Exception as e:
print(f"Error calculating trend: {e}")
return 0.0
@staticmethod
def _calculate_momentum(returns: pd.Series, window: int = 21) -> float:
"""Calculate momentum using multiple timeframes"""
try:
# Calculer le momentum sur différentes périodes
mom_1m = returns.rolling(window=21).sum()
mom_3m = returns.rolling(window=63).sum()
mom_6m = returns.rolling(window=126).sum()
# Combiner avec des poids décroissants
momentum = (
0.5 * mom_1m.iloc[-1] +
0.3 * mom_3m.iloc[-1] +
0.2 * mom_6m.iloc[-1]
)
return np.clip(momentum, -1, 1)
except Exception as e:
print(f"Error calculating momentum: {e}")
return 0.0
@classmethod
def _calculate_liquidity_score(cls, market_data: pd.DataFrame) -> float:
"""Calculate comprehensive liquidity score"""
try:
# Calculer les composantes de liquidité
volume_score = cls._calculate_volume_score(market_data)
spread_score = cls._calculate_spread_score(market_data)
depth_score = cls._calculate_depth_score(market_data)
resilience_score = cls._calculate_resilience_score(market_data)
# Combiner les scores avec des poids
liquidity_score = (
0.35 * volume_score +
0.25 * spread_score +
0.20 * depth_score +
0.20 * resilience_score
)
return np.clip(liquidity_score, 0, 1)
except Exception as e:
print(f"Error calculating liquidity score: {e}")
return 0.5
@staticmethod
def _calculate_volume_score(market_data: pd.DataFrame) -> float:
"""Calculate volume-based liquidity score"""
try:
volume = market_data['volume']
avg_volume = volume.rolling(window=20).mean()
vol_ratio = volume.iloc[-1] / avg_volume.iloc[-1]
# Normaliser le ratio
score = 1 / (1 + np.exp(-2 * (vol_ratio - 1)))
return float(score)
except Exception as e:
print(f"Error calculating volume score: {e}")
return 0.5
@staticmethod
def _calculate_spread_score(market_data: pd.DataFrame) -> float:
"""Calculate spread-based liquidity score"""
try:
if 'ask' in market_data.columns and 'bid' in market_data.columns:
spread = (market_data['ask'] - market_data['bid']) / \
((market_data['ask'] + market_data['bid']) / 2)
avg_spread = spread.rolling(window=20).mean().iloc[-1]
return float(1 - np.clip(avg_spread * 20, 0, 1))
else:
# Estimation alternative basée sur high/low
hl_spread = (market_data['high'] - market_data['low']) / market_data['close']
return float(1 - np.clip(hl_spread.mean() * 10, 0, 1))
except Exception as e:
print(f"Error calculating spread score: {e}")
return 0.5
@staticmethod
def _calculate_amihud_ratio(market_data: pd.DataFrame, window: int = 20) -> float:
"""Calculate Amihud illiquidity ratio"""
try:
returns = market_data['close'].pct_change().abs()
volume = market_data['volume'] * market_data['close'] # Dollar volume
amihud = (returns / volume).rolling(window=window).mean()
# Normaliser le ratio
return float(1 / (1 + amihud.iloc[-1] * 1e6))
except Exception as e:
print(f"Error calculating Amihud ratio: {e}")
return 0.5
@staticmethod
def _calculate_market_resilience(market_data: pd.DataFrame) -> float:
"""Calculate market resilience metric"""
try:
returns = market_data['close'].pct_change()
volume = market_data['volume']
# Calculer l'autocorrélation du volume
vol_autocorr = volume.autocorr()
# Calculer la vitesse de retour à la moyenne des rendements
mean_reversion = -returns.autocorr()
# Combiner les métriques
resilience = 0.5 * (1 - abs(vol_autocorr)) + 0.5 * mean_reversion
return float(np.clip(resilience, 0, 1))
except Exception as e:
print(f"Error calculating market resilience: {e}")
return 0.5
@classmethod
def _calculate_correlation_regime(cls, market_data: pd.DataFrame, window: int = 63) -> Dict[str, float]:
"""Calculate correlation regime characteristics"""
try:
# Calculer les rendements de tous les actifs
returns = market_data.pct_change().dropna()
# Calculer la matrice de corrélation dynamique
correlation_matrix = cls._calculate_dynamic_correlation(returns, window)
# Extraire les caractéristiques du régime de corrélation
eigenvalues, eigenvectors = np.linalg.eigh(correlation_matrix)
return {
'average_correlation': float(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)].mean()),
'correlation_dispersion': float(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)].std()),
'max_correlation': float(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)].max()),
'min_correlation': float(correlation_matrix[np.triu_indices_from(correlation_matrix, k=1)].min()),
'largest_eigenvalue': float(eigenvalues[-1]),
'eigenvalue_dispersion': float(np.std(eigenvalues)),
'effective_rank': float(np.sum(eigenvalues)**2 / np.sum(eigenvalues**2))
}
except Exception as e:
print(f"Error calculating correlation regime: {e}")
return {
'average_correlation': 0.0,
'correlation_dispersion': 0.0,
'max_correlation': 0.0,
'min_correlation': 0.0
}
@staticmethod
def _calculate_dynamic_correlation(returns: pd.DataFrame, window: int = 63) -> np.ndarray:
"""Calculate dynamic correlation matrix with exponential weighting"""
try:
# Paramètre de décroissance exponentielle
lambda_param = 0.94
weights = np.array([lambda_param**(window-i-1) for i in range(window)])
weights = weights / np.sum(weights)
# Normaliser les retours
normalized_returns = returns.iloc[-window:] * np.sqrt(weights).reshape(-1, 1)
# Calculer la matrice de corrélation
corr_matrix = np.corrcoef(normalized_returns.T)
# Assurer la positive semi-definiteness
return cls._ensure_psd(corr_matrix)
except Exception as e:
print(f"Error calculating dynamic correlation: {e}")
return np.eye(len(returns.columns))
@staticmethod
def _ensure_psd(matrix: np.ndarray, epsilon: float = 1e-6) -> np.ndarray:
"""Ensure matrix is positive semi-definite"""
try:
eigenvalues, eigenvectors = np.linalg.eigh(matrix)
eigenvalues = np.maximum(eigenvalues, epsilon)
return eigenvectors @ np.diag(eigenvalues) @ eigenvectors.T
except Exception as e:
print(f"Error ensuring PSD: {e}")
return matrix
@classmethod
def _calculate_tail_risk_metrics(cls, returns: pd.Series) -> Dict[str, float]:
"""Calculate comprehensive tail risk metrics"""
try:
# Ajuster pour l'excès de kurtosis
kurtosis = stats.kurtosis(returns)
skewness = stats.skew(returns)
# Calculer les métriques de risque de queue
var_95 = cls._calculate_var(returns, 0.95)
var_99 = cls._calculate_var(returns, 0.99)
es_95 = cls._calculate_expected_shortfall(returns, 0.95)
# Fit une GPD aux queues de distribution
tail_index = cls._estimate_tail_index(returns)
return {
'VaR_95': float(var_95),
'VaR_99': float(var_99),
'ES_95': float(es_95),
'tail_index': float(tail_index),
'kurtosis': float(kurtosis),
'skewness': float(skewness),
'tail_risk_score': float(cls._calculate_tail_risk_score(
var_95, es_95, kurtosis, skewness, tail_index
))
}
except Exception as e:
print(f"Error calculating tail risk metrics: {e}")
return {
'VaR_95': -0.02,
'ES_95': -0.03,
'tail_risk_score': 0.5
}
@staticmethod
def _calculate_var(returns: pd.Series, confidence: float = 0.95) -> float:
"""Calculate Value at Risk using multiple methods"""
try:
# Calcul historique
hist_var = np.percentile(returns, (1 - confidence) * 100)
# Calcul paramétrique
mu = returns.mean()
sigma = returns.std()
param_var = stats.norm.ppf(1 - confidence, mu, sigma)
# Calcul avec EVT (Extreme Value Theory)
evt_var = cls._calculate_evt_var(returns, confidence)
# Moyenne pondérée des différentes méthodes
weighted_var = 0.4 * hist_var + 0.3 * param_var + 0.3 * evt_var
return float(weighted_var)
except Exception as e:
print(f"Error calculating VaR: {e}")
return float(np.percentile(returns, (1 - confidence) * 100))
@staticmethod
def _calculate_expected_shortfall(returns: pd.Series, confidence: float = 0.95) -> float:
"""Calculate Expected Shortfall (Conditional VaR)"""
try:
var = np.percentile(returns, (1 - confidence) * 100)
return float(returns[returns <= var].mean())
except Exception as e:
print(f"Error calculating Expected Shortfall: {e}")
return float(returns.mean() - 2 * returns.std())
@classmethod
def _calculate_stress_indicators_from_data(cls, market_data: pd.DataFrame) -> Dict[str, float]:
"""Calculate comprehensive market stress indicators"""
try:
returns = market_data['close'].pct_change().dropna()
volume = market_data['volume']
# Calcul des composantes de stress
volatility_stress = cls._calculate_volatility_stress(returns)
liquidity_stress = cls._calculate_liquidity_stress(market_data)
correlation_stress = cls._calculate_correlation_stress(market_data)
credit_stress = cls._calculate_credit_stress(market_data)
# Indicateur composite de stress
composite_stress = np.mean([
volatility_stress * 0.35,
liquidity_stress * 0.25,
correlation_stress * 0.20,
credit_stress * 0.20
])
return {
'composite_stress': float(composite_stress),
'volatility_stress': float(volatility_stress),
'liquidity_stress': float(liquidity_stress),
'correlation_stress': float(correlation_stress),
'credit_stress': float(credit_stress),
'stress_momentum': float(cls._calculate_stress_momentum(returns)),
'stress_regime': cls._determine_stress_regime(composite_stress)
}
except Exception as e:
print(f"Error calculating stress indicators: {e}")
return {'composite_stress': 0.5}
@staticmethod
def _calculate_volatility_stress(returns: pd.Series, window: int = 21) -> float:
"""Calculate volatility-based stress indicator"""
try:
# Calculer la volatilité réalisée
current_vol = returns.rolling(window=window).std().iloc[-1]
historical_vol = returns.std()
# Calculer les sauts de volatilité
vol_jumps = returns.rolling(window=5).std().diff().abs()
jump_intensity = vol_jumps.mean()
# Calculer le stress basé sur la volatilité
vol_ratio = current_vol / historical_vol
stress_score = (0.7 * np.tanh(vol_ratio - 1) +
0.3 * np.tanh(jump_intensity / historical_vol))
return float(np.clip((stress_score + 1) / 2, 0, 1))
except Exception as e:
print(f"Error calculating volatility stress: {e}")
return 0.5
@classmethod
def _calculate_regime_probabilities_from_data(cls,
returns: pd.Series,
n_regimes: int = 3) -> Dict[str, float]:
"""Calculate regime probabilities using Gaussian Mixture Model"""
try:
# Préparer les features pour le GMM
features = cls._prepare_regime_features(returns)
# Fit du modèle GMM
gmm = GaussianMixture(
n_components=n_regimes,
random_state=42,
covariance_type='full',
reg_covar=1e-4
)
# Normaliser les features
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
# Fit et prédiction des probabilités
gmm.fit(scaled_features)
probs = gmm.predict_proba(scaled_features[-1].reshape(1, -1))[0]
# Mapper les probabilités aux régimes
regime_names = ['low_vol', 'normal', 'high_vol']
return dict(zip(regime_names, probs))
except Exception as e:
print(f"Error calculating regime probabilities: {e}")
return {'normal': 1.0, 'high_vol': 0.0, 'low_vol': 0.0}
@staticmethod
def _prepare_regime_features(returns: pd.Series) -> np.ndarray:
"""Prepare features for regime detection"""
try:
window = 21 # Fenêtre glissante
features = []
# Calculer les features
rolling_std = returns.rolling(window=window).std()
rolling_mean = returns.rolling(window=window).mean()
rolling_skew = returns.rolling(window=window).skew()
rolling_kurt = returns.rolling(window=window).kurt()
# Ratio de volatilité haut/bas
high_low_ratio = (returns > 0).rolling(window=window).mean()
# Combiner les features
features = np.column_stack([
rolling_std,
rolling_mean,
rolling_skew,
rolling_kurt,
high_low_ratio
])
return features
except Exception as e:
print(f"Error preparing regime features: {e}")
return np.zeros((len(returns), 5))
@staticmethod
def _determine_stress_regime(stress_level: float) -> str:
"""Determine market stress regime"""
if stress_level < 0.3:
return "normal"
elif stress_level < 0.6:
return "elevated"
elif stress_level < 0.8:
return "high"
else:
return "extreme"
@classmethod
def _calculate_detailed_sentiment(cls, market_data: pd.DataFrame) -> Dict[str, float]:
"""Calculate comprehensive market sentiment indicators"""
try:
# Calculer les différentes composantes du sentiment
price_momentum = cls._calculate_price_momentum_sentiment(market_data)
volume_sentiment = cls._calculate_volume_sentiment(market_data)
volatility_sentiment = cls._calculate_volatility_sentiment(market_data)
relative_strength = cls._calculate_relative_strength(market_data)
# Indicateurs techniques avancés
tech_indicators = cls._calculate_technical_indicators(market_data)
# Combiner en score de sentiment global
composite_sentiment = np.mean([
price_momentum * 0.3,
volume_sentiment * 0.2,
volatility_sentiment * 0.2,
relative_strength * 0.15,
tech_indicators['sentiment_score'] * 0.15
])
return {
'composite_sentiment': float(composite_sentiment),
'price_momentum': float(price_momentum),
'volume_sentiment': float(volume_sentiment),
'volatility_sentiment': float(volatility_sentiment),
'relative_strength': float(relative_strength),
'technical_indicators': tech_indicators,
'sentiment_regime': cls._determine_sentiment_regime(composite_sentiment),
'sentiment_momentum': float(cls._calculate_sentiment_momentum(market_data))
}
except Exception as e:
print(f"Error calculating detailed sentiment: {e}")
return {'composite_sentiment': 0.5}
@staticmethod
def _calculate_technical_indicators(market_data: pd.DataFrame) -> Dict[str, float]:
"""Calculate comprehensive technical indicators"""
try:
close = market_data['close']
# Moyennes mobiles
ma_20 = close.rolling(window=20).mean()
ma_50 = close.rolling(window=50).mean()
ma_200 = close.rolling(window=200).mean()
# RSI
delta = close.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
# MACD
exp1 = close.ewm(span=12, adjust=False).mean()
exp2 = close.ewm(span=26, adjust=False).mean()
macd = exp1 - exp2
signal = macd.ewm(span=9, adjust=False).mean()
# Bollinger Bands
std = close.rolling(window=20).std()
upper_band = ma_20 + (std * 2)
lower_band = ma_20 - (std * 2)
# Calculer le score technique
tech_score = cls._combine_technical_signals(
close.iloc[-1],
ma_20.iloc[-1],
ma_50.iloc[-1],
ma_200.iloc[-1],
rsi.iloc[-1],
macd.iloc[-1],
signal.iloc[-1],
upper_band.iloc[-1],
lower_band.iloc[-1]
)
return {
'sentiment_score': float(tech_score),
'rsi': float(rsi.iloc[-1]),
'macd_histogram': float(macd.iloc[-1] - signal.iloc[-1]),
'bb_position': float((close.iloc[-1] - lower_band.iloc[-1]) /
(upper_band.iloc[-1] - lower_band.iloc[-1]))
}
except Exception as e:
print(f"Error calculating technical indicators: {e}")
return {'sentiment_score': 0.5}
@staticmethod
def _combine_technical_signals(*signals) -> float:
"""Combine multiple technical signals into a single score"""
try:
# Normaliser chaque signal entre 0 et 1
normalized_signals = []
for signal in signals:
if isinstance(signal, (int, float)):
norm_signal = np.clip((signal - np.min(signals)) /
(np.max(signals) - np.min(signals)), 0, 1)
normalized_signals.append(norm_signal)
# Moyenne pondérée des signaux
if normalized_signals:
return float(np.mean(normalized_signals))
return 0.5
except Exception as e:
print(f"Error combining technical signals: {e}")
return 0.5
@staticmethod
def _calculate_sentiment_momentum(market_data: pd.DataFrame,
short_window: int = 5,
long_window: int = 20) -> float:
"""Calculate sentiment momentum using multiple timeframes"""
try:
returns = market_data['close'].pct_change()
# Calculer les moyennes mobiles du sentiment
short_ma = returns.rolling(window=short_window).mean()
long_ma = returns.rolling(window=long_window).mean()
# Calculer le momentum du sentiment
momentum = (short_ma.iloc[-1] - long_ma.iloc[-1]) / long_ma.iloc[-1]
return float(np.clip(momentum, -1, 1))
except Exception as e:
print(f"Error calculating sentiment momentum: {e}")
return 0.0
@classmethod
def _estimate_macro_indicators(cls, market_data: pd.DataFrame) -> MacroIndicators:
"""Estimate comprehensive macroeconomic indicators from market data"""
try:
# Estimation des indicateurs principaux
gdp_growth = cls._estimate_gdp_growth(market_data)
inflation_rate = cls._estimate_inflation_rate(market_data)
interest_rate = cls._estimate_interest_rate(market_data)
unemployment = cls._estimate_unemployment_rate(market_data)
# Indicateurs de confiance et activité
consumer_confidence = cls._estimate_consumer_confidence(market_data)
industrial_production = cls._estimate_industrial_production(market_data)
retail_sales = cls._estimate_retail_sales(market_data)
housing_market = cls._estimate_housing_market(market_data)
# VIX si disponible
vix_level = cls._get_vix_level(market_data)
return MacroIndicators(
gdp_growth=gdp_growth,
inflation_rate=inflation_rate,
interest_rate=interest_rate,
unemployment_rate=unemployment,
consumer_confidence=consumer_confidence,
industrial_production=industrial_production,
retail_sales=retail_sales,
housing_market=housing_market,
vix_level=vix_level
)
except Exception as e:
print(f"Error estimating macro indicators: {e}")
return MacroIndicators.get_default()
@staticmethod
def _estimate_gdp_growth(market_data: pd.DataFrame, window: int = 252) -> float:
"""Estimate GDP growth from market indicators"""
try:
# Calculer les rendements de long terme
returns = market_data['close'].pct_change()
long_term_return = (1 + returns).rolling(window=window).prod() - 1
# Calculer la croissance du volume
volume_growth = (market_data['volume'].rolling(window=window).mean().pct_change())
# Calculer un score composite
growth_score = (0.6 * long_term_return.iloc[-1] +
0.4 * volume_growth.iloc[-1])
# Normaliser entre -0.05 et 0.05 (fourchette typique de croissance GDP)
return float(np.clip(growth_score * 0.05, -0.05, 0.05))
except Exception as e:
print(f"Error estimating GDP growth: {e}")
return 0.02 # Valeur de croissance par défaut
@staticmethod
def _estimate_inflation_rate(market_data: pd.DataFrame, window: int = 126) -> float:
"""Estimate inflation rate from price trends"""
try:
# Calculer la tendance des prix
price_trend = market_data['close'].pct_change().rolling(window=window).mean()
# Ajuster pour obtenir un taux annualisé
inflation_estimate = price_trend.iloc[-1] * 252 / window
# Normaliser entre 0 et 0.1 (fourchette typique d'inflation)
return float(np.clip(inflation_estimate, 0, 0.1))
except Exception as e:
print(f"Error estimating inflation rate: {e}")
return 0.02 # Taux d'inflation par défaut
@staticmethod
def _estimate_interest_rate(market_data: pd.DataFrame) -> float:
"""Estimate effective interest rate from market data"""
try:
# Calculer le taux implicite à partir des prix
dividend_yield = market_data.get('dividend_yield', pd.Series([0.02])).iloc[-1]
earnings_yield = market_data.get('earnings_yield', pd.Series([0.04])).iloc[-1]
# Estimer le taux sans risque
risk_free_rate = (dividend_yield + earnings_yield) / 2
return float(np.clip(risk_free_rate, 0, 0.1))
except Exception as e:
print(f"Error estimating interest rate: {e}")
return 0.02 # Taux d'intérêt par défaut
@classmethod
def _estimate_consumer_confidence(cls, market_data: pd.DataFrame) -> float:
"""Estimate consumer confidence from market behavior"""
try:
# Analyser le comportement du marché
volatility = cls._calculate_volatility(market_data['close'].pct_change())
momentum = cls._calculate_momentum(market_data['close'].pct_change())
volume_trend = cls._calculate_volume_trend(market_data)
# Calculer un score composite
confidence_score = (0.4 * (1 - volatility) +
0.4 * momentum +
0.2 * volume_trend)
return float(np.clip(confidence_score, 0, 1))
except Exception as e:
print(f"Error estimating consumer confidence: {e}")
return 0.5
@staticmethod
def _calculate_volume_trend(market_data: pd.DataFrame, window: int = 21) -> float:
"""Calculate normalized volume trend"""
try:
volume = market_data['volume']
volume_ma = volume.rolling(window=window).mean()
volume_trend = (volume.iloc[-1] / volume_ma.iloc[-1]) - 1
return float(np.clip((volume_trend + 1) / 2, 0, 1))
except Exception as e:
print(f"Error calculating volume trend: {e}")
return 0.5
@classmethod
def _estimate_industrial_production(cls, market_data: pd.DataFrame) -> float:
"""Estimate industrial production index from market indicators"""
try:
# Analyser les tendances des secteurs industriels
price_trend = cls._calculate_sector_trend(market_data, 'industrial')
volume_trend = cls._calculate_sector_volume_trend(market_data, 'industrial')
volatility = cls._calculate_sector_volatility(market_data, 'industrial')
# Combiner les indicateurs
production_score = (
0.4 * price_trend +
0.4 * volume_trend +
0.2 * (1 - volatility) # Volatilité plus faible = meilleure production
)
# Normaliser entre 0 et 1
return float(np.clip(production_score, 0, 1))
except Exception as e:
print(f"Error estimating industrial production: {e}")
return 0.5
@staticmethod
def _estimate_retail_sales(market_data: pd.DataFrame, window: int = 63) -> float:
"""Estimate retail sales growth from market data"""
try:
# Calculer les indicateurs de vente au détail
volume = market_data['volume']
price = market_data['close']
# Calculer la tendance du volume des transactions
volume_ma = volume.rolling(window=window).mean()
volume_growth = (volume.iloc[-1] / volume_ma.iloc[-1]) - 1
# Calculer la tendance des prix
price_ma = price.rolling(window=window).mean()
price_growth = (price.iloc[-1] / price_ma.iloc[-1]) - 1
# Combiner en indicateur de ventes
sales_indicator = (0.6 * volume_growth + 0.4 * price_growth)
return float(np.clip((sales_indicator + 1) / 2, 0, 1))
except Exception as e:
print(f"Error estimating retail sales: {e}")
return 0.5
@classmethod
def _estimate_housing_market(cls, market_data: pd.DataFrame) -> float:
"""Estimate housing market conditions"""
try:
# Analyser les indicateurs immobiliers
price_momentum = cls._calculate_price_momentum(market_data, 'real_estate')
volume_trend = cls._calculate_volume_trend(market_data)
volatility = cls._calculate_volatility(market_data['close'].pct_change())
# Indicateurs spécifiques au secteur immobilier
mortgage_rate_impact = cls._estimate_mortgage_rate_impact(market_data)
housing_sentiment = cls._calculate_sector_sentiment(market_data, 'real_estate')
# Combiner en score global
housing_score = np.mean([
price_momentum * 0.3,
volume_trend * 0.2,
(1 - volatility) * 0.15,
mortgage_rate_impact * 0.2,
housing_sentiment * 0.15
])
return float(np.clip(housing_score, 0, 1))
except Exception as e:
print(f"Error estimating housing market: {e}")
return 0.5
@staticmethod
def _estimate_mortgage_rate_impact(market_data: pd.DataFrame) -> float:
"""Estimate impact of mortgage rates on housing market"""
try:
# Utiliser le rendement des obligations comme proxy du taux hypothécaire
if 'yield' in market_data.columns:
current_yield = market_data['yield'].iloc[-1]
avg_yield = market_data['yield'].mean()
# Impact normalisé (plus le taux est bas, meilleur est le score)
rate_impact = 1 - (current_yield / avg_yield)
return float(np.clip((rate_impact + 1) / 2, 0, 1))
return 0.5
except Exception as e:
print(f"Error estimating mortgage rate impact: {e}")
return 0.5
@classmethod
def _calculate_sector_trend(cls,
market_data: pd.DataFrame,
sector: str,
window: int = 63) -> float:
"""Calculate price trend for specific market sector"""
try:
sector_data = cls._get_sector_data(market_data, sector)
if sector_data is None:
return 0.5
# Calculer la tendance des prix
returns = sector_data['close'].pct_change()
trend = returns.rolling(window=window).mean()
momentum = cls._calculate_momentum(returns)
# Combiner tendance et momentum
sector_trend = 0.7 * trend.iloc[-1] + 0.3 * momentum
return float(np.clip((sector_trend + 1) / 2, 0, 1))
except Exception as e:
print(f"Error calculating sector trend: {e}")
return 0.5
@staticmethod
def _calculate_sector_volume_trend(market_data: pd.DataFrame,
sector: str,
window: int = 21) -> float:
"""Calculate volume trend for specific market sector"""
try:
# Calculer la tendance du volume
volume = market_data['volume']
volume_ma = volume.rolling(window=window).mean()
volume_trend = (volume.iloc[-1] / volume_ma.iloc[-1]) - 1
# Normaliser entre 0 et 1
return float(np.clip((volume_trend + 1) / 2, 0, 1))
except Exception as e:
print(f"Error calculating sector volume trend: {e}")
return 0.5
@staticmethod
def _calculate_sector_volatility(market_data: pd.DataFrame,
sector: str,
window: int = 21) -> float:
"""Calculate volatility for specific market sector"""
try:
returns = market_data['close'].pct_change()
volatility = returns.rolling(window=window).std() * np.sqrt(252)
return float(np.clip(volatility, 0, 1))
except Exception as e:
print(f"Error calculating sector volatility: {e}")
return 0.5
@classmethod
def _analyze_economic_cycle(cls, market_data: pd.DataFrame) -> Dict[str, float]:
"""Analyze current position in economic cycle"""
try:
# Calculer les indicateurs de cycle
trend_strength = cls._calculate_trend_strength(market_data)
cycle_position = cls._estimate_cycle_position(market_data)
momentum_indicators = cls._calculate_cycle_momentum(market_data)
leading_indicators = cls._calculate_leading_indicators(market_data)
return {
'cycle_position': float(cycle_position),
'trend_strength': float(trend_strength),
'momentum_score': float(momentum_indicators['composite_momentum']),
'expansion_probability': float(momentum_indicators['expansion_prob']),
'contraction_probability': float(momentum_indicators['contraction_prob']),
'turning_point_probability': float(cls._estimate_turning_point_probability(
market_data, leading_indicators
)),
'cycle_maturity': float(cls._estimate_cycle_maturity(
market_data, cycle_position
)),
'leading_indicators': leading_indicators
}
except Exception as e:
print(f"Error analyzing economic cycle: {e}")
return {'cycle_position': 0.5}
@staticmethod
def _calculate_trend_strength(market_data: pd.DataFrame, windows: List[int] = [50, 100, 200]) -> float:
"""Calculate multi-timeframe trend strength"""
try:
price = market_data['close']
trend_scores = []
for window in windows:
ma = price.rolling(window=window).mean()
# Score basé sur la position du prix par rapport à la MA
score = (price.iloc[-1] / ma.iloc[-1]) - 1
trend_scores.append(score)
# Moyenne pondérée des scores (plus de poids aux MAs courtes)
weights = np.array([0.5, 0.3, 0.2])
composite_score = np.average(trend_scores, weights=weights)
return float(np.clip((composite_score + 1) / 2, 0, 1))
except Exception as e:
print(f"Error calculating trend strength: {e}")
return 0.5
@classmethod
def _estimate_cycle_position(cls, market_data: pd.DataFrame) -> float:
"""Estimate current position in economic cycle (0 = trough, 0.5 = mid-cycle, 1 = peak)"""
try:
# Calculer les indicateurs composites
price_cycle = cls._calculate_price_cycle(market_data)
volume_cycle = cls._calculate_volume_cycle(market_data)
momentum_cycle = cls._calculate_momentum_cycle(market_data)
# Combiner les indicateurs
cycle_position = np.mean([
price_cycle * 0.4,
volume_cycle * 0.3,
momentum_cycle * 0.3
])
return float(np.clip(cycle_position, 0, 1))
except Exception as e:
print(f"Error estimating cycle position: {e}")
return 0.5
@staticmethod
def _calculate_price_cycle(market_data: pd.DataFrame, window: int = 252) -> float:
"""Calculate price-based cycle indicator"""
try:
price = market_data['close']
# Calculer le prix relatif à la tendance long terme
long_ma = price.rolling(window=window).mean()
relative_price = (price - long_ma) / long_ma
# Normaliser entre 0 et 1
normalized_position = (relative_price - relative_price.min()) / \
(relative_price.max() - relative_price.min())
return float(normalized_position.iloc[-1])
except Exception as e:
print(f"Error calculating price cycle: {e}")
return 0.5
@staticmethod
def _calculate_cycle_momentum(market_data: pd.DataFrame) -> Dict[str, float]:
"""Calculate cycle momentum indicators"""
try:
returns = market_data['close'].pct_change()
# Calculer les probabilités de régime
gmm = GaussianMixture(n_components=2, random_state=42)
features = np.column_stack([
returns.rolling(window=21).mean(),
returns.rolling(window=21).std()
])
gmm.fit(features)
# Prédire les probabilités
current_probs = gmm.predict_proba(features[-1].reshape(1, -1))[0]
return {
'composite_momentum': float(np.mean([
returns.rolling(window=21).mean().iloc[-1],
returns.rolling(window=63).mean().iloc[-1],
returns.rolling(window=252).mean().iloc[-1]
])),
'expansion_prob': float(current_probs[0]),
'contraction_prob': float(current_probs[1])
}
except Exception as e:
print(f"Error calculating cycle momentum: {e}")
return {
'composite_momentum': 0.0,
'expansion_prob': 0.5,
'contraction_prob': 0.5
}
@classmethod
def _calculate_leading_indicators(cls, market_data: pd.DataFrame) -> Dict[str, float]:
"""Calculate composite leading indicators"""
try:
yield_curve = cls._calculate_yield_curve_indicator(market_data)
credit_spread = cls._calculate_credit_spread_indicator(market_data)
market_breadth = cls._calculate_market_breadth(market_data)
momentum = cls._calculate_multi_timeframe_momentum(market_data)
return {
'yield_curve': float(yield_curve),
'credit_spread': float(credit_spread),
'market_breadth': float(market_breadth),
'momentum': float(momentum),
'composite_leading': float(np.mean([
yield_curve * 0.3,
credit_spread * 0.3,
market_breadth * 0.2,
momentum * 0.2
]))
}
except Exception as e:
print(f"Error calculating leading indicators: {e}")
return {'composite_leading': 0.5}
@classmethod
def _estimate_turning_point_probability(cls,
market_data: pd.DataFrame,
leading_indicators: Dict[str, float]) -> float:
"""Estimate probability of market turning point"""
try:
# Analyser les signaux de retournement
technical_signals = cls._analyze_technical_turning_points(market_data)
momentum_signals = cls._analyze_momentum_turning_points(market_data)
breadth_signals = cls._analyze_market_breadth_signals(market_data)
volume_signals = cls._analyze_volume_patterns(market_data)
# Calculer la probabilité composite
turning_point_prob = np.mean([
technical_signals['turning_prob'] * 0.3,
momentum_signals['turning_prob'] * 0.25,
breadth_signals['turning_prob'] * 0.25,
volume_signals['turning_prob'] * 0.2
])
# Ajuster avec les indicateurs avancés
if leading_indicators['composite_leading'] < 0.3 or leading_indicators['composite_leading'] > 0.7:
turning_point_prob *= 1.2 # Augmenter la probabilité aux extrêmes
return float(np.clip(turning_point_prob, 0, 1))
except Exception as e:
print(f"Error estimating turning point probability: {e}")
return 0.5
@staticmethod
def _analyze_technical_turning_points(market_data: pd.DataFrame) -> Dict[str, float]:
"""Analyze technical patterns for turning points"""
try:
price = market_data['close']
# Calculer les moyennes mobiles
ma_50 = price.rolling(window=50).mean()
ma_200 = price.rolling(window=200).mean()
# Détecter les croisements de moyennes mobiles
death_cross = (ma_50.iloc[-1] < ma_200.iloc[-1]) and (ma_50.iloc[-2] > ma_200.iloc[-2])
golden_cross = (ma_50.iloc[-1] > ma_200.iloc[-1]) and (ma_50.iloc[-2] < ma_200.iloc[-2])
# Détecter les divergences
price_momentum = price.pct_change().rolling(window=14).mean()
rsi = cls._calculate_rsi(price)
divergence_score = cls._calculate_divergence_score(price, rsi)
return {
'turning_prob': float(np.mean([
death_cross * 1.0,
golden_cross * 1.0,
divergence_score
])),
'bearish_signals': death_cross or (divergence_score > 0.7),
'bullish_signals': golden_cross or (divergence_score < 0.3)
}
except Exception as e:
print(f"Error analyzing technical turning points: {e}")
return {'turning_prob': 0.5}
@staticmethod
def _calculate_divergence_score(price: pd.Series, indicator: pd.Series) -> float:
"""Calculate divergence score between price and indicator"""
try:
# Calculer les tendances
price_trend = price.pct_change().rolling(window=20).mean()
indicator_trend = indicator.diff().rolling(window=20).mean()
# Détecter les divergences
bullish_div = (price_trend.iloc[-1] < 0) and (indicator_trend.iloc[-1] > 0)
bearish_div = (price_trend.iloc[-1] > 0) and (indicator_trend.iloc[-1] < 0)
if bullish_div:
return float(abs(price_trend.iloc[-1] - indicator_trend.iloc[-1]))
elif bearish_div:
return float(1 - abs(price_trend.iloc[-1] - indicator_trend.iloc[-1]))
return 0.5
except Exception as e:
print(f"Error calculating divergence score: {e}")
return 0.5
@classmethod
def _analyze_momentum_turning_points(cls, market_data: pd.DataFrame) -> Dict[str, float]:
"""Analyze momentum indicators for turning points"""
try:
# Calculer plusieurs indicateurs de momentum
rsi = cls._calculate_rsi(market_data['close'])
macd = cls._calculate_macd(market_data['close'])
stochastic = cls._calculate_stochastic(market_data)
# Détecter les conditions de surachat/survente
overbought = (rsi > 70) or (stochastic > 80)
oversold = (rsi < 30) or (stochastic < 20)
# Détecter les divergences de momentum
momentum_divergence = cls._detect_momentum_divergence(market_data, macd)
return {
'turning_prob': float(np.mean([
overbought * 0.8,
oversold * 0.8,
momentum_divergence * 0.7
])),
'overbought': bool(overbought),
'oversold': bool(oversold)
}
except Exception as e:
print(f"Error analyzing momentum turning points: {e}")
return {'turning_prob': 0.5}
@staticmethod
def _calculate_stochastic(market_data: pd.DataFrame, period: int = 14) -> float:
"""Calculate stochastic oscillator"""
try:
high = market_data['high'].rolling(window=period).max()
low = market_data['low'].rolling(window=period).min()
close = market_data['close']
k = 100 * (close - low) / (high - low)
d = k.rolling(window=3).mean()
return float(d.iloc[-1])
except Exception as e:
print(f"Error calculating stochastic: {e}")
return 50.0
@classmethod
def _estimate_cycle_maturity(cls,
market_data: pd.DataFrame,
cycle_position: float) -> Dict[str, float]:
"""Estimate cycle maturity and remaining duration"""
try:
# Analyser les caractéristiques du cycle
cycle_duration = cls._analyze_cycle_duration(market_data)
trend_strength = cls._calculate_trend_strength(market_data)
momentum_characteristics = cls._analyze_momentum_characteristics(market_data)
volatility_regime = cls._analyze_volatility_regime(market_data)
# Estimer la maturité relative
maturity_score = cls._calculate_maturity_score(
cycle_position,
cycle_duration,
trend_strength,
momentum_characteristics,
volatility_regime
)
return {
'maturity_score': float(maturity_score),
'estimated_remaining_duration': float(cls._estimate_remaining_duration(
maturity_score, cycle_duration
)),
'cycle_characteristics': {
'duration': float(cycle_duration['current_duration']),
'average_duration': float(cycle_duration['average_duration']),
'trend_strength': float(trend_strength),
'momentum_state': str(momentum_characteristics['state']),
'volatility_regime': str(volatility_regime['current_regime'])
}
}
except Exception as e:
print(f"Error estimating cycle maturity: {e}")
return {'maturity_score': 0.5}
@staticmethod
def _analyze_cycle_duration(market_data: pd.DataFrame) -> Dict[str, float]:
"""Analyze cycle duration characteristics"""
try:
returns = market_data['close'].pct_change()
# Détecter les points de retournement
peaks = cls._detect_peaks(returns)
troughs = cls._detect_troughs(returns)
# Calculer les durées des cycles
cycle_durations = []
for i in range(len(peaks)-1):
cycle_durations.append(peaks[i+1] - peaks[i])
return {
'current_duration': float(len(returns) - peaks[-1]),
'average_duration': float(np.mean(cycle_durations)),
'min_duration': float(np.min(cycle_durations)),
'max_duration': float(np.max(cycle_durations))
}
except Exception as e:
print(f"Error analyzing cycle duration: {e}")
return {
'current_duration': 0.0,
'average_duration': 252.0 # Durée moyenne d'un cycle
}
@staticmethod
def _detect_peaks(series: pd.Series, window: int = 21) -> List[int]:
"""Detect peaks in time series"""
peaks = []
for i in range(window, len(series)-window):
if all(series[i] > series[i-j] for j in range(1, window+1)) and \
all(series[i] > series[i+j] for j in range(1, window+1)):
peaks.append(i)
return peaks
@staticmethod
def _detect_troughs(series: pd.Series, window: int = 21) -> List[int]:
"""Detect troughs in time series"""
troughs = []
for i in range(window, len(series)-window):
if all(series[i] < series[i-j] for j in range(1, window+1)) and \
all(series[i] < series[i+j] for j in range(1, window+1)):
troughs.append(i)
return troughs
@classmethod
def _analyze_momentum_characteristics(cls, market_data: pd.DataFrame) -> Dict[str, Any]:
"""Analyze momentum characteristics across multiple timeframes"""
try:
returns = market_data['close'].pct_change()
# Calculer le momentum sur différentes périodes
momentum_metrics = {
'short_term': cls._calculate_momentum(returns, 21),
'medium_term': cls._calculate_momentum(returns, 63),
'long_term': cls._calculate_momentum(returns, 252)
}
# Déterminer l'état du momentum
momentum_state = cls._determine_momentum_state(momentum_metrics)
return {
'metrics': momentum_metrics,
'state': momentum_state,
'strength': float(cls._calculate_momentum_strength(momentum_metrics)),
'consistency': float(cls._calculate_momentum_consistency(momentum_metrics))
}
except Exception as e:
print(f"Error analyzing momentum characteristics: {e}")
return {'state': 'neutral'}
@staticmethod
def _determine_momentum_state(momentum_metrics: Dict[str, float]) -> str:
"""Determine current momentum state"""
# Pondération des différentes périodes
weights = {'short_term': 0.5, 'medium_term': 0.3, 'long_term': 0.2}
weighted_momentum = sum(m * weights[k] for k, m in momentum_metrics.items())
if weighted_momentum > 0.5:
return 'strong_positive'
elif weighted_momentum > 0.2:
return 'positive'
elif weighted_momentum < -0.5:
return 'strong_negative'
elif weighted_momentum < -0.2:
return 'negative'
return 'neutral'
@staticmethod
def _calculate_momentum_strength(momentum_metrics: Dict[str, float]) -> float:
"""Calculate overall momentum strength"""
try:
# Utiliser la moyenne absolue des métriques de momentum
return float(np.mean([abs(m) for m in momentum_metrics.values()]))
except Exception as e:
print(f"Error calculating momentum strength: {e}")
return 0.0
@staticmethod
def _calculate_momentum_consistency(momentum_metrics: Dict[str, float]) -> float:
"""Calculate momentum consistency across timeframes"""
try:
# Vérifier la cohérence des signes
signs = [np.sign(m) for m in momentum_metrics.values()]
return float(abs(np.mean(signs)))
except Exception as e:
print(f"Error calculating momentum consistency: {e}")
return 0.0
@classmethod
def _analyze_volatility_regime(cls, market_data: pd.DataFrame) -> Dict[str, Any]:
"""Analyze volatility regime characteristics using multiple methods"""
try:
returns = market_data['close'].pct_change()
# Calculer les métriques de volatilité
realized_vol = cls._calculate_realized_volatility(returns)
conditional_vol = cls._estimate_conditional_volatility(returns)
regime_probs = cls._estimate_volatility_regime_probabilities(returns)
vol_term_structure = cls._analyze_volatility_term_structure(returns)
# Déterminer le régime actuel
current_regime = cls._identify_volatility_regime(
realized_vol,
conditional_vol,
regime_probs
)
return {
'current_regime': str(current_regime),
'realized_volatility': float(realized_vol),
'conditional_volatility': float(conditional_vol),
'regime_probabilities': regime_probs,
'term_structure': vol_term_structure,
'regime_metrics': {
'persistence': float(cls._calculate_regime_persistence(returns)),
'transition_probability': float(cls._calculate_regime_transition_prob(returns))
}
}
except Exception as e:
print(f"Error analyzing volatility regime: {e}")
return {'current_regime': 'normal'}
@staticmethod
def _calculate_realized_volatility(returns: pd.Series,
windows: List[int] = [21, 63, 252]) -> Dict[str, float]:
"""Calculate realized volatility across multiple timeframes"""
try:
realized_vols = {}
for window in windows:
vol = returns.rolling(window=window).std() * np.sqrt(252)
realized_vols[f'{window}d'] = float(vol.iloc[-1])
return realized_vols
except Exception as e:
print(f"Error calculating realized volatility: {e}")
return {f'{w}d': 0.15 for w in windows}
@staticmethod
def _estimate_conditional_volatility(returns: pd.Series) -> float:
"""Estimate conditional volatility using GARCH model"""
try:
# Utiliser un modèle GARCH(1,1)
model = arch_model(returns, vol='Garch', p=1, q=1)
result = model.fit(disp='off')
forecast = result.forecast(horizon=1)
return float(np.sqrt(forecast.variance.values[-1][0]))
except Exception as e:
print(f"Error estimating conditional volatility: {e}")
return float(returns.std() * np.sqrt(252))
@staticmethod
def _estimate_volatility_regime_probabilities(returns: pd.Series) -> Dict[str, float]:
"""Estimate probabilities of different volatility regimes using GMM"""
try:
# Préparer les données pour GMM
vol_data = returns.rolling(window=21).std().dropna().values.reshape(-1, 1)
# Fit GMM avec 3 composantes (low, normal, high vol)
gmm = GaussianMixture(n_components=3, random_state=42)
gmm.fit(vol_data)
# Calculer les probabilités du régime actuel
current_probs = gmm.predict_proba(vol_data[-1].reshape(1, -1))[0]
return {
'low_vol': float(current_probs[0]),
'normal_vol': float(current_probs[1]),
'high_vol': float(current_probs[2])
}
except Exception as e:
print(f"Error estimating regime probabilities: {e}")
return {'normal_vol': 1.0}
@staticmethod
def _analyze_volatility_term_structure(returns: pd.Series) -> Dict[str, float]:
"""Analyze volatility term structure"""
try:
windows = [5, 21, 63, 252] # 1w, 1m, 3m, 1y
vols = {}
for window in windows:
vol = returns.rolling(window=window).std() * np.sqrt(252)
vols[f'{window}d'] = float(vol.iloc[-1])
# Calculer la pente de la structure
slope = (vols['252d'] - vols['21d']) / vols['21d']
return {
'term_structure': vols,
'slope': float(slope),
'convexity': float(vols['63d'] / np.mean([vols['21d'], vols['252d']]))
}
except Exception as e:
print(f"Error analyzing volatility term structure: {e}")
return {'slope': 0.0}
@staticmethod
def _calculate_regime_persistence(returns: pd.Series, lookback: int = 252) -> float:
"""Calculate volatility regime persistence"""
try:
vol = returns.rolling(window=21).std() * np.sqrt(252)
vol_median = vol.median()
# Calculer la durée moyenne des régimes
regime = (vol > vol_median).astype(int)
regime_changes = regime.diff().abs()
persistence = 1 - (regime_changes.sum() / len(regime_changes))
return float(persistence)
except Exception as e:
print(f"Error calculating regime persistence: {e}")
return 0.5
@classmethod
def _analyze_cycle_transitions(cls, market_data: pd.DataFrame) -> Dict[str, Any]:
"""Analyze market cycle transitions and rotation signals"""
try:
# Analyser les différentes composantes
price_signals = cls._analyze_price_transition_signals(market_data)
sector_rotation = cls._analyze_sector_rotation(market_data)
breadth_signals = cls._analyze_breadth_transitions(market_data)
volatility_signals = cls._analyze_volatility_transitions(market_data)
# Calculer la probabilité de transition de cycle
transition_probability = cls._calculate_transition_probability(
price_signals,
sector_rotation,
breadth_signals,
volatility_signals
)
return {
'transition_probability': float(transition_probability),
'transition_type': cls._determine_transition_type(
price_signals,
sector_rotation,
breadth_signals
),
'price_signals': price_signals,
'sector_rotation': sector_rotation,
'breadth_signals': breadth_signals,
'volatility_signals': volatility_signals,
'confirmation_metrics': cls._calculate_confirmation_metrics(market_data)
}
except Exception as e:
print(f"Error analyzing cycle transitions: {e}")
return {'transition_probability': 0.0}
@staticmethod
def _analyze_price_transition_signals(market_data: pd.DataFrame) -> Dict[str, float]:
"""Analyze price-based transition signals"""
try:
price = market_data['close']
returns = price.pct_change()
# Calculer les moyennes mobiles
ma_50 = price.rolling(window=50).mean()
ma_200 = price.rolling(window=200).mean()
# Détecter les croisements et divergences
death_cross = (ma_50 < ma_200) & (ma_50.shift(1) > ma_200.shift(1))
golden_cross = (ma_50 > ma_200) & (ma_50.shift(1) < ma_200.shift(1))
# Calculer les tendances de prix
price_trend = returns.rolling(window=63).mean()
momentum = returns.rolling(window=21).mean()
return {
'death_cross_signal': float(death_cross.iloc[-1]),
'golden_cross_signal': float(golden_cross.iloc[-1]),
'trend_strength': float(price_trend.iloc[-1]),
'momentum_signal': float(momentum.iloc[-1]),
'price_structure': cls._analyze_price_structure(price)
}
except Exception as e:
print(f"Error analyzing price transition signals: {e}")
return {'trend_strength': 0.0}
@classmethod
def _analyze_sector_rotation(cls, market_data: pd.DataFrame) -> Dict[str, Any]:
"""Analyze sector rotation patterns"""
try:
# Liste des secteurs clés
sectors = ['defensive', 'cyclical', 'technology', 'financial', 'energy']
sector_returns = {}
sector_momentum = {}
for sector in sectors:
sector_data = cls._get_sector_data(market_data, sector)
if sector_data is not None:
returns = sector_data['close'].pct_change()
sector_returns[sector] = float(returns.mean())
sector_momentum[sector] = float(returns.rolling(window=63).mean().iloc[-1])
# Détecter la rotation sectorielle
rotation_pattern = cls._detect_sector_rotation_pattern(sector_returns, sector_momentum)
return {
'sector_returns': sector_returns,
'sector_momentum': sector_momentum,
'rotation_pattern': rotation_pattern,
'rotation_strength': float(cls._calculate_rotation_strength(sector_returns)),
'leading_sectors': cls._identify_leading_sectors(sector_momentum)
}
except Exception as e:
print(f"Error analyzing sector rotation: {e}")
return {'rotation_pattern': 'unclear'}
@staticmethod
def _detect_sector_rotation_pattern(sector_returns: Dict[str, float],
sector_momentum: Dict[str, float]) -> str:
"""Detect the current sector rotation pattern"""
try:
# Classer les secteurs par performance
ranked_sectors = sorted(sector_returns.items(), key=lambda x: x[1], reverse=True)
# Analyser le pattern de rotation
if ranked_sectors[0][0] in ['technology', 'financial']:
return 'early_cycle'
elif ranked_sectors[0][0] in ['energy', 'materials']:
return 'mid_cycle'
elif ranked_sectors[0][0] in ['defensive', 'utilities']:
return 'late_cycle'
else:
return 'transition'
except Exception as e:
print(f"Error detecting sector rotation pattern: {e}")
return 'unclear'
@staticmethod
def _calculate_rotation_strength(sector_returns: Dict[str, float]) -> float:
"""Calculate the strength of sector rotation"""
try:
# Calculer la dispersion des performances sectorielles
returns = list(sector_returns.values())
return float(np.std(returns) / abs(np.mean(returns)))
except Exception as e:
print(f"Error calculating rotation strength: {e}")
return 0.0
@classmethod
def _analyze_breadth_transitions(cls, market_data: pd.DataFrame) -> Dict[str, Any]:
"""Analyze market breadth transition signals"""
try:
# Calculer les indicateurs de largeur de marché
advance_decline = cls._calculate_advance_decline_ratio(market_data)
new_highs_lows = cls._calculate_new_highs_lows_ratio(market_data)
participation = cls._calculate_market_participation(market_data)
# Analyser les divergences
breadth_divergence = cls._detect_breadth_divergence(
market_data['close'],
advance_decline,
participation
)
return {
'advance_decline_ratio': float(advance_decline[-1]),
'new_highs_lows_ratio': float(new_highs_lows[-1]),
'participation_rate': float(participation[-1]),
'breadth_divergence': breadth_divergence,
'breadth_momentum': float(cls._calculate_breadth_momentum(advance_decline)),
'breadth_trend': str(cls._determine_breadth_trend(
advance_decline,
new_highs_lows,
participation
))
}
except Exception as e:
print(f"Error analyzing breadth transitions: {e}")
return {'participation_rate': 0.5}
@staticmethod
def _calculate_advance_decline_ratio(market_data: pd.DataFrame, window: int = 21) -> pd.Series:
"""Calculate advance-decline ratio with moving average"""
try:
# Simuler le ratio advances/declines à partir des données disponibles
returns = market_data['close'].pct_change()
advances = (returns > 0).astype(int).rolling(window=window).sum()
declines = (returns < 0).astype(int).rolling(window=window).sum()
return advances / (declines + 1e-6) # éviter division par zéro
except Exception as e:
print(f"Error calculating A/D ratio: {e}")
return pd.Series(1.0, index=market_data.index)
@classmethod
def _calculate_confirmation_metrics(cls, market_data: pd.DataFrame) -> Dict[str, Any]:
"""Calculate confirmation metrics for cycle transitions"""
try:
# Analyser différents types de confirmation
price_confirmation = cls._analyze_price_confirmation(market_data)
volume_confirmation = cls._analyze_volume_confirmation(market_data)
volatility_confirmation = cls._analyze_volatility_confirmation(market_data)
breadth_confirmation = cls._analyze_breadth_confirmation(market_data)
# Calculer le score de confirmation global
confirmation_score = np.mean([
price_confirmation['score'],
volume_confirmation['score'],
volatility_confirmation['score'],
breadth_confirmation['score']
])
return {
'confirmation_score': float(confirmation_score),
'price_confirmation': price_confirmation,
'volume_confirmation': volume_confirmation,
'volatility_confirmation': volatility_confirmation,
'breadth_confirmation': breadth_confirmation,
'trend_quality': cls._calculate_trend_quality(market_data),
'momentum_quality': cls._calculate_momentum_quality(market_data)
}
except Exception as e:
print(f"Error calculating confirmation metrics: {e}")
return {'confirmation_score': 0.5}
@staticmethod
def _analyze_price_confirmation(market_data: pd.DataFrame) -> Dict[str, float]:
"""Analyze price-based confirmation signals"""
try:
price = market_data['close']
returns = price.pct_change()
# Calculer les confirmations de prix
trend_confirmation = cls._calculate_trend_confirmation(price)
momentum_confirmation = cls._calculate_momentum_confirmation(returns)
pattern_confirmation = cls._analyze_chart_patterns(price)
return {
'score': float(np.mean([
trend_confirmation['score'],
momentum_confirmation['score'],
pattern_confirmation['score']
])),
'trend_confirmation': trend_confirmation,
'momentum_confirmation': momentum_confirmation,
'pattern_confirmation': pattern_confirmation
}
except Exception as e:
print(f"Error analyzing price confirmation: {e}")
return {'score': 0.5}
@staticmethod
def _analyze_volume_confirmation(market_data: pd.DataFrame) -> Dict[str, float]:
"""Analyze volume-based confirmation signals"""
try:
volume = market_data['volume']
price = market_data['close']
# Calculer les tendances de volume
volume_trend = volume.rolling(window=21).mean().pct_change()
price_volume_correlation = pd.Series(price).rolling(window=21).corr(pd.Series(volume))
# Analyser la confirmation par le volume
volume_confirmation = float(
(volume_trend.iloc[-1] > 0) and
(price_volume_correlation.iloc[-1] > 0.3)
)
return {
'score': float(volume_confirmation),
'volume_trend': float(volume_trend.iloc[-1]),
'price_volume_corr': float(price_volume_correlation.iloc[-1]),
'volume_quality': cls._calculate_volume_quality(volume)
}
except Exception as e:
print(f"Error analyzing volume confirmation: {e}")
return {'score': 0.5}
@classmethod
def _analyze_leading_rotation_indicators(cls, market_data: pd.DataFrame) -> Dict[str, Any]:
"""Analyze leading indicators for market rotation and regime changes"""
try:
# Analyser les différentes composantes de rotation
sector_signals = cls._analyze_sector_leadership_changes(market_data)
style_rotation = cls._analyze_style_factor_rotation(market_data)
macro_signals = cls._analyze_macro_rotation_signals(market_data)
correlation_signals = cls._analyze_correlation_regime_changes(market_data)
volatility_structure = cls._analyze_volatility_term_structure_changes(market_data)
credit_signals = cls._analyze_credit_market_signals(market_data)
# Calculer les probabilités de transition de régime
regime_transition_probs = cls._calculate_regime_transition_probabilities(
sector_signals,
style_rotation,
macro_signals,
correlation_signals,
volatility_structure,
credit_signals
)
# Identifier les secteurs en rotation
rotation_candidates = cls._identify_sector_rotation_candidates(
sector_signals['momentum'],
sector_signals['relative_strength']
)
# Analyser la qualité des signaux
signal_quality = cls._assess_rotation_signal_quality(
sector_signals,
style_rotation,
macro_signals
)
return {
'regime_transition_probabilities': regime_transition_probs,
'sector_signals': sector_signals,
'style_rotation': style_rotation,
'macro_signals': macro_signals,
'correlation_signals': correlation_signals,
'volatility_structure': volatility_structure,
'credit_signals': credit_signals,
'rotation_candidates': rotation_candidates,
'signal_quality': signal_quality,
'leading_indicators': {
'early_cycle': cls._calculate_early_cycle_probability(macro_signals, credit_signals),
'mid_cycle': cls._calculate_mid_cycle_probability(sector_signals, style_rotation),
'late_cycle': cls._calculate_late_cycle_probability(volatility_structure, correlation_signals)
},
'transition_confirmation': cls._analyze_transition_confirmation_signals(market_data)
}
except Exception as e:
print(f"Error analyzing leading rotation indicators: {e}")
return {'regime_transition_probabilities': {'current_to_new': 0.0}}
@staticmethod
def _analyze_sector_leadership_changes(market_data: pd.DataFrame) -> Dict[str, Any]:
"""Analyze changes in sector leadership and rotation patterns"""
try:
sectors = ['technology', 'financial', 'healthcare', 'consumer', 'industrial',
'energy', 'materials', 'utilities', 'real_estate', 'communication']
sector_metrics = {}
for sector in sectors:
# Calculer les métriques par secteur
sector_data = market_data[market_data['sector'] == sector]
returns = sector_data['close'].pct_change()
# Calculer les métriques de momentum et force relative
momentum = returns.rolling(window=63).mean()
rs = sector_data['close'] / market_data['close'].mean()
relative_strength = rs.rolling(window=21).mean()
# Calculer la volatilité et la corrélation
volatility = returns.rolling(window=21).std()
correlation = returns.rolling(window=63).corr(market_data['close'].pct_change())
sector_metrics[sector] = {
'momentum': float(momentum.iloc[-1]),
'relative_strength': float(relative_strength.iloc[-1]),
'volatility': float(volatility.iloc[-1]),
'correlation': float(correlation.iloc[-1]),
'leadership_score': float(cls._calculate_sector_leadership_score(
momentum.iloc[-1],
relative_strength.iloc[-1],
volatility.iloc[-1],
correlation.iloc[-1]
))
}
# Détecter les changements de leadership
leadership_changes = cls._detect_leadership_changes(sector_metrics)
# Calculer les rotations sectorielles
rotation_matrix = cls._calculate_sector_rotation_matrix(sector_metrics)
return {
'sector_metrics': sector_metrics,
'leadership_changes': leadership_changes,
'rotation_matrix': rotation_matrix,
'current_leaders': cls._identify_current_sector_leaders(sector_metrics),
'emerging_leaders': cls._identify_emerging_sector_leaders(sector_metrics),
'rotation_strength': float(cls._calculate_rotation_strength(sector_metrics)),
'rotation_momentum': cls._calculate_rotation_momentum(sector_metrics),
'sector_correlations': cls._calculate_sector_correlations(market_data),
'regime_characteristics': cls._analyze_sector_regime_characteristics(sector_metrics)
}
except Exception as e:
print(f"Error analyzing sector leadership changes: {e}")
return {'sector_metrics': {}}
@staticmethod
def _calculate_sector_leadership_score(momentum: float, relative_strength: float,
volatility: float, correlation: float) -> float:
"""Calculate composite sector leadership score"""
try:
# Normaliser les composantes
norm_momentum = np.clip(momentum / 0.02, -1, 1) # 2% comme référence
norm_rs = np.clip(relative_strength - 1, -1, 1)
norm_vol = 1 - np.clip(volatility / 0.2, 0, 1) # 20% comme référence
norm_corr = 1 - np.abs(correlation) # Préférer une faible corrélation
# Calculer le score composite
weights = {
'momentum': 0.35,
'relative_strength': 0.35,
'volatility': 0.15,
'correlation': 0.15
}
leadership_score = (
weights['momentum'] * norm_momentum +
weights['relative_strength'] * norm_rs +
weights['volatility'] * norm_vol +
weights['correlation'] * norm_corr
)
return float(np.clip(leadership_score, -1, 1))
except Exception as e:
print(f"Error calculating sector leadership score: {e}")
return 0.0
@staticmethod
def _detect_leadership_changes(sector_metrics: Dict[str, Dict[str, float]]) -> Dict[str, Any]:
"""Detect changes in sector leadership patterns"""
try:
# Trier les secteurs par score de leadership
ranked_sectors = sorted(
sector_metrics.items(),
key=lambda x: x[1]['leadership_score'],
reverse=True
)
# Identifier les changements de leadership
current_leaders = ranked_sectors[:3]
previous_leaders = ranked_sectors[3:6]
emerging_leaders = [
sector for sector, metrics in ranked_sectors
if metrics['momentum'] > metrics['relative_strength'] * 1.5
]
# Calculer les métriques de changement
leadership_turnover = len(set(current_leaders) - set(previous_leaders)) / 3
emergence_strength = np.mean([
sector_metrics[sector]['momentum']
for sector in emerging_leaders
]) if emerging_leaders else 0.0
return {
'current_leaders': [sector for sector, _ in current_leaders],
'previous_leaders': [sector for sector, _ in previous_leaders],
'emerging_leaders': emerging_leaders,
'leadership_turnover': float(leadership_turnover),
'emergence_strength': float(emergence_strength),
'leadership_stability': float(1 - leadership_turnover),
'transition_characteristics': {
'type': cls._determine_transition_type(current_leaders, emerging_leaders),
'strength': float(emergence_strength / (leadership_turnover + 1e-6)),
'confirmation': float(len(emerging_leaders) / len(sector_metrics))
}
}
except Exception as e:
print(f"Error detecting leadership changes: {e}")
return {'current_leaders': [], 'emerging_leaders': []}
@classmethod
def _analyze_style_factor_rotation(cls, market_data: pd.DataFrame) -> Dict[str, Any]:
"""Analyze style factor rotation and regime transitions"""
try:
# Analyser les différents facteurs de style
style_factors = {
'value': cls._calculate_value_factor(market_data),
'growth': cls._calculate_growth_factor(market_data),
'momentum': cls._calculate_momentum_factor(market_data),
'quality': cls._calculate_quality_factor(market_data),
'size': cls._calculate_size_factor(market_data),
'volatility': cls._calculate_volatility_factor(market_data),
'yield': cls._calculate_yield_factor(market_data)
}
# Calculer les rotations entre facteurs
factor_rotations = cls._calculate_factor_rotations(style_factors)
# Analyser les régimes de facteurs
factor_regimes = cls._analyze_factor_regimes(style_factors)
# Identifier les facteurs dominants et émergents
dominant_factors = cls._identify_dominant_factors(style_factors)
emerging_factors = cls._identify_emerging_factors(style_factors, factor_rotations)
# Calculer les corrélations entre facteurs
factor_correlations = cls._calculate_factor_correlations(style_factors)
# Analyser la qualité des signaux de rotation
rotation_quality = cls._analyze_factor_rotation_quality(
style_factors,
factor_rotations,
factor_correlations
)
return {
'style_factors': style_factors,
'factor_rotations': factor_rotations,
'factor_regimes': factor_regimes,
'dominant_factors': dominant_factors,
'emerging_factors': emerging_factors,
'factor_correlations': factor_correlations,
'rotation_quality': rotation_quality,
'regime_characteristics': {
'current_regime': cls._determine_factor_regime(factor_regimes),
'regime_strength': float(cls._calculate_regime_strength(factor_regimes)),
'transition_probability': float(cls._calculate_regime_transition_probability(factor_regimes))
},
'style_forecasts': cls._forecast_style_factor_returns(style_factors, factor_regimes)
}
except Exception as e:
print(f"Error analyzing style factor rotation: {e}")
return {'style_factors': {}}
@staticmethod
def _calculate_value_factor(market_data: pd.DataFrame) -> Dict[str, float]:
"""Calculate value factor returns and characteristics"""
try:
# Calculer les ratios de valeur
pe_ratio = market_data['price'] / market_data['earnings']
pb_ratio = market_data['price'] / market_data['book_value']
# Calculer le score composite de valeur
value_score = 1 / (
0.5 * (pe_ratio / pe_ratio.mean()) +
0.5 * (pb_ratio / pb_ratio.mean())
)
# Calculer les rendements du facteur
value_returns = value_score.rolling(window=21).mean().pct_change()
return {
'score': float(value_score.iloc[-1]),
'momentum': float(value_returns.mean()),
'volatility': float(value_returns.std()),
'z_score': float((value_score.iloc[-1] - value_score.mean()) / value_score.std()),
'percentile': float(stats.percentileofscore(value_score, value_score.iloc[-1]) / 100),
'trend': str(cls._determine_factor_trend(value_returns))
}
except Exception as e:
print(f"Error calculating value factor: {e}")
return {'score': 0.0}
@staticmethod
def _calculate_factor_rotations(style_factors: Dict[str, Dict[str, float]]) -> Dict[str, float]:
"""Calculate factor rotations and transitions"""
try:
rotations = {}
for factor1 in style_factors:
for factor2 in style_factors:
if factor1 < factor2:
# Calculer la rotation relative entre les facteurs
rotation = (style_factors[factor1]['momentum'] -
style_factors[factor2]['momentum'])
rotations[f'{factor1}_vs_{factor2}'] = float(rotation)
# Calculer les métriques de rotation globales
return {
'factor_rotations': rotations,
'rotation_intensity': float(np.mean([abs(r) for r in rotations.values()])),
'max_rotation': float(max(abs(min(rotations.values())),
abs(max(rotations.values())))),
'rotation_dispersion': float(np.std(list(rotations.values()))),
'dominant_rotation': max(rotations.items(), key=lambda x: abs(x[1]))[0]
}
except Exception as e:
print(f"Error calculating factor rotations: {e}")
return {'rotation_intensity': 0.0}
@classmethod
def _analyze_factor_regimes(cls, style_factors: Dict[str, Dict[str, float]]) -> Dict[str, Any]:
"""Analyze factor regimes using machine learning"""
try:
# Préparer les données pour le clustering
factor_data = np.array([
[factor['score'] for factor in style_factors.values()],
[factor['momentum'] for factor in style_factors.values()],
[factor['volatility'] for factor in style_factors.values()]
]).T
# Utiliser GMM pour détecter les régimes
gmm = GaussianMixture(n_components=3, random_state=42)
regimes = gmm.fit_predict(factor_data)
regime_probs = gmm.predict_proba(factor_data)
# Caractériser les régimes
regime_characteristics = cls._characterize_factor_regimes(
regimes,
regime_probs,
style_factors
)
return {
'current_regime': int(regimes[-1]),
'regime_probabilities': dict(enumerate(regime_probs[-1])),
'regime_characteristics': regime_characteristics,
'regime_stability': float(cls._calculate_regime_stability(regimes)),
'transition_matrix': cls._calculate_regime_transition_matrix(regimes),
'regime_duration': cls._calculate_regime_duration(regimes)
}
except Exception as e:
print(f"Error analyzing factor regimes: {e}")
return {'current_regime': 0}
@classmethod
def _analyze_macro_rotation_signals(cls, market_data: pd.DataFrame) -> Dict[str, Any]:
"""Analyze macroeconomic signals for market rotation"""
try:
# Calculer les indicateurs macro clés
growth_indicators = cls._calculate_growth_indicators(market_data)
inflation_indicators = cls._calculate_inflation_indicators(market_data)
monetary_indicators = cls._calculate_monetary_indicators(market_data)
credit_conditions = cls._analyze_credit_conditions(market_data)
# Déterminer le régime macro actuel
macro_regime = cls._determine_macro_regime(
growth_indicators,
inflation_indicators,
monetary_indicators
)
# Calculer les probabilités de transition
transition_probs = cls._calculate_macro_transition_probabilities(
macro_regime,
growth_indicators,
inflation_indicators,
monetary_indicators,
credit_conditions
)
return {
'current_regime': str(macro_regime),
'transition_probabilities': transition_probs,
'growth_indicators': growth_indicators,
'inflation_indicators': inflation_indicators,
'monetary_indicators': monetary_indicators,
'credit_conditions': credit_conditions,
'regime_implications': cls._calculate_regime_implications(macro_regime),
'asset_allocation_signals': cls._generate_allocation_signals(
macro_regime,
transition_probs
)
}
except Exception as e:
print(f"Error analyzing macro rotation signals: {e}")
return {'current_regime': 'neutral'}
@staticmethod
def _determine_macro_regime(growth: Dict[str, float],
inflation: Dict[str, float],
monetary: Dict[str, float]) -> str:
"""Determine current macroeconomic regime"""
# Déterminer le régime en fonction des indicateurs
if growth['composite_index'] > 0.7 and inflation['level'] < 0.3:
return 'goldilocks'
elif growth['composite_index'] > 0.5 and inflation['level'] > 0.7:
return 'overheating'
elif growth['composite_index'] < 0.3 and inflation['level'] > 0.5:
return 'stagflation'
elif growth['composite_index'] < 0.3 and inflation['level'] < 0.3:
return 'recession'
else:
return 'neutral'
@classmethod
def _generate_allocation_signals(cls,
regime: str,
transition_probs: Dict[str, float]) -> Dict[str, float]:
"""Generate asset allocation signals based on macro regime"""
# Définir les allocations optimales par régime
regime_allocations = {
'goldilocks': {'equities': 0.6, 'bonds': 0.3, 'alternatives': 0.1},
'overheating': {'equities': 0.4, 'bonds': 0.2, 'alternatives': 0.4},
'stagflation': {'equities': 0.2, 'bonds': 0.3, 'alternatives': 0.5},
'recession': {'equities': 0.3, 'bonds': 0.6, 'alternatives': 0.1},
'neutral': {'equities': 0.4, 'bonds': 0.4, 'alternatives': 0.2}
}
# Ajuster les allocations en fonction des probabilités de transition
final_allocation = {}
for asset_class in ['equities', 'bonds', 'alternatives']:
weighted_alloc = sum(
prob * regime_allocations[target_regime][asset_class]
for target_regime, prob in transition_probs.items()
)
final_allocation[asset_class] = weighted_alloc
return final_allocation
@staticmethod
def _calculate_regime_implications(regime: str) -> Dict[str, List[str]]:
"""Calculate investment implications of current regime"""
implications = {
'goldilocks': {
'favorable': ['growth stocks', 'technology', 'consumer discretionary'],
'unfavorable': ['utilities', 'commodities', 'defensive sectors']
},
'overheating': {
'favorable': ['commodities', 'materials', 'energy'],
'unfavorable': ['bonds', 'growth stocks', 'utilities']
},
'stagflation': {
'favorable': ['commodities', 'defensive stocks', 'real assets'],
'unfavorable': ['growth stocks', 'consumer discretionary', 'bonds']
},
'recession': {
'favorable': ['bonds', 'defensive stocks', 'quality factors'],
'unfavorable': ['cyclicals', 'small caps', 'high yield']
},
'neutral': {
'favorable': ['balanced exposure', 'quality factors'],
'unfavorable': ['extreme positioning', 'leverage']
}
}
return implications.get(regime, implications['neutral'])
def get_default_market_conditions(self) -> 'MarketConditions':
"""Return default market conditions"""
return MarketConditions(
current_regime=MarketRegime.LOW_VOLATILITY,
volatility_level=0.15,
market_metrics=MarketMetrics(
volatility=0.15,
trend=0.0,
momentum=0.0,
sentiment=0.5,
liquidity=0.5,
correlation=0.5,
tail_risk=0.5,
regime_probability={'normal': 1.0}
),
macro_indicators=MacroIndicators(
gdp_growth=0.02,
inflation_rate=0.02,
interest_rate=0.02,
unemployment_rate=0.05,
consumer_confidence=0.5,
industrial_production=0.0,
retail_sales=0.0,
housing_market=0.5
),
liquidity_conditions=LiquidityConditions(
bid_ask_spread=0.001,
market_depth=1.0,
trading_volume=1000000,
turnover_ratio=0.2,
amihud_ratio=0.001,
volume_weighted_spread=0.001,
market_resilience=0.5
),
timestamp=datetime.now()
)