Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Optional, Any, Tuple | |
| from datetime import datetime | |
| import logging | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.mixture import GaussianMixture | |
| from arch import arch_model | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass | |
| import logging | |
| from .market_states import MarketRegime, MarketConditions | |
| from .types import MarketData, MLAnalyzerProtocol | |
| from .market_states import MarketRegime, MarketConditions | |
| from .technical_indicators import TechnicalIndicators | |
| from .risk_metrics import EnhancedRiskMetrics, MarketMetrics | |
| def setup_logger(name: str) -> logging.Logger: | |
| """Set up logger with proper formatting""" | |
| logger = logging.getLogger(name) | |
| if not logger.handlers: | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| return logger | |
| class MarketAnalyzer(ABC): | |
| """Abstract base class defining market analyzer interface""" | |
| async def analyze_market_conditions(self, | |
| market_data: MarketData, | |
| additional_data: Optional[Dict[str, Any]] = None) -> MarketConditions: | |
| """Analyze current market conditions""" | |
| pass | |
| async def detect_anomalies(self, | |
| market_data: MarketData, | |
| threshold: float = 0.95) -> List[Dict[str, Any]]: | |
| """Detect market anomalies""" | |
| pass | |
| async def calculate_all_metrics(self, market_data: MarketData) -> MarketMetrics: | |
| """Calculate comprehensive market metrics""" | |
| pass | |
| async def analyze_regime_transitions(self, | |
| market_data: MarketData, | |
| window: int = 252) -> Dict[str, float]: | |
| """Analyze market regime transitions""" | |
| pass | |
| async def calculate_technical_indicators(self, | |
| market_data: MarketData) -> TechnicalIndicators: | |
| """Calculate technical indicators""" | |
| pass | |
| class RiskAssessment: | |
| """Risk assessment metrics and analysis""" | |
| volatility: float # Volatilité historique annualisée | |
| var_95: float # Value at Risk à 95% | |
| cvar_95: float # Conditional Value at Risk à 95% | |
| tail_risk: float # Mesure du risque de queue | |
| correlation_risk: float # Mesure du risque de corrélation | |
| liquidity_risk: float # Mesure du risque de liquidité | |
| regime_risk: float # Risque lié au régime de marché actuel | |
| systemic_risk: float # Mesure du risque systémique | |
| stress_test_results: Dict[str, float] # Résultats des tests de stress | |
| def __post_init__(self): | |
| """Validation après initialisation""" | |
| if not 0 <= self.volatility <= 1: | |
| raise ValueError("Volatility must be between 0 and 1") | |
| if not -1 <= self.var_95 <= 0: | |
| raise ValueError("VaR must be negative and >= -1") | |
| if not -1 <= self.cvar_95 <= 0: | |
| raise ValueError("CVaR must be negative and >= -1") | |
| def get_risk_score(self) -> float: | |
| """Calculate overall risk score between 0 and 1""" | |
| weights = { | |
| 'volatility': 0.3, | |
| 'var': 0.2, | |
| 'cvar': 0.15, | |
| 'tail': 0.1, | |
| 'correlation': 0.05, | |
| 'liquidity': 0.1, | |
| 'regime': 0.05, | |
| 'systemic': 0.05 | |
| } | |
| score = ( | |
| weights['volatility'] * self.volatility + | |
| weights['var'] * abs(self.var_95) + | |
| weights['cvar'] * abs(self.cvar_95) + | |
| weights['tail'] * self.tail_risk + | |
| weights['correlation'] * self.correlation_risk + | |
| weights['liquidity'] * self.liquidity_risk + | |
| weights['regime'] * self.regime_risk + | |
| weights['systemic'] * self.systemic_risk | |
| ) | |
| return min(max(score, 0), 1) | |
| def get_risk_breakdown(self) -> Dict[str, float]: | |
| """Get detailed breakdown of risk components""" | |
| return { | |
| 'volatility_risk': self.volatility, | |
| 'market_risk': abs(self.var_95), | |
| 'tail_risk': self.tail_risk, | |
| 'correlation_risk': self.correlation_risk, | |
| 'liquidity_risk': self.liquidity_risk, | |
| 'regime_risk': self.regime_risk, | |
| 'systemic_risk': self.systemic_risk | |
| } | |
| def get_stress_test_results(self) -> Dict[str, float]: | |
| """Get results of stress test scenarios""" | |
| return self.stress_test_results | |
| def get_risk_alerts(self) -> List[Dict[str, Any]]: | |
| """Generate risk alerts based on thresholds""" | |
| alerts = [] | |
| # Verification des seuils de risque | |
| if self.volatility > 0.3: | |
| alerts.append({ | |
| 'type': 'volatility', | |
| 'level': 'high', | |
| 'message': 'Volatility exceeds normal levels' | |
| }) | |
| if abs(self.var_95) > 0.2: | |
| alerts.append({ | |
| 'type': 'var', | |
| 'level': 'critical', | |
| 'message': 'Value at Risk above critical threshold' | |
| }) | |
| if self.liquidity_risk > 0.7: | |
| alerts.append({ | |
| 'type': 'liquidity', | |
| 'level': 'warning', | |
| 'message': 'High liquidity risk detected' | |
| }) | |
| return alerts | |
| def to_dict(self) -> Dict[str, float]: | |
| """Convert risk assessment to dictionary format""" | |
| return { | |
| 'volatility': self.volatility, | |
| 'var_95': self.var_95, | |
| 'cvar_95': self.cvar_95, | |
| 'tail_risk': self.tail_risk, | |
| 'correlation_risk': self.correlation_risk, | |
| 'liquidity_risk': self.liquidity_risk, | |
| 'regime_risk': self.regime_risk, | |
| 'systemic_risk': self.systemic_risk, | |
| 'risk_score': self.get_risk_score() | |
| } | |
| class MarketAnalysisConfig: | |
| """Configuration for market analysis""" | |
| volatility_window: int = 63 # ~3 months | |
| correlation_window: int = 252 # 1 year | |
| regime_detection_lookback: int = 504 # 2 years | |
| min_data_points: int = 20 | |
| confidence_level: float = 0.95 | |
| use_robust_estimators: bool = True | |
| ml_parameters: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.ml_parameters is None: | |
| self.ml_parameters = { | |
| 'n_estimators': 100, | |
| 'random_state': 42, | |
| 'contamination': 'auto' | |
| } | |
| def validate(self) -> bool: | |
| """Validate configuration parameters""" | |
| try: | |
| assert self.volatility_window > 0 | |
| assert self.correlation_window > 0 | |
| assert self.regime_detection_lookback > 0 | |
| assert 0 < self.confidence_level < 1 | |
| return True | |
| except AssertionError: | |
| return False | |
| class MarketAnalysisResult: | |
| """Container for market analysis results""" | |
| conditions: MarketConditions | |
| metrics: MarketMetrics | |
| technical_indicators: TechnicalIndicators | |
| anomalies: List[Dict[str, Any]] | |
| timestamp: datetime | |
| analysis_duration: float = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert results to dictionary format""" | |
| return { | |
| 'conditions': { | |
| 'regime': self.conditions.current_regime.value, | |
| 'volatility': self.conditions.volatility_level, | |
| 'sentiment': self.conditions.market_sentiment, | |
| 'timestamp': self.timestamp.isoformat() | |
| }, | |
| 'metrics': { | |
| 'volatility': self.metrics.volatility, | |
| 'tail_risk': self.metrics.tail_risk, | |
| 'liquidity': self.metrics.liquidity_score | |
| }, | |
| 'technical': { | |
| 'rsi': self.technical_indicators.rsi, | |
| 'macd': self.technical_indicators.macd, | |
| 'moving_averages': self.technical_indicators.moving_averages | |
| }, | |
| 'anomalies': self.anomalies | |
| } | |
| logger = logging.getLogger(__name__) | |
| def setup_logger(name: str) -> logging.Logger: | |
| """Set up logger with proper formatting""" | |
| logger = logging.getLogger(name) | |
| if not logger.handlers: | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| return logger | |
| def ensure_psd(matrix: np.ndarray, epsilon: float = 1e-6) -> np.ndarray: | |
| """Ensure matrix is positive semi-definite""" | |
| try: | |
| eigenvals, eigenvecs = np.linalg.eigh(matrix) | |
| eigenvals = np.maximum(eigenvals, epsilon) | |
| return eigenvecs @ np.diag(eigenvals) @ eigenvecs.T | |
| except Exception as e: | |
| logger.error(f"Error ensuring PSD: {e}") | |
| return matrix | |
| def calculate_dynamic_correlation(returns: pd.DataFrame, | |
| window: int = 63, | |
| lambda_param: float = 0.94) -> np.ndarray: | |
| """Calculate dynamic correlation with exponential weighting""" | |
| try: | |
| weights = np.array([lambda_param**(window-i-1) for i in range(window)]) | |
| weights = weights / np.sum(weights) | |
| returns_window = returns.iloc[-window:] | |
| weighted_returns = returns_window * np.sqrt(weights).reshape(-1, 1) | |
| corr_matrix = np.corrcoef(weighted_returns.T) | |
| return ensure_psd(corr_matrix) | |
| except Exception as e: | |
| logger.error(f"Error calculating dynamic correlation: {e}") | |
| return np.eye(returns.shape[1]) | |
| def fit_gpd(excesses: np.ndarray) -> Tuple[float, float]: | |
| """Fit Generalized Pareto Distribution""" | |
| try: | |
| import statsmodels.api as sm | |
| shape = sm.GeneralizedPareto(excesses).fit().params[0] | |
| scale = np.mean(excesses) * (1 + shape) if shape != -1 else np.mean(excesses) | |
| return shape, scale | |
| except: | |
| return 0.1, np.std(excesses) | |
| def calculate_gpd_var(params: Tuple[float, float], | |
| threshold: float, | |
| n_samples: int, | |
| confidence: float) -> float: | |
| """Calculate Value at Risk using GPD parameters""" | |
| shape, scale = params | |
| exceeding_prob = confidence | |
| if shape != 0: | |
| return threshold + (scale/shape) * ((n_samples * exceeding_prob)**(-shape) - 1) | |
| return threshold - scale * np.log(n_samples * exceeding_prob) | |
| class MarketAnalyzerImpl(MarketAnalyzer): | |
| def __init__(self, | |
| ml_analyzer: Optional[MLAnalyzerProtocol] = None, | |
| config: Optional[MarketAnalysisConfig] = None): | |
| self.ml_analyzer = ml_analyzer or MLEnhancedAnalyzer() | |
| self.config = config or MarketAnalysisConfig() | |
| # Risk metrics handler | |
| self.risk_metrics = EnhancedRiskMetrics() | |
| # Technical indicators | |
| self.tech_indicators = TechnicalIndicators() | |
| # Models for complex analysis | |
| self.regime_model = GaussianMixture( | |
| n_components=len(MarketRegime), | |
| random_state=42, | |
| covariance_type='full' | |
| ) | |
| self.scaler = StandardScaler() | |
| # Cache | |
| self._cache = {} | |
| self._last_update = datetime.now() | |
| async def analyze_market_conditions(self, | |
| market_data: MarketData, | |
| additional_data: Optional[Dict[str, Any]] = None) -> MarketConditions: | |
| """Analyze current market conditions""" | |
| try: | |
| if not self._validate_market_data(market_data): | |
| return MarketConditions.get_default() | |
| processed_data = self._preprocess_market_data(market_data) | |
| # ML analysis | |
| ml_output = await self.ml_analyzer.analyze_market_conditions(processed_data) | |
| regime = MarketRegime[ml_output['regime'].upper()] | |
| volatility = self.risk_metrics.calculate_volatility(processed_data) | |
| sentiment = ml_output['sentiment'].get('overall', 0.5) | |
| # Risk metrics | |
| risk_metrics = self.risk_metrics.calculate_all_metrics(processed_data) | |
| correlation = self._analyze_correlation_structure(processed_data) | |
| liquidity = self._analyze_liquidity_conditions(processed_data) | |
| # Technical analysis | |
| technicals = self.tech_indicators.calculate_all(processed_data) | |
| return MarketConditions( | |
| current_regime=regime, | |
| volatility_level=volatility, | |
| market_sentiment=sentiment, | |
| technical_indicators=technicals, | |
| risk_metrics=risk_metrics, | |
| correlation_structure=correlation, | |
| liquidity_conditions=liquidity, | |
| timestamp=datetime.now() | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Error in market analysis: {str(e)}") | |
| return MarketConditions.get_default() | |
| def _preprocess_market_data(self, data: MarketData) -> MarketData: | |
| """Preprocess market data""" | |
| processed = data.copy() | |
| # Handle missing values | |
| processed = processed.fillna(method='ffill').fillna(method='bfill') | |
| # Calculate returns and volatility | |
| processed['returns'] = processed['close'].pct_change() | |
| processed['realized_vol'] = self.risk_metrics.calculate_rolling_volatility(processed['returns']) | |
| # Normalize volume | |
| if 'volume' in processed.columns: | |
| processed['normalized_volume'] = processed['volume'] / processed['volume'].rolling(window=21).mean() | |
| return processed | |
| def _analyze_correlation_structure(self, data: MarketData) -> Dict[str, float]: | |
| """Analyze correlation structure""" | |
| try: | |
| returns = data.select_dtypes(include=[np.number]).pct_change().dropna() | |
| # Calculate dynamic correlation | |
| correlation = returns.corr() | |
| eigenvalues = np.linalg.eigvals(correlation) | |
| return { | |
| 'avg_correlation': float(correlation.mean().mean()), | |
| 'max_correlation': float(correlation.max().max()), | |
| 'effective_rank': float(np.sum(eigenvalues) ** 2 / np.sum(eigenvalues ** 2)) | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error in correlation analysis: {str(e)}") | |
| return self._get_default_correlation_metrics() | |
| def _analyze_liquidity_conditions(self, data: MarketData) -> Dict[str, float]: | |
| """Analyze market liquidity""" | |
| try: | |
| if 'volume' not in data.columns: | |
| return {'liquidity_score': 0.5} | |
| volume_score = self._calculate_volume_score(data) | |
| spread_score = self._calculate_spread_score(data) | |
| resilience = self._calculate_market_resilience(data) | |
| return { | |
| 'volume_score': volume_score, | |
| 'spread_score': spread_score, | |
| 'market_resilience': resilience, | |
| 'liquidity_score': np.mean([volume_score, spread_score, resilience]) | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error in liquidity analysis: {str(e)}") | |
| return {'liquidity_score': 0.5} | |
| def _validate_market_data(self, data: MarketData) -> bool: | |
| """Validate market data structure""" | |
| if data is None or data.empty: | |
| return False | |
| required_cols = {'open', 'high', 'low', 'close', 'volume'} | |
| if not required_cols.issubset(data.columns): | |
| return False | |
| if data[list(required_cols)].isnull().any().any(): | |
| return False | |
| return True | |
| def _calculate_volume_score(self, data: MarketData) -> float: | |
| """Calculate volume-based liquidity score""" | |
| volume = data['volume'] | |
| avg_volume = volume.rolling(window=20).mean() | |
| volume_trend = avg_volume.pct_change(20) | |
| trend_score = 0.5 + 0.5 * np.tanh(volume_trend.iloc[-1]) | |
| stability_score = 1 - volume.std() / volume.mean() | |
| return np.mean([trend_score, stability_score]) | |
| def _calculate_spread_score(self, data: MarketData) -> float: | |
| """Calculate spread-based liquidity score""" | |
| if not all(col in data.columns for col in ['bid', 'ask']): | |
| return 0.5 | |
| spread = (data['ask'] - data['bid']) / data['bid'] | |
| avg_spread = spread.rolling(window=20).mean() | |
| return float(1 - np.tanh(avg_spread.iloc[-1] * 10)) | |
| def _calculate_market_resilience(self, data: MarketData) -> float: | |
| """Calculate market resilience score""" | |
| returns = data['returns'].dropna() | |
| acf = sm.tsa.acf(returns, nlags=5) | |
| return 1 - np.mean(np.abs(acf[1:])) | |
| async def detect_anomalies(self, market_data: MarketData) -> List[Dict[str, Any]]: | |
| """Detect market anomalies using ML""" | |
| try: | |
| features = self._prepare_anomaly_features(market_data) | |
| scaled_features = self.scaler.fit_transform(features) | |
| # ML-based anomaly detection | |
| anomalies = await self.ml_analyzer.detect_anomalies( | |
| scaled_features, | |
| threshold=self.config.anomaly_threshold | |
| ) | |
| # Enrich detection results | |
| for anomaly in anomalies: | |
| anomaly['severity'] = self.risk_metrics.calculate_severity_score( | |
| anomaly['metrics'] | |
| ) | |
| anomaly['confidence'] = self.ml_analyzer.calculate_detection_confidence( | |
| anomaly['metrics'] | |
| ) | |
| return anomalies | |
| except Exception as e: | |
| self.logger.error(f"Error in anomaly detection: {e}") | |
| return [] | |
| def _prepare_anomaly_features(self, data: MarketData) -> np.ndarray: | |
| """Prepare features for anomaly detection""" | |
| returns = data['returns'] | |
| volatility = self.risk_metrics.calculate_rolling_volatility(returns) | |
| skewness = returns.rolling(window=20).skew() | |
| kurtosis = returns.rolling(window=20).apply(lambda x: x.kurtosis()) | |
| return np.column_stack([ | |
| returns, | |
| volatility, | |
| skewness, | |
| kurtosis | |
| ]) | |
| def _update_cache(self, result: MarketAnalysisResult): | |
| """Update analysis cache""" | |
| current_time = datetime.now() | |
| cache_key = current_time.strftime('%Y-%m-%d_%H') | |
| self._cache[cache_key] = { | |
| 'conditions': result.conditions, | |
| 'metrics': result.metrics, | |
| 'technical_indicators': result.technical_indicators, | |
| 'timestamp': current_time | |
| } | |
| self._clean_old_cache(max_age_hours=24) | |
| def _clean_old_cache(self, max_age_hours: int): | |
| """Clean expired cache entries""" | |
| current_time = datetime.now() | |
| keys_to_remove = [ | |
| key for key, value in self._cache.items() | |
| if (current_time - value['timestamp']).total_seconds() / 3600 > max_age_hours | |
| ] | |
| for key in keys_to_remove: | |
| del self._cache[key] | |
| async def calculate_all_metrics(self, market_data: MarketData) -> MarketMetrics: | |
| """Calculate comprehensive market metrics""" | |
| try: | |
| returns = market_data['returns'].dropna() | |
| # Core volatility and risk metrics | |
| volatility = self.risk_metrics.calculate_conditional_volatility(returns) | |
| skewness = float(returns.skew()) | |
| kurtosis = float(returns.kurt()) | |
| tail_risk = self.risk_metrics.calculate_tail_risk(returns) | |
| # Liquidity analysis | |
| liquidity_score = self._calculate_liquidity_score(market_data) | |
| # Correlation analysis | |
| correlation_structure = self._analyze_correlation_structure(market_data) | |
| # Regime probabilities | |
| regime_features = self._extract_regime_features(market_data) | |
| regime_probs = self.regime_model.predict_proba( | |
| regime_features[-1].reshape(1, -1) | |
| )[0] | |
| regime_probabilities = { | |
| regime.value: float(prob) | |
| for regime, prob in zip(MarketRegime, regime_probs) | |
| } | |
| return MarketMetrics( | |
| volatility=volatility, | |
| skewness=skewness, | |
| kurtosis=kurtosis, | |
| tail_risk=tail_risk, | |
| liquidity_score=liquidity_score, | |
| correlation_structure=correlation_structure, | |
| regime_probabilities=regime_probabilities | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Error calculating market metrics: {e}") | |
| return MarketMetrics.get_default() | |
| def _extract_regime_features(self, market_data: MarketData) -> np.ndarray: | |
| """Extract features for regime detection""" | |
| returns = market_data['returns'].dropna() | |
| volatility = self.risk_metrics.calculate_rolling_volatility(returns) | |
| skewness = returns.rolling(window=20).skew() | |
| kurtosis = returns.rolling(window=20).apply(lambda x: x.kurtosis()) | |
| momentum = returns.rolling(window=20).mean() | |
| features = np.column_stack([ | |
| returns, | |
| volatility, | |
| skewness, | |
| kurtosis, | |
| momentum | |
| ]) | |
| return self.scaler.fit_transform(features) | |
| async def analyze_regime_transitions(self, market_data: MarketData, window: int = 252) -> Dict[str, float]: | |
| """Analyze market regime transitions""" | |
| try: | |
| features = self._extract_regime_features(market_data) | |
| regime_probabilities = self.regime_model.predict_proba(features) | |
| # Calculate transition frequencies | |
| current_regime = regime_probabilities[-1] | |
| prev_regime = regime_probabilities[-window] if len(regime_probabilities) > window else None | |
| transitions = {} | |
| for i, regime in enumerate(MarketRegime): | |
| if prev_regime is not None: | |
| transition_prob = current_regime[i] - prev_regime[i] | |
| else: | |
| transition_prob = 0.0 | |
| transitions[regime.value] = float(transition_prob) | |
| return transitions | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing regime transitions: {e}") | |
| return {regime.value: 1.0/len(MarketRegime) for regime in MarketRegime} | |
| async def calculate_technical_indicators(self, market_data: MarketData) -> TechnicalIndicators: | |
| """Calculate technical indicators""" | |
| try: | |
| price = market_data['close'] | |
| volume = market_data.get('volume', pd.Series(1, index=market_data.index)) | |
| # Calculate individual indicators | |
| rsi = self.tech_indicators.calculate_rsi(price) | |
| macd = self.tech_indicators.calculate_macd(price) | |
| bbands = self.tech_indicators.calculate_bollinger_bands(price) | |
| momentum = self.tech_indicators.calculate_momentum_indicators(price) | |
| volume_profile = self._analyze_volume_profile(price, volume) | |
| moving_averages = self.tech_indicators.calculate_moving_averages(price) | |
| return TechnicalIndicators( | |
| rsi=rsi, | |
| macd=macd, | |
| bollinger=bbands, | |
| moving_averages=moving_averages, | |
| volume_profile=volume_profile, | |
| momentum_indicators=momentum | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Error calculating technical indicators: {e}") | |
| return TechnicalIndicators.get_default() | |
| def _analyze_volume_profile(self, price: pd.Series, volume: pd.Series) -> Dict[str, float]: | |
| """Advanced volume profile analysis""" | |
| try: | |
| vwap = (price * volume).sum() / volume.sum() | |
| volume_ma = volume.rolling(window=20).mean() | |
| volume_std = volume.rolling(window=20).std() | |
| obv = (np.sign(price.diff()) * volume).cumsum() | |
| volume_percentiles = np.percentile(volume, [25, 50, 75]) | |
| return { | |
| 'vwap': float(vwap), | |
| 'volume_ma': float(volume_ma.iloc[-1]), | |
| 'volume_std': float(volume_std.iloc[-1]), | |
| 'volume_skew': float(volume.skew()), | |
| 'obv_trend': float(obv.diff(20).iloc[-1]), | |
| 'volume_25th': float(volume_percentiles[0]), | |
| 'volume_median': float(volume_percentiles[1]), | |
| 'volume_75th': float(volume_percentiles[2]) | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error analyzing volume profile: {e}") | |
| return { | |
| 'vwap': float(price.mean()), | |
| 'volume_ma': float(volume.mean()) | |
| } | |
| def _determine_rate_environment(self, data: MarketData) -> str: | |
| """Determine interest rate environment""" | |
| try: | |
| rate_changes = data['close'].pct_change(20) # 20-day rate changes | |
| recent_trend = rate_changes[-20:].mean() | |
| if recent_trend > 0.001: # 10bps threshold | |
| return 'RISING' | |
| elif recent_trend < -0.001: | |
| return 'FALLING' | |
| else: | |
| return 'STABLE' | |
| except Exception as e: | |
| self.logger.error(f"Error determining rate environment: {e}") | |
| return 'STABLE' | |
| def _estimate_inflation_rate(self, data: MarketData) -> float: | |
| """Estimate inflation rate from market data""" | |
| try: | |
| # This would normally use actual inflation data | |
| # Here we use a simple proxy based on price trends | |
| price_changes = data['close'].pct_change(252) # Annual changes | |
| return float(price_changes.mean()) | |
| except Exception as e: | |
| self.logger.error(f"Error estimating inflation: {e}") | |
| return 0.02 # Default to 2% | |
| def _estimate_gdp_growth(self, data: MarketData) -> float: | |
| """Estimate GDP growth from market indicators""" | |
| try: | |
| # This would normally use actual GDP data | |
| # Here we use a simple proxy based on market returns | |
| returns = data['returns'].rolling(window=252).mean() * 252 | |
| return float(returns.iloc[-1]) | |
| except Exception as e: | |
| self.logger.error(f"Error estimating GDP growth: {e}") | |
| return 0.015 # Default to 1.5% | |
| def _calculate_leading_indicators(self, data: MarketData) -> Dict[str, float]: | |
| """Calculate leading economic indicators""" | |
| try: | |
| returns = data['returns'] | |
| volume = data['volume'] | |
| indicators = { | |
| 'trend_strength': float(returns.rolling(window=252).mean() * 252), | |
| 'volatility_regime': float(returns.rolling(window=63).std() * np.sqrt(252)), | |
| 'volume_trend': float((volume / volume.shift(20) - 1).mean()), | |
| 'momentum': float(returns.rolling(window=20).mean() * 252) | |
| } | |
| return indicators | |
| except Exception as e: | |
| self.logger.error(f"Error calculating leading indicators: {e}") | |
| return { | |
| 'trend_strength': 0.0, | |
| 'volatility_regime': 0.15, | |
| 'volume_trend': 0.0, | |
| 'momentum': 0.0 | |
| } | |
| def _get_default_correlation_metrics(self) -> Dict[str, float]: | |
| """Get default correlation metrics""" | |
| return { | |
| 'avg_correlation': 0.0, | |
| 'max_correlation': 1.0, | |
| 'min_correlation': -1.0, | |
| 'correlation_dispersion': 0.5, | |
| 'largest_eigenvalue': 1.0, | |
| 'eigenvalue_dispersion': 0.0, | |
| 'effective_rank': 1.0 | |
| } |