from __future__ import annotations import asyncio import hashlib import json import logging import os import re from dataclasses import asdict, dataclass, field from datetime import datetime, timedelta from functools import lru_cache from typing import Any, Dict, List, Optional, Tuple, Union, Set import numpy as np import torch from transformers import AutoTokenizer, AutoModelForCausalLM try: from transformers import PhiForCausalLM except ImportError: pass try: from peft import PeftModel, PeftConfig except ImportError: pass # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("DeepVest") # Configuration manager @dataclass class DeepVestConfig: """Configuration for the DeepVest system""" # Model configuration model_path: str = "financial_profiler_model" use_quantization: bool = True cache_size: int = 100 # Database configuration db_path: str = "profiles_db" use_persistent_storage: bool = True # Analysis configuration default_horizon: int = 5 default_risk_tolerance: int = 3 min_recommendation_count: int = 3 # Performance settings enable_caching: bool = True batch_size: int = 4 parallel_processing: bool = True # Market data integration fetch_market_data: bool = False market_data_api_key: Optional[str] = None # Debug settings debug_mode: bool = False log_prompts: bool = False def __post_init__(self): """Validate configuration after initialization""" if self.debug_mode: logging.getLogger("DeepVest").setLevel(logging.DEBUG) # Core data models @dataclass class FinancialAnalysis: """Detailed financial analysis of an investor profile""" income_stability: float = 0.5 debt_ratio: float = 0.0 savings_capacity: float = 0.0 emergency_fund_months: float = 3.0 risk_capacity: float = 0.5 investment_horizon: int = 5 liquidity_needs: float = 0.2 income_sources: Dict[str, float] = field(default_factory=dict) fixed_expenses: Dict[str, float] = field(default_factory=dict) variable_expenses: Dict[str, float] = field(default_factory=dict) credit_score: Optional[int] = None tax_bracket: Optional[float] = None def validate(self) -> bool: """Validate financial analysis attributes""" try: validations = [ 0 <= self.income_stability <= 1, self.debt_ratio >= 0, 0 <= self.savings_capacity <= 1, self.emergency_fund_months >= 0, 0 <= self.risk_capacity <= 1, self.investment_horizon > 0, 0 <= self.liquidity_needs <= 1 ] if self.credit_score is not None: validations.append(300 <= self.credit_score <= 850) if self.tax_bracket is not None: validations.append(0 <= self.tax_bracket <= 1) return all(validations) except Exception as e: logger.error(f"Error validating financial analysis: {e}") return False def calculate_total_expenses(self) -> float: """Calculate total monthly expenses""" return ( sum(self.fixed_expenses.values()) + sum(self.variable_expenses.values()) ) def calculate_savings_rate(self) -> float: """Calculate savings rate as a percentage of income""" total_income = sum(self.income_sources.values()) if total_income == 0: return 0 return self.savings_capacity / total_income def get_financial_health_score(self) -> float: """Calculate overall financial health score (0-1)""" # Calculate components with different weights stability_weight = 0.3 debt_weight = 0.25 savings_weight = 0.25 emergency_weight = 0.2 # Calculate debt score (lower is better) debt_score = max(0, 1 - (self.debt_ratio / 0.5)) # Calculate emergency fund score (higher is better) emergency_score = min(1, self.emergency_fund_months / 6) # Calculate savings score savings_score = min(1, self.savings_capacity * 3) # Calculate overall score health_score = ( stability_weight * self.income_stability + debt_weight * debt_score + savings_weight * savings_score + emergency_weight * emergency_score ) return min(1, max(0, health_score)) @dataclass class FamilyAnalysis: """Detailed analysis of family situation""" family_size: int = 1 dependents: int = 0 life_stage: str = 'undefined' future_events: List[Dict] = field(default_factory=list) monthly_obligations: float = 0.0 insurance_coverage: Dict[str, float] = field(default_factory=dict) education_needs: Dict[str, float] = field(default_factory=dict) healthcare_needs: Dict[str, float] = field(default_factory=dict) retirement_plans: Dict[str, Any] = field(default_factory=dict) estate_plans: Dict[str, Any] = field(default_factory=dict) def validate(self) -> bool: """Validate family analysis attributes""" try: validations = [ self.family_size >= 1, self.dependents >= 0, self.monthly_obligations >= 0, all(value >= 0 for value in self.insurance_coverage.values()), all(value >= 0 for value in self.education_needs.values()), all(value >= 0 for value in self.healthcare_needs.values()) ] return all(validations) except Exception as e: logger.error(f"Error validating family analysis: {e}") return False def calculate_total_obligations(self) -> float: """Calculate total monthly obligations""" return ( self.monthly_obligations + sum(self.insurance_coverage.values()) + sum(self.education_needs.values()) + sum(self.healthcare_needs.values()) ) def get_next_life_event(self) -> Optional[Dict]: """Get the next upcoming life event""" if not self.future_events: return None # Sort events by date and return the closest one future_events = [e for e in self.future_events if 'date' in e] if not future_events: return None return min(future_events, key=lambda x: x['date']) def calculate_family_complexity(self) -> float: """Calculate family situation complexity (0-1)""" # Base complexity based on family size base_complexity = min(1, (self.family_size - 1) * 0.2) # Additional complexity for dependents dependent_complexity = min(0.5, self.dependents * 0.1) # Additional complexity for upcoming events events_complexity = min(0.3, len(self.future_events) * 0.05) # Additional complexity for special needs special_needs = len(self.healthcare_needs) > 0 or len(self.education_needs) > 0 special_complexity = 0.2 if special_needs else 0 return min(1, base_complexity + dependent_complexity + events_complexity + special_complexity) @dataclass class InvestmentGoals: """Detailed investment goals""" primary_goals: List[Dict] = field(default_factory=list) timeframes: Dict[str, int] = field(default_factory=dict) required_returns: Dict[str, float] = field(default_factory=dict) priority_order: List[str] = field(default_factory=list) target_amounts: Dict[str, float] = field(default_factory=dict) goal_progress: Dict[str, float] = field(default_factory=dict) risk_tolerance_by_goal: Dict[str, int] = field(default_factory=dict) milestones: Dict[str, List[Dict]] = field(default_factory=dict) def validate(self) -> bool: """Validate investment goals attributes""" try: if self.target_amounts and any(amount < 0 for amount in self.target_amounts.values()): return False if self.required_returns and any(ret < -1 or ret > 2 for ret in self.required_returns.values()): return False if self.risk_tolerance_by_goal and any(tol < 1 or tol > 5 for tol in self.risk_tolerance_by_goal.values()): return False if self.goal_progress and any(prog < 0 or prog > 1 for prog in self.goal_progress.values()): return False return True except Exception as e: logger.error(f"Error validating investment goals: {e}") return False def calculate_total_target_amount(self) -> float: """Calculate total target amount across all goals""" return sum(self.target_amounts.values()) def get_highest_priority_goal(self) -> Optional[str]: """Get the highest priority goal""" return self.priority_order[0] if self.priority_order else None def calculate_weighted_risk_tolerance(self) -> float: """Calculate risk tolerance weighted by goal amount""" if not self.risk_tolerance_by_goal or not self.target_amounts: return 3.0 # Default value total_amount = self.calculate_total_target_amount() if total_amount == 0: return 3.0 weighted_tolerance = sum( self.risk_tolerance_by_goal.get(goal, 3) * amount / total_amount for goal, amount in self.target_amounts.items() ) return weighted_tolerance def calculate_goal_progress(self, goal_id: str) -> float: """Calculate progress percentage for a specific goal""" if goal_id not in self.target_amounts or goal_id not in self.goal_progress: return 0.0 return self.goal_progress[goal_id] / self.target_amounts[goal_id] def get_upcoming_milestones(self, days: int = 90) -> List[Dict]: """Get upcoming milestones within specified days""" upcoming = [] now = datetime.now() cutoff = now + timedelta(days=days) for goal, milestones in self.milestones.items(): for milestone in milestones: date = milestone.get('date') if date and now <= date <= cutoff: upcoming.append({ 'goal': goal, **milestone }) return sorted(upcoming, key=lambda x: x['date']) def estimate_goal_feasibility(self, monthly_contribution: float, expected_return: float) -> Dict[str, float]: """Estimate feasibility of goals based on contributions and returns""" feasibility = {} for goal_id, target in self.target_amounts.items(): # Get current progress current_amount = self.goal_progress.get(goal_id, 0) # Get timeframe in months months = self.timeframes.get(goal_id, 60) * 12 if months <= 0: feasibility[goal_id] = 0 continue # Calculate future value of current amount future_current = current_amount * (1 + expected_return/12) ** months # Calculate future value of monthly contributions future_contributions = monthly_contribution * ((1 + expected_return/12) ** months - 1) / (expected_return/12) # Calculate total expected amount expected_amount = future_current + future_contributions # Calculate feasibility as percentage of target feasibility[goal_id] = min(1.0, expected_amount / target) return feasibility @dataclass class ProfileAnalysisResult: """Complete result of LLM profile analysis""" # Basic metrics risk_score: float investment_horizon: int primary_goals: List[str] # Detailed analyses constraints: Dict[str, Any] recommendations: List[str] explanation: str # Advanced metrics risk_decomposition: Dict[str, float] = field(default_factory=dict) goal_feasibility: Dict[str, float] = field(default_factory=dict) investment_style: Dict[str, float] = field(default_factory=dict) market_sensitivity: Dict[str, float] = field(default_factory=dict) # Recommended allocations asset_allocation: Dict[str, float] = field(default_factory=dict) geographic_allocation: Dict[str, float] = field(default_factory=dict) # Alerts and attention points alerts: List[Dict[str, Any]] = field(default_factory=list) attention_points: List[str] = field(default_factory=list) # Context and adaptability market_context: Dict[str, Any] = field(default_factory=dict) adaptability_score: float = 0.5 stress_test_results: Dict[str, float] = field(default_factory=dict) def __post_init__(self): """Post-initialization validation""" if not 0 <= self.risk_score <= 1: raise ValueError("Risk score must be between 0 and 1") if self.investment_horizon <= 0: raise ValueError("Investment horizon must be positive") def get_risk_category(self) -> str: """Determine risk category based on score""" if self.risk_score < 0.2: return "Très conservateur" elif self.risk_score < 0.4: return "Conservateur" elif self.risk_score < 0.6: return "Modéré" elif self.risk_score < 0.8: return "Dynamique" else: return "Très dynamique" def get_critical_alerts(self) -> List[Dict[str, Any]]: """Get critical alerts only""" return [alert for alert in self.alerts if alert.get('severity') == 'critical'] def get_major_constraints(self) -> List[str]: """Identify major constraints""" return [ constraint for constraint, value in self.constraints.items() if isinstance(value, (bool, int, float)) and value > 0.7 ] def get_primary_investment_style(self) -> str: """Determine primary investment style""" if not self.investment_style: return "Balanced" return max(self.investment_style.items(), key=lambda x: x[1])[0] def calculate_overall_feasibility(self) -> float: """Calculate overall goal feasibility""" if not self.goal_feasibility: return 0.5 return sum(self.goal_feasibility.values()) / len(self.goal_feasibility) def get_risk_factors(self) -> List[Tuple[str, float]]: """Get main risk factors sorted by importance""" return sorted( self.risk_decomposition.items(), key=lambda x: x[1], reverse=True ) def get_market_adaptability(self) -> Dict[str, float]: """Evaluate adaptability to market conditions""" adaptability = {} for condition, sensitivity in self.market_sensitivity.items(): adaptability[condition] = min( self.adaptability_score * (1 + sensitivity), 1.0 ) return adaptability def generate_one_page_summary(self) -> str: """Generate a concise one-page summary of the analysis""" risk_category = self.get_risk_category() summary = [ f"# Profil d'Investissement: {risk_category}", f"Score de risque: {self.risk_score:.2f}/1.00 | Horizon: {self.investment_horizon} ans", "", "## Objectifs Principaux", *[f"- {goal}" for goal in self.primary_goals[:3]], "", "## Allocation d'Actifs Recommandée", *[f"- {asset}: {weight:.1%}" for asset, weight in sorted(self.asset_allocation.items(), key=lambda x: x[1], reverse=True)], "", "## Facteurs de Risque Principaux", *[f"- {factor}: {score:.1%}" for factor, score in self.get_risk_factors()[:3]], "", "## Recommandations Clés", *[f"- {rec}" for rec in self.recommendations[:3]], "", "## Points d'Attention", *[f"- {point}" for point in self.attention_points[:3]], ] return "\n".join(summary) def to_dict(self) -> Dict[str, Any]: """Convert analysis to dictionary""" return { 'risk_profile': { 'score': self.risk_score, 'category': self.get_risk_category(), 'decomposition': self.risk_decomposition, 'stress_tests': self.stress_test_results }, 'investment_profile': { 'horizon': self.investment_horizon, 'style': self.investment_style, 'adaptability': self.adaptability_score, 'market_sensitivity': self.market_sensitivity }, 'goals_analysis': { 'primary_goals': self.primary_goals, 'feasibility': self.goal_feasibility, 'constraints': self.constraints }, 'recommendations': { 'suggestions': self.recommendations, 'attention_points': self.attention_points, 'alerts': self.alerts }, 'allocations': { 'assets': self.asset_allocation, 'geography': self.geographic_allocation }, 'market_context': self.market_context, 'explanation': self.explanation } @dataclass class PersonalProfile: """Complete and dynamic investor profile with advanced analysis""" # Identifiers and metadata id: Optional[str] = None created_at: datetime = field(default_factory=datetime.now) last_updated: datetime = field(default_factory=datetime.now) version: int = 1 # Base profile risk_tolerance: int = 3 investment_horizon: int = 5 income_stability: int = 3 financial_knowledge: int = 3 investment_goals: List[str] = field(default_factory=list) esg_preferences: bool = False constraints: Dict[str, Any] = field(default_factory=dict) # Personal information age: int = 30 annual_income: float = 50000.0 total_assets: float = 0.0 total_debt: float = 0.0 monthly_savings: float = 0.0 # Added this field for the épargne mensuelle occupation: str = "" industry: str = "" education_level: str = "" marital_status: str = "single" number_of_dependents: int = 0 tax_status: str = "" residential_status: str = "owner" # Detailed analyses financial_analysis: Optional[FinancialAnalysis] = None family_analysis: Optional[FamilyAnalysis] = None investment_objectives: Optional[InvestmentGoals] = None profile_description: Optional[str] = None # Dynamic metrics and history risk_metrics: Dict[str, float] = field(default_factory=dict) performance_metrics: Dict[str, float] = field(default_factory=dict) historical_changes: List[Dict[str, Any]] = field(default_factory=list) last_review: Optional[datetime] = None # Investment preferences investment_preferences: Dict[str, Any] = field(default_factory=lambda: { 'asset_classes': [], 'sectors': [], 'regions': [], 'strategies': [], 'excluded_investments': [] }) # LLM analysis results llm_analysis_results: Optional[ProfileAnalysisResult] = None risk_score: float = 0.5 adaptability_score: float = 0.5 complexity_tolerance: float = 0.5 def __post_init__(self): """Post-initialization setup""" # Generate ID if not provided if self.id is None: self.id = self._generate_profile_id() # Initialize analyses if not provided if self.financial_analysis is None: self.financial_analysis = FinancialAnalysis() if self.family_analysis is None: self.family_analysis = FamilyAnalysis() if self.investment_objectives is None: self.investment_objectives = InvestmentGoals() # Validate profile self._validate_profile() def _generate_profile_id(self) -> str: """Generate unique profile ID""" components = [ str(self.age), str(self.annual_income), str(self.risk_tolerance), self.occupation, datetime.now().isoformat() ] unique_string = "_".join(components) return f"PROFILE_{hashlib.sha256(unique_string.encode()).hexdigest()[:16]}" def _validate_profile(self): """Comprehensive profile validation""" validations = [ (1 <= self.risk_tolerance <= 5, "Risk tolerance must be between 1 and 5"), (1 <= self.investment_horizon <= 50, "Investment horizon must be between 1 and 50 years"), (1 <= self.income_stability <= 5, "Income stability must be between 1 and 5"), (1 <= self.financial_knowledge <= 5, "Financial knowledge must be between 1 and 5"), (18 <= self.age <= 120, "Age must be between 18 and 120"), (self.annual_income >= 0, "Annual income cannot be negative"), (self.total_assets >= 0, "Total assets cannot be negative"), (self.total_debt >= 0, "Total debt cannot be negative"), (0 <= self.risk_score <= 1, "Risk score must be between 0 and 1"), (0 <= self.adaptability_score <= 1, "Adaptability score must be between 0 and 1"), (0 <= self.complexity_tolerance <= 1, "Complexity tolerance must be between 0 and 1") ] # Advanced validation if self.investment_preferences: for asset_class in self.investment_preferences.get('asset_classes', []): if not isinstance(asset_class, str): raise ValueError("Invalid asset class format") if self.investment_goals and len(self.investment_goals) == 0: raise ValueError("At least one investment goal must be specified") # Check basic constraints for validation, message in validations: if not validation: raise ValueError(message) # Check detailed analyses if self.financial_analysis and not self.financial_analysis.validate(): raise ValueError("Invalid financial analysis") if self.family_analysis and not self.family_analysis.validate(): raise ValueError("Invalid family analysis") if self.investment_objectives and not self.investment_objectives.validate(): raise ValueError("Invalid investment objectives") def calculate_comprehensive_risk_metrics(self) -> Dict[str, float]: """Calculate comprehensive risk metrics""" try: risk_metrics = { 'total_debt_ratio': self.total_debt / self.annual_income if self.annual_income > 0 else float('inf'), 'emergency_fund_ratio': self.financial_analysis.emergency_fund_months / 6 if self.financial_analysis else 0, 'risk_capacity': self.financial_analysis.risk_capacity if self.financial_analysis else 0.5, 'investment_diversification': len(self.investment_goals) / 5 if self.investment_goals else 0, 'debt_service_ratio': self._calculate_debt_service_ratio(), 'savings_rate': self._calculate_savings_rate(), 'liquidity_ratio': self._calculate_liquidity_ratio(), 'investment_concentration': self._calculate_investment_concentration() } # Add LLM scores if self.llm_analysis_results: risk_metrics.update({ 'llm_risk_score': self.llm_analysis_results.risk_score, 'adaptability': self.llm_analysis_results.adaptability_score }) # Calculate overall risk capacity financial_health = self.financial_analysis.get_financial_health_score() if self.financial_analysis else 0.5 risk_metrics['overall_risk_capacity'] = ( risk_metrics['risk_capacity'] * 0.5 + (1 - risk_metrics['total_debt_ratio']) * 0.3 + financial_health * 0.2 ) self.risk_metrics = risk_metrics return risk_metrics except Exception as e: logger.error(f"Error calculating risk metrics: {e}") return {} def _calculate_debt_service_ratio(self) -> float: """Calculate debt service ratio""" try: if not self.financial_analysis or self.annual_income == 0: return 0.0 if isinstance(self.financial_analysis.fixed_expenses, dict) and 'debt_payments' in self.financial_analysis.fixed_expenses: if isinstance(self.financial_analysis.fixed_expenses['debt_payments'], dict): monthly_debt_payment = sum(self.financial_analysis.fixed_expenses['debt_payments'].values()) else: monthly_debt_payment = self.financial_analysis.fixed_expenses['debt_payments'] else: # Estimate monthly debt payment using total debt and a standard amortization monthly_debt_payment = self.total_debt * 0.01 # Rough estimate of 1% monthly payment monthly_income = self.annual_income / 12 return monthly_debt_payment / monthly_income if monthly_income > 0 else 0 except Exception as e: logger.error(f"Error calculating debt service ratio: {e}") return 0.0 def _calculate_savings_rate(self) -> float: """Calculate savings rate""" try: if not self.financial_analysis or self.annual_income == 0: return 0.0 return (self.financial_analysis.savings_capacity * 12) / self.annual_income except Exception as e: logger.error(f"Error calculating savings rate: {e}") return 0.0 def _calculate_liquidity_ratio(self) -> float: """Calculate liquidity ratio""" try: if not self.financial_analysis or self.total_assets == 0: return 0.0 liquid_assets = sum( amount for type_, amount in self.financial_analysis.income_sources.items() if type_ in ['savings', 'investments', 'cash'] ) return liquid_assets / self.total_assets except Exception as e: logger.error(f"Error calculating liquidity ratio: {e}") return 0.0 def _calculate_investment_concentration(self) -> float: """Calculate investment concentration using HHI""" try: if not self.investment_preferences or not self.investment_preferences.get('asset_classes'): return 1.0 asset_classes = self.investment_preferences['asset_classes'] weights = [1/len(asset_classes)] * len(asset_classes) return sum(w * w for w in weights) except Exception as e: logger.error(f"Error calculating investment concentration: {e}") return 1.0 def update(self, updates: Dict[str, Any]) -> 'PersonalProfile': """Update profile with new information""" try: # Create copy of current profile data current_data = asdict(self) # Save change history self._save_change_history(updates) # Update fields for key, value in updates.items(): if key in current_data: current_data[key] = value # Update version and timestamps current_data['version'] += 1 current_data['last_updated'] = datetime.now() # Create new profile with updated data updated_profile = self.__class__(**current_data) # Validate updated profile updated_profile._validate_profile() return updated_profile except Exception as e: logger.error(f"Error updating profile: {e}") raise def _save_change_history(self, updates: Dict[str, Any]): """Record change history""" change_record = { 'timestamp': datetime.now().isoformat(), 'changes': updates, 'previous_values': { key: getattr(self, key) for key in updates.keys() if hasattr(self, key) }, 'version': self.version } self.historical_changes.append(change_record) def get_change_history(self, field: Optional[str] = None) -> List[Dict[str, Any]]: """Get change history for optional field""" if field: return [ change for change in self.historical_changes if field in change['changes'] ] return self.historical_changes def export_json(self, filepath: str, include_detailed_analysis: bool = False): """Export profile to JSON file""" try: # Make sure directory exists os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(filepath, 'w', encoding='utf-8') as f: json.dump( asdict(self) if include_detailed_analysis else self.to_dict(), f, ensure_ascii=False, indent=2, default=str ) logger.info(f"Profile exported successfully to {filepath}") except Exception as e: logger.error(f"Error exporting profile to {filepath}: {e}") raise def to_dict(self) -> Dict[str, Any]: """Convert profile to dictionary representation""" return { 'id': self.id, 'version': self.version, 'created_at': self.created_at.isoformat(), 'last_updated': self.last_updated.isoformat(), 'basic_info': { 'age': self.age, 'annual_income': self.annual_income, 'total_assets': self.total_assets, 'total_debt': self.total_debt, 'marital_status': self.marital_status, 'dependents': self.number_of_dependents }, 'investment_profile': { 'risk_tolerance': self.risk_tolerance, 'investment_horizon': self.investment_horizon, 'financial_knowledge': self.financial_knowledge, 'income_stability': self.income_stability, 'goals': self.investment_goals, 'esg_preferences': self.esg_preferences }, 'risk_metrics': self.risk_metrics, 'llm_analysis': self.llm_analysis_results.to_dict() if self.llm_analysis_results else None } @classmethod def from_json(cls, filepath: str) -> 'PersonalProfile': """Load profile from JSON file""" try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) # Convert date strings to datetime objects for date_field in ['created_at', 'last_updated', 'last_review']: if date_field in data and data[date_field]: data[date_field] = datetime.fromisoformat(data[date_field]) # Reconstruct complex objects if 'financial_analysis' in data and data['financial_analysis']: data['financial_analysis'] = FinancialAnalysis(**data['financial_analysis']) if 'family_analysis' in data and data['family_analysis']: data['family_analysis'] = FamilyAnalysis(**data['family_analysis']) if 'investment_objectives' in data and data['investment_objectives']: data['investment_objectives'] = InvestmentGoals(**data['investment_objectives']) if 'llm_analysis_results' in data and data['llm_analysis_results']: data['llm_analysis_results'] = ProfileAnalysisResult(**data['llm_analysis_results']) # Create and validate profile profile = cls(**data) profile._validate_profile() return profile except Exception as e: logger.error(f"Error loading profile from {filepath}: {e}") raise class DeepVestLLM: """Advanced LLM integration using fine-tuned financial advisor model""" def __init__(self, config: DeepVestConfig): """Initialize with configuration""" self.config = config self.logger = logging.getLogger("DeepVest.LLM") # Initialize model and tokenizer self.model = None self.tokenizer = None self.device = self._detect_optimal_device() # Load model if available if os.path.exists(config.model_path): self.load_model(config.model_path) # Pass the model_path argument else: self.logger.warning(f"Model path {config.model_path} not found, using fallback mode") # Initialize cache self.generation_cache = {} self.max_cache_entries = config.cache_size def _detect_optimal_device(self) -> str: """Detect the optimal device for model inference""" if torch.cuda.is_available(): self.logger.info("CUDA GPU available, using CUDA") return "cuda" elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): self.logger.info("Apple MPS available, using MPS") return "mps" else: self.logger.info("No GPU available, using CPU") return "cpu" def load_model(self, model_path=None): """Load the fine-tuned model from the configured path""" try: path_to_use = model_path if model_path else self.config.model_path self.logger.info(f"Loading fine-tuned model from {path_to_use}") # Determine the best device to use if torch.cuda.is_available(): self.logger.info("CUDA available, using GPU") device = "cuda" elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): self.logger.info("Apple MPS available, using MPS") device = "mps" else: self.logger.info("Using CPU") device = "cpu" # Check if config.json exists and has required fields config_file = os.path.join(path_to_use, "config.json") if os.path.exists(config_file): with open(config_file, 'r') as f: config = json.load(f) # If no model_type, add one for Phi model if 'model_type' not in config: self.logger.warning("Adding missing model_type to config.json") config['model_type'] = "phi" # Use phi since you're fine-tuning Phi-1.5 # Add architecture if missing if 'architectures' not in config: config['architectures'] = ["PhiForCausalLM"] # Save the modified config with open(config_file, 'w') as f: json.dump(config, f, indent=2) else: # Create a basic config.json if it doesn't exist self.logger.warning(f"config.json not found in {path_to_use}, creating one") config = { "model_type": "phi", "architectures": ["PhiForCausalLM"], "bos_token_id": 1, "eos_token_id": 2, "pad_token_id": 0, "hidden_size": 2048, "vocab_size": 51200, "torch_dtype": "float16" } with open(config_file, 'w') as f: json.dump(config, f, indent=2) # Try different loading approaches try: # First attempt: direct loading with AutoModelForCausalLM self.tokenizer = AutoTokenizer.from_pretrained( path_to_use, trust_remote_code=True ) self.model = AutoModelForCausalLM.from_pretrained( path_to_use, trust_remote_code=True, torch_dtype=torch.float16, device_map="auto", low_cpu_mem_usage=True ) except Exception as model_error: self.logger.warning(f"First loading attempt failed: {model_error}") # Second attempt: try loading with specific model class try: from transformers import PhiForCausalLM self.tokenizer = AutoTokenizer.from_pretrained(path_to_use, trust_remote_code=True) self.model = PhiForCausalLM.from_pretrained( path_to_use, trust_remote_code=True, torch_dtype=torch.float16, device_map="auto", low_cpu_mem_usage=True ) except Exception as specific_error: self.logger.warning(f"Second loading attempt failed: {specific_error}") # Third attempt: try loading as a PEFT/LoRA model try: from peft import PeftModel, PeftConfig # Check if we need to load the base model first base_model_path = "microsoft/phi-1_5" base_model = AutoModelForCausalLM.from_pretrained( base_model_path, trust_remote_code=True, torch_dtype=torch.float16, device_map="auto" ) self.tokenizer = AutoTokenizer.from_pretrained(base_model_path, trust_remote_code=True) self.model = PeftModel.from_pretrained(base_model, path_to_use) except Exception as peft_error: # All attempts failed self.logger.error(f"All loading attempts failed. Last error: {peft_error}") raise Exception(f"Could not load model: {model_error} -> {specific_error} -> {peft_error}") # Make sure the tokenizer has a pad token if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.logger.info(f"Model loaded successfully") return True except Exception as e: self.logger.error(f"Error loading model: {e}") return False def _get_cache_key(self, prompt: str) -> str: """Generate cache key for a prompt""" return hashlib.md5(prompt.encode()).hexdigest() def _manage_cache(self): """Manage cache size""" if len(self.generation_cache) > self.max_cache_entries: # Sort by access timestamp and remove oldest sorted_entries = sorted( self.generation_cache.items(), key=lambda x: x[1]['timestamp'] ) # Remove oldest 20% entries_to_remove = int(self.max_cache_entries * 0.2) for i in range(entries_to_remove): if i < len(sorted_entries): del self.generation_cache[sorted_entries[i][0]] self.logger.debug(f"Cache cleanup: removed {entries_to_remove} entries") async def generate_text(self, prompt: str, max_length: int = 1500) -> str: """Generate text using the loaded model""" # Check cache if enabled if self.config.enable_caching: cache_key = self._get_cache_key(prompt) if cache_key in self.generation_cache: cache_entry = self.generation_cache[cache_key] cache_entry['timestamp'] = datetime.now() # Update access timestamp self.logger.debug("Using cached response") return cache_entry['response'] if self.config.log_prompts: self.logger.debug(f"Prompt: {prompt}") # Check if model_loaded is True to confirm successful loading if hasattr(self, 'model_loaded') and self.model_loaded and self.model and self.tokenizer: # Generate with loaded model try: inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=4096) # Handle device mapping - use the model's device rather than explicitly moving inputs if hasattr(self.model, 'device'): device = self.model.device inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = self.model.generate( **inputs, max_length=max_length, do_sample=True, temperature=0.7, top_p=0.92, top_k=50, repetition_penalty=1.15, num_beams=4, pad_token_id=self.tokenizer.eos_token_id ) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) # Extract actual response from the prompt if "[/INST]" in response: response = response.split("[/INST]")[-1].strip() # Cache the result if enabled if self.config.enable_caching: self.generation_cache[cache_key] = { 'response': response, 'timestamp': datetime.now() } self._manage_cache() return response except Exception as e: self.logger.error(f"Error generating text: {e}") return await self._fallback_generation(prompt) else: # Use fallback generation self.logger.info("Using fallback generation") return await self._fallback_generation(prompt) async def _fallback_generation(self, prompt: str) -> str: """Fallback generation when model is unavailable""" self.logger.info("Using fallback generation") # Extract key information from prompt risk_tolerance = 3 investment_horizon = 5 # Extract risk tolerance if present risk_match = re.search(r'[Rr]isque.*?(\d+)', prompt) if risk_match: risk_tolerance = int(risk_match.group(1)) # Extract investment horizon if present horizon_match = re.search(r'[Hh]orizon.*?(\d+)', prompt) if horizon_match: investment_horizon = int(horizon_match.group(1)) # Generate simple structured response risk_score = risk_tolerance / 5 fallback_response = f"""1. Score de risque: {risk_score:.2f} - Basé sur la tolérance au risque déclarée. 2. Horizon d'investissement recommandé: {investment_horizon} ans. 3. Objectifs prioritaires: Épargne générale, Croissance du capital. 4. Contraintes: Limitation classique de diversification. 5. Recommandations d'allocation: - Actions: {int(risk_score * 100)}% - Obligations: {int((1-risk_score) * 80)}% - Liquidités: {int((1-risk_score) * 20)}% 6. Recommandations: - Diversifier votre portefeuille - Investir régulièrement - Maintenir une épargne de sécurité suffisante 7. Points d'attention: - Réévaluer régulièrement votre profil de risque - Ajuster votre allocation selon l'évolution de votre situation - Prévoir une réserve d'urgence adéquate """ return fallback_response def prepare_analysis_prompt(self, profile_data: Dict[str, Any]) -> str: """Prepare analysis prompt from profile data""" return f"""[INST] Je suis un conseiller financier expert. Analyse ce profil d'investisseur de manière exhaustive. Profil détaillé : - Âge: {profile_data.get('age', 'Non spécifié')} ans - Revenu annuel: {profile_data.get('annual_income', 'Non spécifié')}€ - Situation familiale: {profile_data.get('marital_status', 'Non spécifié')} - Dépendants: {profile_data.get('number_of_dependents', 0)} - Épargne mensuelle: {profile_data.get('monthly_savings', 'Non spécifié')}€ - Dette totale: {profile_data.get('total_debt', 'Non spécifié')}€ - Patrimoine total: {profile_data.get('total_assets', 'Non spécifié')}€ - Objectifs: {", ".join(profile_data.get('investment_goals', []))} - Tolérance au risque (1-5): {profile_data.get('risk_tolerance', 3)} - Horizon d'investissement: {profile_data.get('investment_horizon', 5)} ans - Connaissances financières (1-5): {profile_data.get('financial_knowledge', 3)} - Stabilité professionnelle: {profile_data.get('income_stability', 3)} - Préférences ESG: {"Oui" if profile_data.get('esg_preferences', False) else "Non"} - Secteurs préférés: {", ".join(profile_data.get('investment_preferences', {}).get('sectors', []))} - Contraintes spécifiques: {", ".join(str(k) for k, v in profile_data.get('constraints', {}).items() if v)} Format de réponse requis: 1. Score de risque (entre 0 et 1): [score] - Justification détaillée 2. Horizon d'investissement recommandé: [années] - Justification 3. Objectifs prioritaires: Liste ordonnée avec justification 4. Contraintes et limitations: Analyse détaillée des contraintes 5. Recommandations d'allocation: - Par classe d'actifs (%) - Par zone géographique (%) - Par stratégie d'investissement 6. Analyse des risques: - Décomposition des risques - Points de vigilance - Stress tests recommandés 7. Plan d'action détaillé: - Court terme (0-2 ans) - Moyen terme (2-5 ans) - Long terme (5+ ans) 8. Recommandations spécifiques: - Investissements recommandés - Stratégies de diversification - Protection du capital 9. Explication détaillée: - Raisonnement complet - Points d'attention particuliers - Opportunités identifiées Analyse complète: [/INST]""" def prepare_description_prompt(self, profile_data: Dict[str, Any]) -> str: """Prepare description prompt from profile data""" return f"""[INST] En tant que conseiller financier expert, génère une description détaillée et professionnelle de ce profil d'investisseur. Profil complet: - Âge: {profile_data.get('age', 'Non spécifié')} ans - Revenu annuel: {profile_data.get('annual_income', 'Non spécifié')}€ - Situation familiale: {profile_data.get('marital_status', 'Non spécifié')} - Objectifs: {", ".join(profile_data.get('investment_goals', []))} - Tolérance au risque: {profile_data.get('risk_tolerance', 3)}/5 - Horizon: {profile_data.get('investment_horizon', 5)} ans - Connaissances financières: {profile_data.get('financial_knowledge', 3)}/5 - Épargne mensuelle: {profile_data.get('monthly_savings', 'Non spécifié')}€ - Patrimoine: {profile_data.get('total_assets', 'Non spécifié')}€ - Dette: {profile_data.get('total_debt', 'Non spécifié')}€ - Secteur d'activité: {profile_data.get('industry', 'Non spécifié')} - Stabilité professionnelle: {profile_data.get('income_stability', 3)}/5 La description doit couvrir: 1. Analyse du profil de risque et des objectifs 2. Evaluation de la capacité financière 3. Identification des contraintes et opportunités 4. Recommandations d'investissement personnalisées 5. Points de vigilance et attention particulière 6. Stratégie de diversification recommandée 7. Projection et scénarios d'évolution Description: [/INST]""" async def generate_batch(self, prompts: List[str], max_length: int = 1500) -> List[str]: """Generate responses for multiple prompts in parallel""" if not self.model or not self.tokenizer: # Fallback for batch generation return [await self._fallback_generation(prompt) for prompt in prompts] responses = [] # Process in batches batch_size = self.config.batch_size for i in range(0, len(prompts), batch_size): batch_prompts = prompts[i:i+batch_size] batch_inputs = self.tokenizer( batch_prompts, return_tensors="pt", padding=True, truncation=True, max_length=4096 ) batch_inputs = {k: v.to(self.device) for k, v in batch_inputs.items()} with torch.no_grad(): outputs = self.model.generate( **batch_inputs, max_length=max_length, do_sample=True, temperature=0.7, top_p=0.92, top_k=50, repetition_penalty=1.15, pad_token_id=self.tokenizer.eos_token_id ) for j, output in enumerate(outputs): response = self.tokenizer.decode(output, skip_special_tokens=True) if "[/INST]" in response: response = response.split("[/INST]")[-1].strip() responses.append(response) return responses class LLMProfileAnalyzer: """Advanced profile analyzer using fine-tuned financial LLM""" def __init__(self, config: Optional[DeepVestConfig] = None): self.config = config or DeepVestConfig() self.logger = logging.getLogger("DeepVest.Analyzer") self.llm = DeepVestLLM(self.config) # Initialize cache self.analysis_cache = {} self.max_cache_size = self.config.cache_size def _get_cache_key(self, profile_data: Dict[str, Any]) -> str: """Generate unique cache key for a profile""" key_components = [ str(profile_data.get('age', '')), str(profile_data.get('risk_tolerance', '')), str(profile_data.get('investment_horizon', '')), ','.join(profile_data.get('investment_goals', [])), str(profile_data.get('annual_income', '')), str(profile_data.get('total_assets', '')), str(profile_data.get('total_debt', '')) ] return hashlib.md5('_'.join(key_components).encode()).hexdigest() def _manage_cache(self): """Manage cache size by removing oldest entries""" if len(self.analysis_cache) > self.max_cache_size: # Sort by timestamp and remove oldest items = sorted( self.analysis_cache.items(), key=lambda x: x[1]['timestamp'] ) # Remove oldest 20% num_to_remove = int(self.max_cache_size * 0.2) for i in range(num_to_remove): if i < len(items): del self.analysis_cache[items[i][0]] async def analyze_profile(self, profile_data: Dict[str, Any]) -> ProfileAnalysisResult: """Comprehensive profile analysis with recommendations""" try: # Check cache first cache_key = self._get_cache_key(profile_data) if cache_key in self.analysis_cache and self.config.enable_caching: self.logger.info(f"Using cached analysis for profile {cache_key}") return self.analysis_cache[cache_key]['result'] self.logger.info(f"Starting profile analysis for ID: {profile_data.get('id', 'unknown')}") # Prepare prompt prompt = self.llm.prepare_analysis_prompt(profile_data) # Generate response response = await self.llm.generate_text(prompt) # Parse and analyze response analysis_result = self._parse_response(response, profile_data) # Validate results if not self._validate_results(analysis_result): self.logger.warning("Analysis results validation failed, using default values") analysis_result = self._get_default_analysis(profile_data) # Enrich analysis with additional calculations enriched_analysis = self._enrich_analysis(analysis_result, profile_data) # Cache the result if self.config.enable_caching: self.analysis_cache[cache_key] = { 'result': enriched_analysis, 'timestamp': datetime.now() } self._manage_cache() self.logger.info("Profile analysis completed successfully") return enriched_analysis except Exception as e: self.logger.error(f"Error in profile analysis: {e}") return self._get_default_analysis(profile_data) async def batch_analyze_profiles(self, profiles: List[Dict[str, Any]]) -> List[ProfileAnalysisResult]: """Analyze multiple profiles in batch""" try: results = [] uncached_profiles = [] uncached_indices = [] # Check cache first for each profile for i, profile in enumerate(profiles): cache_key = self._get_cache_key(profile) if cache_key in self.analysis_cache and self.config.enable_caching: results.append(self.analysis_cache[cache_key]['result']) else: uncached_profiles.append(profile) uncached_indices.append(i) # If all profiles were cached, return results if not uncached_profiles: return results # Prepare prompts for uncached profiles prompts = [self.llm.prepare_analysis_prompt(profile) for profile in uncached_profiles] # Generate responses in batch responses = await self.llm.generate_batch(prompts) # Parse and process responses for i, (profile, response) in enumerate(zip(uncached_profiles, responses)): try: # Parse response analysis_result = self._parse_response(response, profile) # Validate and enrich if not self._validate_results(analysis_result): analysis_result = self._get_default_analysis(profile) enriched_analysis = self._enrich_analysis(analysis_result, profile) # Cache the result if self.config.enable_caching: cache_key = self._get_cache_key(profile) self.analysis_cache[cache_key] = { 'result': enriched_analysis, 'timestamp': datetime.now() } # Insert at correct position insert_index = uncached_indices[i] # Extend results list if needed while len(results) <= insert_index: results.append(None) results[insert_index] = enriched_analysis except Exception as e: self.logger.error(f"Error processing profile {i}: {e}") results.append(self._get_default_analysis(profile)) # Manage cache after batch processing if self.config.enable_caching: self._manage_cache() # Fill any remaining None values with defaults for i, result in enumerate(results): if result is None: results[i] = self._get_default_analysis(profiles[i]) return results except Exception as e: self.logger.error(f"Error in batch profile analysis: {e}") return [self._get_default_analysis(profile) for profile in profiles] def _parse_response(self, response: str, original_data: Dict[str, Any]) -> ProfileAnalysisResult: """Parse LLM response into structured data""" try: sections = response.split('\n') # Extract asset allocation asset_allocation = self._extract_asset_allocation(sections) geo_allocation = self._extract_geographic_allocation(sections) return ProfileAnalysisResult( risk_score=self._extract_risk_score(sections), investment_horizon=self._extract_horizon(sections), primary_goals=self._extract_goals(sections), constraints=self._extract_constraints(sections), recommendations=self._extract_recommendations(sections), explanation=self._extract_explanation(sections), risk_decomposition=self._extract_risk_decomposition(sections), goal_feasibility=self._extract_goal_feasibility(sections), investment_style=self._extract_investment_style(sections), market_sensitivity=self._extract_market_sensitivity(sections), alerts=self._extract_alerts(sections), attention_points=self._extract_attention_points(sections), market_context=self._extract_market_context(sections), adaptability_score=self._calculate_adaptability_score(sections), stress_test_results=self._extract_stress_test_results(sections), asset_allocation=asset_allocation, geographic_allocation=geo_allocation ) except Exception as e: self.logger.error(f"Error parsing response: {e}") return self._get_default_analysis(original_data) def _validate_results(self, analysis_result: ProfileAnalysisResult) -> bool: """Validate analysis results for consistency""" try: # Check essential values if not (0 <= analysis_result.risk_score <= 1): analysis_result.risk_score = min(max(analysis_result.risk_score, 0), 1) if analysis_result.investment_horizon <= 0: analysis_result.investment_horizon = 5 if not analysis_result.primary_goals: return False if not analysis_result.recommendations: return False # Check consistency between risk and horizon if analysis_result.risk_score > 0.7 and analysis_result.investment_horizon < 5: self.logger.warning("Inconsistency detected: high risk with short horizon") analysis_result.attention_points.append( "Attention: profil à risque élevé avec horizon court, prudence recommandée" ) return True except Exception as e: self.logger.error(f"Error validating analysis results: {e}") return False def _enrich_analysis(self, analysis: ProfileAnalysisResult, profile_data: Dict[str, Any]) -> ProfileAnalysisResult: """Enrich analysis with additional calculations""" try: # Calculate adjusted risk score base_risk_score = analysis.risk_score adjustments = [] # Age adjustment age = profile_data.get('age', 30) age_adjustment = max(0, (65 - age) / 65) * 0.2 adjustments.append(age_adjustment) # Knowledge adjustment knowledge = profile_data.get('financial_knowledge', 3) knowledge_adjustment = (knowledge - 3) * 0.1 adjustments.append(knowledge_adjustment) # Financial stability adjustment income = profile_data.get('annual_income', 0) debt = profile_data.get('total_debt', 0) if income > 0: debt_ratio = debt / income financial_adjustment = -0.1 if debt_ratio > 0.4 else 0.1 adjustments.append(financial_adjustment) # Goals diversification adjustment goals_diversification = min(len(profile_data.get('investment_goals', [])) * 0.05, 0.2) adjustments.append(goals_diversification) # Apply adjustments final_risk_score = base_risk_score for adj in adjustments: final_risk_score = max(0, min(1, final_risk_score + adj)) # Update analysis analysis.risk_score = final_risk_score analysis.risk_decomposition.update({ 'age_factor': age_adjustment, 'knowledge_factor': knowledge_adjustment, 'financial_stability': financial_adjustment if 'financial_adjustment' in locals() else 0, 'goals_diversification': goals_diversification }) # Add default allocation if missing if not analysis.asset_allocation: analysis.asset_allocation = self._generate_default_allocation(final_risk_score) if not analysis.geographic_allocation: analysis.geographic_allocation = self._generate_default_geo_allocation() # Calculate market adaptability score if not analysis.market_sensitivity: # Create default market sensitivity based on risk score analysis.market_sensitivity = { 'market_beta': final_risk_score * 0.8 + 0.2, 'interest_rate': max(0, 0.7 - final_risk_score * 0.4), 'economic_cycle': 0.4 + final_risk_score * 0.3, 'inflation': max(0, 0.6 - final_risk_score * 0.2) } # Calculate goal feasibility if missing if not analysis.goal_feasibility and profile_data.get('investment_objectives'): objectives = profile_data.get('investment_objectives') if hasattr(objectives, 'calculate_goal_feasibility'): monthly_contribution = profile_data.get('monthly_savings', 0) expected_return = 0.03 + final_risk_score * 0.04 # Simple expected return model analysis.goal_feasibility = objectives.calculate_goal_feasibility( monthly_contribution=monthly_contribution, expected_return=expected_return ) return analysis except Exception as e: self.logger.error(f"Error enriching analysis: {e}") return analysis def _get_default_analysis(self, profile_data: Dict[str, Any]) -> ProfileAnalysisResult: """Generate default analysis when LLM fails""" risk_score = profile_data.get('risk_tolerance', 3) / 5 return ProfileAnalysisResult( risk_score=risk_score, investment_horizon=profile_data.get('investment_horizon', 5), primary_goals=profile_data.get('investment_goals', ["Épargne générale"]), constraints={}, recommendations=[ "Construire un portefeuille diversifié", "Maintenir une réserve d'urgence", "Investir régulièrement" ], explanation="Analyse par défaut basée sur les données fournies.", asset_allocation=self._generate_default_allocation(risk_score), geographic_allocation=self._generate_default_geo_allocation() ) def _generate_default_allocation(self, risk_score: float) -> Dict[str, float]: """Generate default asset allocation based on risk score""" # Simple allocation model based on risk score stocks = risk_score bonds = (1 - risk_score) * 0.8 cash = (1 - risk_score) * 0.2 return { 'Actions': stocks, 'Obligations': bonds, 'Liquidités': cash } def _generate_default_geo_allocation(self) -> Dict[str, float]: """Generate default geographic allocation""" return { 'Europe': 0.4, 'Amérique du Nord': 0.3, 'Asie-Pacifique': 0.2, 'Marchés émergents': 0.1 } # Extraction methods for parsing LLM response def _extract_risk_score(self, sections: List[str]) -> float: """Extract risk score from text""" risk_line = next((s for s in sections if s.startswith("1. Score de risque")), "") try: matches = re.findall(r'(\d*\.?\d+)', risk_line) score = float(matches[0]) if matches else 0.5 return min(max(score, 0), 1) except (IndexError, ValueError): return 0.5 def _extract_horizon(self, sections: List[str]) -> int: """Extract investment horizon""" horizon_line = next((s for s in sections if s.startswith("2. Horizon")), "") try: matches = re.findall(r'(\d+)', horizon_line) return int(matches[0]) if matches else 5 except (IndexError, ValueError): return 5 def _extract_goals(self, sections: List[str]) -> List[str]: """Extract prioritized goals""" goals_section = next((s for s in sections if s.startswith("3. Objectifs")), "") if not goals_section: return [] try: goals_text = goals_section.split(":")[-1].strip() goals = re.split(r'[•\-\d]+\.?\s*', goals_text) return [g.strip() for g in goals if g.strip()] except Exception: return [] def _extract_constraints(self, sections: List[str]) -> Dict[str, Any]: """Extract constraints and limitations""" constraints_section = next((s for s in sections if s.startswith("4. Contraintes")), "") if not constraints_section: return {} constraints = {} try: text = constraints_section.split(":")[-1].strip() # Analyze key constraints if re.search(r'liquidité|liquide|court terme', text, re.I): constraints['liquidity_needed'] = True if re.search(r'esg|durable|responsable|éthique', text, re.I): constraints['esg_focus'] = True if re.search(r'revenu|dividende|rente', text, re.I): constraints['income_focus'] = True if re.search(r'protection|capital|garanti', text, re.I): constraints['capital_protection'] = True if re.search(r'fiscalité|impôt|tax', text, re.I): constraints['tax_optimization'] = True if re.search(r'succession|transmission|héritage', text, re.I): constraints['inheritance_planning'] = True return constraints except Exception: return {} def _extract_recommendations(self, sections: List[str]) -> List[str]: """Extract specific recommendations""" recommendations = [] rec_section = next((s for s in sections if s.startswith("8. Recommandations")), "") if rec_section: parts = re.split(r'[•\-\d]+\.?\s*', rec_section) for part in parts: clean_part = part.strip() if clean_part and not clean_part.startswith("8. Recommandations"): recommendations.append(clean_part) # If no recommendations found, check action plan if not recommendations: plan_section = next((s for s in sections if s.startswith("7. Plan d'action")), "") if plan_section: parts = re.split(r'[•\-\d]+\.?\s*', plan_section) for part in parts: clean_part = part.strip() if clean_part and not clean_part.startswith("7. Plan"): recommendations.append(clean_part) # Default recommendations if still empty if not recommendations: recommendations = [ "Diversifier le portefeuille selon le profil de risque", "Maintenir une épargne de sécurité", "Adopter une stratégie d'investissement régulier" ] return recommendations def _extract_explanation(self, sections: List[str]) -> str: """Extract detailed explanation""" explanation_section = next((s for s in sections if s.startswith("9. Explication")), "") if explanation_section: explanation = explanation_section.split(":", 1)[-1].strip() if explanation: return explanation # Build explanation from multiple sections if not found combined_explanation = [] for section_num in [1, 2, 3, 4, 8]: section = next((s for s in sections if s.startswith(f"{section_num}.")), "") if section: combined_explanation.append(section) if combined_explanation: return "\n\n".join(combined_explanation) return "Analyse basée sur le profil d'investisseur fourni." def _extract_asset_allocation(self, sections: List[str]) -> Dict[str, float]: """Extract asset allocation recommendations""" allocation_section = next((s for s in sections if "classe d'actifs" in s.lower()), "") allocation = {} if allocation_section: matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', allocation_section) for match in matches: asset_class = match.group(1).strip() percentage = float(match.group(2).replace(',', '.')) allocation[asset_class] = percentage / 100 # Look elsewhere if not found if not allocation: for section in sections: if re.search(r'allocation|répartition|portefeuille', section, re.I): matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', section) for match in matches: asset_class = match.group(1).strip() percentage = float(match.group(2).replace(',', '.')) allocation[asset_class] = percentage / 100 # Normalize weights if needed total = sum(allocation.values()) if 0 < total < 0.99 or total > 1.01: allocation = {k: v/total for k, v in allocation.items()} return allocation def _extract_geographic_allocation(self, sections: List[str]) -> Dict[str, float]: """Extract geographic allocation recommendations""" geo_section = next((s for s in sections if "géographique" in s.lower()), "") allocation = {} if geo_section: matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', geo_section) for match in matches: region = match.group(1).strip() percentage = float(match.group(2).replace(',', '.')) allocation[region] = percentage / 100 # Look elsewhere if not found if not allocation: for section in sections: if re.search(r'zone|région|international|pays', section, re.I): matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', section) for match in matches: region = match.group(1).strip() percentage = float(match.group(2).replace(',', '.')) allocation[region] = percentage / 100 # Normalize weights if needed total = sum(allocation.values()) if 0 < total < 0.99 or total > 1.01: allocation = {k: v/total for k, v in allocation.items()} return allocation def _extract_risk_decomposition(self, sections: List[str]) -> Dict[str, float]: """Extract risk decomposition""" risk_section = next((s for s in sections if "Analyse des risques" in s), "") decomposition = {} if risk_section: try: market_risk = re.search(r'marché[^\d]*(\d+(?:\.\d+)?)', risk_section) if market_risk: decomposition['market_risk'] = float(market_risk.group(1)) / 100 if float(market_risk.group(1)) > 1 else float(market_risk.group(1)) credit_risk = re.search(r'crédit[^\d]*(\d+(?:\.\d+)?)', risk_section) if credit_risk: decomposition['credit_risk'] = float(credit_risk.group(1)) / 100 if float(credit_risk.group(1)) > 1 else float(credit_risk.group(1)) liquidity_risk = re.search(r'liquidité[^\d]*(\d+(?:\.\d+)?)', risk_section) if liquidity_risk: decomposition['liquidity_risk'] = float(liquidity_risk.group(1)) / 100 if float(liquidity_risk.group(1)) > 1 else float(liquidity_risk.group(1)) operational_risk = re.search(r'opérationnel[^\d]*(\d+(?:\.\d+)?)', risk_section) if operational_risk: decomposition['operational_risk'] = float(operational_risk.group(1)) / 100 if float(operational_risk.group(1)) > 1 else float(operational_risk.group(1)) except ValueError: pass # Default values if nothing found if not decomposition: decomposition = { 'market_risk': 0.4, 'credit_risk': 0.2, 'liquidity_risk': 0.2, 'operational_risk': 0.2 } return decomposition def _extract_goal_feasibility(self, sections: List[str]) -> Dict[str, float]: """Extract goal feasibility assessment""" goals_section = next((s for s in sections if "Objectifs prioritaires" in s), "") feasibility = {} if goals_section: goal_matches = re.finditer(r'([^:]+):\s*(\d+(?:\.\d+)?)', goals_section) for match in goal_matches: goal = match.group(1).strip() score = float(match.group(2)) feasibility[goal] = min(max(score / 100 if score > 1 else score, 0), 1) return feasibility def _extract_investment_style(self, sections: List[str]) -> Dict[str, float]: """Determine recommended investment style""" style_weights = { 'value': 0.0, 'growth': 0.0, 'income': 0.0, 'momentum': 0.0, 'quality': 0.0 } recommendations = next((s for s in sections if "Recommandations" in s), "") if recommendations: style_weights['value'] = len(re.findall(r'valeur|sous-évalué|value', recommendations, re.I)) * 0.2 style_weights['growth'] = len(re.findall(r'croissance|growth|innovation', recommendations, re.I)) * 0.2 style_weights['income'] = len(re.findall(r'revenu|dividende|income', recommendations, re.I)) * 0.2 style_weights['momentum'] = len(re.findall(r'momentum|tendance|trend', recommendations, re.I)) * 0.2 style_weights['quality'] = len(re.findall(r'qualité|quality|solide', recommendations, re.I)) * 0.2 # Normalize weights total = sum(style_weights.values()) if total > 0: style_weights = {k: v/total for k, v in style_weights.items()} return style_weights def _extract_market_sensitivity(self, sections: List[str]) -> Dict[str, float]: """Analyze sensitivity to market conditions""" sensitivity = { 'market_beta': 0.0, 'interest_rate': 0.0, 'economic_cycle': 0.0, 'inflation': 0.0 } analysis_text = "\n".join(sections) # Analyze mentions and context if 'beta' in analysis_text.lower(): beta_match = re.search(r'beta[^\d]*(\d+(?:\.\d+)?)', analysis_text, re.I) if beta_match: sensitivity['market_beta'] = float(beta_match.group(1)) if 'taux' in analysis_text.lower(): sensitivity['interest_rate'] = len(re.findall(r'taux|rate', analysis_text, re.I)) * 0.1 if 'cycle' in analysis_text.lower(): sensitivity['economic_cycle'] = len(re.findall(r'cycle|économique', analysis_text, re.I)) * 0.1 if 'inflation' in analysis_text.lower(): sensitivity['inflation'] = len(re.findall(r'inflation|prix', analysis_text, re.I)) * 0.1 # Normalize sensitivities max_value = max(sensitivity.values()) if max_value > 0: sensitivity = {k: v/max_value for k, v in sensitivity.items()} return sensitivity def _extract_alerts(self, sections: List[str]) -> List[Dict[str, Any]]: """Extract alerts and critical attention points""" alerts = [] attention_section = next((s for s in sections if "Points de vigilance" in s), "") if attention_section: alert_patterns = [ (r'(!+|ATTENTION|ALERTE)[^\n]*', 'critical'), (r'attention|vigilance[^\n]*', 'warning'), (r'noter|remarquer[^\n]*', 'info') ] for pattern, severity in alert_patterns: matches = re.finditer(pattern, attention_section, re.I) for match in matches: alerts.append({ 'severity': severity, 'message': match.group(0).strip(), 'timestamp': datetime.now().isoformat() }) return alerts def _extract_attention_points(self, sections: List[str]) -> List[str]: """Extract specific attention points""" attention_points = [] attention_section = next((s for s in sections if "Points d'attention" in s or "Points de vigilance" in s), "") if attention_section: points = re.split(r'[•\-\d]+\.?\s*', attention_section) attention_points = [p.strip() for p in points if p.strip() and not p.startswith("Points")] return attention_points def _extract_market_context(self, sections: List[str]) -> Dict[str, Any]: """Extract relevant market context""" context = {} market_section = next((s for s in sections if "Contexte de marché" in s or "conditions de marché" in s), "") if market_section: if re.search(r'baiss|bear|négatif', market_section, re.I): context['market_regime'] = 'bear' elif re.search(r'hauss|bull|positif', market_section, re.I): context['market_regime'] = 'bull' else: context['market_regime'] = 'neutral' # Volatility analysis if re.search(r'volatile|instable', market_section, re.I): context['volatility'] = 'high' elif re.search(r'stable|calme', market_section, re.I): context['volatility'] = 'low' else: context['volatility'] = 'moderate' # Market sentiment context['sentiment'] = self._analyze_market_sentiment(market_section) return context def _calculate_adaptability_score(self, sections: List[str]) -> float: """Calculate profile adaptability score""" score = 0.5 # Base score text = "\n".join(sections) # Positive factors positive_factors = [ (r'flex|adapt|ajust', 0.1), (r'divers|équilibr', 0.1), (r'long terme|patient', 0.1), (r'expérience|connaissance', 0.1), (r'stable|régulier', 0.1) ] # Negative factors negative_factors = [ (r'rigid|fix|strict', -0.1), (r'risqu|peur|craint', -0.1), (r'court terme|urgent', -0.1), (r'limité|contraint', -0.1), (r'instable|irrégulier', -0.1) ] # Calculate score for pattern, weight in positive_factors + negative_factors: if re.search(pattern, text, re.I): score = min(max(score + weight, 0), 1) return score def _analyze_market_sentiment(self, text: str) -> str: """Analyze market sentiment""" positive_count = len(re.findall(r'positif|optimiste|favorable|hausse', text, re.I)) negative_count = len(re.findall(r'négatif|pessimiste|défavorable|baisse', text, re.I)) if positive_count > negative_count: return 'positive' elif negative_count > positive_count: return 'negative' return 'neutral' def _extract_stress_test_results(self, sections: List[str]) -> Dict[str, float]: """Extract stress test results""" results = {} stress_section = next((s for s in sections if "Stress tests" in s), "") if stress_section: scenarios = re.finditer(r'([^:]+):\s*-?\d+(?:\.\d+)?%', stress_section) for scenario in scenarios: name = scenario.group(1).strip() try: impact = float(re.search(r'-?\d+(?:\.\d+)?', scenario.group(0)).group(0)) results[name] = impact / 100 # Convert to decimal except (ValueError, AttributeError): continue # Default scenarios if nothing found if not results: results = { 'market_crash': -0.30, 'interest_rate_shock': -0.15, 'currency_crisis': -0.20, 'inflation_shock': -0.10 } return results async def generate_profile_description(self, profile_data: Dict[str, Any]) -> str: """Generate detailed profile description""" try: # Prepare prompt prompt = self.llm.prepare_description_prompt(profile_data) # Generate description description = await self.llm.generate_text(prompt) # Clean and format description = description.replace("[/INST]", "").strip() return description except Exception as e: self.logger.error(f"Error generating profile description: {e}") return self._generate_default_description(profile_data) def _generate_default_description(self, profile_data: Dict[str, Any]) -> str: """Generate default description if generation fails""" age = profile_data.get('age', 'Non spécifié') income = profile_data.get('annual_income', 'Non spécifié') goals = ", ".join(profile_data.get('investment_goals', ["épargne générale"])) risk = profile_data.get('risk_tolerance', 3) horizon = profile_data.get('investment_horizon', 5) risk_profile = "prudent" if risk <= 2 else "modéré" if risk <= 3 else "dynamique" return f"""Profil investisseur de {age} ans avec un revenu annuel de {income}€. Ce profil présente une tolérance au risque {risk_profile} (niveau {risk}/5) et un horizon d'investissement de {horizon} ans. Ses principaux objectifs sont: {goals}. Une stratégie d'investissement diversifiée est recommandée, adaptée à son niveau de risque {risk_profile}. L'allocation d'actifs devra tenir compte de ses objectifs spécifiques tout en respectant son horizon d'investissement. """ class DeepVestProfiler: """Advanced investor profiler with fine-tuned AI integration""" def __init__(self, config: Optional[DeepVestConfig] = None): """Initialize profiler with configuration""" self.config = config or DeepVestConfig() self.logger = logging.getLogger("DeepVest.Profiler") self.llm_analyzer = LLMProfileAnalyzer(self.config) self.profiles = {} # In-memory profile storage # Initialize database directory if persistent storage is enabled if self.config.use_persistent_storage: self._ensure_db_directory() def _ensure_db_directory(self): """Ensure database directory exists""" if not os.path.exists(self.config.db_path): os.makedirs(self.config.db_path) self.logger.info(f"Created database directory: {self.config.db_path}") async def create_profile(self, profile_data: Dict[str, Any]) -> PersonalProfile: """Create complete investor profile with LLM analysis""" try: self.logger.info("Creating new investor profile") # Process input data and handle French field names processed_data = profile_data.copy() # Map French field names for monthly savings if 'épargne_mensuelle' in processed_data: processed_data['monthly_savings'] = processed_data.pop('épargne_mensuelle') elif 'épargne mensuelle' in processed_data: processed_data['monthly_savings'] = processed_data.pop('épargne mensuelle') # Perform LLM analysis llm_analysis = await self.llm_analyzer.analyze_profile(processed_data) # Generate profile description profile_description = await self.llm_analyzer.generate_profile_description(processed_data) # Build profile profile = PersonalProfile( # Identifiers id=processed_data.get('id'), # Base profile risk_tolerance=processed_data.get('risk_tolerance', 3), investment_horizon=processed_data.get('investment_horizon', 5), income_stability=processed_data.get('income_stability', 3), financial_knowledge=processed_data.get('financial_knowledge', 3), investment_goals=processed_data.get('investment_goals', []), esg_preferences=processed_data.get('esg_preferences', False), constraints=processed_data.get('constraints', {}), # Personal information age=processed_data.get('age', 30), annual_income=processed_data.get('annual_income', 0), total_assets=processed_data.get('total_assets', 0), total_debt=processed_data.get('total_debt', 0), monthly_savings=processed_data.get('monthly_savings', 0), # Added monthly_savings occupation=processed_data.get('occupation', ''), industry=processed_data.get('industry', ''), education_level=processed_data.get('education_level', ''), marital_status=processed_data.get('marital_status', 'single'), number_of_dependents=processed_data.get('number_of_dependents', processed_data.get('dependents', 0)), # LLM analysis results llm_analysis_results=llm_analysis, risk_score=llm_analysis.risk_score, adaptability_score=llm_analysis.adaptability_score, complexity_tolerance=processed_data.get('complexity_tolerance', 0.5), profile_description=profile_description ) # Add detailed analyses await self._add_detailed_analysis(profile, processed_data) # Validate profile profile._validate_profile() # Calculate comprehensive risk metrics profile.calculate_comprehensive_risk_metrics() # Store in memory self.profiles[profile.id] = profile # Store in database if enabled if self.config.use_persistent_storage: self._save_profile_to_db(profile) self.logger.info(f"Profile created successfully with ID: {profile.id}") return profile except Exception as e: self.logger.error(f"Error creating profile: {e}") raise async def _add_detailed_analysis(self, profile: PersonalProfile, data: Dict[str, Any]): """Add detailed analyses to profile""" # Financial analysis try: profile.financial_analysis = FinancialAnalysis( income_stability=data.get('income_stability', 3) / 5, debt_ratio=profile.total_debt / profile.annual_income if profile.annual_income > 0 else 0, savings_capacity=data.get('monthly_savings', 0) / data.get('annual_income', 1) * 12 if data.get('annual_income', 0) > 0 else 0, emergency_fund_months=data.get('emergency_fund_months', 3), risk_capacity=profile.llm_analysis_results.risk_score if profile.llm_analysis_results else 0.5, investment_horizon=profile.investment_horizon, liquidity_needs=data.get('liquidity_needs', 0.2), income_sources=data.get('income_sources', {}), fixed_expenses=data.get('fixed_expenses', {}), variable_expenses=data.get('variable_expenses', {}) ) except Exception as e: self.logger.warning(f"Error creating detailed financial analysis: {e}") # Family analysis try: profile.family_analysis = FamilyAnalysis( family_size=1 + (1 if data.get('marital_status') in ['married', 'partnered'] else 0) + data.get('dependents', 0), dependents=data.get('dependents', 0), life_stage=self._determine_life_stage(profile.age), future_events=data.get('future_events', []), monthly_obligations=data.get('monthly_obligations', 0), insurance_coverage=data.get('insurance_coverage', {}), education_needs=data.get('education_needs', {}), healthcare_needs=data.get('healthcare_needs', {}) ) except Exception as e: self.logger.warning(f"Error creating detailed family analysis: {e}") # Investment objectives try: profile.investment_objectives = InvestmentGoals( primary_goals=[{'name': goal, 'priority': i+1} for i, goal in enumerate(profile.investment_goals)], timeframes=data.get('goal_timeframes', {}), required_returns=data.get('required_returns', {}), priority_order=profile.investment_goals, target_amounts=data.get('goal_amounts', {}), goal_progress=data.get('goal_progress', {}), risk_tolerance_by_goal=data.get('risk_by_goal', {}) ) except Exception as e: self.logger.warning(f"Error creating detailed investment objectives: {e}") def _determine_life_stage(self, age: int) -> str: """Determine life stage based on age""" if age < 25: return "early_career" elif age < 35: return "career_building" elif age < 45: return "family_building" elif age < 55: return "peak_earning" elif age < 65: return "pre_retirement" else: return "retirement" def _save_profile_to_db(self, profile: PersonalProfile): """Save profile to persistent storage""" try: # Generate filename filename = f"{profile.id}.json" filepath = os.path.join(self.config.db_path, filename) # Export as JSON profile.export_json(filepath, include_detailed_analysis=True) self.logger.info(f"Profile saved to database: {filepath}") except Exception as e: self.logger.error(f"Error saving profile to database: {e}") async def get_profile(self, profile_id: str) -> Optional[PersonalProfile]: """Récupérer un profil par son ID""" try: # Check in-memory cache first if profile_id in self.profiles: self.logger.debug(f"Profile {profile_id} found in memory cache") return self.profiles[profile_id] # If not in memory and persistent storage is enabled, try to load from disk if self.config.use_persistent_storage: filepath = os.path.join(self.config.db_path, f"{profile_id}.json") if os.path.exists(filepath): self.logger.info(f"Loading profile {profile_id} from disk storage") profile = PersonalProfile.from_json(filepath) # Add to memory cache self.profiles[profile_id] = profile return profile self.logger.warning(f"Profile {profile_id} not found") return None except Exception as e: self.logger.error(f"Error retrieving profile {profile_id}: {e}") return None async def update_profile(self, profile_id: str, updates: Dict[str, Any]) -> PersonalProfile: """Met à jour un profil existant avec de nouvelles informations""" try: # Récupérer le profil existant profile = await self.get_profile(profile_id) if not profile: raise ValueError(f"Profile with ID {profile_id} not found") # Process updates for French field names processed_updates = updates.copy() if 'épargne_mensuelle' in processed_updates: processed_updates['monthly_savings'] = processed_updates.pop('épargne_mensuelle') elif 'épargne mensuelle' in processed_updates: processed_updates['monthly_savings'] = processed_updates.pop('épargne mensuelle') # Use dataclasses.asdict to properly convert dataclass to dict from dataclasses import asdict profile_dict = asdict(profile) # Remove fields that shouldn't be passed to constructor fields_to_remove = ['basic_info'] for field in fields_to_remove: if field in profile_dict: del profile_dict[field] # Apply the updates for key, value in processed_updates.items(): if key in profile_dict: profile_dict[key] = value # Recréer un objet PersonalProfile à partir du dictionnaire mis à jour # We need to handle potential unexpected arguments # Use only the fields that PersonalProfile accepts from inspect import signature profile_params = signature(PersonalProfile).parameters filtered_dict = {k: v for k, v in profile_dict.items() if k in profile_params} updated_profile = PersonalProfile(**filtered_dict) # Validate the updated profile if hasattr(updated_profile, '_validate_profile'): updated_profile._validate_profile() # Reanalyze the profile analysis_result = await self.analyzer.analyze_profile(updated_profile) updated_profile.llm_analysis_results = analysis_result # Sauvegarder le profil mis à jour await self._save_profile(updated_profile) self.logger.info(f"Profile {profile_id} updated successfully") return updated_profile except Exception as e: self.logger.error(f"Error updating profile {profile_id}: {str(e)}") raise def _are_changes_significant(self, updates: Dict[str, Any]) -> bool: """Determine if profile changes require re-analysis""" significant_fields = { 'risk_tolerance', 'investment_horizon', 'age', 'annual_income', 'total_assets', 'total_debt', 'investment_goals', 'marital_status', 'number_of_dependents', 'esg_preferences' } return any(field in updates for field in significant_fields) async def delete_profile(self, profile_id: str) -> bool: """Supprimer un profil""" try: # Remove from memory if profile_id in self.profiles: del self.profiles[profile_id] # Remove from persistent storage if enabled if self.config.use_persistent_storage: filepath = os.path.join(self.config.db_path, f"{profile_id}.json") if os.path.exists(filepath): os.remove(filepath) self.logger.info(f"Profile {profile_id} deleted successfully") return True except Exception as e: self.logger.error(f"Error deleting profile {profile_id}: {e}") return False async def list_profiles(self, filter_criteria: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """Liste tous les profils disponibles avec critères de filtrage optionnels""" try: profiles_list = [] # Load all profiles from disk if persistent storage is enabled if self.config.use_persistent_storage: self._load_all_profiles_to_memory() # Filter and convert profiles to basic info dictionaries for profile_id, profile in self.profiles.items(): if filter_criteria and not self._profile_matches_criteria(profile, filter_criteria): continue # Extract basic profile info profile_info = { 'id': profile.id, 'age': profile.age, 'risk_score': profile.risk_score, 'investment_horizon': profile.investment_horizon, 'creation_date': profile.created_at.isoformat(), 'last_updated': profile.last_updated.isoformat(), 'goals': profile.investment_goals } profiles_list.append(profile_info) return profiles_list except Exception as e: self.logger.error(f"Error listing profiles: {e}") return [] def _profile_matches_criteria(self, profile: PersonalProfile, criteria: Dict[str, Any]) -> bool: """Vérifie si un profil correspond aux critères de filtre""" for field, value in criteria.items(): if not hasattr(profile, field): return False profile_value = getattr(profile, field) # Handle range criteria if isinstance(value, dict) and ('min' in value or 'max' in value): if 'min' in value and profile_value < value['min']: return False if 'max' in value and profile_value > value['max']: return False # Handle list criteria (any match) elif isinstance(value, list): if profile_value not in value: return False # Direct comparison elif profile_value != value: return False return True def _load_all_profiles_to_memory(self): """Charge tous les profils depuis le stockage persistant vers la mémoire""" if not self.config.use_persistent_storage: return try: # Get all JSON files in the database directory db_path = self.config.db_path if not os.path.exists(db_path): self.logger.warning(f"Database directory {db_path} not found") return json_files = [f for f in os.listdir(db_path) if f.endswith('.json')] self.logger.info(f"Found {len(json_files)} profile files in database") # Load each file that isn't already in memory for json_file in json_files: profile_id = json_file.replace('.json', '') if profile_id not in self.profiles: filepath = os.path.join(db_path, json_file) try: profile = PersonalProfile.from_json(filepath) self.profiles[profile_id] = profile except Exception as e: self.logger.error(f"Error loading profile from {filepath}: {e}") except Exception as e: self.logger.error(f"Error loading profiles from disk: {e}") async def batch_analyze_profiles(self, profile_ids: List[str]) -> Dict[str, Any]: """Analyse par lots de plusieurs profils pour détection de tendances""" try: profiles = [] profile_map = {} # Load all requested profiles for profile_id in profile_ids: profile = await self.get_profile(profile_id) if profile: profiles.append(profile) profile_map[profile_id] = profile if not profiles: self.logger.warning("No valid profiles found for batch analysis") return {} # Extract common trends and insights age_groups = self._group_profiles_by_age(profiles) risk_groups = self._group_profiles_by_risk(profiles) goal_trends = self._analyze_goal_trends(profiles) # Perform clustering if we have enough profiles clusters = {} if len(profiles) >= 5: clusters = await self._cluster_profiles(profiles) return { 'age_groups': age_groups, 'risk_distribution': risk_groups, 'goal_trends': goal_trends, 'profile_clusters': clusters, 'total_profiles_analyzed': len(profiles) } except Exception as e: self.logger.error(f"Error in batch profile analysis: {e}") return {'error': str(e)} def _group_profiles_by_age(self, profiles: List[PersonalProfile]) -> Dict[str, int]: """Groupe les profils par tranche d'âge""" age_groups = { '18-25': 0, '26-35': 0, '36-45': 0, '46-55': 0, '56-65': 0, '65+': 0 } for profile in profiles: age = profile.age if age <= 25: age_groups['18-25'] += 1 elif age <= 35: age_groups['26-35'] += 1 elif age <= 45: age_groups['36-45'] += 1 elif age <= 55: age_groups['46-55'] += 1 elif age <= 65: age_groups['56-65'] += 1 else: age_groups['65+'] += 1 return age_groups def _group_profiles_by_risk(self, profiles: List[PersonalProfile]) -> Dict[str, int]: """Groupe les profils par niveau de risque""" risk_groups = { 'Très conservateur': 0, 'Conservateur': 0, 'Modéré': 0, 'Dynamique': 0, 'Très dynamique': 0 } for profile in profiles: risk_score = profile.risk_score if risk_score < 0.2: risk_groups['Très conservateur'] += 1 elif risk_score < 0.4: risk_groups['Conservateur'] += 1 elif risk_score < 0.6: risk_groups['Modéré'] += 1 elif risk_score < 0.8: risk_groups['Dynamique'] += 1 else: risk_groups['Très dynamique'] += 1 return risk_groups def _analyze_goal_trends(self, profiles: List[PersonalProfile]) -> Dict[str, Dict[str, Any]]: """Analyse les tendances dans les objectifs d'investissement""" all_goals = {} # Count goals for profile in profiles: for goal in profile.investment_goals: if goal not in all_goals: all_goals[goal] = { 'count': 0, 'avg_risk_score': 0, 'avg_investment_horizon': 0, 'profiles': [] } all_goals[goal]['count'] += 1 all_goals[goal]['avg_risk_score'] += profile.risk_score all_goals[goal]['avg_investment_horizon'] += profile.investment_horizon all_goals[goal]['profiles'].append(profile.id) # Calculate averages for goal, data in all_goals.items(): count = data['count'] if count > 0: data['avg_risk_score'] /= count data['avg_investment_horizon'] /= count data['percentage'] = (count / len(profiles)) * 100 return all_goals async def _cluster_profiles(self, profiles: List[PersonalProfile]) -> Dict[str, List[str]]: """Regroupe les profils en clusters basés sur leurs caractéristiques""" try: # Extract features for clustering features = [] profile_ids = [] for profile in profiles: # Create feature vector feature_vector = [ profile.risk_score, profile.investment_horizon / 10, # Normalize horizon profile.age / 100, # Normalize age profile.financial_knowledge / 5, # Normalize knowledge profile.income_stability / 5, # Normalize stability profile.total_assets / 1000000 if profile.total_assets > 0 else 0, # Normalize assets profile.total_debt / 1000000 if profile.total_debt > 0 else 0, # Normalize debt len(profile.investment_goals) / 5 # Normalize goal count ] features.append(feature_vector) profile_ids.append(profile.id) # Skip clustering if too few profiles if len(features) < 3: return {"cluster_1": profile_ids} # Perform k-means clustering from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler # Standardize features scaler = StandardScaler() features_scaled = scaler.fit_transform(features) # Determine optimal number of clusters (between 2 and 5) n_clusters = min(max(2, len(profiles) // 3), 5) # Perform clustering kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) clusters = kmeans.fit_predict(features_scaled) # Organize profiles by cluster cluster_dict = {} for i, cluster_id in enumerate(clusters): cluster_name = f"cluster_{cluster_id + 1}" if cluster_name not in cluster_dict: cluster_dict[cluster_name] = [] cluster_dict[cluster_name].append(profile_ids[i]) return cluster_dict except Exception as e: self.logger.error(f"Error in profile clustering: {e}") return {"cluster_error": [p.id for p in profiles]} def generate_asset_allocation(self, risk_score: float, investment_horizon: int, esg_preferences: bool = False) -> Dict[str, Any]: """Génère une allocation d'actifs basée sur le profil de risque et l'horizon Args: risk_score: Score de risque (1-5 ou 0-1) investment_horizon: Horizon d'investissement en années esg_preferences: Indique si l'investisseur a des préférences ESG Returns: Dictionnaire contenant l'allocation d'actifs et le profil de risque """ try: # Normaliser le score de risque entre 0 et 1 s'il est sur une échelle différente normalized_risk = risk_score if normalized_risk > 1: # Si le score est sur une échelle de 1-5 normalized_risk = risk_score / 5.0 # Définir les catégories de risque risk_category = "" if normalized_risk < 0.2: risk_category = "Très conservateur" elif normalized_risk < 0.4: risk_category = "Conservateur" elif normalized_risk < 0.6: risk_category = "Modéré" elif normalized_risk < 0.8: risk_category = "Dynamique" else: risk_category = "Très dynamique" # Allocation de base selon le profil de risque allocation = {} if risk_category == "Très conservateur": allocation = { 'Actions': 0.15, 'Obligations': 0.65, 'Liquidités': 0.20 } elif risk_category == "Conservateur": allocation = { 'Actions': 0.30, 'Obligations': 0.55, 'Liquidités': 0.15 } elif risk_category == "Modéré": allocation = { 'Actions': 0.50, 'Obligations': 0.40, 'Liquidités': 0.10 } elif risk_category == "Dynamique": allocation = { 'Actions': 0.70, 'Obligations': 0.25, 'Liquidités': 0.05 } else: # Très dynamique allocation = { 'Actions': 0.85, 'Obligations': 0.10, 'Liquidités': 0.05 } # Ajustement selon l'horizon horizon_factor = 0 if investment_horizon < 5: # Réduire les actions pour les horizons courts horizon_factor = -0.15 elif investment_horizon > 15: # Augmenter les actions pour les horizons longs horizon_factor = 0.10 # Appliquer l'ajustement basé sur l'horizon if horizon_factor != 0: action_shift = min(allocation['Actions'], abs(horizon_factor)) * (1 if horizon_factor > 0 else -1) allocation['Actions'] += action_shift # Répartir le changement sur les obligations et liquidités remaining_shift = -action_shift obligation_shift = remaining_shift * 0.7 liquidity_shift = remaining_shift * 0.3 allocation['Obligations'] += obligation_shift allocation['Liquidités'] += liquidity_shift # Ajustement pour les préférences ESG if esg_preferences: esg_allocation = min(allocation['Actions'], 0.3) allocation['Actions'] -= esg_allocation allocation['Actions ESG'] = esg_allocation # S'assurer que tous les pourcentages sont positifs allocation = {k: max(0, v) for k, v in allocation.items()} # Normaliser pour que la somme soit égale à 1 total = sum(allocation.values()) allocation = {k: v / total for k, v in allocation.items()} # Retourner l'allocation avec la catégorie de risque return { 'asset_allocation': allocation, 'risk_profile': { 'score': normalized_risk, 'category': risk_category } } except Exception as e: self.logger.error(f"Error generating asset allocation: {str(e)}") # Allocation par défaut en cas d'erreur return { 'asset_allocation': {'Actions': 0.6, 'Obligations': 0.3, 'Liquidités': 0.1}, 'risk_profile': {'score': 0.5, 'category': 'Modéré'} } async def generate_investment_strategy(self, profile: PersonalProfile) -> Dict[str, Any]: """Génère une stratégie d'investissement personnalisée pour un profil""" try: # Vérifier que le profil est valide if not profile or not profile.id: raise ValueError("Invalid profile") # Obtenir monthly_savings avec fallback pour éviter les erreurs monthly_savings = getattr(profile, 'monthly_savings', 0) if monthly_savings == 0 and hasattr(profile, 'financial_analysis') and hasattr(profile.financial_analysis, 'monthly_savings'): monthly_savings = profile.financial_analysis.monthly_savings # Calculer l'allocation d'actifs risk_score = profile.risk_tolerance if 1 <= profile.risk_tolerance <= 5 else 3 asset_allocation = self.generate_asset_allocation( risk_score=risk_score, investment_horizon=profile.investment_horizon, esg_preferences=profile.esg_preferences ) # Personnaliser les recommandations en fonction du profil recommendations = [] # Recommandation de base pour tous recommendations.append("Diversifier le portefeuille selon le profil de risque") # Recommandations spécifiques selon le profil if profile.age < 30: recommendations.append("Privilégier les investissements à fort potentiel de croissance") recommendations.append("Adopter une stratégie d'investissement régulier (DCA)") elif profile.age < 50: recommendations.append("Équilibrer croissance et sécurité dans votre allocation") recommendations.append("Réviser annuellement votre stratégie fiscale") else: recommendations.append("Sécuriser progressivement votre capital") recommendations.append("Optimiser votre stratégie en vue de la retraite") # Recommandations selon les objectifs if "Retraite" in profile.investment_goals: recommendations.append("Maximiser les versements sur votre plan de retraite") if "Achat immobilier" in profile.investment_goals: recommendations.append("Constituer un apport pour votre projet immobilier") if "Études des enfants" in profile.investment_goals: recommendations.append("Mettre en place une épargne dédiée aux études des enfants") # Recommandations spécifiques à la situation financière if profile.total_debt > profile.annual_income: recommendations.append("Prioriser le remboursement des dettes à taux élevé") if monthly_savings < (profile.annual_income / 12) * 0.1: recommendations.append("Augmenter votre taux d'épargne mensuel") # Limiter à 5 recommandations principales if len(recommendations) > 5: recommendations = recommendations[:5] # Construire la stratégie complète strategy = { 'profile_id': profile.id, 'timestamp': datetime.now().isoformat(), 'risk_profile': asset_allocation['risk_profile'], 'asset_allocation': asset_allocation['asset_allocation'], 'recommendations': recommendations, 'targets': { 'monthly_savings': max(profile.annual_income * 0.15 / 12, monthly_savings), 'emergency_fund': profile.annual_income / 6 }, 'investment_products': self._recommend_products(profile, asset_allocation) } self.logger.info(f"Generated investment strategy for profile {profile.id}") return strategy except Exception as e: self.logger.error(f"Error generating investment strategy: {str(e)}") # Stratégie par défaut en cas d'erreur return { 'risk_profile': {'category': 'Modéré'}, 'asset_allocation': {'Actions': 0.6, 'Obligations': 0.3, 'Liquidités': 0.1}, 'recommendations': [ "Diversifier le portefeuille selon le profil de risque", "Maintenir une épargne de sécurité", "Adopter une stratégie d'investissement régulier" ] } def _refine_asset_allocation(self, allocation: Dict[str, float], profile: PersonalProfile) -> Dict[str, float]: """Raffine l'allocation d'actifs basée sur des contraintes supplémentaires""" refined = allocation.copy() # Apply ESG preference adjustments if profile.esg_preferences and 'Actions' in refined: # Reduce regular stocks and add ESG stocks regular_stocks = refined.get('Actions', 0) if regular_stocks > 0.3: refined['Actions'] = regular_stocks * 0.7 refined['Actions ESG'] = regular_stocks * 0.3 # Apply liquidity needs adjustment if hasattr(profile, 'financial_analysis') and profile.financial_analysis: liquidity_needs = profile.financial_analysis.liquidity_needs if liquidity_needs > 0.3 and 'Liquidités' in refined: # Ensure minimum cash allocation min_cash = max(0.15, liquidity_needs) if refined['Liquidités'] < min_cash: # Reduce other allocations proportionally shortfall = min_cash - refined['Liquidités'] for asset, weight in list(refined.items()): if asset != 'Liquidités': reduction = weight * (shortfall / (1 - refined['Liquidités'])) refined[asset] = max(0, weight - reduction) refined['Liquidités'] = min_cash # Normalize to ensure sum is 1.0 total = sum(refined.values()) if abs(total - 1.0) > 0.01: refined = {k: v/total for k, v in refined.items()} return refined def _refine_geographic_allocation(self, allocation: Dict[str, float], profile: PersonalProfile) -> Dict[str, float]: """Raffine l'allocation géographique basée sur des contraintes supplémentaires""" # Use provided allocation or default if not allocation: allocation = { 'Europe': 0.4, 'Amérique du Nord': 0.3, 'Asie-Pacifique': 0.2, 'Pays émergents': 0.1 } refined = allocation.copy() # Apply profile-specific adjustments if profile.risk_score < 0.3: # More conservative profile - reduce emerging markets if 'Pays émergents' in refined: emerging = refined['Pays émergents'] refined['Pays émergents'] = emerging * 0.5 # Redistribute to developed markets for region in ['Europe', 'Amérique du Nord']: if region in refined: refined[region] += emerging * 0.25 elif profile.risk_score > 0.7: # More aggressive profile - increase emerging markets if 'Pays émergents' in refined and refined['Pays émergents'] < 0.2: increase = min(0.1, 0.2 - refined['Pays émergents']) refined['Pays émergents'] += increase # Take from developed markets for region in ['Europe', 'Amérique du Nord']: if region in refined: refined[region] -= increase / 2 # Normalize to ensure sum is 1.0 total = sum(refined.values()) if abs(total - 1.0) > 0.01: refined = {k: v/total for k, v in refined.items()} return refined def _generate_implementation_steps(self, strategy: Dict[str, Any]) -> List[Dict[str, Any]]: """Génère des étapes d'implémentation pratiques pour la stratégie""" steps = [] # Step 1: Set up accounts steps.append({ 'order': 1, 'title': 'Mise en place des comptes', 'description': 'Ouvrir les comptes nécessaires pour implémenter la stratégie', 'actions': [ 'Ouvrir un compte-titres classique pour les investissements généraux', 'Mettre en place un PEA pour les actions européennes', 'Considérer une assurance-vie pour les investissements à long terme' ] }) # Step 2: Initial asset allocation asset_allocation = strategy.get('asset_allocation', {}) allocation_actions = [] for asset, weight in asset_allocation.items(): allocation_actions.append(f'Allouer {weight:.1%} du portefeuille à la classe d\'actifs "{asset}"') steps.append({ 'order': 2, 'title': 'Allocation initiale des actifs', 'description': 'Répartir les investissements selon l\'allocation d\'actifs recommandée', 'actions': allocation_actions }) # Step 3: Implementation schedule steps.append({ 'order': 3, 'title': 'Calendrier d\'implémentation', 'description': 'Établir un calendrier progressif pour l\'implémentation de la stratégie', 'actions': [ 'Mois 1: Mise en place des comptes et investissement de 25% du capital', 'Mois 2-3: Investissement de 25% supplémentaires', 'Mois 4-6: Investissement du capital restant', 'Trimestriel: Rééquilibrage du portefeuille selon l\'allocation cible' ] }) # Step 4: Monitoring and adjustments steps.append({ 'order': 4, 'title': 'Suivi et ajustements', 'description': 'Établir un processus de suivi régulier de la stratégie', 'actions': [ 'Suivre mensuellement la performance du portefeuille', 'Vérifier trimestriellement l\'alignement avec l\'allocation cible', 'Réévaluer annuellement la stratégie complète', 'Ajuster en fonction des changements de situation personnelle' ] }) return steps def _determine_rebalancing_schedule(self, profile: PersonalProfile) -> Dict[str, Any]: """Détermine le calendrier de rééquilibrage optimal""" # Base frequency on risk profile and investment knowledge if profile.risk_score > 0.7 or profile.financial_knowledge >= 4: frequency = 'Trimestrielle' threshold = 0.05 # 5% drift threshold elif profile.risk_score < 0.3 or profile.financial_knowledge <= 2: frequency = 'Annuelle' threshold = 0.1 # 10% drift threshold else: frequency = 'Semestrielle' threshold = 0.075 # 7.5% drift threshold return { 'frequency': frequency, 'drift_threshold': threshold, 'method': 'Threshold-based' if profile.financial_knowledge >= 3 else 'Calendar-based', 'next_scheduled': datetime.now() + self._frequency_to_timedelta(frequency) } def _frequency_to_timedelta(self, frequency: str) -> timedelta: """Convertit une fréquence textuelle en objet timedelta""" if frequency == 'Mensuelle': return timedelta(days=30) elif frequency == 'Trimestrielle': return timedelta(days=90) elif frequency == 'Semestrielle': return timedelta(days=180) elif frequency == 'Annuelle': return timedelta(days=365) else: return timedelta(days=90) # Default to quarterly def _calculate_expected_performance(self, allocation: Dict[str, float], years: int) -> Dict[str, Any]: """Calcule les performances attendues basées sur l'allocation et l'horizon""" # Define return and volatility assumptions by asset class assumptions = { 'Actions': {'return': 0.07, 'volatility': 0.18}, 'Actions ESG': {'return': 0.065, 'volatility': 0.17}, 'Obligations': {'return': 0.03, 'volatility': 0.05}, 'Immobilier': {'return': 0.05, 'volatility': 0.15}, 'Liquidités': {'return': 0.01, 'volatility': 0.01}, 'Or': {'return': 0.04, 'volatility': 0.16}, 'Matières premières': {'return': 0.04, 'volatility': 0.2}, 'Cryptomonnaies': {'return': 0.15, 'volatility': 0.6} } # Calculate weighted expected return and volatility expected_return = 0 expected_volatility = 0 for asset, weight in allocation.items(): if asset in assumptions: expected_return += assumptions[asset]['return'] * weight expected_volatility += assumptions[asset]['volatility'] * weight # Apply time horizon adjustments (slightly higher returns for longer horizons) if years > 10: expected_return *= 1.05 elif years < 5: expected_return *= 0.95 # Calculate compound return compound_return = (1 + expected_return) ** years - 1 # Calculate different scenarios return { 'expected_annual_return': expected_return, 'expected_volatility': expected_volatility, 'expected_total_return': compound_return, 'scenarios': { 'conservative': compound_return * 0.7, 'expected': compound_return, 'optimistic': compound_return * 1.3 }, 'assumptions': { 'time_horizon': years, 'asset_assumptions': assumptions } } async def _generate_fallback_strategy(self, profile: PersonalProfile) -> Dict[str, Any]: """Génère une stratégie de repli lorsque l'analyse LLM n'est pas disponible""" # Generate asset allocation based on risk profile asset_allocation = self._generate_default_allocation(profile) # Standard geographic allocation geographic_allocation = { 'Europe': 0.4, 'Amérique du Nord': 0.3, 'Asie-Pacifique': 0.2, 'Pays émergents': 0.1 } # Default recommendations recommendations = [ "Diversifier votre portefeuille selon votre profil de risque", "Maintenir une épargne de sécurité équivalente à 3-6 mois de dépenses", "Investir régulièrement pour lisser l'exposition au risque de marché", "Rééquilibrer périodiquement votre portefeuille pour maintenir l'allocation cible" ] # Default attention points attention_points = [ "Surveiller l'évolution de votre tolérance au risque avec le temps", "Ajuster votre stratégie en fonction des changements importants de votre situation personnelle", "Rester discipliné face aux fluctuations du marché" ] return { 'risk_profile': { 'score': profile.risk_tolerance / 5, 'category': self._get_risk_category(profile.risk_tolerance / 5) }, 'investment_horizon': profile.investment_horizon, 'primary_goals': profile.investment_goals[:3], 'asset_allocation': asset_allocation, 'geographic_allocation': geographic_allocation, 'recommendations': recommendations, 'attention_points': attention_points } def _generate_default_allocation(self, profile: PersonalProfile) -> Dict[str, float]: """Génère une allocation par défaut basée sur le profil de risque""" risk_score = profile.risk_tolerance / 5 # Base allocation components stocks = risk_score bonds = (1 - risk_score) * 0.8 cash = (1 - risk_score) * 0.2 # Additional components for higher risk profiles if risk_score > 0.6: # Add alternatives for high risk profiles alternatives = stocks * 0.15 stocks -= alternatives return { 'Actions': stocks, 'Obligations': bonds, 'Liquidités': cash, 'Actifs alternatifs': alternatives } else: return { 'Actions': stocks, 'Obligations': bonds, 'Liquidités': cash } def _get_risk_category(self, risk_score: float) -> str: """Détermine la catégorie de risque basée sur le score""" if risk_score < 0.2: return "Très conservateur" elif risk_score < 0.4: return "Conservateur" elif risk_score < 0.6: return "Modéré" elif risk_score < 0.8: return "Dynamique" else: return "Très dynamique" def _generate_simple_fallback_strategy(self, profile: PersonalProfile) -> Dict[str, Any]: """Génère une stratégie simplifiée en cas d'erreur""" risk_level = profile.risk_tolerance if risk_level <= 2: allocation = {'Actions': 0.3, 'Obligations': 0.5, 'Liquidités': 0.2} elif risk_level <= 3: allocation = {'Actions': 0.5, 'Obligations': 0.4, 'Liquidités': 0.1} else: allocation = {'Actions': 0.7, 'Obligations': 0.2, 'Liquidités': 0.1} return { 'risk_profile': { 'score': risk_level / 5, 'category': self._get_risk_category(risk_level / 5) }, 'investment_horizon': profile.investment_horizon, 'asset_allocation': allocation, 'recommendations': [ "Diversifier votre portefeuille", "Investir régulièrement", "Maintenir une épargne de sécurité" ] }