Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import re | |
| from dataclasses import asdict, dataclass, field | |
| from datetime import datetime, timedelta | |
| from functools import lru_cache | |
| from typing import Any, Dict, List, Optional, Tuple, Union, Set | |
| import numpy as np | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| try: | |
| from transformers import PhiForCausalLM | |
| except ImportError: | |
| pass | |
| try: | |
| from peft import PeftModel, PeftConfig | |
| except ImportError: | |
| pass | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger("DeepVest") | |
| # Configuration manager | |
| class DeepVestConfig: | |
| """Configuration for the DeepVest system""" | |
| # Model configuration | |
| model_path: str = "financial_profiler_model" | |
| use_quantization: bool = True | |
| cache_size: int = 100 | |
| # Database configuration | |
| db_path: str = "profiles_db" | |
| use_persistent_storage: bool = True | |
| # Analysis configuration | |
| default_horizon: int = 5 | |
| default_risk_tolerance: int = 3 | |
| min_recommendation_count: int = 3 | |
| # Performance settings | |
| enable_caching: bool = True | |
| batch_size: int = 4 | |
| parallel_processing: bool = True | |
| # Market data integration | |
| fetch_market_data: bool = False | |
| market_data_api_key: Optional[str] = None | |
| # Debug settings | |
| debug_mode: bool = False | |
| log_prompts: bool = False | |
| def __post_init__(self): | |
| """Validate configuration after initialization""" | |
| if self.debug_mode: | |
| logging.getLogger("DeepVest").setLevel(logging.DEBUG) | |
| # Core data models | |
| class FinancialAnalysis: | |
| """Detailed financial analysis of an investor profile""" | |
| income_stability: float = 0.5 | |
| debt_ratio: float = 0.0 | |
| savings_capacity: float = 0.0 | |
| emergency_fund_months: float = 3.0 | |
| risk_capacity: float = 0.5 | |
| investment_horizon: int = 5 | |
| liquidity_needs: float = 0.2 | |
| income_sources: Dict[str, float] = field(default_factory=dict) | |
| fixed_expenses: Dict[str, float] = field(default_factory=dict) | |
| variable_expenses: Dict[str, float] = field(default_factory=dict) | |
| credit_score: Optional[int] = None | |
| tax_bracket: Optional[float] = None | |
| def validate(self) -> bool: | |
| """Validate financial analysis attributes""" | |
| try: | |
| validations = [ | |
| 0 <= self.income_stability <= 1, | |
| self.debt_ratio >= 0, | |
| 0 <= self.savings_capacity <= 1, | |
| self.emergency_fund_months >= 0, | |
| 0 <= self.risk_capacity <= 1, | |
| self.investment_horizon > 0, | |
| 0 <= self.liquidity_needs <= 1 | |
| ] | |
| if self.credit_score is not None: | |
| validations.append(300 <= self.credit_score <= 850) | |
| if self.tax_bracket is not None: | |
| validations.append(0 <= self.tax_bracket <= 1) | |
| return all(validations) | |
| except Exception as e: | |
| logger.error(f"Error validating financial analysis: {e}") | |
| return False | |
| def calculate_total_expenses(self) -> float: | |
| """Calculate total monthly expenses""" | |
| return ( | |
| sum(self.fixed_expenses.values()) + | |
| sum(self.variable_expenses.values()) | |
| ) | |
| def calculate_savings_rate(self) -> float: | |
| """Calculate savings rate as a percentage of income""" | |
| total_income = sum(self.income_sources.values()) | |
| if total_income == 0: | |
| return 0 | |
| return self.savings_capacity / total_income | |
| def get_financial_health_score(self) -> float: | |
| """Calculate overall financial health score (0-1)""" | |
| # Calculate components with different weights | |
| stability_weight = 0.3 | |
| debt_weight = 0.25 | |
| savings_weight = 0.25 | |
| emergency_weight = 0.2 | |
| # Calculate debt score (lower is better) | |
| debt_score = max(0, 1 - (self.debt_ratio / 0.5)) | |
| # Calculate emergency fund score (higher is better) | |
| emergency_score = min(1, self.emergency_fund_months / 6) | |
| # Calculate savings score | |
| savings_score = min(1, self.savings_capacity * 3) | |
| # Calculate overall score | |
| health_score = ( | |
| stability_weight * self.income_stability + | |
| debt_weight * debt_score + | |
| savings_weight * savings_score + | |
| emergency_weight * emergency_score | |
| ) | |
| return min(1, max(0, health_score)) | |
| class FamilyAnalysis: | |
| """Detailed analysis of family situation""" | |
| family_size: int = 1 | |
| dependents: int = 0 | |
| life_stage: str = 'undefined' | |
| future_events: List[Dict] = field(default_factory=list) | |
| monthly_obligations: float = 0.0 | |
| insurance_coverage: Dict[str, float] = field(default_factory=dict) | |
| education_needs: Dict[str, float] = field(default_factory=dict) | |
| healthcare_needs: Dict[str, float] = field(default_factory=dict) | |
| retirement_plans: Dict[str, Any] = field(default_factory=dict) | |
| estate_plans: Dict[str, Any] = field(default_factory=dict) | |
| def validate(self) -> bool: | |
| """Validate family analysis attributes""" | |
| try: | |
| validations = [ | |
| self.family_size >= 1, | |
| self.dependents >= 0, | |
| self.monthly_obligations >= 0, | |
| all(value >= 0 for value in self.insurance_coverage.values()), | |
| all(value >= 0 for value in self.education_needs.values()), | |
| all(value >= 0 for value in self.healthcare_needs.values()) | |
| ] | |
| return all(validations) | |
| except Exception as e: | |
| logger.error(f"Error validating family analysis: {e}") | |
| return False | |
| def calculate_total_obligations(self) -> float: | |
| """Calculate total monthly obligations""" | |
| return ( | |
| self.monthly_obligations + | |
| sum(self.insurance_coverage.values()) + | |
| sum(self.education_needs.values()) + | |
| sum(self.healthcare_needs.values()) | |
| ) | |
| def get_next_life_event(self) -> Optional[Dict]: | |
| """Get the next upcoming life event""" | |
| if not self.future_events: | |
| return None | |
| # Sort events by date and return the closest one | |
| future_events = [e for e in self.future_events if 'date' in e] | |
| if not future_events: | |
| return None | |
| return min(future_events, key=lambda x: x['date']) | |
| def calculate_family_complexity(self) -> float: | |
| """Calculate family situation complexity (0-1)""" | |
| # Base complexity based on family size | |
| base_complexity = min(1, (self.family_size - 1) * 0.2) | |
| # Additional complexity for dependents | |
| dependent_complexity = min(0.5, self.dependents * 0.1) | |
| # Additional complexity for upcoming events | |
| events_complexity = min(0.3, len(self.future_events) * 0.05) | |
| # Additional complexity for special needs | |
| special_needs = len(self.healthcare_needs) > 0 or len(self.education_needs) > 0 | |
| special_complexity = 0.2 if special_needs else 0 | |
| return min(1, base_complexity + dependent_complexity + events_complexity + special_complexity) | |
| class InvestmentGoals: | |
| """Detailed investment goals""" | |
| primary_goals: List[Dict] = field(default_factory=list) | |
| timeframes: Dict[str, int] = field(default_factory=dict) | |
| required_returns: Dict[str, float] = field(default_factory=dict) | |
| priority_order: List[str] = field(default_factory=list) | |
| target_amounts: Dict[str, float] = field(default_factory=dict) | |
| goal_progress: Dict[str, float] = field(default_factory=dict) | |
| risk_tolerance_by_goal: Dict[str, int] = field(default_factory=dict) | |
| milestones: Dict[str, List[Dict]] = field(default_factory=dict) | |
| def validate(self) -> bool: | |
| """Validate investment goals attributes""" | |
| try: | |
| if self.target_amounts and any(amount < 0 for amount in self.target_amounts.values()): | |
| return False | |
| if self.required_returns and any(ret < -1 or ret > 2 for ret in self.required_returns.values()): | |
| return False | |
| if self.risk_tolerance_by_goal and any(tol < 1 or tol > 5 for tol in self.risk_tolerance_by_goal.values()): | |
| return False | |
| if self.goal_progress and any(prog < 0 or prog > 1 for prog in self.goal_progress.values()): | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error validating investment goals: {e}") | |
| return False | |
| def calculate_total_target_amount(self) -> float: | |
| """Calculate total target amount across all goals""" | |
| return sum(self.target_amounts.values()) | |
| def get_highest_priority_goal(self) -> Optional[str]: | |
| """Get the highest priority goal""" | |
| return self.priority_order[0] if self.priority_order else None | |
| def calculate_weighted_risk_tolerance(self) -> float: | |
| """Calculate risk tolerance weighted by goal amount""" | |
| if not self.risk_tolerance_by_goal or not self.target_amounts: | |
| return 3.0 # Default value | |
| total_amount = self.calculate_total_target_amount() | |
| if total_amount == 0: | |
| return 3.0 | |
| weighted_tolerance = sum( | |
| self.risk_tolerance_by_goal.get(goal, 3) * amount / total_amount | |
| for goal, amount in self.target_amounts.items() | |
| ) | |
| return weighted_tolerance | |
| def calculate_goal_progress(self, goal_id: str) -> float: | |
| """Calculate progress percentage for a specific goal""" | |
| if goal_id not in self.target_amounts or goal_id not in self.goal_progress: | |
| return 0.0 | |
| return self.goal_progress[goal_id] / self.target_amounts[goal_id] | |
| def get_upcoming_milestones(self, days: int = 90) -> List[Dict]: | |
| """Get upcoming milestones within specified days""" | |
| upcoming = [] | |
| now = datetime.now() | |
| cutoff = now + timedelta(days=days) | |
| for goal, milestones in self.milestones.items(): | |
| for milestone in milestones: | |
| date = milestone.get('date') | |
| if date and now <= date <= cutoff: | |
| upcoming.append({ | |
| 'goal': goal, | |
| **milestone | |
| }) | |
| return sorted(upcoming, key=lambda x: x['date']) | |
| def estimate_goal_feasibility(self, | |
| monthly_contribution: float, | |
| expected_return: float) -> Dict[str, float]: | |
| """Estimate feasibility of goals based on contributions and returns""" | |
| feasibility = {} | |
| for goal_id, target in self.target_amounts.items(): | |
| # Get current progress | |
| current_amount = self.goal_progress.get(goal_id, 0) | |
| # Get timeframe in months | |
| months = self.timeframes.get(goal_id, 60) * 12 | |
| if months <= 0: | |
| feasibility[goal_id] = 0 | |
| continue | |
| # Calculate future value of current amount | |
| future_current = current_amount * (1 + expected_return/12) ** months | |
| # Calculate future value of monthly contributions | |
| future_contributions = monthly_contribution * ((1 + expected_return/12) ** months - 1) / (expected_return/12) | |
| # Calculate total expected amount | |
| expected_amount = future_current + future_contributions | |
| # Calculate feasibility as percentage of target | |
| feasibility[goal_id] = min(1.0, expected_amount / target) | |
| return feasibility | |
| class ProfileAnalysisResult: | |
| """Complete result of LLM profile analysis""" | |
| # Basic metrics | |
| risk_score: float | |
| investment_horizon: int | |
| primary_goals: List[str] | |
| # Detailed analyses | |
| constraints: Dict[str, Any] | |
| recommendations: List[str] | |
| explanation: str | |
| # Advanced metrics | |
| risk_decomposition: Dict[str, float] = field(default_factory=dict) | |
| goal_feasibility: Dict[str, float] = field(default_factory=dict) | |
| investment_style: Dict[str, float] = field(default_factory=dict) | |
| market_sensitivity: Dict[str, float] = field(default_factory=dict) | |
| # Recommended allocations | |
| asset_allocation: Dict[str, float] = field(default_factory=dict) | |
| geographic_allocation: Dict[str, float] = field(default_factory=dict) | |
| # Alerts and attention points | |
| alerts: List[Dict[str, Any]] = field(default_factory=list) | |
| attention_points: List[str] = field(default_factory=list) | |
| # Context and adaptability | |
| market_context: Dict[str, Any] = field(default_factory=dict) | |
| adaptability_score: float = 0.5 | |
| stress_test_results: Dict[str, float] = field(default_factory=dict) | |
| def __post_init__(self): | |
| """Post-initialization validation""" | |
| if not 0 <= self.risk_score <= 1: | |
| raise ValueError("Risk score must be between 0 and 1") | |
| if self.investment_horizon <= 0: | |
| raise ValueError("Investment horizon must be positive") | |
| def get_risk_category(self) -> str: | |
| """Determine risk category based on score""" | |
| if self.risk_score < 0.2: | |
| return "Très conservateur" | |
| elif self.risk_score < 0.4: | |
| return "Conservateur" | |
| elif self.risk_score < 0.6: | |
| return "Modéré" | |
| elif self.risk_score < 0.8: | |
| return "Dynamique" | |
| else: | |
| return "Très dynamique" | |
| def get_critical_alerts(self) -> List[Dict[str, Any]]: | |
| """Get critical alerts only""" | |
| return [alert for alert in self.alerts if alert.get('severity') == 'critical'] | |
| def get_major_constraints(self) -> List[str]: | |
| """Identify major constraints""" | |
| return [ | |
| constraint for constraint, value in self.constraints.items() | |
| if isinstance(value, (bool, int, float)) and value > 0.7 | |
| ] | |
| def get_primary_investment_style(self) -> str: | |
| """Determine primary investment style""" | |
| if not self.investment_style: | |
| return "Balanced" | |
| return max(self.investment_style.items(), key=lambda x: x[1])[0] | |
| def calculate_overall_feasibility(self) -> float: | |
| """Calculate overall goal feasibility""" | |
| if not self.goal_feasibility: | |
| return 0.5 | |
| return sum(self.goal_feasibility.values()) / len(self.goal_feasibility) | |
| def get_risk_factors(self) -> List[Tuple[str, float]]: | |
| """Get main risk factors sorted by importance""" | |
| return sorted( | |
| self.risk_decomposition.items(), | |
| key=lambda x: x[1], | |
| reverse=True | |
| ) | |
| def get_market_adaptability(self) -> Dict[str, float]: | |
| """Evaluate adaptability to market conditions""" | |
| adaptability = {} | |
| for condition, sensitivity in self.market_sensitivity.items(): | |
| adaptability[condition] = min( | |
| self.adaptability_score * (1 + sensitivity), | |
| 1.0 | |
| ) | |
| return adaptability | |
| def generate_one_page_summary(self) -> str: | |
| """Generate a concise one-page summary of the analysis""" | |
| risk_category = self.get_risk_category() | |
| summary = [ | |
| f"# Profil d'Investissement: {risk_category}", | |
| f"Score de risque: {self.risk_score:.2f}/1.00 | Horizon: {self.investment_horizon} ans", | |
| "", | |
| "## Objectifs Principaux", | |
| *[f"- {goal}" for goal in self.primary_goals[:3]], | |
| "", | |
| "## Allocation d'Actifs Recommandée", | |
| *[f"- {asset}: {weight:.1%}" for asset, weight in | |
| sorted(self.asset_allocation.items(), key=lambda x: x[1], reverse=True)], | |
| "", | |
| "## Facteurs de Risque Principaux", | |
| *[f"- {factor}: {score:.1%}" for factor, score in self.get_risk_factors()[:3]], | |
| "", | |
| "## Recommandations Clés", | |
| *[f"- {rec}" for rec in self.recommendations[:3]], | |
| "", | |
| "## Points d'Attention", | |
| *[f"- {point}" for point in self.attention_points[:3]], | |
| ] | |
| return "\n".join(summary) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert analysis to dictionary""" | |
| return { | |
| 'risk_profile': { | |
| 'score': self.risk_score, | |
| 'category': self.get_risk_category(), | |
| 'decomposition': self.risk_decomposition, | |
| 'stress_tests': self.stress_test_results | |
| }, | |
| 'investment_profile': { | |
| 'horizon': self.investment_horizon, | |
| 'style': self.investment_style, | |
| 'adaptability': self.adaptability_score, | |
| 'market_sensitivity': self.market_sensitivity | |
| }, | |
| 'goals_analysis': { | |
| 'primary_goals': self.primary_goals, | |
| 'feasibility': self.goal_feasibility, | |
| 'constraints': self.constraints | |
| }, | |
| 'recommendations': { | |
| 'suggestions': self.recommendations, | |
| 'attention_points': self.attention_points, | |
| 'alerts': self.alerts | |
| }, | |
| 'allocations': { | |
| 'assets': self.asset_allocation, | |
| 'geography': self.geographic_allocation | |
| }, | |
| 'market_context': self.market_context, | |
| 'explanation': self.explanation | |
| } | |
| class PersonalProfile: | |
| """Complete and dynamic investor profile with advanced analysis""" | |
| # Identifiers and metadata | |
| id: Optional[str] = None | |
| created_at: datetime = field(default_factory=datetime.now) | |
| last_updated: datetime = field(default_factory=datetime.now) | |
| version: int = 1 | |
| # Base profile | |
| risk_tolerance: int = 3 | |
| investment_horizon: int = 5 | |
| income_stability: int = 3 | |
| financial_knowledge: int = 3 | |
| investment_goals: List[str] = field(default_factory=list) | |
| esg_preferences: bool = False | |
| constraints: Dict[str, Any] = field(default_factory=dict) | |
| # Personal information | |
| age: int = 30 | |
| annual_income: float = 50000.0 | |
| total_assets: float = 0.0 | |
| total_debt: float = 0.0 | |
| monthly_savings: float = 0.0 # Added this field for the épargne mensuelle | |
| occupation: str = "" | |
| industry: str = "" | |
| education_level: str = "" | |
| marital_status: str = "single" | |
| number_of_dependents: int = 0 | |
| tax_status: str = "" | |
| residential_status: str = "owner" | |
| # Detailed analyses | |
| financial_analysis: Optional[FinancialAnalysis] = None | |
| family_analysis: Optional[FamilyAnalysis] = None | |
| investment_objectives: Optional[InvestmentGoals] = None | |
| profile_description: Optional[str] = None | |
| # Dynamic metrics and history | |
| risk_metrics: Dict[str, float] = field(default_factory=dict) | |
| performance_metrics: Dict[str, float] = field(default_factory=dict) | |
| historical_changes: List[Dict[str, Any]] = field(default_factory=list) | |
| last_review: Optional[datetime] = None | |
| # Investment preferences | |
| investment_preferences: Dict[str, Any] = field(default_factory=lambda: { | |
| 'asset_classes': [], | |
| 'sectors': [], | |
| 'regions': [], | |
| 'strategies': [], | |
| 'excluded_investments': [] | |
| }) | |
| # LLM analysis results | |
| llm_analysis_results: Optional[ProfileAnalysisResult] = None | |
| risk_score: float = 0.5 | |
| adaptability_score: float = 0.5 | |
| complexity_tolerance: float = 0.5 | |
| def __post_init__(self): | |
| """Post-initialization setup""" | |
| # Generate ID if not provided | |
| if self.id is None: | |
| self.id = self._generate_profile_id() | |
| # Initialize analyses if not provided | |
| if self.financial_analysis is None: | |
| self.financial_analysis = FinancialAnalysis() | |
| if self.family_analysis is None: | |
| self.family_analysis = FamilyAnalysis() | |
| if self.investment_objectives is None: | |
| self.investment_objectives = InvestmentGoals() | |
| # Validate profile | |
| self._validate_profile() | |
| def _generate_profile_id(self) -> str: | |
| """Generate unique profile ID""" | |
| components = [ | |
| str(self.age), | |
| str(self.annual_income), | |
| str(self.risk_tolerance), | |
| self.occupation, | |
| datetime.now().isoformat() | |
| ] | |
| unique_string = "_".join(components) | |
| return f"PROFILE_{hashlib.sha256(unique_string.encode()).hexdigest()[:16]}" | |
| def _validate_profile(self): | |
| """Comprehensive profile validation""" | |
| validations = [ | |
| (1 <= self.risk_tolerance <= 5, "Risk tolerance must be between 1 and 5"), | |
| (1 <= self.investment_horizon <= 50, "Investment horizon must be between 1 and 50 years"), | |
| (1 <= self.income_stability <= 5, "Income stability must be between 1 and 5"), | |
| (1 <= self.financial_knowledge <= 5, "Financial knowledge must be between 1 and 5"), | |
| (18 <= self.age <= 120, "Age must be between 18 and 120"), | |
| (self.annual_income >= 0, "Annual income cannot be negative"), | |
| (self.total_assets >= 0, "Total assets cannot be negative"), | |
| (self.total_debt >= 0, "Total debt cannot be negative"), | |
| (0 <= self.risk_score <= 1, "Risk score must be between 0 and 1"), | |
| (0 <= self.adaptability_score <= 1, "Adaptability score must be between 0 and 1"), | |
| (0 <= self.complexity_tolerance <= 1, "Complexity tolerance must be between 0 and 1") | |
| ] | |
| # Advanced validation | |
| if self.investment_preferences: | |
| for asset_class in self.investment_preferences.get('asset_classes', []): | |
| if not isinstance(asset_class, str): | |
| raise ValueError("Invalid asset class format") | |
| if self.investment_goals and len(self.investment_goals) == 0: | |
| raise ValueError("At least one investment goal must be specified") | |
| # Check basic constraints | |
| for validation, message in validations: | |
| if not validation: | |
| raise ValueError(message) | |
| # Check detailed analyses | |
| if self.financial_analysis and not self.financial_analysis.validate(): | |
| raise ValueError("Invalid financial analysis") | |
| if self.family_analysis and not self.family_analysis.validate(): | |
| raise ValueError("Invalid family analysis") | |
| if self.investment_objectives and not self.investment_objectives.validate(): | |
| raise ValueError("Invalid investment objectives") | |
| def calculate_comprehensive_risk_metrics(self) -> Dict[str, float]: | |
| """Calculate comprehensive risk metrics""" | |
| try: | |
| risk_metrics = { | |
| 'total_debt_ratio': self.total_debt / self.annual_income if self.annual_income > 0 else float('inf'), | |
| 'emergency_fund_ratio': self.financial_analysis.emergency_fund_months / 6 if self.financial_analysis else 0, | |
| 'risk_capacity': self.financial_analysis.risk_capacity if self.financial_analysis else 0.5, | |
| 'investment_diversification': len(self.investment_goals) / 5 if self.investment_goals else 0, | |
| 'debt_service_ratio': self._calculate_debt_service_ratio(), | |
| 'savings_rate': self._calculate_savings_rate(), | |
| 'liquidity_ratio': self._calculate_liquidity_ratio(), | |
| 'investment_concentration': self._calculate_investment_concentration() | |
| } | |
| # Add LLM scores | |
| if self.llm_analysis_results: | |
| risk_metrics.update({ | |
| 'llm_risk_score': self.llm_analysis_results.risk_score, | |
| 'adaptability': self.llm_analysis_results.adaptability_score | |
| }) | |
| # Calculate overall risk capacity | |
| financial_health = self.financial_analysis.get_financial_health_score() if self.financial_analysis else 0.5 | |
| risk_metrics['overall_risk_capacity'] = ( | |
| risk_metrics['risk_capacity'] * 0.5 + | |
| (1 - risk_metrics['total_debt_ratio']) * 0.3 + | |
| financial_health * 0.2 | |
| ) | |
| self.risk_metrics = risk_metrics | |
| return risk_metrics | |
| except Exception as e: | |
| logger.error(f"Error calculating risk metrics: {e}") | |
| return {} | |
| def _calculate_debt_service_ratio(self) -> float: | |
| """Calculate debt service ratio""" | |
| try: | |
| if not self.financial_analysis or self.annual_income == 0: | |
| return 0.0 | |
| if isinstance(self.financial_analysis.fixed_expenses, dict) and 'debt_payments' in self.financial_analysis.fixed_expenses: | |
| if isinstance(self.financial_analysis.fixed_expenses['debt_payments'], dict): | |
| monthly_debt_payment = sum(self.financial_analysis.fixed_expenses['debt_payments'].values()) | |
| else: | |
| monthly_debt_payment = self.financial_analysis.fixed_expenses['debt_payments'] | |
| else: | |
| # Estimate monthly debt payment using total debt and a standard amortization | |
| monthly_debt_payment = self.total_debt * 0.01 # Rough estimate of 1% monthly payment | |
| monthly_income = self.annual_income / 12 | |
| return monthly_debt_payment / monthly_income if monthly_income > 0 else 0 | |
| except Exception as e: | |
| logger.error(f"Error calculating debt service ratio: {e}") | |
| return 0.0 | |
| def _calculate_savings_rate(self) -> float: | |
| """Calculate savings rate""" | |
| try: | |
| if not self.financial_analysis or self.annual_income == 0: | |
| return 0.0 | |
| return (self.financial_analysis.savings_capacity * 12) / self.annual_income | |
| except Exception as e: | |
| logger.error(f"Error calculating savings rate: {e}") | |
| return 0.0 | |
| def _calculate_liquidity_ratio(self) -> float: | |
| """Calculate liquidity ratio""" | |
| try: | |
| if not self.financial_analysis or self.total_assets == 0: | |
| return 0.0 | |
| liquid_assets = sum( | |
| amount for type_, amount in self.financial_analysis.income_sources.items() | |
| if type_ in ['savings', 'investments', 'cash'] | |
| ) | |
| return liquid_assets / self.total_assets | |
| except Exception as e: | |
| logger.error(f"Error calculating liquidity ratio: {e}") | |
| return 0.0 | |
| def _calculate_investment_concentration(self) -> float: | |
| """Calculate investment concentration using HHI""" | |
| try: | |
| if not self.investment_preferences or not self.investment_preferences.get('asset_classes'): | |
| return 1.0 | |
| asset_classes = self.investment_preferences['asset_classes'] | |
| weights = [1/len(asset_classes)] * len(asset_classes) | |
| return sum(w * w for w in weights) | |
| except Exception as e: | |
| logger.error(f"Error calculating investment concentration: {e}") | |
| return 1.0 | |
| def update(self, updates: Dict[str, Any]) -> 'PersonalProfile': | |
| """Update profile with new information""" | |
| try: | |
| # Create copy of current profile data | |
| current_data = asdict(self) | |
| # Save change history | |
| self._save_change_history(updates) | |
| # Update fields | |
| for key, value in updates.items(): | |
| if key in current_data: | |
| current_data[key] = value | |
| # Update version and timestamps | |
| current_data['version'] += 1 | |
| current_data['last_updated'] = datetime.now() | |
| # Create new profile with updated data | |
| updated_profile = self.__class__(**current_data) | |
| # Validate updated profile | |
| updated_profile._validate_profile() | |
| return updated_profile | |
| except Exception as e: | |
| logger.error(f"Error updating profile: {e}") | |
| raise | |
| def _save_change_history(self, updates: Dict[str, Any]): | |
| """Record change history""" | |
| change_record = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'changes': updates, | |
| 'previous_values': { | |
| key: getattr(self, key) | |
| for key in updates.keys() | |
| if hasattr(self, key) | |
| }, | |
| 'version': self.version | |
| } | |
| self.historical_changes.append(change_record) | |
| def get_change_history(self, field: Optional[str] = None) -> List[Dict[str, Any]]: | |
| """Get change history for optional field""" | |
| if field: | |
| return [ | |
| change for change in self.historical_changes | |
| if field in change['changes'] | |
| ] | |
| return self.historical_changes | |
| def export_json(self, filepath: str, include_detailed_analysis: bool = False): | |
| """Export profile to JSON file""" | |
| try: | |
| # Make sure directory exists | |
| os.makedirs(os.path.dirname(filepath), exist_ok=True) | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump( | |
| asdict(self) if include_detailed_analysis else self.to_dict(), | |
| f, ensure_ascii=False, indent=2, default=str | |
| ) | |
| logger.info(f"Profile exported successfully to {filepath}") | |
| except Exception as e: | |
| logger.error(f"Error exporting profile to {filepath}: {e}") | |
| raise | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert profile to dictionary representation""" | |
| return { | |
| 'id': self.id, | |
| 'version': self.version, | |
| 'created_at': self.created_at.isoformat(), | |
| 'last_updated': self.last_updated.isoformat(), | |
| 'basic_info': { | |
| 'age': self.age, | |
| 'annual_income': self.annual_income, | |
| 'total_assets': self.total_assets, | |
| 'total_debt': self.total_debt, | |
| 'marital_status': self.marital_status, | |
| 'dependents': self.number_of_dependents | |
| }, | |
| 'investment_profile': { | |
| 'risk_tolerance': self.risk_tolerance, | |
| 'investment_horizon': self.investment_horizon, | |
| 'financial_knowledge': self.financial_knowledge, | |
| 'income_stability': self.income_stability, | |
| 'goals': self.investment_goals, | |
| 'esg_preferences': self.esg_preferences | |
| }, | |
| 'risk_metrics': self.risk_metrics, | |
| 'llm_analysis': self.llm_analysis_results.to_dict() if self.llm_analysis_results else None | |
| } | |
| def from_json(cls, filepath: str) -> 'PersonalProfile': | |
| """Load profile from JSON file""" | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # Convert date strings to datetime objects | |
| for date_field in ['created_at', 'last_updated', 'last_review']: | |
| if date_field in data and data[date_field]: | |
| data[date_field] = datetime.fromisoformat(data[date_field]) | |
| # Reconstruct complex objects | |
| if 'financial_analysis' in data and data['financial_analysis']: | |
| data['financial_analysis'] = FinancialAnalysis(**data['financial_analysis']) | |
| if 'family_analysis' in data and data['family_analysis']: | |
| data['family_analysis'] = FamilyAnalysis(**data['family_analysis']) | |
| if 'investment_objectives' in data and data['investment_objectives']: | |
| data['investment_objectives'] = InvestmentGoals(**data['investment_objectives']) | |
| if 'llm_analysis_results' in data and data['llm_analysis_results']: | |
| data['llm_analysis_results'] = ProfileAnalysisResult(**data['llm_analysis_results']) | |
| # Create and validate profile | |
| profile = cls(**data) | |
| profile._validate_profile() | |
| return profile | |
| except Exception as e: | |
| logger.error(f"Error loading profile from {filepath}: {e}") | |
| raise | |
| class DeepVestLLM: | |
| """Advanced LLM integration using fine-tuned financial advisor model""" | |
| def __init__(self, config: DeepVestConfig): | |
| """Initialize with configuration""" | |
| self.config = config | |
| self.logger = logging.getLogger("DeepVest.LLM") | |
| # Initialize model and tokenizer | |
| self.model = None | |
| self.tokenizer = None | |
| self.device = self._detect_optimal_device() | |
| # Load model if available | |
| if os.path.exists(config.model_path): | |
| self.load_model(config.model_path) # Pass the model_path argument | |
| else: | |
| self.logger.warning(f"Model path {config.model_path} not found, using fallback mode") | |
| # Initialize cache | |
| self.generation_cache = {} | |
| self.max_cache_entries = config.cache_size | |
| def _detect_optimal_device(self) -> str: | |
| """Detect the optimal device for model inference""" | |
| if torch.cuda.is_available(): | |
| self.logger.info("CUDA GPU available, using CUDA") | |
| return "cuda" | |
| elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): | |
| self.logger.info("Apple MPS available, using MPS") | |
| return "mps" | |
| else: | |
| self.logger.info("No GPU available, using CPU") | |
| return "cpu" | |
| def load_model(self, model_path=None): | |
| """Load the fine-tuned model from the configured path""" | |
| try: | |
| path_to_use = model_path if model_path else self.config.model_path | |
| self.logger.info(f"Loading fine-tuned model from {path_to_use}") | |
| # Determine the best device to use | |
| if torch.cuda.is_available(): | |
| self.logger.info("CUDA available, using GPU") | |
| device = "cuda" | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| self.logger.info("Apple MPS available, using MPS") | |
| device = "mps" | |
| else: | |
| self.logger.info("Using CPU") | |
| device = "cpu" | |
| # Check if config.json exists and has required fields | |
| config_file = os.path.join(path_to_use, "config.json") | |
| if os.path.exists(config_file): | |
| with open(config_file, 'r') as f: | |
| config = json.load(f) | |
| # If no model_type, add one for Phi model | |
| if 'model_type' not in config: | |
| self.logger.warning("Adding missing model_type to config.json") | |
| config['model_type'] = "phi" # Use phi since you're fine-tuning Phi-1.5 | |
| # Add architecture if missing | |
| if 'architectures' not in config: | |
| config['architectures'] = ["PhiForCausalLM"] | |
| # Save the modified config | |
| with open(config_file, 'w') as f: | |
| json.dump(config, f, indent=2) | |
| else: | |
| # Create a basic config.json if it doesn't exist | |
| self.logger.warning(f"config.json not found in {path_to_use}, creating one") | |
| config = { | |
| "model_type": "phi", | |
| "architectures": ["PhiForCausalLM"], | |
| "bos_token_id": 1, | |
| "eos_token_id": 2, | |
| "pad_token_id": 0, | |
| "hidden_size": 2048, | |
| "vocab_size": 51200, | |
| "torch_dtype": "float16" | |
| } | |
| with open(config_file, 'w') as f: | |
| json.dump(config, f, indent=2) | |
| # Try different loading approaches | |
| try: | |
| # First attempt: direct loading with AutoModelForCausalLM | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| path_to_use, | |
| trust_remote_code=True | |
| ) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| path_to_use, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16, | |
| device_map="auto", | |
| low_cpu_mem_usage=True | |
| ) | |
| except Exception as model_error: | |
| self.logger.warning(f"First loading attempt failed: {model_error}") | |
| # Second attempt: try loading with specific model class | |
| try: | |
| from transformers import PhiForCausalLM | |
| self.tokenizer = AutoTokenizer.from_pretrained(path_to_use, trust_remote_code=True) | |
| self.model = PhiForCausalLM.from_pretrained( | |
| path_to_use, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16, | |
| device_map="auto", | |
| low_cpu_mem_usage=True | |
| ) | |
| except Exception as specific_error: | |
| self.logger.warning(f"Second loading attempt failed: {specific_error}") | |
| # Third attempt: try loading as a PEFT/LoRA model | |
| try: | |
| from peft import PeftModel, PeftConfig | |
| # Check if we need to load the base model first | |
| base_model_path = "microsoft/phi-1_5" | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| base_model_path, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16, | |
| device_map="auto" | |
| ) | |
| self.tokenizer = AutoTokenizer.from_pretrained(base_model_path, trust_remote_code=True) | |
| self.model = PeftModel.from_pretrained(base_model, path_to_use) | |
| except Exception as peft_error: | |
| # All attempts failed | |
| self.logger.error(f"All loading attempts failed. Last error: {peft_error}") | |
| raise Exception(f"Could not load model: {model_error} -> {specific_error} -> {peft_error}") | |
| # Make sure the tokenizer has a pad token | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| self.logger.info(f"Model loaded successfully") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Error loading model: {e}") | |
| return False | |
| def _get_cache_key(self, prompt: str) -> str: | |
| """Generate cache key for a prompt""" | |
| return hashlib.md5(prompt.encode()).hexdigest() | |
| def _manage_cache(self): | |
| """Manage cache size""" | |
| if len(self.generation_cache) > self.max_cache_entries: | |
| # Sort by access timestamp and remove oldest | |
| sorted_entries = sorted( | |
| self.generation_cache.items(), | |
| key=lambda x: x[1]['timestamp'] | |
| ) | |
| # Remove oldest 20% | |
| entries_to_remove = int(self.max_cache_entries * 0.2) | |
| for i in range(entries_to_remove): | |
| if i < len(sorted_entries): | |
| del self.generation_cache[sorted_entries[i][0]] | |
| self.logger.debug(f"Cache cleanup: removed {entries_to_remove} entries") | |
| async def generate_text(self, prompt: str, max_length: int = 1500) -> str: | |
| """Generate text using the loaded model""" | |
| # Check cache if enabled | |
| if self.config.enable_caching: | |
| cache_key = self._get_cache_key(prompt) | |
| if cache_key in self.generation_cache: | |
| cache_entry = self.generation_cache[cache_key] | |
| cache_entry['timestamp'] = datetime.now() # Update access timestamp | |
| self.logger.debug("Using cached response") | |
| return cache_entry['response'] | |
| if self.config.log_prompts: | |
| self.logger.debug(f"Prompt: {prompt}") | |
| # Check if model_loaded is True to confirm successful loading | |
| if hasattr(self, 'model_loaded') and self.model_loaded and self.model and self.tokenizer: | |
| # Generate with loaded model | |
| try: | |
| inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=4096) | |
| # Handle device mapping - use the model's device rather than explicitly moving inputs | |
| if hasattr(self.model, 'device'): | |
| device = self.model.device | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_length=max_length, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_p=0.92, | |
| top_k=50, | |
| repetition_penalty=1.15, | |
| num_beams=4, | |
| pad_token_id=self.tokenizer.eos_token_id | |
| ) | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Extract actual response from the prompt | |
| if "[/INST]" in response: | |
| response = response.split("[/INST]")[-1].strip() | |
| # Cache the result if enabled | |
| if self.config.enable_caching: | |
| self.generation_cache[cache_key] = { | |
| 'response': response, | |
| 'timestamp': datetime.now() | |
| } | |
| self._manage_cache() | |
| return response | |
| except Exception as e: | |
| self.logger.error(f"Error generating text: {e}") | |
| return await self._fallback_generation(prompt) | |
| else: | |
| # Use fallback generation | |
| self.logger.info("Using fallback generation") | |
| return await self._fallback_generation(prompt) | |
| async def _fallback_generation(self, prompt: str) -> str: | |
| """Fallback generation when model is unavailable""" | |
| self.logger.info("Using fallback generation") | |
| # Extract key information from prompt | |
| risk_tolerance = 3 | |
| investment_horizon = 5 | |
| # Extract risk tolerance if present | |
| risk_match = re.search(r'[Rr]isque.*?(\d+)', prompt) | |
| if risk_match: | |
| risk_tolerance = int(risk_match.group(1)) | |
| # Extract investment horizon if present | |
| horizon_match = re.search(r'[Hh]orizon.*?(\d+)', prompt) | |
| if horizon_match: | |
| investment_horizon = int(horizon_match.group(1)) | |
| # Generate simple structured response | |
| risk_score = risk_tolerance / 5 | |
| fallback_response = f"""1. Score de risque: {risk_score:.2f} - Basé sur la tolérance au risque déclarée. | |
| 2. Horizon d'investissement recommandé: {investment_horizon} ans. | |
| 3. Objectifs prioritaires: Épargne générale, Croissance du capital. | |
| 4. Contraintes: Limitation classique de diversification. | |
| 5. Recommandations d'allocation: | |
| - Actions: {int(risk_score * 100)}% | |
| - Obligations: {int((1-risk_score) * 80)}% | |
| - Liquidités: {int((1-risk_score) * 20)}% | |
| 6. Recommandations: | |
| - Diversifier votre portefeuille | |
| - Investir régulièrement | |
| - Maintenir une épargne de sécurité suffisante | |
| 7. Points d'attention: | |
| - Réévaluer régulièrement votre profil de risque | |
| - Ajuster votre allocation selon l'évolution de votre situation | |
| - Prévoir une réserve d'urgence adéquate | |
| """ | |
| return fallback_response | |
| def prepare_analysis_prompt(self, profile_data: Dict[str, Any]) -> str: | |
| """Prepare analysis prompt from profile data""" | |
| return f"""[INST] Je suis un conseiller financier expert. Analyse ce profil d'investisseur de manière exhaustive. | |
| Profil détaillé : | |
| - Âge: {profile_data.get('age', 'Non spécifié')} ans | |
| - Revenu annuel: {profile_data.get('annual_income', 'Non spécifié')}€ | |
| - Situation familiale: {profile_data.get('marital_status', 'Non spécifié')} | |
| - Dépendants: {profile_data.get('number_of_dependents', 0)} | |
| - Épargne mensuelle: {profile_data.get('monthly_savings', 'Non spécifié')}€ | |
| - Dette totale: {profile_data.get('total_debt', 'Non spécifié')}€ | |
| - Patrimoine total: {profile_data.get('total_assets', 'Non spécifié')}€ | |
| - Objectifs: {", ".join(profile_data.get('investment_goals', []))} | |
| - Tolérance au risque (1-5): {profile_data.get('risk_tolerance', 3)} | |
| - Horizon d'investissement: {profile_data.get('investment_horizon', 5)} ans | |
| - Connaissances financières (1-5): {profile_data.get('financial_knowledge', 3)} | |
| - Stabilité professionnelle: {profile_data.get('income_stability', 3)} | |
| - Préférences ESG: {"Oui" if profile_data.get('esg_preferences', False) else "Non"} | |
| - Secteurs préférés: {", ".join(profile_data.get('investment_preferences', {}).get('sectors', []))} | |
| - Contraintes spécifiques: {", ".join(str(k) for k, v in profile_data.get('constraints', {}).items() if v)} | |
| Format de réponse requis: | |
| 1. Score de risque (entre 0 et 1): [score] - Justification détaillée | |
| 2. Horizon d'investissement recommandé: [années] - Justification | |
| 3. Objectifs prioritaires: Liste ordonnée avec justification | |
| 4. Contraintes et limitations: Analyse détaillée des contraintes | |
| 5. Recommandations d'allocation: | |
| - Par classe d'actifs (%) | |
| - Par zone géographique (%) | |
| - Par stratégie d'investissement | |
| 6. Analyse des risques: | |
| - Décomposition des risques | |
| - Points de vigilance | |
| - Stress tests recommandés | |
| 7. Plan d'action détaillé: | |
| - Court terme (0-2 ans) | |
| - Moyen terme (2-5 ans) | |
| - Long terme (5+ ans) | |
| 8. Recommandations spécifiques: | |
| - Investissements recommandés | |
| - Stratégies de diversification | |
| - Protection du capital | |
| 9. Explication détaillée: | |
| - Raisonnement complet | |
| - Points d'attention particuliers | |
| - Opportunités identifiées | |
| Analyse complète: [/INST]""" | |
| def prepare_description_prompt(self, profile_data: Dict[str, Any]) -> str: | |
| """Prepare description prompt from profile data""" | |
| return f"""[INST] En tant que conseiller financier expert, génère une description détaillée et professionnelle de ce profil d'investisseur. | |
| Profil complet: | |
| - Âge: {profile_data.get('age', 'Non spécifié')} ans | |
| - Revenu annuel: {profile_data.get('annual_income', 'Non spécifié')}€ | |
| - Situation familiale: {profile_data.get('marital_status', 'Non spécifié')} | |
| - Objectifs: {", ".join(profile_data.get('investment_goals', []))} | |
| - Tolérance au risque: {profile_data.get('risk_tolerance', 3)}/5 | |
| - Horizon: {profile_data.get('investment_horizon', 5)} ans | |
| - Connaissances financières: {profile_data.get('financial_knowledge', 3)}/5 | |
| - Épargne mensuelle: {profile_data.get('monthly_savings', 'Non spécifié')}€ | |
| - Patrimoine: {profile_data.get('total_assets', 'Non spécifié')}€ | |
| - Dette: {profile_data.get('total_debt', 'Non spécifié')}€ | |
| - Secteur d'activité: {profile_data.get('industry', 'Non spécifié')} | |
| - Stabilité professionnelle: {profile_data.get('income_stability', 3)}/5 | |
| La description doit couvrir: | |
| 1. Analyse du profil de risque et des objectifs | |
| 2. Evaluation de la capacité financière | |
| 3. Identification des contraintes et opportunités | |
| 4. Recommandations d'investissement personnalisées | |
| 5. Points de vigilance et attention particulière | |
| 6. Stratégie de diversification recommandée | |
| 7. Projection et scénarios d'évolution | |
| Description: [/INST]""" | |
| async def generate_batch(self, prompts: List[str], max_length: int = 1500) -> List[str]: | |
| """Generate responses for multiple prompts in parallel""" | |
| if not self.model or not self.tokenizer: | |
| # Fallback for batch generation | |
| return [await self._fallback_generation(prompt) for prompt in prompts] | |
| responses = [] | |
| # Process in batches | |
| batch_size = self.config.batch_size | |
| for i in range(0, len(prompts), batch_size): | |
| batch_prompts = prompts[i:i+batch_size] | |
| batch_inputs = self.tokenizer( | |
| batch_prompts, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=4096 | |
| ) | |
| batch_inputs = {k: v.to(self.device) for k, v in batch_inputs.items()} | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **batch_inputs, | |
| max_length=max_length, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_p=0.92, | |
| top_k=50, | |
| repetition_penalty=1.15, | |
| pad_token_id=self.tokenizer.eos_token_id | |
| ) | |
| for j, output in enumerate(outputs): | |
| response = self.tokenizer.decode(output, skip_special_tokens=True) | |
| if "[/INST]" in response: | |
| response = response.split("[/INST]")[-1].strip() | |
| responses.append(response) | |
| return responses | |
| class LLMProfileAnalyzer: | |
| """Advanced profile analyzer using fine-tuned financial LLM""" | |
| def __init__(self, config: Optional[DeepVestConfig] = None): | |
| self.config = config or DeepVestConfig() | |
| self.logger = logging.getLogger("DeepVest.Analyzer") | |
| self.llm = DeepVestLLM(self.config) | |
| # Initialize cache | |
| self.analysis_cache = {} | |
| self.max_cache_size = self.config.cache_size | |
| def _get_cache_key(self, profile_data: Dict[str, Any]) -> str: | |
| """Generate unique cache key for a profile""" | |
| key_components = [ | |
| str(profile_data.get('age', '')), | |
| str(profile_data.get('risk_tolerance', '')), | |
| str(profile_data.get('investment_horizon', '')), | |
| ','.join(profile_data.get('investment_goals', [])), | |
| str(profile_data.get('annual_income', '')), | |
| str(profile_data.get('total_assets', '')), | |
| str(profile_data.get('total_debt', '')) | |
| ] | |
| return hashlib.md5('_'.join(key_components).encode()).hexdigest() | |
| def _manage_cache(self): | |
| """Manage cache size by removing oldest entries""" | |
| if len(self.analysis_cache) > self.max_cache_size: | |
| # Sort by timestamp and remove oldest | |
| items = sorted( | |
| self.analysis_cache.items(), | |
| key=lambda x: x[1]['timestamp'] | |
| ) | |
| # Remove oldest 20% | |
| num_to_remove = int(self.max_cache_size * 0.2) | |
| for i in range(num_to_remove): | |
| if i < len(items): | |
| del self.analysis_cache[items[i][0]] | |
| async def analyze_profile(self, profile_data: Dict[str, Any]) -> ProfileAnalysisResult: | |
| """Comprehensive profile analysis with recommendations""" | |
| try: | |
| # Check cache first | |
| cache_key = self._get_cache_key(profile_data) | |
| if cache_key in self.analysis_cache and self.config.enable_caching: | |
| self.logger.info(f"Using cached analysis for profile {cache_key}") | |
| return self.analysis_cache[cache_key]['result'] | |
| self.logger.info(f"Starting profile analysis for ID: {profile_data.get('id', 'unknown')}") | |
| # Prepare prompt | |
| prompt = self.llm.prepare_analysis_prompt(profile_data) | |
| # Generate response | |
| response = await self.llm.generate_text(prompt) | |
| # Parse and analyze response | |
| analysis_result = self._parse_response(response, profile_data) | |
| # Validate results | |
| if not self._validate_results(analysis_result): | |
| self.logger.warning("Analysis results validation failed, using default values") | |
| analysis_result = self._get_default_analysis(profile_data) | |
| # Enrich analysis with additional calculations | |
| enriched_analysis = self._enrich_analysis(analysis_result, profile_data) | |
| # Cache the result | |
| if self.config.enable_caching: | |
| self.analysis_cache[cache_key] = { | |
| 'result': enriched_analysis, | |
| 'timestamp': datetime.now() | |
| } | |
| self._manage_cache() | |
| self.logger.info("Profile analysis completed successfully") | |
| return enriched_analysis | |
| except Exception as e: | |
| self.logger.error(f"Error in profile analysis: {e}") | |
| return self._get_default_analysis(profile_data) | |
| async def batch_analyze_profiles(self, profiles: List[Dict[str, Any]]) -> List[ProfileAnalysisResult]: | |
| """Analyze multiple profiles in batch""" | |
| try: | |
| results = [] | |
| uncached_profiles = [] | |
| uncached_indices = [] | |
| # Check cache first for each profile | |
| for i, profile in enumerate(profiles): | |
| cache_key = self._get_cache_key(profile) | |
| if cache_key in self.analysis_cache and self.config.enable_caching: | |
| results.append(self.analysis_cache[cache_key]['result']) | |
| else: | |
| uncached_profiles.append(profile) | |
| uncached_indices.append(i) | |
| # If all profiles were cached, return results | |
| if not uncached_profiles: | |
| return results | |
| # Prepare prompts for uncached profiles | |
| prompts = [self.llm.prepare_analysis_prompt(profile) for profile in uncached_profiles] | |
| # Generate responses in batch | |
| responses = await self.llm.generate_batch(prompts) | |
| # Parse and process responses | |
| for i, (profile, response) in enumerate(zip(uncached_profiles, responses)): | |
| try: | |
| # Parse response | |
| analysis_result = self._parse_response(response, profile) | |
| # Validate and enrich | |
| if not self._validate_results(analysis_result): | |
| analysis_result = self._get_default_analysis(profile) | |
| enriched_analysis = self._enrich_analysis(analysis_result, profile) | |
| # Cache the result | |
| if self.config.enable_caching: | |
| cache_key = self._get_cache_key(profile) | |
| self.analysis_cache[cache_key] = { | |
| 'result': enriched_analysis, | |
| 'timestamp': datetime.now() | |
| } | |
| # Insert at correct position | |
| insert_index = uncached_indices[i] | |
| # Extend results list if needed | |
| while len(results) <= insert_index: | |
| results.append(None) | |
| results[insert_index] = enriched_analysis | |
| except Exception as e: | |
| self.logger.error(f"Error processing profile {i}: {e}") | |
| results.append(self._get_default_analysis(profile)) | |
| # Manage cache after batch processing | |
| if self.config.enable_caching: | |
| self._manage_cache() | |
| # Fill any remaining None values with defaults | |
| for i, result in enumerate(results): | |
| if result is None: | |
| results[i] = self._get_default_analysis(profiles[i]) | |
| return results | |
| except Exception as e: | |
| self.logger.error(f"Error in batch profile analysis: {e}") | |
| return [self._get_default_analysis(profile) for profile in profiles] | |
| def _parse_response(self, response: str, original_data: Dict[str, Any]) -> ProfileAnalysisResult: | |
| """Parse LLM response into structured data""" | |
| try: | |
| sections = response.split('\n') | |
| # Extract asset allocation | |
| asset_allocation = self._extract_asset_allocation(sections) | |
| geo_allocation = self._extract_geographic_allocation(sections) | |
| return ProfileAnalysisResult( | |
| risk_score=self._extract_risk_score(sections), | |
| investment_horizon=self._extract_horizon(sections), | |
| primary_goals=self._extract_goals(sections), | |
| constraints=self._extract_constraints(sections), | |
| recommendations=self._extract_recommendations(sections), | |
| explanation=self._extract_explanation(sections), | |
| risk_decomposition=self._extract_risk_decomposition(sections), | |
| goal_feasibility=self._extract_goal_feasibility(sections), | |
| investment_style=self._extract_investment_style(sections), | |
| market_sensitivity=self._extract_market_sensitivity(sections), | |
| alerts=self._extract_alerts(sections), | |
| attention_points=self._extract_attention_points(sections), | |
| market_context=self._extract_market_context(sections), | |
| adaptability_score=self._calculate_adaptability_score(sections), | |
| stress_test_results=self._extract_stress_test_results(sections), | |
| asset_allocation=asset_allocation, | |
| geographic_allocation=geo_allocation | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Error parsing response: {e}") | |
| return self._get_default_analysis(original_data) | |
| def _validate_results(self, analysis_result: ProfileAnalysisResult) -> bool: | |
| """Validate analysis results for consistency""" | |
| try: | |
| # Check essential values | |
| if not (0 <= analysis_result.risk_score <= 1): | |
| analysis_result.risk_score = min(max(analysis_result.risk_score, 0), 1) | |
| if analysis_result.investment_horizon <= 0: | |
| analysis_result.investment_horizon = 5 | |
| if not analysis_result.primary_goals: | |
| return False | |
| if not analysis_result.recommendations: | |
| return False | |
| # Check consistency between risk and horizon | |
| if analysis_result.risk_score > 0.7 and analysis_result.investment_horizon < 5: | |
| self.logger.warning("Inconsistency detected: high risk with short horizon") | |
| analysis_result.attention_points.append( | |
| "Attention: profil à risque élevé avec horizon court, prudence recommandée" | |
| ) | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Error validating analysis results: {e}") | |
| return False | |
| def _enrich_analysis(self, | |
| analysis: ProfileAnalysisResult, | |
| profile_data: Dict[str, Any]) -> ProfileAnalysisResult: | |
| """Enrich analysis with additional calculations""" | |
| try: | |
| # Calculate adjusted risk score | |
| base_risk_score = analysis.risk_score | |
| adjustments = [] | |
| # Age adjustment | |
| age = profile_data.get('age', 30) | |
| age_adjustment = max(0, (65 - age) / 65) * 0.2 | |
| adjustments.append(age_adjustment) | |
| # Knowledge adjustment | |
| knowledge = profile_data.get('financial_knowledge', 3) | |
| knowledge_adjustment = (knowledge - 3) * 0.1 | |
| adjustments.append(knowledge_adjustment) | |
| # Financial stability adjustment | |
| income = profile_data.get('annual_income', 0) | |
| debt = profile_data.get('total_debt', 0) | |
| if income > 0: | |
| debt_ratio = debt / income | |
| financial_adjustment = -0.1 if debt_ratio > 0.4 else 0.1 | |
| adjustments.append(financial_adjustment) | |
| # Goals diversification adjustment | |
| goals_diversification = min(len(profile_data.get('investment_goals', [])) * 0.05, 0.2) | |
| adjustments.append(goals_diversification) | |
| # Apply adjustments | |
| final_risk_score = base_risk_score | |
| for adj in adjustments: | |
| final_risk_score = max(0, min(1, final_risk_score + adj)) | |
| # Update analysis | |
| analysis.risk_score = final_risk_score | |
| analysis.risk_decomposition.update({ | |
| 'age_factor': age_adjustment, | |
| 'knowledge_factor': knowledge_adjustment, | |
| 'financial_stability': financial_adjustment if 'financial_adjustment' in locals() else 0, | |
| 'goals_diversification': goals_diversification | |
| }) | |
| # Add default allocation if missing | |
| if not analysis.asset_allocation: | |
| analysis.asset_allocation = self._generate_default_allocation(final_risk_score) | |
| if not analysis.geographic_allocation: | |
| analysis.geographic_allocation = self._generate_default_geo_allocation() | |
| # Calculate market adaptability score | |
| if not analysis.market_sensitivity: | |
| # Create default market sensitivity based on risk score | |
| analysis.market_sensitivity = { | |
| 'market_beta': final_risk_score * 0.8 + 0.2, | |
| 'interest_rate': max(0, 0.7 - final_risk_score * 0.4), | |
| 'economic_cycle': 0.4 + final_risk_score * 0.3, | |
| 'inflation': max(0, 0.6 - final_risk_score * 0.2) | |
| } | |
| # Calculate goal feasibility if missing | |
| if not analysis.goal_feasibility and profile_data.get('investment_objectives'): | |
| objectives = profile_data.get('investment_objectives') | |
| if hasattr(objectives, 'calculate_goal_feasibility'): | |
| monthly_contribution = profile_data.get('monthly_savings', 0) | |
| expected_return = 0.03 + final_risk_score * 0.04 # Simple expected return model | |
| analysis.goal_feasibility = objectives.calculate_goal_feasibility( | |
| monthly_contribution=monthly_contribution, | |
| expected_return=expected_return | |
| ) | |
| return analysis | |
| except Exception as e: | |
| self.logger.error(f"Error enriching analysis: {e}") | |
| return analysis | |
| def _get_default_analysis(self, profile_data: Dict[str, Any]) -> ProfileAnalysisResult: | |
| """Generate default analysis when LLM fails""" | |
| risk_score = profile_data.get('risk_tolerance', 3) / 5 | |
| return ProfileAnalysisResult( | |
| risk_score=risk_score, | |
| investment_horizon=profile_data.get('investment_horizon', 5), | |
| primary_goals=profile_data.get('investment_goals', ["Épargne générale"]), | |
| constraints={}, | |
| recommendations=[ | |
| "Construire un portefeuille diversifié", | |
| "Maintenir une réserve d'urgence", | |
| "Investir régulièrement" | |
| ], | |
| explanation="Analyse par défaut basée sur les données fournies.", | |
| asset_allocation=self._generate_default_allocation(risk_score), | |
| geographic_allocation=self._generate_default_geo_allocation() | |
| ) | |
| def _generate_default_allocation(self, risk_score: float) -> Dict[str, float]: | |
| """Generate default asset allocation based on risk score""" | |
| # Simple allocation model based on risk score | |
| stocks = risk_score | |
| bonds = (1 - risk_score) * 0.8 | |
| cash = (1 - risk_score) * 0.2 | |
| return { | |
| 'Actions': stocks, | |
| 'Obligations': bonds, | |
| 'Liquidités': cash | |
| } | |
| def _generate_default_geo_allocation(self) -> Dict[str, float]: | |
| """Generate default geographic allocation""" | |
| return { | |
| 'Europe': 0.4, | |
| 'Amérique du Nord': 0.3, | |
| 'Asie-Pacifique': 0.2, | |
| 'Marchés émergents': 0.1 | |
| } | |
| # Extraction methods for parsing LLM response | |
| def _extract_risk_score(self, sections: List[str]) -> float: | |
| """Extract risk score from text""" | |
| risk_line = next((s for s in sections if s.startswith("1. Score de risque")), "") | |
| try: | |
| matches = re.findall(r'(\d*\.?\d+)', risk_line) | |
| score = float(matches[0]) if matches else 0.5 | |
| return min(max(score, 0), 1) | |
| except (IndexError, ValueError): | |
| return 0.5 | |
| def _extract_horizon(self, sections: List[str]) -> int: | |
| """Extract investment horizon""" | |
| horizon_line = next((s for s in sections if s.startswith("2. Horizon")), "") | |
| try: | |
| matches = re.findall(r'(\d+)', horizon_line) | |
| return int(matches[0]) if matches else 5 | |
| except (IndexError, ValueError): | |
| return 5 | |
| def _extract_goals(self, sections: List[str]) -> List[str]: | |
| """Extract prioritized goals""" | |
| goals_section = next((s for s in sections if s.startswith("3. Objectifs")), "") | |
| if not goals_section: | |
| return [] | |
| try: | |
| goals_text = goals_section.split(":")[-1].strip() | |
| goals = re.split(r'[•\-\d]+\.?\s*', goals_text) | |
| return [g.strip() for g in goals if g.strip()] | |
| except Exception: | |
| return [] | |
| def _extract_constraints(self, sections: List[str]) -> Dict[str, Any]: | |
| """Extract constraints and limitations""" | |
| constraints_section = next((s for s in sections if s.startswith("4. Contraintes")), "") | |
| if not constraints_section: | |
| return {} | |
| constraints = {} | |
| try: | |
| text = constraints_section.split(":")[-1].strip() | |
| # Analyze key constraints | |
| if re.search(r'liquidité|liquide|court terme', text, re.I): | |
| constraints['liquidity_needed'] = True | |
| if re.search(r'esg|durable|responsable|éthique', text, re.I): | |
| constraints['esg_focus'] = True | |
| if re.search(r'revenu|dividende|rente', text, re.I): | |
| constraints['income_focus'] = True | |
| if re.search(r'protection|capital|garanti', text, re.I): | |
| constraints['capital_protection'] = True | |
| if re.search(r'fiscalité|impôt|tax', text, re.I): | |
| constraints['tax_optimization'] = True | |
| if re.search(r'succession|transmission|héritage', text, re.I): | |
| constraints['inheritance_planning'] = True | |
| return constraints | |
| except Exception: | |
| return {} | |
| def _extract_recommendations(self, sections: List[str]) -> List[str]: | |
| """Extract specific recommendations""" | |
| recommendations = [] | |
| rec_section = next((s for s in sections if s.startswith("8. Recommandations")), "") | |
| if rec_section: | |
| parts = re.split(r'[•\-\d]+\.?\s*', rec_section) | |
| for part in parts: | |
| clean_part = part.strip() | |
| if clean_part and not clean_part.startswith("8. Recommandations"): | |
| recommendations.append(clean_part) | |
| # If no recommendations found, check action plan | |
| if not recommendations: | |
| plan_section = next((s for s in sections if s.startswith("7. Plan d'action")), "") | |
| if plan_section: | |
| parts = re.split(r'[•\-\d]+\.?\s*', plan_section) | |
| for part in parts: | |
| clean_part = part.strip() | |
| if clean_part and not clean_part.startswith("7. Plan"): | |
| recommendations.append(clean_part) | |
| # Default recommendations if still empty | |
| if not recommendations: | |
| recommendations = [ | |
| "Diversifier le portefeuille selon le profil de risque", | |
| "Maintenir une épargne de sécurité", | |
| "Adopter une stratégie d'investissement régulier" | |
| ] | |
| return recommendations | |
| def _extract_explanation(self, sections: List[str]) -> str: | |
| """Extract detailed explanation""" | |
| explanation_section = next((s for s in sections if s.startswith("9. Explication")), "") | |
| if explanation_section: | |
| explanation = explanation_section.split(":", 1)[-1].strip() | |
| if explanation: | |
| return explanation | |
| # Build explanation from multiple sections if not found | |
| combined_explanation = [] | |
| for section_num in [1, 2, 3, 4, 8]: | |
| section = next((s for s in sections if s.startswith(f"{section_num}.")), "") | |
| if section: | |
| combined_explanation.append(section) | |
| if combined_explanation: | |
| return "\n\n".join(combined_explanation) | |
| return "Analyse basée sur le profil d'investisseur fourni." | |
| def _extract_asset_allocation(self, sections: List[str]) -> Dict[str, float]: | |
| """Extract asset allocation recommendations""" | |
| allocation_section = next((s for s in sections if "classe d'actifs" in s.lower()), "") | |
| allocation = {} | |
| if allocation_section: | |
| matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', allocation_section) | |
| for match in matches: | |
| asset_class = match.group(1).strip() | |
| percentage = float(match.group(2).replace(',', '.')) | |
| allocation[asset_class] = percentage / 100 | |
| # Look elsewhere if not found | |
| if not allocation: | |
| for section in sections: | |
| if re.search(r'allocation|répartition|portefeuille', section, re.I): | |
| matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', section) | |
| for match in matches: | |
| asset_class = match.group(1).strip() | |
| percentage = float(match.group(2).replace(',', '.')) | |
| allocation[asset_class] = percentage / 100 | |
| # Normalize weights if needed | |
| total = sum(allocation.values()) | |
| if 0 < total < 0.99 or total > 1.01: | |
| allocation = {k: v/total for k, v in allocation.items()} | |
| return allocation | |
| def _extract_geographic_allocation(self, sections: List[str]) -> Dict[str, float]: | |
| """Extract geographic allocation recommendations""" | |
| geo_section = next((s for s in sections if "géographique" in s.lower()), "") | |
| allocation = {} | |
| if geo_section: | |
| matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', geo_section) | |
| for match in matches: | |
| region = match.group(1).strip() | |
| percentage = float(match.group(2).replace(',', '.')) | |
| allocation[region] = percentage / 100 | |
| # Look elsewhere if not found | |
| if not allocation: | |
| for section in sections: | |
| if re.search(r'zone|région|international|pays', section, re.I): | |
| matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', section) | |
| for match in matches: | |
| region = match.group(1).strip() | |
| percentage = float(match.group(2).replace(',', '.')) | |
| allocation[region] = percentage / 100 | |
| # Normalize weights if needed | |
| total = sum(allocation.values()) | |
| if 0 < total < 0.99 or total > 1.01: | |
| allocation = {k: v/total for k, v in allocation.items()} | |
| return allocation | |
| def _extract_risk_decomposition(self, sections: List[str]) -> Dict[str, float]: | |
| """Extract risk decomposition""" | |
| risk_section = next((s for s in sections if "Analyse des risques" in s), "") | |
| decomposition = {} | |
| if risk_section: | |
| try: | |
| market_risk = re.search(r'marché[^\d]*(\d+(?:\.\d+)?)', risk_section) | |
| if market_risk: | |
| decomposition['market_risk'] = float(market_risk.group(1)) / 100 if float(market_risk.group(1)) > 1 else float(market_risk.group(1)) | |
| credit_risk = re.search(r'crédit[^\d]*(\d+(?:\.\d+)?)', risk_section) | |
| if credit_risk: | |
| decomposition['credit_risk'] = float(credit_risk.group(1)) / 100 if float(credit_risk.group(1)) > 1 else float(credit_risk.group(1)) | |
| liquidity_risk = re.search(r'liquidité[^\d]*(\d+(?:\.\d+)?)', risk_section) | |
| if liquidity_risk: | |
| decomposition['liquidity_risk'] = float(liquidity_risk.group(1)) / 100 if float(liquidity_risk.group(1)) > 1 else float(liquidity_risk.group(1)) | |
| operational_risk = re.search(r'opérationnel[^\d]*(\d+(?:\.\d+)?)', risk_section) | |
| if operational_risk: | |
| decomposition['operational_risk'] = float(operational_risk.group(1)) / 100 if float(operational_risk.group(1)) > 1 else float(operational_risk.group(1)) | |
| except ValueError: | |
| pass | |
| # Default values if nothing found | |
| if not decomposition: | |
| decomposition = { | |
| 'market_risk': 0.4, | |
| 'credit_risk': 0.2, | |
| 'liquidity_risk': 0.2, | |
| 'operational_risk': 0.2 | |
| } | |
| return decomposition | |
| def _extract_goal_feasibility(self, sections: List[str]) -> Dict[str, float]: | |
| """Extract goal feasibility assessment""" | |
| goals_section = next((s for s in sections if "Objectifs prioritaires" in s), "") | |
| feasibility = {} | |
| if goals_section: | |
| goal_matches = re.finditer(r'([^:]+):\s*(\d+(?:\.\d+)?)', goals_section) | |
| for match in goal_matches: | |
| goal = match.group(1).strip() | |
| score = float(match.group(2)) | |
| feasibility[goal] = min(max(score / 100 if score > 1 else score, 0), 1) | |
| return feasibility | |
| def _extract_investment_style(self, sections: List[str]) -> Dict[str, float]: | |
| """Determine recommended investment style""" | |
| style_weights = { | |
| 'value': 0.0, | |
| 'growth': 0.0, | |
| 'income': 0.0, | |
| 'momentum': 0.0, | |
| 'quality': 0.0 | |
| } | |
| recommendations = next((s for s in sections if "Recommandations" in s), "") | |
| if recommendations: | |
| style_weights['value'] = len(re.findall(r'valeur|sous-évalué|value', recommendations, re.I)) * 0.2 | |
| style_weights['growth'] = len(re.findall(r'croissance|growth|innovation', recommendations, re.I)) * 0.2 | |
| style_weights['income'] = len(re.findall(r'revenu|dividende|income', recommendations, re.I)) * 0.2 | |
| style_weights['momentum'] = len(re.findall(r'momentum|tendance|trend', recommendations, re.I)) * 0.2 | |
| style_weights['quality'] = len(re.findall(r'qualité|quality|solide', recommendations, re.I)) * 0.2 | |
| # Normalize weights | |
| total = sum(style_weights.values()) | |
| if total > 0: | |
| style_weights = {k: v/total for k, v in style_weights.items()} | |
| return style_weights | |
| def _extract_market_sensitivity(self, sections: List[str]) -> Dict[str, float]: | |
| """Analyze sensitivity to market conditions""" | |
| sensitivity = { | |
| 'market_beta': 0.0, | |
| 'interest_rate': 0.0, | |
| 'economic_cycle': 0.0, | |
| 'inflation': 0.0 | |
| } | |
| analysis_text = "\n".join(sections) | |
| # Analyze mentions and context | |
| if 'beta' in analysis_text.lower(): | |
| beta_match = re.search(r'beta[^\d]*(\d+(?:\.\d+)?)', analysis_text, re.I) | |
| if beta_match: | |
| sensitivity['market_beta'] = float(beta_match.group(1)) | |
| if 'taux' in analysis_text.lower(): | |
| sensitivity['interest_rate'] = len(re.findall(r'taux|rate', analysis_text, re.I)) * 0.1 | |
| if 'cycle' in analysis_text.lower(): | |
| sensitivity['economic_cycle'] = len(re.findall(r'cycle|économique', analysis_text, re.I)) * 0.1 | |
| if 'inflation' in analysis_text.lower(): | |
| sensitivity['inflation'] = len(re.findall(r'inflation|prix', analysis_text, re.I)) * 0.1 | |
| # Normalize sensitivities | |
| max_value = max(sensitivity.values()) | |
| if max_value > 0: | |
| sensitivity = {k: v/max_value for k, v in sensitivity.items()} | |
| return sensitivity | |
| def _extract_alerts(self, sections: List[str]) -> List[Dict[str, Any]]: | |
| """Extract alerts and critical attention points""" | |
| alerts = [] | |
| attention_section = next((s for s in sections if "Points de vigilance" in s), "") | |
| if attention_section: | |
| alert_patterns = [ | |
| (r'(!+|ATTENTION|ALERTE)[^\n]*', 'critical'), | |
| (r'attention|vigilance[^\n]*', 'warning'), | |
| (r'noter|remarquer[^\n]*', 'info') | |
| ] | |
| for pattern, severity in alert_patterns: | |
| matches = re.finditer(pattern, attention_section, re.I) | |
| for match in matches: | |
| alerts.append({ | |
| 'severity': severity, | |
| 'message': match.group(0).strip(), | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| return alerts | |
| def _extract_attention_points(self, sections: List[str]) -> List[str]: | |
| """Extract specific attention points""" | |
| attention_points = [] | |
| attention_section = next((s for s in sections if "Points d'attention" in s or "Points de vigilance" in s), "") | |
| if attention_section: | |
| points = re.split(r'[•\-\d]+\.?\s*', attention_section) | |
| attention_points = [p.strip() for p in points if p.strip() and not p.startswith("Points")] | |
| return attention_points | |
| def _extract_market_context(self, sections: List[str]) -> Dict[str, Any]: | |
| """Extract relevant market context""" | |
| context = {} | |
| market_section = next((s for s in sections if "Contexte de marché" in s or "conditions de marché" in s), "") | |
| if market_section: | |
| if re.search(r'baiss|bear|négatif', market_section, re.I): | |
| context['market_regime'] = 'bear' | |
| elif re.search(r'hauss|bull|positif', market_section, re.I): | |
| context['market_regime'] = 'bull' | |
| else: | |
| context['market_regime'] = 'neutral' | |
| # Volatility analysis | |
| if re.search(r'volatile|instable', market_section, re.I): | |
| context['volatility'] = 'high' | |
| elif re.search(r'stable|calme', market_section, re.I): | |
| context['volatility'] = 'low' | |
| else: | |
| context['volatility'] = 'moderate' | |
| # Market sentiment | |
| context['sentiment'] = self._analyze_market_sentiment(market_section) | |
| return context | |
| def _calculate_adaptability_score(self, sections: List[str]) -> float: | |
| """Calculate profile adaptability score""" | |
| score = 0.5 # Base score | |
| text = "\n".join(sections) | |
| # Positive factors | |
| positive_factors = [ | |
| (r'flex|adapt|ajust', 0.1), | |
| (r'divers|équilibr', 0.1), | |
| (r'long terme|patient', 0.1), | |
| (r'expérience|connaissance', 0.1), | |
| (r'stable|régulier', 0.1) | |
| ] | |
| # Negative factors | |
| negative_factors = [ | |
| (r'rigid|fix|strict', -0.1), | |
| (r'risqu|peur|craint', -0.1), | |
| (r'court terme|urgent', -0.1), | |
| (r'limité|contraint', -0.1), | |
| (r'instable|irrégulier', -0.1) | |
| ] | |
| # Calculate score | |
| for pattern, weight in positive_factors + negative_factors: | |
| if re.search(pattern, text, re.I): | |
| score = min(max(score + weight, 0), 1) | |
| return score | |
| def _analyze_market_sentiment(self, text: str) -> str: | |
| """Analyze market sentiment""" | |
| positive_count = len(re.findall(r'positif|optimiste|favorable|hausse', text, re.I)) | |
| negative_count = len(re.findall(r'négatif|pessimiste|défavorable|baisse', text, re.I)) | |
| if positive_count > negative_count: | |
| return 'positive' | |
| elif negative_count > positive_count: | |
| return 'negative' | |
| return 'neutral' | |
| def _extract_stress_test_results(self, sections: List[str]) -> Dict[str, float]: | |
| """Extract stress test results""" | |
| results = {} | |
| stress_section = next((s for s in sections if "Stress tests" in s), "") | |
| if stress_section: | |
| scenarios = re.finditer(r'([^:]+):\s*-?\d+(?:\.\d+)?%', stress_section) | |
| for scenario in scenarios: | |
| name = scenario.group(1).strip() | |
| try: | |
| impact = float(re.search(r'-?\d+(?:\.\d+)?', scenario.group(0)).group(0)) | |
| results[name] = impact / 100 # Convert to decimal | |
| except (ValueError, AttributeError): | |
| continue | |
| # Default scenarios if nothing found | |
| if not results: | |
| results = { | |
| 'market_crash': -0.30, | |
| 'interest_rate_shock': -0.15, | |
| 'currency_crisis': -0.20, | |
| 'inflation_shock': -0.10 | |
| } | |
| return results | |
| async def generate_profile_description(self, profile_data: Dict[str, Any]) -> str: | |
| """Generate detailed profile description""" | |
| try: | |
| # Prepare prompt | |
| prompt = self.llm.prepare_description_prompt(profile_data) | |
| # Generate description | |
| description = await self.llm.generate_text(prompt) | |
| # Clean and format | |
| description = description.replace("[/INST]", "").strip() | |
| return description | |
| except Exception as e: | |
| self.logger.error(f"Error generating profile description: {e}") | |
| return self._generate_default_description(profile_data) | |
| def _generate_default_description(self, profile_data: Dict[str, Any]) -> str: | |
| """Generate default description if generation fails""" | |
| age = profile_data.get('age', 'Non spécifié') | |
| income = profile_data.get('annual_income', 'Non spécifié') | |
| goals = ", ".join(profile_data.get('investment_goals', ["épargne générale"])) | |
| risk = profile_data.get('risk_tolerance', 3) | |
| horizon = profile_data.get('investment_horizon', 5) | |
| risk_profile = "prudent" if risk <= 2 else "modéré" if risk <= 3 else "dynamique" | |
| return f"""Profil investisseur de {age} ans avec un revenu annuel de {income}€. | |
| Ce profil présente une tolérance au risque {risk_profile} (niveau {risk}/5) et un horizon d'investissement de {horizon} ans. | |
| Ses principaux objectifs sont: {goals}. | |
| Une stratégie d'investissement diversifiée est recommandée, adaptée à son niveau de risque {risk_profile}. | |
| L'allocation d'actifs devra tenir compte de ses objectifs spécifiques tout en respectant son horizon d'investissement. | |
| """ | |
| class DeepVestProfiler: | |
| """Advanced investor profiler with fine-tuned AI integration""" | |
| def __init__(self, config: Optional[DeepVestConfig] = None): | |
| """Initialize profiler with configuration""" | |
| self.config = config or DeepVestConfig() | |
| self.logger = logging.getLogger("DeepVest.Profiler") | |
| self.llm_analyzer = LLMProfileAnalyzer(self.config) | |
| self.profiles = {} # In-memory profile storage | |
| # Initialize database directory if persistent storage is enabled | |
| if self.config.use_persistent_storage: | |
| self._ensure_db_directory() | |
| def _ensure_db_directory(self): | |
| """Ensure database directory exists""" | |
| if not os.path.exists(self.config.db_path): | |
| os.makedirs(self.config.db_path) | |
| self.logger.info(f"Created database directory: {self.config.db_path}") | |
| async def create_profile(self, profile_data: Dict[str, Any]) -> PersonalProfile: | |
| """Create complete investor profile with LLM analysis""" | |
| try: | |
| self.logger.info("Creating new investor profile") | |
| # Process input data and handle French field names | |
| processed_data = profile_data.copy() | |
| # Map French field names for monthly savings | |
| if 'épargne_mensuelle' in processed_data: | |
| processed_data['monthly_savings'] = processed_data.pop('épargne_mensuelle') | |
| elif 'épargne mensuelle' in processed_data: | |
| processed_data['monthly_savings'] = processed_data.pop('épargne mensuelle') | |
| # Perform LLM analysis | |
| llm_analysis = await self.llm_analyzer.analyze_profile(processed_data) | |
| # Generate profile description | |
| profile_description = await self.llm_analyzer.generate_profile_description(processed_data) | |
| # Build profile | |
| profile = PersonalProfile( | |
| # Identifiers | |
| id=processed_data.get('id'), | |
| # Base profile | |
| risk_tolerance=processed_data.get('risk_tolerance', 3), | |
| investment_horizon=processed_data.get('investment_horizon', 5), | |
| income_stability=processed_data.get('income_stability', 3), | |
| financial_knowledge=processed_data.get('financial_knowledge', 3), | |
| investment_goals=processed_data.get('investment_goals', []), | |
| esg_preferences=processed_data.get('esg_preferences', False), | |
| constraints=processed_data.get('constraints', {}), | |
| # Personal information | |
| age=processed_data.get('age', 30), | |
| annual_income=processed_data.get('annual_income', 0), | |
| total_assets=processed_data.get('total_assets', 0), | |
| total_debt=processed_data.get('total_debt', 0), | |
| monthly_savings=processed_data.get('monthly_savings', 0), # Added monthly_savings | |
| occupation=processed_data.get('occupation', ''), | |
| industry=processed_data.get('industry', ''), | |
| education_level=processed_data.get('education_level', ''), | |
| marital_status=processed_data.get('marital_status', 'single'), | |
| number_of_dependents=processed_data.get('number_of_dependents', processed_data.get('dependents', 0)), | |
| # LLM analysis results | |
| llm_analysis_results=llm_analysis, | |
| risk_score=llm_analysis.risk_score, | |
| adaptability_score=llm_analysis.adaptability_score, | |
| complexity_tolerance=processed_data.get('complexity_tolerance', 0.5), | |
| profile_description=profile_description | |
| ) | |
| # Add detailed analyses | |
| await self._add_detailed_analysis(profile, processed_data) | |
| # Validate profile | |
| profile._validate_profile() | |
| # Calculate comprehensive risk metrics | |
| profile.calculate_comprehensive_risk_metrics() | |
| # Store in memory | |
| self.profiles[profile.id] = profile | |
| # Store in database if enabled | |
| if self.config.use_persistent_storage: | |
| self._save_profile_to_db(profile) | |
| self.logger.info(f"Profile created successfully with ID: {profile.id}") | |
| return profile | |
| except Exception as e: | |
| self.logger.error(f"Error creating profile: {e}") | |
| raise | |
| async def _add_detailed_analysis(self, profile: PersonalProfile, data: Dict[str, Any]): | |
| """Add detailed analyses to profile""" | |
| # Financial analysis | |
| try: | |
| profile.financial_analysis = FinancialAnalysis( | |
| income_stability=data.get('income_stability', 3) / 5, | |
| debt_ratio=profile.total_debt / profile.annual_income if profile.annual_income > 0 else 0, | |
| savings_capacity=data.get('monthly_savings', 0) / data.get('annual_income', 1) * 12 if data.get('annual_income', 0) > 0 else 0, | |
| emergency_fund_months=data.get('emergency_fund_months', 3), | |
| risk_capacity=profile.llm_analysis_results.risk_score if profile.llm_analysis_results else 0.5, | |
| investment_horizon=profile.investment_horizon, | |
| liquidity_needs=data.get('liquidity_needs', 0.2), | |
| income_sources=data.get('income_sources', {}), | |
| fixed_expenses=data.get('fixed_expenses', {}), | |
| variable_expenses=data.get('variable_expenses', {}) | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Error creating detailed financial analysis: {e}") | |
| # Family analysis | |
| try: | |
| profile.family_analysis = FamilyAnalysis( | |
| family_size=1 + (1 if data.get('marital_status') in ['married', 'partnered'] else 0) + data.get('dependents', 0), | |
| dependents=data.get('dependents', 0), | |
| life_stage=self._determine_life_stage(profile.age), | |
| future_events=data.get('future_events', []), | |
| monthly_obligations=data.get('monthly_obligations', 0), | |
| insurance_coverage=data.get('insurance_coverage', {}), | |
| education_needs=data.get('education_needs', {}), | |
| healthcare_needs=data.get('healthcare_needs', {}) | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Error creating detailed family analysis: {e}") | |
| # Investment objectives | |
| try: | |
| profile.investment_objectives = InvestmentGoals( | |
| primary_goals=[{'name': goal, 'priority': i+1} for i, goal in enumerate(profile.investment_goals)], | |
| timeframes=data.get('goal_timeframes', {}), | |
| required_returns=data.get('required_returns', {}), | |
| priority_order=profile.investment_goals, | |
| target_amounts=data.get('goal_amounts', {}), | |
| goal_progress=data.get('goal_progress', {}), | |
| risk_tolerance_by_goal=data.get('risk_by_goal', {}) | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Error creating detailed investment objectives: {e}") | |
| def _determine_life_stage(self, age: int) -> str: | |
| """Determine life stage based on age""" | |
| if age < 25: | |
| return "early_career" | |
| elif age < 35: | |
| return "career_building" | |
| elif age < 45: | |
| return "family_building" | |
| elif age < 55: | |
| return "peak_earning" | |
| elif age < 65: | |
| return "pre_retirement" | |
| else: | |
| return "retirement" | |
| def _save_profile_to_db(self, profile: PersonalProfile): | |
| """Save profile to persistent storage""" | |
| try: | |
| # Generate filename | |
| filename = f"{profile.id}.json" | |
| filepath = os.path.join(self.config.db_path, filename) | |
| # Export as JSON | |
| profile.export_json(filepath, include_detailed_analysis=True) | |
| self.logger.info(f"Profile saved to database: {filepath}") | |
| except Exception as e: | |
| self.logger.error(f"Error saving profile to database: {e}") | |
| async def get_profile(self, profile_id: str) -> Optional[PersonalProfile]: | |
| """Récupérer un profil par son ID""" | |
| try: | |
| # Check in-memory cache first | |
| if profile_id in self.profiles: | |
| self.logger.debug(f"Profile {profile_id} found in memory cache") | |
| return self.profiles[profile_id] | |
| # If not in memory and persistent storage is enabled, try to load from disk | |
| if self.config.use_persistent_storage: | |
| filepath = os.path.join(self.config.db_path, f"{profile_id}.json") | |
| if os.path.exists(filepath): | |
| self.logger.info(f"Loading profile {profile_id} from disk storage") | |
| profile = PersonalProfile.from_json(filepath) | |
| # Add to memory cache | |
| self.profiles[profile_id] = profile | |
| return profile | |
| self.logger.warning(f"Profile {profile_id} not found") | |
| return None | |
| except Exception as e: | |
| self.logger.error(f"Error retrieving profile {profile_id}: {e}") | |
| return None | |
| async def update_profile(self, profile_id: str, updates: Dict[str, Any]) -> PersonalProfile: | |
| """Met à jour un profil existant avec de nouvelles informations""" | |
| try: | |
| # Récupérer le profil existant | |
| profile = await self.get_profile(profile_id) | |
| if not profile: | |
| raise ValueError(f"Profile with ID {profile_id} not found") | |
| # Process updates for French field names | |
| processed_updates = updates.copy() | |
| if 'épargne_mensuelle' in processed_updates: | |
| processed_updates['monthly_savings'] = processed_updates.pop('épargne_mensuelle') | |
| elif 'épargne mensuelle' in processed_updates: | |
| processed_updates['monthly_savings'] = processed_updates.pop('épargne mensuelle') | |
| # Use dataclasses.asdict to properly convert dataclass to dict | |
| from dataclasses import asdict | |
| profile_dict = asdict(profile) | |
| # Remove fields that shouldn't be passed to constructor | |
| fields_to_remove = ['basic_info'] | |
| for field in fields_to_remove: | |
| if field in profile_dict: | |
| del profile_dict[field] | |
| # Apply the updates | |
| for key, value in processed_updates.items(): | |
| if key in profile_dict: | |
| profile_dict[key] = value | |
| # Recréer un objet PersonalProfile à partir du dictionnaire mis à jour | |
| # We need to handle potential unexpected arguments | |
| # Use only the fields that PersonalProfile accepts | |
| from inspect import signature | |
| profile_params = signature(PersonalProfile).parameters | |
| filtered_dict = {k: v for k, v in profile_dict.items() if k in profile_params} | |
| updated_profile = PersonalProfile(**filtered_dict) | |
| # Validate the updated profile | |
| if hasattr(updated_profile, '_validate_profile'): | |
| updated_profile._validate_profile() | |
| # Reanalyze the profile | |
| analysis_result = await self.analyzer.analyze_profile(updated_profile) | |
| updated_profile.llm_analysis_results = analysis_result | |
| # Sauvegarder le profil mis à jour | |
| await self._save_profile(updated_profile) | |
| self.logger.info(f"Profile {profile_id} updated successfully") | |
| return updated_profile | |
| except Exception as e: | |
| self.logger.error(f"Error updating profile {profile_id}: {str(e)}") | |
| raise | |
| def _are_changes_significant(self, updates: Dict[str, Any]) -> bool: | |
| """Determine if profile changes require re-analysis""" | |
| significant_fields = { | |
| 'risk_tolerance', 'investment_horizon', 'age', 'annual_income', | |
| 'total_assets', 'total_debt', 'investment_goals', 'marital_status', | |
| 'number_of_dependents', 'esg_preferences' | |
| } | |
| return any(field in updates for field in significant_fields) | |
| async def delete_profile(self, profile_id: str) -> bool: | |
| """Supprimer un profil""" | |
| try: | |
| # Remove from memory | |
| if profile_id in self.profiles: | |
| del self.profiles[profile_id] | |
| # Remove from persistent storage if enabled | |
| if self.config.use_persistent_storage: | |
| filepath = os.path.join(self.config.db_path, f"{profile_id}.json") | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| self.logger.info(f"Profile {profile_id} deleted successfully") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Error deleting profile {profile_id}: {e}") | |
| return False | |
| async def list_profiles(self, filter_criteria: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: | |
| """Liste tous les profils disponibles avec critères de filtrage optionnels""" | |
| try: | |
| profiles_list = [] | |
| # Load all profiles from disk if persistent storage is enabled | |
| if self.config.use_persistent_storage: | |
| self._load_all_profiles_to_memory() | |
| # Filter and convert profiles to basic info dictionaries | |
| for profile_id, profile in self.profiles.items(): | |
| if filter_criteria and not self._profile_matches_criteria(profile, filter_criteria): | |
| continue | |
| # Extract basic profile info | |
| profile_info = { | |
| 'id': profile.id, | |
| 'age': profile.age, | |
| 'risk_score': profile.risk_score, | |
| 'investment_horizon': profile.investment_horizon, | |
| 'creation_date': profile.created_at.isoformat(), | |
| 'last_updated': profile.last_updated.isoformat(), | |
| 'goals': profile.investment_goals | |
| } | |
| profiles_list.append(profile_info) | |
| return profiles_list | |
| except Exception as e: | |
| self.logger.error(f"Error listing profiles: {e}") | |
| return [] | |
| def _profile_matches_criteria(self, profile: PersonalProfile, criteria: Dict[str, Any]) -> bool: | |
| """Vérifie si un profil correspond aux critères de filtre""" | |
| for field, value in criteria.items(): | |
| if not hasattr(profile, field): | |
| return False | |
| profile_value = getattr(profile, field) | |
| # Handle range criteria | |
| if isinstance(value, dict) and ('min' in value or 'max' in value): | |
| if 'min' in value and profile_value < value['min']: | |
| return False | |
| if 'max' in value and profile_value > value['max']: | |
| return False | |
| # Handle list criteria (any match) | |
| elif isinstance(value, list): | |
| if profile_value not in value: | |
| return False | |
| # Direct comparison | |
| elif profile_value != value: | |
| return False | |
| return True | |
| def _load_all_profiles_to_memory(self): | |
| """Charge tous les profils depuis le stockage persistant vers la mémoire""" | |
| if not self.config.use_persistent_storage: | |
| return | |
| try: | |
| # Get all JSON files in the database directory | |
| db_path = self.config.db_path | |
| if not os.path.exists(db_path): | |
| self.logger.warning(f"Database directory {db_path} not found") | |
| return | |
| json_files = [f for f in os.listdir(db_path) if f.endswith('.json')] | |
| self.logger.info(f"Found {len(json_files)} profile files in database") | |
| # Load each file that isn't already in memory | |
| for json_file in json_files: | |
| profile_id = json_file.replace('.json', '') | |
| if profile_id not in self.profiles: | |
| filepath = os.path.join(db_path, json_file) | |
| try: | |
| profile = PersonalProfile.from_json(filepath) | |
| self.profiles[profile_id] = profile | |
| except Exception as e: | |
| self.logger.error(f"Error loading profile from {filepath}: {e}") | |
| except Exception as e: | |
| self.logger.error(f"Error loading profiles from disk: {e}") | |
| async def batch_analyze_profiles(self, profile_ids: List[str]) -> Dict[str, Any]: | |
| """Analyse par lots de plusieurs profils pour détection de tendances""" | |
| try: | |
| profiles = [] | |
| profile_map = {} | |
| # Load all requested profiles | |
| for profile_id in profile_ids: | |
| profile = await self.get_profile(profile_id) | |
| if profile: | |
| profiles.append(profile) | |
| profile_map[profile_id] = profile | |
| if not profiles: | |
| self.logger.warning("No valid profiles found for batch analysis") | |
| return {} | |
| # Extract common trends and insights | |
| age_groups = self._group_profiles_by_age(profiles) | |
| risk_groups = self._group_profiles_by_risk(profiles) | |
| goal_trends = self._analyze_goal_trends(profiles) | |
| # Perform clustering if we have enough profiles | |
| clusters = {} | |
| if len(profiles) >= 5: | |
| clusters = await self._cluster_profiles(profiles) | |
| return { | |
| 'age_groups': age_groups, | |
| 'risk_distribution': risk_groups, | |
| 'goal_trends': goal_trends, | |
| 'profile_clusters': clusters, | |
| 'total_profiles_analyzed': len(profiles) | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error in batch profile analysis: {e}") | |
| return {'error': str(e)} | |
| def _group_profiles_by_age(self, profiles: List[PersonalProfile]) -> Dict[str, int]: | |
| """Groupe les profils par tranche d'âge""" | |
| age_groups = { | |
| '18-25': 0, | |
| '26-35': 0, | |
| '36-45': 0, | |
| '46-55': 0, | |
| '56-65': 0, | |
| '65+': 0 | |
| } | |
| for profile in profiles: | |
| age = profile.age | |
| if age <= 25: | |
| age_groups['18-25'] += 1 | |
| elif age <= 35: | |
| age_groups['26-35'] += 1 | |
| elif age <= 45: | |
| age_groups['36-45'] += 1 | |
| elif age <= 55: | |
| age_groups['46-55'] += 1 | |
| elif age <= 65: | |
| age_groups['56-65'] += 1 | |
| else: | |
| age_groups['65+'] += 1 | |
| return age_groups | |
| def _group_profiles_by_risk(self, profiles: List[PersonalProfile]) -> Dict[str, int]: | |
| """Groupe les profils par niveau de risque""" | |
| risk_groups = { | |
| 'Très conservateur': 0, | |
| 'Conservateur': 0, | |
| 'Modéré': 0, | |
| 'Dynamique': 0, | |
| 'Très dynamique': 0 | |
| } | |
| for profile in profiles: | |
| risk_score = profile.risk_score | |
| if risk_score < 0.2: | |
| risk_groups['Très conservateur'] += 1 | |
| elif risk_score < 0.4: | |
| risk_groups['Conservateur'] += 1 | |
| elif risk_score < 0.6: | |
| risk_groups['Modéré'] += 1 | |
| elif risk_score < 0.8: | |
| risk_groups['Dynamique'] += 1 | |
| else: | |
| risk_groups['Très dynamique'] += 1 | |
| return risk_groups | |
| def _analyze_goal_trends(self, profiles: List[PersonalProfile]) -> Dict[str, Dict[str, Any]]: | |
| """Analyse les tendances dans les objectifs d'investissement""" | |
| all_goals = {} | |
| # Count goals | |
| for profile in profiles: | |
| for goal in profile.investment_goals: | |
| if goal not in all_goals: | |
| all_goals[goal] = { | |
| 'count': 0, | |
| 'avg_risk_score': 0, | |
| 'avg_investment_horizon': 0, | |
| 'profiles': [] | |
| } | |
| all_goals[goal]['count'] += 1 | |
| all_goals[goal]['avg_risk_score'] += profile.risk_score | |
| all_goals[goal]['avg_investment_horizon'] += profile.investment_horizon | |
| all_goals[goal]['profiles'].append(profile.id) | |
| # Calculate averages | |
| for goal, data in all_goals.items(): | |
| count = data['count'] | |
| if count > 0: | |
| data['avg_risk_score'] /= count | |
| data['avg_investment_horizon'] /= count | |
| data['percentage'] = (count / len(profiles)) * 100 | |
| return all_goals | |
| async def _cluster_profiles(self, profiles: List[PersonalProfile]) -> Dict[str, List[str]]: | |
| """Regroupe les profils en clusters basés sur leurs caractéristiques""" | |
| try: | |
| # Extract features for clustering | |
| features = [] | |
| profile_ids = [] | |
| for profile in profiles: | |
| # Create feature vector | |
| feature_vector = [ | |
| profile.risk_score, | |
| profile.investment_horizon / 10, # Normalize horizon | |
| profile.age / 100, # Normalize age | |
| profile.financial_knowledge / 5, # Normalize knowledge | |
| profile.income_stability / 5, # Normalize stability | |
| profile.total_assets / 1000000 if profile.total_assets > 0 else 0, # Normalize assets | |
| profile.total_debt / 1000000 if profile.total_debt > 0 else 0, # Normalize debt | |
| len(profile.investment_goals) / 5 # Normalize goal count | |
| ] | |
| features.append(feature_vector) | |
| profile_ids.append(profile.id) | |
| # Skip clustering if too few profiles | |
| if len(features) < 3: | |
| return {"cluster_1": profile_ids} | |
| # Perform k-means clustering | |
| from sklearn.cluster import KMeans | |
| from sklearn.preprocessing import StandardScaler | |
| # Standardize features | |
| scaler = StandardScaler() | |
| features_scaled = scaler.fit_transform(features) | |
| # Determine optimal number of clusters (between 2 and 5) | |
| n_clusters = min(max(2, len(profiles) // 3), 5) | |
| # Perform clustering | |
| kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) | |
| clusters = kmeans.fit_predict(features_scaled) | |
| # Organize profiles by cluster | |
| cluster_dict = {} | |
| for i, cluster_id in enumerate(clusters): | |
| cluster_name = f"cluster_{cluster_id + 1}" | |
| if cluster_name not in cluster_dict: | |
| cluster_dict[cluster_name] = [] | |
| cluster_dict[cluster_name].append(profile_ids[i]) | |
| return cluster_dict | |
| except Exception as e: | |
| self.logger.error(f"Error in profile clustering: {e}") | |
| return {"cluster_error": [p.id for p in profiles]} | |
| def generate_asset_allocation(self, risk_score: float, investment_horizon: int, esg_preferences: bool = False) -> Dict[str, Any]: | |
| """Génère une allocation d'actifs basée sur le profil de risque et l'horizon | |
| Args: | |
| risk_score: Score de risque (1-5 ou 0-1) | |
| investment_horizon: Horizon d'investissement en années | |
| esg_preferences: Indique si l'investisseur a des préférences ESG | |
| Returns: | |
| Dictionnaire contenant l'allocation d'actifs et le profil de risque | |
| """ | |
| try: | |
| # Normaliser le score de risque entre 0 et 1 s'il est sur une échelle différente | |
| normalized_risk = risk_score | |
| if normalized_risk > 1: # Si le score est sur une échelle de 1-5 | |
| normalized_risk = risk_score / 5.0 | |
| # Définir les catégories de risque | |
| risk_category = "" | |
| if normalized_risk < 0.2: | |
| risk_category = "Très conservateur" | |
| elif normalized_risk < 0.4: | |
| risk_category = "Conservateur" | |
| elif normalized_risk < 0.6: | |
| risk_category = "Modéré" | |
| elif normalized_risk < 0.8: | |
| risk_category = "Dynamique" | |
| else: | |
| risk_category = "Très dynamique" | |
| # Allocation de base selon le profil de risque | |
| allocation = {} | |
| if risk_category == "Très conservateur": | |
| allocation = { | |
| 'Actions': 0.15, | |
| 'Obligations': 0.65, | |
| 'Liquidités': 0.20 | |
| } | |
| elif risk_category == "Conservateur": | |
| allocation = { | |
| 'Actions': 0.30, | |
| 'Obligations': 0.55, | |
| 'Liquidités': 0.15 | |
| } | |
| elif risk_category == "Modéré": | |
| allocation = { | |
| 'Actions': 0.50, | |
| 'Obligations': 0.40, | |
| 'Liquidités': 0.10 | |
| } | |
| elif risk_category == "Dynamique": | |
| allocation = { | |
| 'Actions': 0.70, | |
| 'Obligations': 0.25, | |
| 'Liquidités': 0.05 | |
| } | |
| else: # Très dynamique | |
| allocation = { | |
| 'Actions': 0.85, | |
| 'Obligations': 0.10, | |
| 'Liquidités': 0.05 | |
| } | |
| # Ajustement selon l'horizon | |
| horizon_factor = 0 | |
| if investment_horizon < 5: | |
| # Réduire les actions pour les horizons courts | |
| horizon_factor = -0.15 | |
| elif investment_horizon > 15: | |
| # Augmenter les actions pour les horizons longs | |
| horizon_factor = 0.10 | |
| # Appliquer l'ajustement basé sur l'horizon | |
| if horizon_factor != 0: | |
| action_shift = min(allocation['Actions'], abs(horizon_factor)) * (1 if horizon_factor > 0 else -1) | |
| allocation['Actions'] += action_shift | |
| # Répartir le changement sur les obligations et liquidités | |
| remaining_shift = -action_shift | |
| obligation_shift = remaining_shift * 0.7 | |
| liquidity_shift = remaining_shift * 0.3 | |
| allocation['Obligations'] += obligation_shift | |
| allocation['Liquidités'] += liquidity_shift | |
| # Ajustement pour les préférences ESG | |
| if esg_preferences: | |
| esg_allocation = min(allocation['Actions'], 0.3) | |
| allocation['Actions'] -= esg_allocation | |
| allocation['Actions ESG'] = esg_allocation | |
| # S'assurer que tous les pourcentages sont positifs | |
| allocation = {k: max(0, v) for k, v in allocation.items()} | |
| # Normaliser pour que la somme soit égale à 1 | |
| total = sum(allocation.values()) | |
| allocation = {k: v / total for k, v in allocation.items()} | |
| # Retourner l'allocation avec la catégorie de risque | |
| return { | |
| 'asset_allocation': allocation, | |
| 'risk_profile': { | |
| 'score': normalized_risk, | |
| 'category': risk_category | |
| } | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error generating asset allocation: {str(e)}") | |
| # Allocation par défaut en cas d'erreur | |
| return { | |
| 'asset_allocation': {'Actions': 0.6, 'Obligations': 0.3, 'Liquidités': 0.1}, | |
| 'risk_profile': {'score': 0.5, 'category': 'Modéré'} | |
| } | |
| async def generate_investment_strategy(self, profile: PersonalProfile) -> Dict[str, Any]: | |
| """Génère une stratégie d'investissement personnalisée pour un profil""" | |
| try: | |
| # Vérifier que le profil est valide | |
| if not profile or not profile.id: | |
| raise ValueError("Invalid profile") | |
| # Obtenir monthly_savings avec fallback pour éviter les erreurs | |
| monthly_savings = getattr(profile, 'monthly_savings', 0) | |
| if monthly_savings == 0 and hasattr(profile, 'financial_analysis') and hasattr(profile.financial_analysis, 'monthly_savings'): | |
| monthly_savings = profile.financial_analysis.monthly_savings | |
| # Calculer l'allocation d'actifs | |
| risk_score = profile.risk_tolerance if 1 <= profile.risk_tolerance <= 5 else 3 | |
| asset_allocation = self.generate_asset_allocation( | |
| risk_score=risk_score, | |
| investment_horizon=profile.investment_horizon, | |
| esg_preferences=profile.esg_preferences | |
| ) | |
| # Personnaliser les recommandations en fonction du profil | |
| recommendations = [] | |
| # Recommandation de base pour tous | |
| recommendations.append("Diversifier le portefeuille selon le profil de risque") | |
| # Recommandations spécifiques selon le profil | |
| if profile.age < 30: | |
| recommendations.append("Privilégier les investissements à fort potentiel de croissance") | |
| recommendations.append("Adopter une stratégie d'investissement régulier (DCA)") | |
| elif profile.age < 50: | |
| recommendations.append("Équilibrer croissance et sécurité dans votre allocation") | |
| recommendations.append("Réviser annuellement votre stratégie fiscale") | |
| else: | |
| recommendations.append("Sécuriser progressivement votre capital") | |
| recommendations.append("Optimiser votre stratégie en vue de la retraite") | |
| # Recommandations selon les objectifs | |
| if "Retraite" in profile.investment_goals: | |
| recommendations.append("Maximiser les versements sur votre plan de retraite") | |
| if "Achat immobilier" in profile.investment_goals: | |
| recommendations.append("Constituer un apport pour votre projet immobilier") | |
| if "Études des enfants" in profile.investment_goals: | |
| recommendations.append("Mettre en place une épargne dédiée aux études des enfants") | |
| # Recommandations spécifiques à la situation financière | |
| if profile.total_debt > profile.annual_income: | |
| recommendations.append("Prioriser le remboursement des dettes à taux élevé") | |
| if monthly_savings < (profile.annual_income / 12) * 0.1: | |
| recommendations.append("Augmenter votre taux d'épargne mensuel") | |
| # Limiter à 5 recommandations principales | |
| if len(recommendations) > 5: | |
| recommendations = recommendations[:5] | |
| # Construire la stratégie complète | |
| strategy = { | |
| 'profile_id': profile.id, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'risk_profile': asset_allocation['risk_profile'], | |
| 'asset_allocation': asset_allocation['asset_allocation'], | |
| 'recommendations': recommendations, | |
| 'targets': { | |
| 'monthly_savings': max(profile.annual_income * 0.15 / 12, monthly_savings), | |
| 'emergency_fund': profile.annual_income / 6 | |
| }, | |
| 'investment_products': self._recommend_products(profile, asset_allocation) | |
| } | |
| self.logger.info(f"Generated investment strategy for profile {profile.id}") | |
| return strategy | |
| except Exception as e: | |
| self.logger.error(f"Error generating investment strategy: {str(e)}") | |
| # Stratégie par défaut en cas d'erreur | |
| return { | |
| 'risk_profile': {'category': 'Modéré'}, | |
| 'asset_allocation': {'Actions': 0.6, 'Obligations': 0.3, 'Liquidités': 0.1}, | |
| 'recommendations': [ | |
| "Diversifier le portefeuille selon le profil de risque", | |
| "Maintenir une épargne de sécurité", | |
| "Adopter une stratégie d'investissement régulier" | |
| ] | |
| } | |
| def _refine_asset_allocation(self, allocation: Dict[str, float], profile: PersonalProfile) -> Dict[str, float]: | |
| """Raffine l'allocation d'actifs basée sur des contraintes supplémentaires""" | |
| refined = allocation.copy() | |
| # Apply ESG preference adjustments | |
| if profile.esg_preferences and 'Actions' in refined: | |
| # Reduce regular stocks and add ESG stocks | |
| regular_stocks = refined.get('Actions', 0) | |
| if regular_stocks > 0.3: | |
| refined['Actions'] = regular_stocks * 0.7 | |
| refined['Actions ESG'] = regular_stocks * 0.3 | |
| # Apply liquidity needs adjustment | |
| if hasattr(profile, 'financial_analysis') and profile.financial_analysis: | |
| liquidity_needs = profile.financial_analysis.liquidity_needs | |
| if liquidity_needs > 0.3 and 'Liquidités' in refined: | |
| # Ensure minimum cash allocation | |
| min_cash = max(0.15, liquidity_needs) | |
| if refined['Liquidités'] < min_cash: | |
| # Reduce other allocations proportionally | |
| shortfall = min_cash - refined['Liquidités'] | |
| for asset, weight in list(refined.items()): | |
| if asset != 'Liquidités': | |
| reduction = weight * (shortfall / (1 - refined['Liquidités'])) | |
| refined[asset] = max(0, weight - reduction) | |
| refined['Liquidités'] = min_cash | |
| # Normalize to ensure sum is 1.0 | |
| total = sum(refined.values()) | |
| if abs(total - 1.0) > 0.01: | |
| refined = {k: v/total for k, v in refined.items()} | |
| return refined | |
| def _refine_geographic_allocation(self, allocation: Dict[str, float], profile: PersonalProfile) -> Dict[str, float]: | |
| """Raffine l'allocation géographique basée sur des contraintes supplémentaires""" | |
| # Use provided allocation or default | |
| if not allocation: | |
| allocation = { | |
| 'Europe': 0.4, | |
| 'Amérique du Nord': 0.3, | |
| 'Asie-Pacifique': 0.2, | |
| 'Pays émergents': 0.1 | |
| } | |
| refined = allocation.copy() | |
| # Apply profile-specific adjustments | |
| if profile.risk_score < 0.3: | |
| # More conservative profile - reduce emerging markets | |
| if 'Pays émergents' in refined: | |
| emerging = refined['Pays émergents'] | |
| refined['Pays émergents'] = emerging * 0.5 | |
| # Redistribute to developed markets | |
| for region in ['Europe', 'Amérique du Nord']: | |
| if region in refined: | |
| refined[region] += emerging * 0.25 | |
| elif profile.risk_score > 0.7: | |
| # More aggressive profile - increase emerging markets | |
| if 'Pays émergents' in refined and refined['Pays émergents'] < 0.2: | |
| increase = min(0.1, 0.2 - refined['Pays émergents']) | |
| refined['Pays émergents'] += increase | |
| # Take from developed markets | |
| for region in ['Europe', 'Amérique du Nord']: | |
| if region in refined: | |
| refined[region] -= increase / 2 | |
| # Normalize to ensure sum is 1.0 | |
| total = sum(refined.values()) | |
| if abs(total - 1.0) > 0.01: | |
| refined = {k: v/total for k, v in refined.items()} | |
| return refined | |
| def _generate_implementation_steps(self, strategy: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Génère des étapes d'implémentation pratiques pour la stratégie""" | |
| steps = [] | |
| # Step 1: Set up accounts | |
| steps.append({ | |
| 'order': 1, | |
| 'title': 'Mise en place des comptes', | |
| 'description': 'Ouvrir les comptes nécessaires pour implémenter la stratégie', | |
| 'actions': [ | |
| 'Ouvrir un compte-titres classique pour les investissements généraux', | |
| 'Mettre en place un PEA pour les actions européennes', | |
| 'Considérer une assurance-vie pour les investissements à long terme' | |
| ] | |
| }) | |
| # Step 2: Initial asset allocation | |
| asset_allocation = strategy.get('asset_allocation', {}) | |
| allocation_actions = [] | |
| for asset, weight in asset_allocation.items(): | |
| allocation_actions.append(f'Allouer {weight:.1%} du portefeuille à la classe d\'actifs "{asset}"') | |
| steps.append({ | |
| 'order': 2, | |
| 'title': 'Allocation initiale des actifs', | |
| 'description': 'Répartir les investissements selon l\'allocation d\'actifs recommandée', | |
| 'actions': allocation_actions | |
| }) | |
| # Step 3: Implementation schedule | |
| steps.append({ | |
| 'order': 3, | |
| 'title': 'Calendrier d\'implémentation', | |
| 'description': 'Établir un calendrier progressif pour l\'implémentation de la stratégie', | |
| 'actions': [ | |
| 'Mois 1: Mise en place des comptes et investissement de 25% du capital', | |
| 'Mois 2-3: Investissement de 25% supplémentaires', | |
| 'Mois 4-6: Investissement du capital restant', | |
| 'Trimestriel: Rééquilibrage du portefeuille selon l\'allocation cible' | |
| ] | |
| }) | |
| # Step 4: Monitoring and adjustments | |
| steps.append({ | |
| 'order': 4, | |
| 'title': 'Suivi et ajustements', | |
| 'description': 'Établir un processus de suivi régulier de la stratégie', | |
| 'actions': [ | |
| 'Suivre mensuellement la performance du portefeuille', | |
| 'Vérifier trimestriellement l\'alignement avec l\'allocation cible', | |
| 'Réévaluer annuellement la stratégie complète', | |
| 'Ajuster en fonction des changements de situation personnelle' | |
| ] | |
| }) | |
| return steps | |
| def _determine_rebalancing_schedule(self, profile: PersonalProfile) -> Dict[str, Any]: | |
| """Détermine le calendrier de rééquilibrage optimal""" | |
| # Base frequency on risk profile and investment knowledge | |
| if profile.risk_score > 0.7 or profile.financial_knowledge >= 4: | |
| frequency = 'Trimestrielle' | |
| threshold = 0.05 # 5% drift threshold | |
| elif profile.risk_score < 0.3 or profile.financial_knowledge <= 2: | |
| frequency = 'Annuelle' | |
| threshold = 0.1 # 10% drift threshold | |
| else: | |
| frequency = 'Semestrielle' | |
| threshold = 0.075 # 7.5% drift threshold | |
| return { | |
| 'frequency': frequency, | |
| 'drift_threshold': threshold, | |
| 'method': 'Threshold-based' if profile.financial_knowledge >= 3 else 'Calendar-based', | |
| 'next_scheduled': datetime.now() + self._frequency_to_timedelta(frequency) | |
| } | |
| def _frequency_to_timedelta(self, frequency: str) -> timedelta: | |
| """Convertit une fréquence textuelle en objet timedelta""" | |
| if frequency == 'Mensuelle': | |
| return timedelta(days=30) | |
| elif frequency == 'Trimestrielle': | |
| return timedelta(days=90) | |
| elif frequency == 'Semestrielle': | |
| return timedelta(days=180) | |
| elif frequency == 'Annuelle': | |
| return timedelta(days=365) | |
| else: | |
| return timedelta(days=90) # Default to quarterly | |
| def _calculate_expected_performance(self, allocation: Dict[str, float], years: int) -> Dict[str, Any]: | |
| """Calcule les performances attendues basées sur l'allocation et l'horizon""" | |
| # Define return and volatility assumptions by asset class | |
| assumptions = { | |
| 'Actions': {'return': 0.07, 'volatility': 0.18}, | |
| 'Actions ESG': {'return': 0.065, 'volatility': 0.17}, | |
| 'Obligations': {'return': 0.03, 'volatility': 0.05}, | |
| 'Immobilier': {'return': 0.05, 'volatility': 0.15}, | |
| 'Liquidités': {'return': 0.01, 'volatility': 0.01}, | |
| 'Or': {'return': 0.04, 'volatility': 0.16}, | |
| 'Matières premières': {'return': 0.04, 'volatility': 0.2}, | |
| 'Cryptomonnaies': {'return': 0.15, 'volatility': 0.6} | |
| } | |
| # Calculate weighted expected return and volatility | |
| expected_return = 0 | |
| expected_volatility = 0 | |
| for asset, weight in allocation.items(): | |
| if asset in assumptions: | |
| expected_return += assumptions[asset]['return'] * weight | |
| expected_volatility += assumptions[asset]['volatility'] * weight | |
| # Apply time horizon adjustments (slightly higher returns for longer horizons) | |
| if years > 10: | |
| expected_return *= 1.05 | |
| elif years < 5: | |
| expected_return *= 0.95 | |
| # Calculate compound return | |
| compound_return = (1 + expected_return) ** years - 1 | |
| # Calculate different scenarios | |
| return { | |
| 'expected_annual_return': expected_return, | |
| 'expected_volatility': expected_volatility, | |
| 'expected_total_return': compound_return, | |
| 'scenarios': { | |
| 'conservative': compound_return * 0.7, | |
| 'expected': compound_return, | |
| 'optimistic': compound_return * 1.3 | |
| }, | |
| 'assumptions': { | |
| 'time_horizon': years, | |
| 'asset_assumptions': assumptions | |
| } | |
| } | |
| async def _generate_fallback_strategy(self, profile: PersonalProfile) -> Dict[str, Any]: | |
| """Génère une stratégie de repli lorsque l'analyse LLM n'est pas disponible""" | |
| # Generate asset allocation based on risk profile | |
| asset_allocation = self._generate_default_allocation(profile) | |
| # Standard geographic allocation | |
| geographic_allocation = { | |
| 'Europe': 0.4, | |
| 'Amérique du Nord': 0.3, | |
| 'Asie-Pacifique': 0.2, | |
| 'Pays émergents': 0.1 | |
| } | |
| # Default recommendations | |
| recommendations = [ | |
| "Diversifier votre portefeuille selon votre profil de risque", | |
| "Maintenir une épargne de sécurité équivalente à 3-6 mois de dépenses", | |
| "Investir régulièrement pour lisser l'exposition au risque de marché", | |
| "Rééquilibrer périodiquement votre portefeuille pour maintenir l'allocation cible" | |
| ] | |
| # Default attention points | |
| attention_points = [ | |
| "Surveiller l'évolution de votre tolérance au risque avec le temps", | |
| "Ajuster votre stratégie en fonction des changements importants de votre situation personnelle", | |
| "Rester discipliné face aux fluctuations du marché" | |
| ] | |
| return { | |
| 'risk_profile': { | |
| 'score': profile.risk_tolerance / 5, | |
| 'category': self._get_risk_category(profile.risk_tolerance / 5) | |
| }, | |
| 'investment_horizon': profile.investment_horizon, | |
| 'primary_goals': profile.investment_goals[:3], | |
| 'asset_allocation': asset_allocation, | |
| 'geographic_allocation': geographic_allocation, | |
| 'recommendations': recommendations, | |
| 'attention_points': attention_points | |
| } | |
| def _generate_default_allocation(self, profile: PersonalProfile) -> Dict[str, float]: | |
| """Génère une allocation par défaut basée sur le profil de risque""" | |
| risk_score = profile.risk_tolerance / 5 | |
| # Base allocation components | |
| stocks = risk_score | |
| bonds = (1 - risk_score) * 0.8 | |
| cash = (1 - risk_score) * 0.2 | |
| # Additional components for higher risk profiles | |
| if risk_score > 0.6: | |
| # Add alternatives for high risk profiles | |
| alternatives = stocks * 0.15 | |
| stocks -= alternatives | |
| return { | |
| 'Actions': stocks, | |
| 'Obligations': bonds, | |
| 'Liquidités': cash, | |
| 'Actifs alternatifs': alternatives | |
| } | |
| else: | |
| return { | |
| 'Actions': stocks, | |
| 'Obligations': bonds, | |
| 'Liquidités': cash | |
| } | |
| def _get_risk_category(self, risk_score: float) -> str: | |
| """Détermine la catégorie de risque basée sur le score""" | |
| if risk_score < 0.2: | |
| return "Très conservateur" | |
| elif risk_score < 0.4: | |
| return "Conservateur" | |
| elif risk_score < 0.6: | |
| return "Modéré" | |
| elif risk_score < 0.8: | |
| return "Dynamique" | |
| else: | |
| return "Très dynamique" | |
| def _generate_simple_fallback_strategy(self, profile: PersonalProfile) -> Dict[str, Any]: | |
| """Génère une stratégie simplifiée en cas d'erreur""" | |
| risk_level = profile.risk_tolerance | |
| if risk_level <= 2: | |
| allocation = {'Actions': 0.3, 'Obligations': 0.5, 'Liquidités': 0.2} | |
| elif risk_level <= 3: | |
| allocation = {'Actions': 0.5, 'Obligations': 0.4, 'Liquidités': 0.1} | |
| else: | |
| allocation = {'Actions': 0.7, 'Obligations': 0.2, 'Liquidités': 0.1} | |
| return { | |
| 'risk_profile': { | |
| 'score': risk_level / 5, | |
| 'category': self._get_risk_category(risk_level / 5) | |
| }, | |
| 'investment_horizon': profile.investment_horizon, | |
| 'asset_allocation': allocation, | |
| 'recommendations': [ | |
| "Diversifier votre portefeuille", | |
| "Investir régulièrement", | |
| "Maintenir une épargne de sécurité" | |
| ] | |
| } | |