ProfilingAI / src /core /profiling.py
Sandrine Guétin
Version propre de DeepVest
2106f78
raw
history blame
134 kB
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import os
import re
from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple, Union, Set
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
try:
from transformers import PhiForCausalLM
except ImportError:
pass
try:
from peft import PeftModel, PeftConfig
except ImportError:
pass
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("DeepVest")
# Configuration manager
@dataclass
class DeepVestConfig:
"""Configuration for the DeepVest system"""
# Model configuration
model_path: str = "financial_profiler_model"
use_quantization: bool = True
cache_size: int = 100
# Database configuration
db_path: str = "profiles_db"
use_persistent_storage: bool = True
# Analysis configuration
default_horizon: int = 5
default_risk_tolerance: int = 3
min_recommendation_count: int = 3
# Performance settings
enable_caching: bool = True
batch_size: int = 4
parallel_processing: bool = True
# Market data integration
fetch_market_data: bool = False
market_data_api_key: Optional[str] = None
# Debug settings
debug_mode: bool = False
log_prompts: bool = False
def __post_init__(self):
"""Validate configuration after initialization"""
if self.debug_mode:
logging.getLogger("DeepVest").setLevel(logging.DEBUG)
# Core data models
@dataclass
class FinancialAnalysis:
"""Detailed financial analysis of an investor profile"""
income_stability: float = 0.5
debt_ratio: float = 0.0
savings_capacity: float = 0.0
emergency_fund_months: float = 3.0
risk_capacity: float = 0.5
investment_horizon: int = 5
liquidity_needs: float = 0.2
income_sources: Dict[str, float] = field(default_factory=dict)
fixed_expenses: Dict[str, float] = field(default_factory=dict)
variable_expenses: Dict[str, float] = field(default_factory=dict)
credit_score: Optional[int] = None
tax_bracket: Optional[float] = None
def validate(self) -> bool:
"""Validate financial analysis attributes"""
try:
validations = [
0 <= self.income_stability <= 1,
self.debt_ratio >= 0,
0 <= self.savings_capacity <= 1,
self.emergency_fund_months >= 0,
0 <= self.risk_capacity <= 1,
self.investment_horizon > 0,
0 <= self.liquidity_needs <= 1
]
if self.credit_score is not None:
validations.append(300 <= self.credit_score <= 850)
if self.tax_bracket is not None:
validations.append(0 <= self.tax_bracket <= 1)
return all(validations)
except Exception as e:
logger.error(f"Error validating financial analysis: {e}")
return False
def calculate_total_expenses(self) -> float:
"""Calculate total monthly expenses"""
return (
sum(self.fixed_expenses.values()) +
sum(self.variable_expenses.values())
)
def calculate_savings_rate(self) -> float:
"""Calculate savings rate as a percentage of income"""
total_income = sum(self.income_sources.values())
if total_income == 0:
return 0
return self.savings_capacity / total_income
def get_financial_health_score(self) -> float:
"""Calculate overall financial health score (0-1)"""
# Calculate components with different weights
stability_weight = 0.3
debt_weight = 0.25
savings_weight = 0.25
emergency_weight = 0.2
# Calculate debt score (lower is better)
debt_score = max(0, 1 - (self.debt_ratio / 0.5))
# Calculate emergency fund score (higher is better)
emergency_score = min(1, self.emergency_fund_months / 6)
# Calculate savings score
savings_score = min(1, self.savings_capacity * 3)
# Calculate overall score
health_score = (
stability_weight * self.income_stability +
debt_weight * debt_score +
savings_weight * savings_score +
emergency_weight * emergency_score
)
return min(1, max(0, health_score))
@dataclass
class FamilyAnalysis:
"""Detailed analysis of family situation"""
family_size: int = 1
dependents: int = 0
life_stage: str = 'undefined'
future_events: List[Dict] = field(default_factory=list)
monthly_obligations: float = 0.0
insurance_coverage: Dict[str, float] = field(default_factory=dict)
education_needs: Dict[str, float] = field(default_factory=dict)
healthcare_needs: Dict[str, float] = field(default_factory=dict)
retirement_plans: Dict[str, Any] = field(default_factory=dict)
estate_plans: Dict[str, Any] = field(default_factory=dict)
def validate(self) -> bool:
"""Validate family analysis attributes"""
try:
validations = [
self.family_size >= 1,
self.dependents >= 0,
self.monthly_obligations >= 0,
all(value >= 0 for value in self.insurance_coverage.values()),
all(value >= 0 for value in self.education_needs.values()),
all(value >= 0 for value in self.healthcare_needs.values())
]
return all(validations)
except Exception as e:
logger.error(f"Error validating family analysis: {e}")
return False
def calculate_total_obligations(self) -> float:
"""Calculate total monthly obligations"""
return (
self.monthly_obligations +
sum(self.insurance_coverage.values()) +
sum(self.education_needs.values()) +
sum(self.healthcare_needs.values())
)
def get_next_life_event(self) -> Optional[Dict]:
"""Get the next upcoming life event"""
if not self.future_events:
return None
# Sort events by date and return the closest one
future_events = [e for e in self.future_events if 'date' in e]
if not future_events:
return None
return min(future_events, key=lambda x: x['date'])
def calculate_family_complexity(self) -> float:
"""Calculate family situation complexity (0-1)"""
# Base complexity based on family size
base_complexity = min(1, (self.family_size - 1) * 0.2)
# Additional complexity for dependents
dependent_complexity = min(0.5, self.dependents * 0.1)
# Additional complexity for upcoming events
events_complexity = min(0.3, len(self.future_events) * 0.05)
# Additional complexity for special needs
special_needs = len(self.healthcare_needs) > 0 or len(self.education_needs) > 0
special_complexity = 0.2 if special_needs else 0
return min(1, base_complexity + dependent_complexity + events_complexity + special_complexity)
@dataclass
class InvestmentGoals:
"""Detailed investment goals"""
primary_goals: List[Dict] = field(default_factory=list)
timeframes: Dict[str, int] = field(default_factory=dict)
required_returns: Dict[str, float] = field(default_factory=dict)
priority_order: List[str] = field(default_factory=list)
target_amounts: Dict[str, float] = field(default_factory=dict)
goal_progress: Dict[str, float] = field(default_factory=dict)
risk_tolerance_by_goal: Dict[str, int] = field(default_factory=dict)
milestones: Dict[str, List[Dict]] = field(default_factory=dict)
def validate(self) -> bool:
"""Validate investment goals attributes"""
try:
if self.target_amounts and any(amount < 0 for amount in self.target_amounts.values()):
return False
if self.required_returns and any(ret < -1 or ret > 2 for ret in self.required_returns.values()):
return False
if self.risk_tolerance_by_goal and any(tol < 1 or tol > 5 for tol in self.risk_tolerance_by_goal.values()):
return False
if self.goal_progress and any(prog < 0 or prog > 1 for prog in self.goal_progress.values()):
return False
return True
except Exception as e:
logger.error(f"Error validating investment goals: {e}")
return False
def calculate_total_target_amount(self) -> float:
"""Calculate total target amount across all goals"""
return sum(self.target_amounts.values())
def get_highest_priority_goal(self) -> Optional[str]:
"""Get the highest priority goal"""
return self.priority_order[0] if self.priority_order else None
def calculate_weighted_risk_tolerance(self) -> float:
"""Calculate risk tolerance weighted by goal amount"""
if not self.risk_tolerance_by_goal or not self.target_amounts:
return 3.0 # Default value
total_amount = self.calculate_total_target_amount()
if total_amount == 0:
return 3.0
weighted_tolerance = sum(
self.risk_tolerance_by_goal.get(goal, 3) * amount / total_amount
for goal, amount in self.target_amounts.items()
)
return weighted_tolerance
def calculate_goal_progress(self, goal_id: str) -> float:
"""Calculate progress percentage for a specific goal"""
if goal_id not in self.target_amounts or goal_id not in self.goal_progress:
return 0.0
return self.goal_progress[goal_id] / self.target_amounts[goal_id]
def get_upcoming_milestones(self, days: int = 90) -> List[Dict]:
"""Get upcoming milestones within specified days"""
upcoming = []
now = datetime.now()
cutoff = now + timedelta(days=days)
for goal, milestones in self.milestones.items():
for milestone in milestones:
date = milestone.get('date')
if date and now <= date <= cutoff:
upcoming.append({
'goal': goal,
**milestone
})
return sorted(upcoming, key=lambda x: x['date'])
def estimate_goal_feasibility(self,
monthly_contribution: float,
expected_return: float) -> Dict[str, float]:
"""Estimate feasibility of goals based on contributions and returns"""
feasibility = {}
for goal_id, target in self.target_amounts.items():
# Get current progress
current_amount = self.goal_progress.get(goal_id, 0)
# Get timeframe in months
months = self.timeframes.get(goal_id, 60) * 12
if months <= 0:
feasibility[goal_id] = 0
continue
# Calculate future value of current amount
future_current = current_amount * (1 + expected_return/12) ** months
# Calculate future value of monthly contributions
future_contributions = monthly_contribution * ((1 + expected_return/12) ** months - 1) / (expected_return/12)
# Calculate total expected amount
expected_amount = future_current + future_contributions
# Calculate feasibility as percentage of target
feasibility[goal_id] = min(1.0, expected_amount / target)
return feasibility
@dataclass
class ProfileAnalysisResult:
"""Complete result of LLM profile analysis"""
# Basic metrics
risk_score: float
investment_horizon: int
primary_goals: List[str]
# Detailed analyses
constraints: Dict[str, Any]
recommendations: List[str]
explanation: str
# Advanced metrics
risk_decomposition: Dict[str, float] = field(default_factory=dict)
goal_feasibility: Dict[str, float] = field(default_factory=dict)
investment_style: Dict[str, float] = field(default_factory=dict)
market_sensitivity: Dict[str, float] = field(default_factory=dict)
# Recommended allocations
asset_allocation: Dict[str, float] = field(default_factory=dict)
geographic_allocation: Dict[str, float] = field(default_factory=dict)
# Alerts and attention points
alerts: List[Dict[str, Any]] = field(default_factory=list)
attention_points: List[str] = field(default_factory=list)
# Context and adaptability
market_context: Dict[str, Any] = field(default_factory=dict)
adaptability_score: float = 0.5
stress_test_results: Dict[str, float] = field(default_factory=dict)
def __post_init__(self):
"""Post-initialization validation"""
if not 0 <= self.risk_score <= 1:
raise ValueError("Risk score must be between 0 and 1")
if self.investment_horizon <= 0:
raise ValueError("Investment horizon must be positive")
def get_risk_category(self) -> str:
"""Determine risk category based on score"""
if self.risk_score < 0.2:
return "Très conservateur"
elif self.risk_score < 0.4:
return "Conservateur"
elif self.risk_score < 0.6:
return "Modéré"
elif self.risk_score < 0.8:
return "Dynamique"
else:
return "Très dynamique"
def get_critical_alerts(self) -> List[Dict[str, Any]]:
"""Get critical alerts only"""
return [alert for alert in self.alerts if alert.get('severity') == 'critical']
def get_major_constraints(self) -> List[str]:
"""Identify major constraints"""
return [
constraint for constraint, value in self.constraints.items()
if isinstance(value, (bool, int, float)) and value > 0.7
]
def get_primary_investment_style(self) -> str:
"""Determine primary investment style"""
if not self.investment_style:
return "Balanced"
return max(self.investment_style.items(), key=lambda x: x[1])[0]
def calculate_overall_feasibility(self) -> float:
"""Calculate overall goal feasibility"""
if not self.goal_feasibility:
return 0.5
return sum(self.goal_feasibility.values()) / len(self.goal_feasibility)
def get_risk_factors(self) -> List[Tuple[str, float]]:
"""Get main risk factors sorted by importance"""
return sorted(
self.risk_decomposition.items(),
key=lambda x: x[1],
reverse=True
)
def get_market_adaptability(self) -> Dict[str, float]:
"""Evaluate adaptability to market conditions"""
adaptability = {}
for condition, sensitivity in self.market_sensitivity.items():
adaptability[condition] = min(
self.adaptability_score * (1 + sensitivity),
1.0
)
return adaptability
def generate_one_page_summary(self) -> str:
"""Generate a concise one-page summary of the analysis"""
risk_category = self.get_risk_category()
summary = [
f"# Profil d'Investissement: {risk_category}",
f"Score de risque: {self.risk_score:.2f}/1.00 | Horizon: {self.investment_horizon} ans",
"",
"## Objectifs Principaux",
*[f"- {goal}" for goal in self.primary_goals[:3]],
"",
"## Allocation d'Actifs Recommandée",
*[f"- {asset}: {weight:.1%}" for asset, weight in
sorted(self.asset_allocation.items(), key=lambda x: x[1], reverse=True)],
"",
"## Facteurs de Risque Principaux",
*[f"- {factor}: {score:.1%}" for factor, score in self.get_risk_factors()[:3]],
"",
"## Recommandations Clés",
*[f"- {rec}" for rec in self.recommendations[:3]],
"",
"## Points d'Attention",
*[f"- {point}" for point in self.attention_points[:3]],
]
return "\n".join(summary)
def to_dict(self) -> Dict[str, Any]:
"""Convert analysis to dictionary"""
return {
'risk_profile': {
'score': self.risk_score,
'category': self.get_risk_category(),
'decomposition': self.risk_decomposition,
'stress_tests': self.stress_test_results
},
'investment_profile': {
'horizon': self.investment_horizon,
'style': self.investment_style,
'adaptability': self.adaptability_score,
'market_sensitivity': self.market_sensitivity
},
'goals_analysis': {
'primary_goals': self.primary_goals,
'feasibility': self.goal_feasibility,
'constraints': self.constraints
},
'recommendations': {
'suggestions': self.recommendations,
'attention_points': self.attention_points,
'alerts': self.alerts
},
'allocations': {
'assets': self.asset_allocation,
'geography': self.geographic_allocation
},
'market_context': self.market_context,
'explanation': self.explanation
}
@dataclass
class PersonalProfile:
"""Complete and dynamic investor profile with advanced analysis"""
# Identifiers and metadata
id: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
version: int = 1
# Base profile
risk_tolerance: int = 3
investment_horizon: int = 5
income_stability: int = 3
financial_knowledge: int = 3
investment_goals: List[str] = field(default_factory=list)
esg_preferences: bool = False
constraints: Dict[str, Any] = field(default_factory=dict)
# Personal information
age: int = 30
annual_income: float = 50000.0
total_assets: float = 0.0
total_debt: float = 0.0
monthly_savings: float = 0.0 # Added this field for the épargne mensuelle
occupation: str = ""
industry: str = ""
education_level: str = ""
marital_status: str = "single"
number_of_dependents: int = 0
tax_status: str = ""
residential_status: str = "owner"
# Detailed analyses
financial_analysis: Optional[FinancialAnalysis] = None
family_analysis: Optional[FamilyAnalysis] = None
investment_objectives: Optional[InvestmentGoals] = None
profile_description: Optional[str] = None
# Dynamic metrics and history
risk_metrics: Dict[str, float] = field(default_factory=dict)
performance_metrics: Dict[str, float] = field(default_factory=dict)
historical_changes: List[Dict[str, Any]] = field(default_factory=list)
last_review: Optional[datetime] = None
# Investment preferences
investment_preferences: Dict[str, Any] = field(default_factory=lambda: {
'asset_classes': [],
'sectors': [],
'regions': [],
'strategies': [],
'excluded_investments': []
})
# LLM analysis results
llm_analysis_results: Optional[ProfileAnalysisResult] = None
risk_score: float = 0.5
adaptability_score: float = 0.5
complexity_tolerance: float = 0.5
def __post_init__(self):
"""Post-initialization setup"""
# Generate ID if not provided
if self.id is None:
self.id = self._generate_profile_id()
# Initialize analyses if not provided
if self.financial_analysis is None:
self.financial_analysis = FinancialAnalysis()
if self.family_analysis is None:
self.family_analysis = FamilyAnalysis()
if self.investment_objectives is None:
self.investment_objectives = InvestmentGoals()
# Validate profile
self._validate_profile()
def _generate_profile_id(self) -> str:
"""Generate unique profile ID"""
components = [
str(self.age),
str(self.annual_income),
str(self.risk_tolerance),
self.occupation,
datetime.now().isoformat()
]
unique_string = "_".join(components)
return f"PROFILE_{hashlib.sha256(unique_string.encode()).hexdigest()[:16]}"
def _validate_profile(self):
"""Comprehensive profile validation"""
validations = [
(1 <= self.risk_tolerance <= 5, "Risk tolerance must be between 1 and 5"),
(1 <= self.investment_horizon <= 50, "Investment horizon must be between 1 and 50 years"),
(1 <= self.income_stability <= 5, "Income stability must be between 1 and 5"),
(1 <= self.financial_knowledge <= 5, "Financial knowledge must be between 1 and 5"),
(18 <= self.age <= 120, "Age must be between 18 and 120"),
(self.annual_income >= 0, "Annual income cannot be negative"),
(self.total_assets >= 0, "Total assets cannot be negative"),
(self.total_debt >= 0, "Total debt cannot be negative"),
(0 <= self.risk_score <= 1, "Risk score must be between 0 and 1"),
(0 <= self.adaptability_score <= 1, "Adaptability score must be between 0 and 1"),
(0 <= self.complexity_tolerance <= 1, "Complexity tolerance must be between 0 and 1")
]
# Advanced validation
if self.investment_preferences:
for asset_class in self.investment_preferences.get('asset_classes', []):
if not isinstance(asset_class, str):
raise ValueError("Invalid asset class format")
if self.investment_goals and len(self.investment_goals) == 0:
raise ValueError("At least one investment goal must be specified")
# Check basic constraints
for validation, message in validations:
if not validation:
raise ValueError(message)
# Check detailed analyses
if self.financial_analysis and not self.financial_analysis.validate():
raise ValueError("Invalid financial analysis")
if self.family_analysis and not self.family_analysis.validate():
raise ValueError("Invalid family analysis")
if self.investment_objectives and not self.investment_objectives.validate():
raise ValueError("Invalid investment objectives")
def calculate_comprehensive_risk_metrics(self) -> Dict[str, float]:
"""Calculate comprehensive risk metrics"""
try:
risk_metrics = {
'total_debt_ratio': self.total_debt / self.annual_income if self.annual_income > 0 else float('inf'),
'emergency_fund_ratio': self.financial_analysis.emergency_fund_months / 6 if self.financial_analysis else 0,
'risk_capacity': self.financial_analysis.risk_capacity if self.financial_analysis else 0.5,
'investment_diversification': len(self.investment_goals) / 5 if self.investment_goals else 0,
'debt_service_ratio': self._calculate_debt_service_ratio(),
'savings_rate': self._calculate_savings_rate(),
'liquidity_ratio': self._calculate_liquidity_ratio(),
'investment_concentration': self._calculate_investment_concentration()
}
# Add LLM scores
if self.llm_analysis_results:
risk_metrics.update({
'llm_risk_score': self.llm_analysis_results.risk_score,
'adaptability': self.llm_analysis_results.adaptability_score
})
# Calculate overall risk capacity
financial_health = self.financial_analysis.get_financial_health_score() if self.financial_analysis else 0.5
risk_metrics['overall_risk_capacity'] = (
risk_metrics['risk_capacity'] * 0.5 +
(1 - risk_metrics['total_debt_ratio']) * 0.3 +
financial_health * 0.2
)
self.risk_metrics = risk_metrics
return risk_metrics
except Exception as e:
logger.error(f"Error calculating risk metrics: {e}")
return {}
def _calculate_debt_service_ratio(self) -> float:
"""Calculate debt service ratio"""
try:
if not self.financial_analysis or self.annual_income == 0:
return 0.0
if isinstance(self.financial_analysis.fixed_expenses, dict) and 'debt_payments' in self.financial_analysis.fixed_expenses:
if isinstance(self.financial_analysis.fixed_expenses['debt_payments'], dict):
monthly_debt_payment = sum(self.financial_analysis.fixed_expenses['debt_payments'].values())
else:
monthly_debt_payment = self.financial_analysis.fixed_expenses['debt_payments']
else:
# Estimate monthly debt payment using total debt and a standard amortization
monthly_debt_payment = self.total_debt * 0.01 # Rough estimate of 1% monthly payment
monthly_income = self.annual_income / 12
return monthly_debt_payment / monthly_income if monthly_income > 0 else 0
except Exception as e:
logger.error(f"Error calculating debt service ratio: {e}")
return 0.0
def _calculate_savings_rate(self) -> float:
"""Calculate savings rate"""
try:
if not self.financial_analysis or self.annual_income == 0:
return 0.0
return (self.financial_analysis.savings_capacity * 12) / self.annual_income
except Exception as e:
logger.error(f"Error calculating savings rate: {e}")
return 0.0
def _calculate_liquidity_ratio(self) -> float:
"""Calculate liquidity ratio"""
try:
if not self.financial_analysis or self.total_assets == 0:
return 0.0
liquid_assets = sum(
amount for type_, amount in self.financial_analysis.income_sources.items()
if type_ in ['savings', 'investments', 'cash']
)
return liquid_assets / self.total_assets
except Exception as e:
logger.error(f"Error calculating liquidity ratio: {e}")
return 0.0
def _calculate_investment_concentration(self) -> float:
"""Calculate investment concentration using HHI"""
try:
if not self.investment_preferences or not self.investment_preferences.get('asset_classes'):
return 1.0
asset_classes = self.investment_preferences['asset_classes']
weights = [1/len(asset_classes)] * len(asset_classes)
return sum(w * w for w in weights)
except Exception as e:
logger.error(f"Error calculating investment concentration: {e}")
return 1.0
def update(self, updates: Dict[str, Any]) -> 'PersonalProfile':
"""Update profile with new information"""
try:
# Create copy of current profile data
current_data = asdict(self)
# Save change history
self._save_change_history(updates)
# Update fields
for key, value in updates.items():
if key in current_data:
current_data[key] = value
# Update version and timestamps
current_data['version'] += 1
current_data['last_updated'] = datetime.now()
# Create new profile with updated data
updated_profile = self.__class__(**current_data)
# Validate updated profile
updated_profile._validate_profile()
return updated_profile
except Exception as e:
logger.error(f"Error updating profile: {e}")
raise
def _save_change_history(self, updates: Dict[str, Any]):
"""Record change history"""
change_record = {
'timestamp': datetime.now().isoformat(),
'changes': updates,
'previous_values': {
key: getattr(self, key)
for key in updates.keys()
if hasattr(self, key)
},
'version': self.version
}
self.historical_changes.append(change_record)
def get_change_history(self, field: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get change history for optional field"""
if field:
return [
change for change in self.historical_changes
if field in change['changes']
]
return self.historical_changes
def export_json(self, filepath: str, include_detailed_analysis: bool = False):
"""Export profile to JSON file"""
try:
# Make sure directory exists
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(
asdict(self) if include_detailed_analysis else self.to_dict(),
f, ensure_ascii=False, indent=2, default=str
)
logger.info(f"Profile exported successfully to {filepath}")
except Exception as e:
logger.error(f"Error exporting profile to {filepath}: {e}")
raise
def to_dict(self) -> Dict[str, Any]:
"""Convert profile to dictionary representation"""
return {
'id': self.id,
'version': self.version,
'created_at': self.created_at.isoformat(),
'last_updated': self.last_updated.isoformat(),
'basic_info': {
'age': self.age,
'annual_income': self.annual_income,
'total_assets': self.total_assets,
'total_debt': self.total_debt,
'marital_status': self.marital_status,
'dependents': self.number_of_dependents
},
'investment_profile': {
'risk_tolerance': self.risk_tolerance,
'investment_horizon': self.investment_horizon,
'financial_knowledge': self.financial_knowledge,
'income_stability': self.income_stability,
'goals': self.investment_goals,
'esg_preferences': self.esg_preferences
},
'risk_metrics': self.risk_metrics,
'llm_analysis': self.llm_analysis_results.to_dict() if self.llm_analysis_results else None
}
@classmethod
def from_json(cls, filepath: str) -> 'PersonalProfile':
"""Load profile from JSON file"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
# Convert date strings to datetime objects
for date_field in ['created_at', 'last_updated', 'last_review']:
if date_field in data and data[date_field]:
data[date_field] = datetime.fromisoformat(data[date_field])
# Reconstruct complex objects
if 'financial_analysis' in data and data['financial_analysis']:
data['financial_analysis'] = FinancialAnalysis(**data['financial_analysis'])
if 'family_analysis' in data and data['family_analysis']:
data['family_analysis'] = FamilyAnalysis(**data['family_analysis'])
if 'investment_objectives' in data and data['investment_objectives']:
data['investment_objectives'] = InvestmentGoals(**data['investment_objectives'])
if 'llm_analysis_results' in data and data['llm_analysis_results']:
data['llm_analysis_results'] = ProfileAnalysisResult(**data['llm_analysis_results'])
# Create and validate profile
profile = cls(**data)
profile._validate_profile()
return profile
except Exception as e:
logger.error(f"Error loading profile from {filepath}: {e}")
raise
class DeepVestLLM:
"""Advanced LLM integration using fine-tuned financial advisor model"""
def __init__(self, config: DeepVestConfig):
"""Initialize with configuration"""
self.config = config
self.logger = logging.getLogger("DeepVest.LLM")
# Initialize model and tokenizer
self.model = None
self.tokenizer = None
self.device = self._detect_optimal_device()
# Load model if available
if os.path.exists(config.model_path):
self.load_model(config.model_path) # Pass the model_path argument
else:
self.logger.warning(f"Model path {config.model_path} not found, using fallback mode")
# Initialize cache
self.generation_cache = {}
self.max_cache_entries = config.cache_size
def _detect_optimal_device(self) -> str:
"""Detect the optimal device for model inference"""
if torch.cuda.is_available():
self.logger.info("CUDA GPU available, using CUDA")
return "cuda"
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
self.logger.info("Apple MPS available, using MPS")
return "mps"
else:
self.logger.info("No GPU available, using CPU")
return "cpu"
def load_model(self, model_path=None):
"""Load the fine-tuned model from the configured path"""
try:
path_to_use = model_path if model_path else self.config.model_path
self.logger.info(f"Loading fine-tuned model from {path_to_use}")
# Determine the best device to use
if torch.cuda.is_available():
self.logger.info("CUDA available, using GPU")
device = "cuda"
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
self.logger.info("Apple MPS available, using MPS")
device = "mps"
else:
self.logger.info("Using CPU")
device = "cpu"
# Check if config.json exists and has required fields
config_file = os.path.join(path_to_use, "config.json")
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
# If no model_type, add one for Phi model
if 'model_type' not in config:
self.logger.warning("Adding missing model_type to config.json")
config['model_type'] = "phi" # Use phi since you're fine-tuning Phi-1.5
# Add architecture if missing
if 'architectures' not in config:
config['architectures'] = ["PhiForCausalLM"]
# Save the modified config
with open(config_file, 'w') as f:
json.dump(config, f, indent=2)
else:
# Create a basic config.json if it doesn't exist
self.logger.warning(f"config.json not found in {path_to_use}, creating one")
config = {
"model_type": "phi",
"architectures": ["PhiForCausalLM"],
"bos_token_id": 1,
"eos_token_id": 2,
"pad_token_id": 0,
"hidden_size": 2048,
"vocab_size": 51200,
"torch_dtype": "float16"
}
with open(config_file, 'w') as f:
json.dump(config, f, indent=2)
# Try different loading approaches
try:
# First attempt: direct loading with AutoModelForCausalLM
self.tokenizer = AutoTokenizer.from_pretrained(
path_to_use,
trust_remote_code=True
)
self.model = AutoModelForCausalLM.from_pretrained(
path_to_use,
trust_remote_code=True,
torch_dtype=torch.float16,
device_map="auto",
low_cpu_mem_usage=True
)
except Exception as model_error:
self.logger.warning(f"First loading attempt failed: {model_error}")
# Second attempt: try loading with specific model class
try:
from transformers import PhiForCausalLM
self.tokenizer = AutoTokenizer.from_pretrained(path_to_use, trust_remote_code=True)
self.model = PhiForCausalLM.from_pretrained(
path_to_use,
trust_remote_code=True,
torch_dtype=torch.float16,
device_map="auto",
low_cpu_mem_usage=True
)
except Exception as specific_error:
self.logger.warning(f"Second loading attempt failed: {specific_error}")
# Third attempt: try loading as a PEFT/LoRA model
try:
from peft import PeftModel, PeftConfig
# Check if we need to load the base model first
base_model_path = "microsoft/phi-1_5"
base_model = AutoModelForCausalLM.from_pretrained(
base_model_path,
trust_remote_code=True,
torch_dtype=torch.float16,
device_map="auto"
)
self.tokenizer = AutoTokenizer.from_pretrained(base_model_path, trust_remote_code=True)
self.model = PeftModel.from_pretrained(base_model, path_to_use)
except Exception as peft_error:
# All attempts failed
self.logger.error(f"All loading attempts failed. Last error: {peft_error}")
raise Exception(f"Could not load model: {model_error} -> {specific_error} -> {peft_error}")
# Make sure the tokenizer has a pad token
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.logger.info(f"Model loaded successfully")
return True
except Exception as e:
self.logger.error(f"Error loading model: {e}")
return False
def _get_cache_key(self, prompt: str) -> str:
"""Generate cache key for a prompt"""
return hashlib.md5(prompt.encode()).hexdigest()
def _manage_cache(self):
"""Manage cache size"""
if len(self.generation_cache) > self.max_cache_entries:
# Sort by access timestamp and remove oldest
sorted_entries = sorted(
self.generation_cache.items(),
key=lambda x: x[1]['timestamp']
)
# Remove oldest 20%
entries_to_remove = int(self.max_cache_entries * 0.2)
for i in range(entries_to_remove):
if i < len(sorted_entries):
del self.generation_cache[sorted_entries[i][0]]
self.logger.debug(f"Cache cleanup: removed {entries_to_remove} entries")
async def generate_text(self, prompt: str, max_length: int = 1500) -> str:
"""Generate text using the loaded model"""
# Check cache if enabled
if self.config.enable_caching:
cache_key = self._get_cache_key(prompt)
if cache_key in self.generation_cache:
cache_entry = self.generation_cache[cache_key]
cache_entry['timestamp'] = datetime.now() # Update access timestamp
self.logger.debug("Using cached response")
return cache_entry['response']
if self.config.log_prompts:
self.logger.debug(f"Prompt: {prompt}")
# Check if model_loaded is True to confirm successful loading
if hasattr(self, 'model_loaded') and self.model_loaded and self.model and self.tokenizer:
# Generate with loaded model
try:
inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=4096)
# Handle device mapping - use the model's device rather than explicitly moving inputs
if hasattr(self.model, 'device'):
device = self.model.device
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=max_length,
do_sample=True,
temperature=0.7,
top_p=0.92,
top_k=50,
repetition_penalty=1.15,
num_beams=4,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Extract actual response from the prompt
if "[/INST]" in response:
response = response.split("[/INST]")[-1].strip()
# Cache the result if enabled
if self.config.enable_caching:
self.generation_cache[cache_key] = {
'response': response,
'timestamp': datetime.now()
}
self._manage_cache()
return response
except Exception as e:
self.logger.error(f"Error generating text: {e}")
return await self._fallback_generation(prompt)
else:
# Use fallback generation
self.logger.info("Using fallback generation")
return await self._fallback_generation(prompt)
async def _fallback_generation(self, prompt: str) -> str:
"""Fallback generation when model is unavailable"""
self.logger.info("Using fallback generation")
# Extract key information from prompt
risk_tolerance = 3
investment_horizon = 5
# Extract risk tolerance if present
risk_match = re.search(r'[Rr]isque.*?(\d+)', prompt)
if risk_match:
risk_tolerance = int(risk_match.group(1))
# Extract investment horizon if present
horizon_match = re.search(r'[Hh]orizon.*?(\d+)', prompt)
if horizon_match:
investment_horizon = int(horizon_match.group(1))
# Generate simple structured response
risk_score = risk_tolerance / 5
fallback_response = f"""1. Score de risque: {risk_score:.2f} - Basé sur la tolérance au risque déclarée.
2. Horizon d'investissement recommandé: {investment_horizon} ans.
3. Objectifs prioritaires: Épargne générale, Croissance du capital.
4. Contraintes: Limitation classique de diversification.
5. Recommandations d'allocation:
- Actions: {int(risk_score * 100)}%
- Obligations: {int((1-risk_score) * 80)}%
- Liquidités: {int((1-risk_score) * 20)}%
6. Recommandations:
- Diversifier votre portefeuille
- Investir régulièrement
- Maintenir une épargne de sécurité suffisante
7. Points d'attention:
- Réévaluer régulièrement votre profil de risque
- Ajuster votre allocation selon l'évolution de votre situation
- Prévoir une réserve d'urgence adéquate
"""
return fallback_response
def prepare_analysis_prompt(self, profile_data: Dict[str, Any]) -> str:
"""Prepare analysis prompt from profile data"""
return f"""[INST] Je suis un conseiller financier expert. Analyse ce profil d'investisseur de manière exhaustive.
Profil détaillé :
- Âge: {profile_data.get('age', 'Non spécifié')} ans
- Revenu annuel: {profile_data.get('annual_income', 'Non spécifié')}
- Situation familiale: {profile_data.get('marital_status', 'Non spécifié')}
- Dépendants: {profile_data.get('number_of_dependents', 0)}
- Épargne mensuelle: {profile_data.get('monthly_savings', 'Non spécifié')}
- Dette totale: {profile_data.get('total_debt', 'Non spécifié')}
- Patrimoine total: {profile_data.get('total_assets', 'Non spécifié')}
- Objectifs: {", ".join(profile_data.get('investment_goals', []))}
- Tolérance au risque (1-5): {profile_data.get('risk_tolerance', 3)}
- Horizon d'investissement: {profile_data.get('investment_horizon', 5)} ans
- Connaissances financières (1-5): {profile_data.get('financial_knowledge', 3)}
- Stabilité professionnelle: {profile_data.get('income_stability', 3)}
- Préférences ESG: {"Oui" if profile_data.get('esg_preferences', False) else "Non"}
- Secteurs préférés: {", ".join(profile_data.get('investment_preferences', {}).get('sectors', []))}
- Contraintes spécifiques: {", ".join(str(k) for k, v in profile_data.get('constraints', {}).items() if v)}
Format de réponse requis:
1. Score de risque (entre 0 et 1): [score] - Justification détaillée
2. Horizon d'investissement recommandé: [années] - Justification
3. Objectifs prioritaires: Liste ordonnée avec justification
4. Contraintes et limitations: Analyse détaillée des contraintes
5. Recommandations d'allocation:
- Par classe d'actifs (%)
- Par zone géographique (%)
- Par stratégie d'investissement
6. Analyse des risques:
- Décomposition des risques
- Points de vigilance
- Stress tests recommandés
7. Plan d'action détaillé:
- Court terme (0-2 ans)
- Moyen terme (2-5 ans)
- Long terme (5+ ans)
8. Recommandations spécifiques:
- Investissements recommandés
- Stratégies de diversification
- Protection du capital
9. Explication détaillée:
- Raisonnement complet
- Points d'attention particuliers
- Opportunités identifiées
Analyse complète: [/INST]"""
def prepare_description_prompt(self, profile_data: Dict[str, Any]) -> str:
"""Prepare description prompt from profile data"""
return f"""[INST] En tant que conseiller financier expert, génère une description détaillée et professionnelle de ce profil d'investisseur.
Profil complet:
- Âge: {profile_data.get('age', 'Non spécifié')} ans
- Revenu annuel: {profile_data.get('annual_income', 'Non spécifié')}
- Situation familiale: {profile_data.get('marital_status', 'Non spécifié')}
- Objectifs: {", ".join(profile_data.get('investment_goals', []))}
- Tolérance au risque: {profile_data.get('risk_tolerance', 3)}/5
- Horizon: {profile_data.get('investment_horizon', 5)} ans
- Connaissances financières: {profile_data.get('financial_knowledge', 3)}/5
- Épargne mensuelle: {profile_data.get('monthly_savings', 'Non spécifié')}
- Patrimoine: {profile_data.get('total_assets', 'Non spécifié')}
- Dette: {profile_data.get('total_debt', 'Non spécifié')}
- Secteur d'activité: {profile_data.get('industry', 'Non spécifié')}
- Stabilité professionnelle: {profile_data.get('income_stability', 3)}/5
La description doit couvrir:
1. Analyse du profil de risque et des objectifs
2. Evaluation de la capacité financière
3. Identification des contraintes et opportunités
4. Recommandations d'investissement personnalisées
5. Points de vigilance et attention particulière
6. Stratégie de diversification recommandée
7. Projection et scénarios d'évolution
Description: [/INST]"""
async def generate_batch(self, prompts: List[str], max_length: int = 1500) -> List[str]:
"""Generate responses for multiple prompts in parallel"""
if not self.model or not self.tokenizer:
# Fallback for batch generation
return [await self._fallback_generation(prompt) for prompt in prompts]
responses = []
# Process in batches
batch_size = self.config.batch_size
for i in range(0, len(prompts), batch_size):
batch_prompts = prompts[i:i+batch_size]
batch_inputs = self.tokenizer(
batch_prompts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=4096
)
batch_inputs = {k: v.to(self.device) for k, v in batch_inputs.items()}
with torch.no_grad():
outputs = self.model.generate(
**batch_inputs,
max_length=max_length,
do_sample=True,
temperature=0.7,
top_p=0.92,
top_k=50,
repetition_penalty=1.15,
pad_token_id=self.tokenizer.eos_token_id
)
for j, output in enumerate(outputs):
response = self.tokenizer.decode(output, skip_special_tokens=True)
if "[/INST]" in response:
response = response.split("[/INST]")[-1].strip()
responses.append(response)
return responses
class LLMProfileAnalyzer:
"""Advanced profile analyzer using fine-tuned financial LLM"""
def __init__(self, config: Optional[DeepVestConfig] = None):
self.config = config or DeepVestConfig()
self.logger = logging.getLogger("DeepVest.Analyzer")
self.llm = DeepVestLLM(self.config)
# Initialize cache
self.analysis_cache = {}
self.max_cache_size = self.config.cache_size
def _get_cache_key(self, profile_data: Dict[str, Any]) -> str:
"""Generate unique cache key for a profile"""
key_components = [
str(profile_data.get('age', '')),
str(profile_data.get('risk_tolerance', '')),
str(profile_data.get('investment_horizon', '')),
','.join(profile_data.get('investment_goals', [])),
str(profile_data.get('annual_income', '')),
str(profile_data.get('total_assets', '')),
str(profile_data.get('total_debt', ''))
]
return hashlib.md5('_'.join(key_components).encode()).hexdigest()
def _manage_cache(self):
"""Manage cache size by removing oldest entries"""
if len(self.analysis_cache) > self.max_cache_size:
# Sort by timestamp and remove oldest
items = sorted(
self.analysis_cache.items(),
key=lambda x: x[1]['timestamp']
)
# Remove oldest 20%
num_to_remove = int(self.max_cache_size * 0.2)
for i in range(num_to_remove):
if i < len(items):
del self.analysis_cache[items[i][0]]
async def analyze_profile(self, profile_data: Dict[str, Any]) -> ProfileAnalysisResult:
"""Comprehensive profile analysis with recommendations"""
try:
# Check cache first
cache_key = self._get_cache_key(profile_data)
if cache_key in self.analysis_cache and self.config.enable_caching:
self.logger.info(f"Using cached analysis for profile {cache_key}")
return self.analysis_cache[cache_key]['result']
self.logger.info(f"Starting profile analysis for ID: {profile_data.get('id', 'unknown')}")
# Prepare prompt
prompt = self.llm.prepare_analysis_prompt(profile_data)
# Generate response
response = await self.llm.generate_text(prompt)
# Parse and analyze response
analysis_result = self._parse_response(response, profile_data)
# Validate results
if not self._validate_results(analysis_result):
self.logger.warning("Analysis results validation failed, using default values")
analysis_result = self._get_default_analysis(profile_data)
# Enrich analysis with additional calculations
enriched_analysis = self._enrich_analysis(analysis_result, profile_data)
# Cache the result
if self.config.enable_caching:
self.analysis_cache[cache_key] = {
'result': enriched_analysis,
'timestamp': datetime.now()
}
self._manage_cache()
self.logger.info("Profile analysis completed successfully")
return enriched_analysis
except Exception as e:
self.logger.error(f"Error in profile analysis: {e}")
return self._get_default_analysis(profile_data)
async def batch_analyze_profiles(self, profiles: List[Dict[str, Any]]) -> List[ProfileAnalysisResult]:
"""Analyze multiple profiles in batch"""
try:
results = []
uncached_profiles = []
uncached_indices = []
# Check cache first for each profile
for i, profile in enumerate(profiles):
cache_key = self._get_cache_key(profile)
if cache_key in self.analysis_cache and self.config.enable_caching:
results.append(self.analysis_cache[cache_key]['result'])
else:
uncached_profiles.append(profile)
uncached_indices.append(i)
# If all profiles were cached, return results
if not uncached_profiles:
return results
# Prepare prompts for uncached profiles
prompts = [self.llm.prepare_analysis_prompt(profile) for profile in uncached_profiles]
# Generate responses in batch
responses = await self.llm.generate_batch(prompts)
# Parse and process responses
for i, (profile, response) in enumerate(zip(uncached_profiles, responses)):
try:
# Parse response
analysis_result = self._parse_response(response, profile)
# Validate and enrich
if not self._validate_results(analysis_result):
analysis_result = self._get_default_analysis(profile)
enriched_analysis = self._enrich_analysis(analysis_result, profile)
# Cache the result
if self.config.enable_caching:
cache_key = self._get_cache_key(profile)
self.analysis_cache[cache_key] = {
'result': enriched_analysis,
'timestamp': datetime.now()
}
# Insert at correct position
insert_index = uncached_indices[i]
# Extend results list if needed
while len(results) <= insert_index:
results.append(None)
results[insert_index] = enriched_analysis
except Exception as e:
self.logger.error(f"Error processing profile {i}: {e}")
results.append(self._get_default_analysis(profile))
# Manage cache after batch processing
if self.config.enable_caching:
self._manage_cache()
# Fill any remaining None values with defaults
for i, result in enumerate(results):
if result is None:
results[i] = self._get_default_analysis(profiles[i])
return results
except Exception as e:
self.logger.error(f"Error in batch profile analysis: {e}")
return [self._get_default_analysis(profile) for profile in profiles]
def _parse_response(self, response: str, original_data: Dict[str, Any]) -> ProfileAnalysisResult:
"""Parse LLM response into structured data"""
try:
sections = response.split('\n')
# Extract asset allocation
asset_allocation = self._extract_asset_allocation(sections)
geo_allocation = self._extract_geographic_allocation(sections)
return ProfileAnalysisResult(
risk_score=self._extract_risk_score(sections),
investment_horizon=self._extract_horizon(sections),
primary_goals=self._extract_goals(sections),
constraints=self._extract_constraints(sections),
recommendations=self._extract_recommendations(sections),
explanation=self._extract_explanation(sections),
risk_decomposition=self._extract_risk_decomposition(sections),
goal_feasibility=self._extract_goal_feasibility(sections),
investment_style=self._extract_investment_style(sections),
market_sensitivity=self._extract_market_sensitivity(sections),
alerts=self._extract_alerts(sections),
attention_points=self._extract_attention_points(sections),
market_context=self._extract_market_context(sections),
adaptability_score=self._calculate_adaptability_score(sections),
stress_test_results=self._extract_stress_test_results(sections),
asset_allocation=asset_allocation,
geographic_allocation=geo_allocation
)
except Exception as e:
self.logger.error(f"Error parsing response: {e}")
return self._get_default_analysis(original_data)
def _validate_results(self, analysis_result: ProfileAnalysisResult) -> bool:
"""Validate analysis results for consistency"""
try:
# Check essential values
if not (0 <= analysis_result.risk_score <= 1):
analysis_result.risk_score = min(max(analysis_result.risk_score, 0), 1)
if analysis_result.investment_horizon <= 0:
analysis_result.investment_horizon = 5
if not analysis_result.primary_goals:
return False
if not analysis_result.recommendations:
return False
# Check consistency between risk and horizon
if analysis_result.risk_score > 0.7 and analysis_result.investment_horizon < 5:
self.logger.warning("Inconsistency detected: high risk with short horizon")
analysis_result.attention_points.append(
"Attention: profil à risque élevé avec horizon court, prudence recommandée"
)
return True
except Exception as e:
self.logger.error(f"Error validating analysis results: {e}")
return False
def _enrich_analysis(self,
analysis: ProfileAnalysisResult,
profile_data: Dict[str, Any]) -> ProfileAnalysisResult:
"""Enrich analysis with additional calculations"""
try:
# Calculate adjusted risk score
base_risk_score = analysis.risk_score
adjustments = []
# Age adjustment
age = profile_data.get('age', 30)
age_adjustment = max(0, (65 - age) / 65) * 0.2
adjustments.append(age_adjustment)
# Knowledge adjustment
knowledge = profile_data.get('financial_knowledge', 3)
knowledge_adjustment = (knowledge - 3) * 0.1
adjustments.append(knowledge_adjustment)
# Financial stability adjustment
income = profile_data.get('annual_income', 0)
debt = profile_data.get('total_debt', 0)
if income > 0:
debt_ratio = debt / income
financial_adjustment = -0.1 if debt_ratio > 0.4 else 0.1
adjustments.append(financial_adjustment)
# Goals diversification adjustment
goals_diversification = min(len(profile_data.get('investment_goals', [])) * 0.05, 0.2)
adjustments.append(goals_diversification)
# Apply adjustments
final_risk_score = base_risk_score
for adj in adjustments:
final_risk_score = max(0, min(1, final_risk_score + adj))
# Update analysis
analysis.risk_score = final_risk_score
analysis.risk_decomposition.update({
'age_factor': age_adjustment,
'knowledge_factor': knowledge_adjustment,
'financial_stability': financial_adjustment if 'financial_adjustment' in locals() else 0,
'goals_diversification': goals_diversification
})
# Add default allocation if missing
if not analysis.asset_allocation:
analysis.asset_allocation = self._generate_default_allocation(final_risk_score)
if not analysis.geographic_allocation:
analysis.geographic_allocation = self._generate_default_geo_allocation()
# Calculate market adaptability score
if not analysis.market_sensitivity:
# Create default market sensitivity based on risk score
analysis.market_sensitivity = {
'market_beta': final_risk_score * 0.8 + 0.2,
'interest_rate': max(0, 0.7 - final_risk_score * 0.4),
'economic_cycle': 0.4 + final_risk_score * 0.3,
'inflation': max(0, 0.6 - final_risk_score * 0.2)
}
# Calculate goal feasibility if missing
if not analysis.goal_feasibility and profile_data.get('investment_objectives'):
objectives = profile_data.get('investment_objectives')
if hasattr(objectives, 'calculate_goal_feasibility'):
monthly_contribution = profile_data.get('monthly_savings', 0)
expected_return = 0.03 + final_risk_score * 0.04 # Simple expected return model
analysis.goal_feasibility = objectives.calculate_goal_feasibility(
monthly_contribution=monthly_contribution,
expected_return=expected_return
)
return analysis
except Exception as e:
self.logger.error(f"Error enriching analysis: {e}")
return analysis
def _get_default_analysis(self, profile_data: Dict[str, Any]) -> ProfileAnalysisResult:
"""Generate default analysis when LLM fails"""
risk_score = profile_data.get('risk_tolerance', 3) / 5
return ProfileAnalysisResult(
risk_score=risk_score,
investment_horizon=profile_data.get('investment_horizon', 5),
primary_goals=profile_data.get('investment_goals', ["Épargne générale"]),
constraints={},
recommendations=[
"Construire un portefeuille diversifié",
"Maintenir une réserve d'urgence",
"Investir régulièrement"
],
explanation="Analyse par défaut basée sur les données fournies.",
asset_allocation=self._generate_default_allocation(risk_score),
geographic_allocation=self._generate_default_geo_allocation()
)
def _generate_default_allocation(self, risk_score: float) -> Dict[str, float]:
"""Generate default asset allocation based on risk score"""
# Simple allocation model based on risk score
stocks = risk_score
bonds = (1 - risk_score) * 0.8
cash = (1 - risk_score) * 0.2
return {
'Actions': stocks,
'Obligations': bonds,
'Liquidités': cash
}
def _generate_default_geo_allocation(self) -> Dict[str, float]:
"""Generate default geographic allocation"""
return {
'Europe': 0.4,
'Amérique du Nord': 0.3,
'Asie-Pacifique': 0.2,
'Marchés émergents': 0.1
}
# Extraction methods for parsing LLM response
def _extract_risk_score(self, sections: List[str]) -> float:
"""Extract risk score from text"""
risk_line = next((s for s in sections if s.startswith("1. Score de risque")), "")
try:
matches = re.findall(r'(\d*\.?\d+)', risk_line)
score = float(matches[0]) if matches else 0.5
return min(max(score, 0), 1)
except (IndexError, ValueError):
return 0.5
def _extract_horizon(self, sections: List[str]) -> int:
"""Extract investment horizon"""
horizon_line = next((s for s in sections if s.startswith("2. Horizon")), "")
try:
matches = re.findall(r'(\d+)', horizon_line)
return int(matches[0]) if matches else 5
except (IndexError, ValueError):
return 5
def _extract_goals(self, sections: List[str]) -> List[str]:
"""Extract prioritized goals"""
goals_section = next((s for s in sections if s.startswith("3. Objectifs")), "")
if not goals_section:
return []
try:
goals_text = goals_section.split(":")[-1].strip()
goals = re.split(r'[•\-\d]+\.?\s*', goals_text)
return [g.strip() for g in goals if g.strip()]
except Exception:
return []
def _extract_constraints(self, sections: List[str]) -> Dict[str, Any]:
"""Extract constraints and limitations"""
constraints_section = next((s for s in sections if s.startswith("4. Contraintes")), "")
if not constraints_section:
return {}
constraints = {}
try:
text = constraints_section.split(":")[-1].strip()
# Analyze key constraints
if re.search(r'liquidité|liquide|court terme', text, re.I):
constraints['liquidity_needed'] = True
if re.search(r'esg|durable|responsable|éthique', text, re.I):
constraints['esg_focus'] = True
if re.search(r'revenu|dividende|rente', text, re.I):
constraints['income_focus'] = True
if re.search(r'protection|capital|garanti', text, re.I):
constraints['capital_protection'] = True
if re.search(r'fiscalité|impôt|tax', text, re.I):
constraints['tax_optimization'] = True
if re.search(r'succession|transmission|héritage', text, re.I):
constraints['inheritance_planning'] = True
return constraints
except Exception:
return {}
def _extract_recommendations(self, sections: List[str]) -> List[str]:
"""Extract specific recommendations"""
recommendations = []
rec_section = next((s for s in sections if s.startswith("8. Recommandations")), "")
if rec_section:
parts = re.split(r'[•\-\d]+\.?\s*', rec_section)
for part in parts:
clean_part = part.strip()
if clean_part and not clean_part.startswith("8. Recommandations"):
recommendations.append(clean_part)
# If no recommendations found, check action plan
if not recommendations:
plan_section = next((s for s in sections if s.startswith("7. Plan d'action")), "")
if plan_section:
parts = re.split(r'[•\-\d]+\.?\s*', plan_section)
for part in parts:
clean_part = part.strip()
if clean_part and not clean_part.startswith("7. Plan"):
recommendations.append(clean_part)
# Default recommendations if still empty
if not recommendations:
recommendations = [
"Diversifier le portefeuille selon le profil de risque",
"Maintenir une épargne de sécurité",
"Adopter une stratégie d'investissement régulier"
]
return recommendations
def _extract_explanation(self, sections: List[str]) -> str:
"""Extract detailed explanation"""
explanation_section = next((s for s in sections if s.startswith("9. Explication")), "")
if explanation_section:
explanation = explanation_section.split(":", 1)[-1].strip()
if explanation:
return explanation
# Build explanation from multiple sections if not found
combined_explanation = []
for section_num in [1, 2, 3, 4, 8]:
section = next((s for s in sections if s.startswith(f"{section_num}.")), "")
if section:
combined_explanation.append(section)
if combined_explanation:
return "\n\n".join(combined_explanation)
return "Analyse basée sur le profil d'investisseur fourni."
def _extract_asset_allocation(self, sections: List[str]) -> Dict[str, float]:
"""Extract asset allocation recommendations"""
allocation_section = next((s for s in sections if "classe d'actifs" in s.lower()), "")
allocation = {}
if allocation_section:
matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', allocation_section)
for match in matches:
asset_class = match.group(1).strip()
percentage = float(match.group(2).replace(',', '.'))
allocation[asset_class] = percentage / 100
# Look elsewhere if not found
if not allocation:
for section in sections:
if re.search(r'allocation|répartition|portefeuille', section, re.I):
matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', section)
for match in matches:
asset_class = match.group(1).strip()
percentage = float(match.group(2).replace(',', '.'))
allocation[asset_class] = percentage / 100
# Normalize weights if needed
total = sum(allocation.values())
if 0 < total < 0.99 or total > 1.01:
allocation = {k: v/total for k, v in allocation.items()}
return allocation
def _extract_geographic_allocation(self, sections: List[str]) -> Dict[str, float]:
"""Extract geographic allocation recommendations"""
geo_section = next((s for s in sections if "géographique" in s.lower()), "")
allocation = {}
if geo_section:
matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', geo_section)
for match in matches:
region = match.group(1).strip()
percentage = float(match.group(2).replace(',', '.'))
allocation[region] = percentage / 100
# Look elsewhere if not found
if not allocation:
for section in sections:
if re.search(r'zone|région|international|pays', section, re.I):
matches = re.finditer(r'([A-Za-zÀ-ÖØ-öø-ÿ\s]+)[\s:]+([\d.,]+)\s*%', section)
for match in matches:
region = match.group(1).strip()
percentage = float(match.group(2).replace(',', '.'))
allocation[region] = percentage / 100
# Normalize weights if needed
total = sum(allocation.values())
if 0 < total < 0.99 or total > 1.01:
allocation = {k: v/total for k, v in allocation.items()}
return allocation
def _extract_risk_decomposition(self, sections: List[str]) -> Dict[str, float]:
"""Extract risk decomposition"""
risk_section = next((s for s in sections if "Analyse des risques" in s), "")
decomposition = {}
if risk_section:
try:
market_risk = re.search(r'marché[^\d]*(\d+(?:\.\d+)?)', risk_section)
if market_risk:
decomposition['market_risk'] = float(market_risk.group(1)) / 100 if float(market_risk.group(1)) > 1 else float(market_risk.group(1))
credit_risk = re.search(r'crédit[^\d]*(\d+(?:\.\d+)?)', risk_section)
if credit_risk:
decomposition['credit_risk'] = float(credit_risk.group(1)) / 100 if float(credit_risk.group(1)) > 1 else float(credit_risk.group(1))
liquidity_risk = re.search(r'liquidité[^\d]*(\d+(?:\.\d+)?)', risk_section)
if liquidity_risk:
decomposition['liquidity_risk'] = float(liquidity_risk.group(1)) / 100 if float(liquidity_risk.group(1)) > 1 else float(liquidity_risk.group(1))
operational_risk = re.search(r'opérationnel[^\d]*(\d+(?:\.\d+)?)', risk_section)
if operational_risk:
decomposition['operational_risk'] = float(operational_risk.group(1)) / 100 if float(operational_risk.group(1)) > 1 else float(operational_risk.group(1))
except ValueError:
pass
# Default values if nothing found
if not decomposition:
decomposition = {
'market_risk': 0.4,
'credit_risk': 0.2,
'liquidity_risk': 0.2,
'operational_risk': 0.2
}
return decomposition
def _extract_goal_feasibility(self, sections: List[str]) -> Dict[str, float]:
"""Extract goal feasibility assessment"""
goals_section = next((s for s in sections if "Objectifs prioritaires" in s), "")
feasibility = {}
if goals_section:
goal_matches = re.finditer(r'([^:]+):\s*(\d+(?:\.\d+)?)', goals_section)
for match in goal_matches:
goal = match.group(1).strip()
score = float(match.group(2))
feasibility[goal] = min(max(score / 100 if score > 1 else score, 0), 1)
return feasibility
def _extract_investment_style(self, sections: List[str]) -> Dict[str, float]:
"""Determine recommended investment style"""
style_weights = {
'value': 0.0,
'growth': 0.0,
'income': 0.0,
'momentum': 0.0,
'quality': 0.0
}
recommendations = next((s for s in sections if "Recommandations" in s), "")
if recommendations:
style_weights['value'] = len(re.findall(r'valeur|sous-évalué|value', recommendations, re.I)) * 0.2
style_weights['growth'] = len(re.findall(r'croissance|growth|innovation', recommendations, re.I)) * 0.2
style_weights['income'] = len(re.findall(r'revenu|dividende|income', recommendations, re.I)) * 0.2
style_weights['momentum'] = len(re.findall(r'momentum|tendance|trend', recommendations, re.I)) * 0.2
style_weights['quality'] = len(re.findall(r'qualité|quality|solide', recommendations, re.I)) * 0.2
# Normalize weights
total = sum(style_weights.values())
if total > 0:
style_weights = {k: v/total for k, v in style_weights.items()}
return style_weights
def _extract_market_sensitivity(self, sections: List[str]) -> Dict[str, float]:
"""Analyze sensitivity to market conditions"""
sensitivity = {
'market_beta': 0.0,
'interest_rate': 0.0,
'economic_cycle': 0.0,
'inflation': 0.0
}
analysis_text = "\n".join(sections)
# Analyze mentions and context
if 'beta' in analysis_text.lower():
beta_match = re.search(r'beta[^\d]*(\d+(?:\.\d+)?)', analysis_text, re.I)
if beta_match:
sensitivity['market_beta'] = float(beta_match.group(1))
if 'taux' in analysis_text.lower():
sensitivity['interest_rate'] = len(re.findall(r'taux|rate', analysis_text, re.I)) * 0.1
if 'cycle' in analysis_text.lower():
sensitivity['economic_cycle'] = len(re.findall(r'cycle|économique', analysis_text, re.I)) * 0.1
if 'inflation' in analysis_text.lower():
sensitivity['inflation'] = len(re.findall(r'inflation|prix', analysis_text, re.I)) * 0.1
# Normalize sensitivities
max_value = max(sensitivity.values())
if max_value > 0:
sensitivity = {k: v/max_value for k, v in sensitivity.items()}
return sensitivity
def _extract_alerts(self, sections: List[str]) -> List[Dict[str, Any]]:
"""Extract alerts and critical attention points"""
alerts = []
attention_section = next((s for s in sections if "Points de vigilance" in s), "")
if attention_section:
alert_patterns = [
(r'(!+|ATTENTION|ALERTE)[^\n]*', 'critical'),
(r'attention|vigilance[^\n]*', 'warning'),
(r'noter|remarquer[^\n]*', 'info')
]
for pattern, severity in alert_patterns:
matches = re.finditer(pattern, attention_section, re.I)
for match in matches:
alerts.append({
'severity': severity,
'message': match.group(0).strip(),
'timestamp': datetime.now().isoformat()
})
return alerts
def _extract_attention_points(self, sections: List[str]) -> List[str]:
"""Extract specific attention points"""
attention_points = []
attention_section = next((s for s in sections if "Points d'attention" in s or "Points de vigilance" in s), "")
if attention_section:
points = re.split(r'[•\-\d]+\.?\s*', attention_section)
attention_points = [p.strip() for p in points if p.strip() and not p.startswith("Points")]
return attention_points
def _extract_market_context(self, sections: List[str]) -> Dict[str, Any]:
"""Extract relevant market context"""
context = {}
market_section = next((s for s in sections if "Contexte de marché" in s or "conditions de marché" in s), "")
if market_section:
if re.search(r'baiss|bear|négatif', market_section, re.I):
context['market_regime'] = 'bear'
elif re.search(r'hauss|bull|positif', market_section, re.I):
context['market_regime'] = 'bull'
else:
context['market_regime'] = 'neutral'
# Volatility analysis
if re.search(r'volatile|instable', market_section, re.I):
context['volatility'] = 'high'
elif re.search(r'stable|calme', market_section, re.I):
context['volatility'] = 'low'
else:
context['volatility'] = 'moderate'
# Market sentiment
context['sentiment'] = self._analyze_market_sentiment(market_section)
return context
def _calculate_adaptability_score(self, sections: List[str]) -> float:
"""Calculate profile adaptability score"""
score = 0.5 # Base score
text = "\n".join(sections)
# Positive factors
positive_factors = [
(r'flex|adapt|ajust', 0.1),
(r'divers|équilibr', 0.1),
(r'long terme|patient', 0.1),
(r'expérience|connaissance', 0.1),
(r'stable|régulier', 0.1)
]
# Negative factors
negative_factors = [
(r'rigid|fix|strict', -0.1),
(r'risqu|peur|craint', -0.1),
(r'court terme|urgent', -0.1),
(r'limité|contraint', -0.1),
(r'instable|irrégulier', -0.1)
]
# Calculate score
for pattern, weight in positive_factors + negative_factors:
if re.search(pattern, text, re.I):
score = min(max(score + weight, 0), 1)
return score
def _analyze_market_sentiment(self, text: str) -> str:
"""Analyze market sentiment"""
positive_count = len(re.findall(r'positif|optimiste|favorable|hausse', text, re.I))
negative_count = len(re.findall(r'négatif|pessimiste|défavorable|baisse', text, re.I))
if positive_count > negative_count:
return 'positive'
elif negative_count > positive_count:
return 'negative'
return 'neutral'
def _extract_stress_test_results(self, sections: List[str]) -> Dict[str, float]:
"""Extract stress test results"""
results = {}
stress_section = next((s for s in sections if "Stress tests" in s), "")
if stress_section:
scenarios = re.finditer(r'([^:]+):\s*-?\d+(?:\.\d+)?%', stress_section)
for scenario in scenarios:
name = scenario.group(1).strip()
try:
impact = float(re.search(r'-?\d+(?:\.\d+)?', scenario.group(0)).group(0))
results[name] = impact / 100 # Convert to decimal
except (ValueError, AttributeError):
continue
# Default scenarios if nothing found
if not results:
results = {
'market_crash': -0.30,
'interest_rate_shock': -0.15,
'currency_crisis': -0.20,
'inflation_shock': -0.10
}
return results
async def generate_profile_description(self, profile_data: Dict[str, Any]) -> str:
"""Generate detailed profile description"""
try:
# Prepare prompt
prompt = self.llm.prepare_description_prompt(profile_data)
# Generate description
description = await self.llm.generate_text(prompt)
# Clean and format
description = description.replace("[/INST]", "").strip()
return description
except Exception as e:
self.logger.error(f"Error generating profile description: {e}")
return self._generate_default_description(profile_data)
def _generate_default_description(self, profile_data: Dict[str, Any]) -> str:
"""Generate default description if generation fails"""
age = profile_data.get('age', 'Non spécifié')
income = profile_data.get('annual_income', 'Non spécifié')
goals = ", ".join(profile_data.get('investment_goals', ["épargne générale"]))
risk = profile_data.get('risk_tolerance', 3)
horizon = profile_data.get('investment_horizon', 5)
risk_profile = "prudent" if risk <= 2 else "modéré" if risk <= 3 else "dynamique"
return f"""Profil investisseur de {age} ans avec un revenu annuel de {income}€.
Ce profil présente une tolérance au risque {risk_profile} (niveau {risk}/5) et un horizon d'investissement de {horizon} ans.
Ses principaux objectifs sont: {goals}.
Une stratégie d'investissement diversifiée est recommandée, adaptée à son niveau de risque {risk_profile}.
L'allocation d'actifs devra tenir compte de ses objectifs spécifiques tout en respectant son horizon d'investissement.
"""
class DeepVestProfiler:
"""Advanced investor profiler with fine-tuned AI integration"""
def __init__(self, config: Optional[DeepVestConfig] = None):
"""Initialize profiler with configuration"""
self.config = config or DeepVestConfig()
self.logger = logging.getLogger("DeepVest.Profiler")
self.llm_analyzer = LLMProfileAnalyzer(self.config)
self.profiles = {} # In-memory profile storage
# Initialize database directory if persistent storage is enabled
if self.config.use_persistent_storage:
self._ensure_db_directory()
def _ensure_db_directory(self):
"""Ensure database directory exists"""
if not os.path.exists(self.config.db_path):
os.makedirs(self.config.db_path)
self.logger.info(f"Created database directory: {self.config.db_path}")
async def create_profile(self, profile_data: Dict[str, Any]) -> PersonalProfile:
"""Create complete investor profile with LLM analysis"""
try:
self.logger.info("Creating new investor profile")
# Process input data and handle French field names
processed_data = profile_data.copy()
# Map French field names for monthly savings
if 'épargne_mensuelle' in processed_data:
processed_data['monthly_savings'] = processed_data.pop('épargne_mensuelle')
elif 'épargne mensuelle' in processed_data:
processed_data['monthly_savings'] = processed_data.pop('épargne mensuelle')
# Perform LLM analysis
llm_analysis = await self.llm_analyzer.analyze_profile(processed_data)
# Generate profile description
profile_description = await self.llm_analyzer.generate_profile_description(processed_data)
# Build profile
profile = PersonalProfile(
# Identifiers
id=processed_data.get('id'),
# Base profile
risk_tolerance=processed_data.get('risk_tolerance', 3),
investment_horizon=processed_data.get('investment_horizon', 5),
income_stability=processed_data.get('income_stability', 3),
financial_knowledge=processed_data.get('financial_knowledge', 3),
investment_goals=processed_data.get('investment_goals', []),
esg_preferences=processed_data.get('esg_preferences', False),
constraints=processed_data.get('constraints', {}),
# Personal information
age=processed_data.get('age', 30),
annual_income=processed_data.get('annual_income', 0),
total_assets=processed_data.get('total_assets', 0),
total_debt=processed_data.get('total_debt', 0),
monthly_savings=processed_data.get('monthly_savings', 0), # Added monthly_savings
occupation=processed_data.get('occupation', ''),
industry=processed_data.get('industry', ''),
education_level=processed_data.get('education_level', ''),
marital_status=processed_data.get('marital_status', 'single'),
number_of_dependents=processed_data.get('number_of_dependents', processed_data.get('dependents', 0)),
# LLM analysis results
llm_analysis_results=llm_analysis,
risk_score=llm_analysis.risk_score,
adaptability_score=llm_analysis.adaptability_score,
complexity_tolerance=processed_data.get('complexity_tolerance', 0.5),
profile_description=profile_description
)
# Add detailed analyses
await self._add_detailed_analysis(profile, processed_data)
# Validate profile
profile._validate_profile()
# Calculate comprehensive risk metrics
profile.calculate_comprehensive_risk_metrics()
# Store in memory
self.profiles[profile.id] = profile
# Store in database if enabled
if self.config.use_persistent_storage:
self._save_profile_to_db(profile)
self.logger.info(f"Profile created successfully with ID: {profile.id}")
return profile
except Exception as e:
self.logger.error(f"Error creating profile: {e}")
raise
async def _add_detailed_analysis(self, profile: PersonalProfile, data: Dict[str, Any]):
"""Add detailed analyses to profile"""
# Financial analysis
try:
profile.financial_analysis = FinancialAnalysis(
income_stability=data.get('income_stability', 3) / 5,
debt_ratio=profile.total_debt / profile.annual_income if profile.annual_income > 0 else 0,
savings_capacity=data.get('monthly_savings', 0) / data.get('annual_income', 1) * 12 if data.get('annual_income', 0) > 0 else 0,
emergency_fund_months=data.get('emergency_fund_months', 3),
risk_capacity=profile.llm_analysis_results.risk_score if profile.llm_analysis_results else 0.5,
investment_horizon=profile.investment_horizon,
liquidity_needs=data.get('liquidity_needs', 0.2),
income_sources=data.get('income_sources', {}),
fixed_expenses=data.get('fixed_expenses', {}),
variable_expenses=data.get('variable_expenses', {})
)
except Exception as e:
self.logger.warning(f"Error creating detailed financial analysis: {e}")
# Family analysis
try:
profile.family_analysis = FamilyAnalysis(
family_size=1 + (1 if data.get('marital_status') in ['married', 'partnered'] else 0) + data.get('dependents', 0),
dependents=data.get('dependents', 0),
life_stage=self._determine_life_stage(profile.age),
future_events=data.get('future_events', []),
monthly_obligations=data.get('monthly_obligations', 0),
insurance_coverage=data.get('insurance_coverage', {}),
education_needs=data.get('education_needs', {}),
healthcare_needs=data.get('healthcare_needs', {})
)
except Exception as e:
self.logger.warning(f"Error creating detailed family analysis: {e}")
# Investment objectives
try:
profile.investment_objectives = InvestmentGoals(
primary_goals=[{'name': goal, 'priority': i+1} for i, goal in enumerate(profile.investment_goals)],
timeframes=data.get('goal_timeframes', {}),
required_returns=data.get('required_returns', {}),
priority_order=profile.investment_goals,
target_amounts=data.get('goal_amounts', {}),
goal_progress=data.get('goal_progress', {}),
risk_tolerance_by_goal=data.get('risk_by_goal', {})
)
except Exception as e:
self.logger.warning(f"Error creating detailed investment objectives: {e}")
def _determine_life_stage(self, age: int) -> str:
"""Determine life stage based on age"""
if age < 25:
return "early_career"
elif age < 35:
return "career_building"
elif age < 45:
return "family_building"
elif age < 55:
return "peak_earning"
elif age < 65:
return "pre_retirement"
else:
return "retirement"
def _save_profile_to_db(self, profile: PersonalProfile):
"""Save profile to persistent storage"""
try:
# Generate filename
filename = f"{profile.id}.json"
filepath = os.path.join(self.config.db_path, filename)
# Export as JSON
profile.export_json(filepath, include_detailed_analysis=True)
self.logger.info(f"Profile saved to database: {filepath}")
except Exception as e:
self.logger.error(f"Error saving profile to database: {e}")
async def get_profile(self, profile_id: str) -> Optional[PersonalProfile]:
"""Récupérer un profil par son ID"""
try:
# Check in-memory cache first
if profile_id in self.profiles:
self.logger.debug(f"Profile {profile_id} found in memory cache")
return self.profiles[profile_id]
# If not in memory and persistent storage is enabled, try to load from disk
if self.config.use_persistent_storage:
filepath = os.path.join(self.config.db_path, f"{profile_id}.json")
if os.path.exists(filepath):
self.logger.info(f"Loading profile {profile_id} from disk storage")
profile = PersonalProfile.from_json(filepath)
# Add to memory cache
self.profiles[profile_id] = profile
return profile
self.logger.warning(f"Profile {profile_id} not found")
return None
except Exception as e:
self.logger.error(f"Error retrieving profile {profile_id}: {e}")
return None
async def update_profile(self, profile_id: str, updates: Dict[str, Any]) -> PersonalProfile:
"""Met à jour un profil existant avec de nouvelles informations"""
try:
# Récupérer le profil existant
profile = await self.get_profile(profile_id)
if not profile:
raise ValueError(f"Profile with ID {profile_id} not found")
# Process updates for French field names
processed_updates = updates.copy()
if 'épargne_mensuelle' in processed_updates:
processed_updates['monthly_savings'] = processed_updates.pop('épargne_mensuelle')
elif 'épargne mensuelle' in processed_updates:
processed_updates['monthly_savings'] = processed_updates.pop('épargne mensuelle')
# Use dataclasses.asdict to properly convert dataclass to dict
from dataclasses import asdict
profile_dict = asdict(profile)
# Remove fields that shouldn't be passed to constructor
fields_to_remove = ['basic_info']
for field in fields_to_remove:
if field in profile_dict:
del profile_dict[field]
# Apply the updates
for key, value in processed_updates.items():
if key in profile_dict:
profile_dict[key] = value
# Recréer un objet PersonalProfile à partir du dictionnaire mis à jour
# We need to handle potential unexpected arguments
# Use only the fields that PersonalProfile accepts
from inspect import signature
profile_params = signature(PersonalProfile).parameters
filtered_dict = {k: v for k, v in profile_dict.items() if k in profile_params}
updated_profile = PersonalProfile(**filtered_dict)
# Validate the updated profile
if hasattr(updated_profile, '_validate_profile'):
updated_profile._validate_profile()
# Reanalyze the profile
analysis_result = await self.analyzer.analyze_profile(updated_profile)
updated_profile.llm_analysis_results = analysis_result
# Sauvegarder le profil mis à jour
await self._save_profile(updated_profile)
self.logger.info(f"Profile {profile_id} updated successfully")
return updated_profile
except Exception as e:
self.logger.error(f"Error updating profile {profile_id}: {str(e)}")
raise
def _are_changes_significant(self, updates: Dict[str, Any]) -> bool:
"""Determine if profile changes require re-analysis"""
significant_fields = {
'risk_tolerance', 'investment_horizon', 'age', 'annual_income',
'total_assets', 'total_debt', 'investment_goals', 'marital_status',
'number_of_dependents', 'esg_preferences'
}
return any(field in updates for field in significant_fields)
async def delete_profile(self, profile_id: str) -> bool:
"""Supprimer un profil"""
try:
# Remove from memory
if profile_id in self.profiles:
del self.profiles[profile_id]
# Remove from persistent storage if enabled
if self.config.use_persistent_storage:
filepath = os.path.join(self.config.db_path, f"{profile_id}.json")
if os.path.exists(filepath):
os.remove(filepath)
self.logger.info(f"Profile {profile_id} deleted successfully")
return True
except Exception as e:
self.logger.error(f"Error deleting profile {profile_id}: {e}")
return False
async def list_profiles(self, filter_criteria: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""Liste tous les profils disponibles avec critères de filtrage optionnels"""
try:
profiles_list = []
# Load all profiles from disk if persistent storage is enabled
if self.config.use_persistent_storage:
self._load_all_profiles_to_memory()
# Filter and convert profiles to basic info dictionaries
for profile_id, profile in self.profiles.items():
if filter_criteria and not self._profile_matches_criteria(profile, filter_criteria):
continue
# Extract basic profile info
profile_info = {
'id': profile.id,
'age': profile.age,
'risk_score': profile.risk_score,
'investment_horizon': profile.investment_horizon,
'creation_date': profile.created_at.isoformat(),
'last_updated': profile.last_updated.isoformat(),
'goals': profile.investment_goals
}
profiles_list.append(profile_info)
return profiles_list
except Exception as e:
self.logger.error(f"Error listing profiles: {e}")
return []
def _profile_matches_criteria(self, profile: PersonalProfile, criteria: Dict[str, Any]) -> bool:
"""Vérifie si un profil correspond aux critères de filtre"""
for field, value in criteria.items():
if not hasattr(profile, field):
return False
profile_value = getattr(profile, field)
# Handle range criteria
if isinstance(value, dict) and ('min' in value or 'max' in value):
if 'min' in value and profile_value < value['min']:
return False
if 'max' in value and profile_value > value['max']:
return False
# Handle list criteria (any match)
elif isinstance(value, list):
if profile_value not in value:
return False
# Direct comparison
elif profile_value != value:
return False
return True
def _load_all_profiles_to_memory(self):
"""Charge tous les profils depuis le stockage persistant vers la mémoire"""
if not self.config.use_persistent_storage:
return
try:
# Get all JSON files in the database directory
db_path = self.config.db_path
if not os.path.exists(db_path):
self.logger.warning(f"Database directory {db_path} not found")
return
json_files = [f for f in os.listdir(db_path) if f.endswith('.json')]
self.logger.info(f"Found {len(json_files)} profile files in database")
# Load each file that isn't already in memory
for json_file in json_files:
profile_id = json_file.replace('.json', '')
if profile_id not in self.profiles:
filepath = os.path.join(db_path, json_file)
try:
profile = PersonalProfile.from_json(filepath)
self.profiles[profile_id] = profile
except Exception as e:
self.logger.error(f"Error loading profile from {filepath}: {e}")
except Exception as e:
self.logger.error(f"Error loading profiles from disk: {e}")
async def batch_analyze_profiles(self, profile_ids: List[str]) -> Dict[str, Any]:
"""Analyse par lots de plusieurs profils pour détection de tendances"""
try:
profiles = []
profile_map = {}
# Load all requested profiles
for profile_id in profile_ids:
profile = await self.get_profile(profile_id)
if profile:
profiles.append(profile)
profile_map[profile_id] = profile
if not profiles:
self.logger.warning("No valid profiles found for batch analysis")
return {}
# Extract common trends and insights
age_groups = self._group_profiles_by_age(profiles)
risk_groups = self._group_profiles_by_risk(profiles)
goal_trends = self._analyze_goal_trends(profiles)
# Perform clustering if we have enough profiles
clusters = {}
if len(profiles) >= 5:
clusters = await self._cluster_profiles(profiles)
return {
'age_groups': age_groups,
'risk_distribution': risk_groups,
'goal_trends': goal_trends,
'profile_clusters': clusters,
'total_profiles_analyzed': len(profiles)
}
except Exception as e:
self.logger.error(f"Error in batch profile analysis: {e}")
return {'error': str(e)}
def _group_profiles_by_age(self, profiles: List[PersonalProfile]) -> Dict[str, int]:
"""Groupe les profils par tranche d'âge"""
age_groups = {
'18-25': 0,
'26-35': 0,
'36-45': 0,
'46-55': 0,
'56-65': 0,
'65+': 0
}
for profile in profiles:
age = profile.age
if age <= 25:
age_groups['18-25'] += 1
elif age <= 35:
age_groups['26-35'] += 1
elif age <= 45:
age_groups['36-45'] += 1
elif age <= 55:
age_groups['46-55'] += 1
elif age <= 65:
age_groups['56-65'] += 1
else:
age_groups['65+'] += 1
return age_groups
def _group_profiles_by_risk(self, profiles: List[PersonalProfile]) -> Dict[str, int]:
"""Groupe les profils par niveau de risque"""
risk_groups = {
'Très conservateur': 0,
'Conservateur': 0,
'Modéré': 0,
'Dynamique': 0,
'Très dynamique': 0
}
for profile in profiles:
risk_score = profile.risk_score
if risk_score < 0.2:
risk_groups['Très conservateur'] += 1
elif risk_score < 0.4:
risk_groups['Conservateur'] += 1
elif risk_score < 0.6:
risk_groups['Modéré'] += 1
elif risk_score < 0.8:
risk_groups['Dynamique'] += 1
else:
risk_groups['Très dynamique'] += 1
return risk_groups
def _analyze_goal_trends(self, profiles: List[PersonalProfile]) -> Dict[str, Dict[str, Any]]:
"""Analyse les tendances dans les objectifs d'investissement"""
all_goals = {}
# Count goals
for profile in profiles:
for goal in profile.investment_goals:
if goal not in all_goals:
all_goals[goal] = {
'count': 0,
'avg_risk_score': 0,
'avg_investment_horizon': 0,
'profiles': []
}
all_goals[goal]['count'] += 1
all_goals[goal]['avg_risk_score'] += profile.risk_score
all_goals[goal]['avg_investment_horizon'] += profile.investment_horizon
all_goals[goal]['profiles'].append(profile.id)
# Calculate averages
for goal, data in all_goals.items():
count = data['count']
if count > 0:
data['avg_risk_score'] /= count
data['avg_investment_horizon'] /= count
data['percentage'] = (count / len(profiles)) * 100
return all_goals
async def _cluster_profiles(self, profiles: List[PersonalProfile]) -> Dict[str, List[str]]:
"""Regroupe les profils en clusters basés sur leurs caractéristiques"""
try:
# Extract features for clustering
features = []
profile_ids = []
for profile in profiles:
# Create feature vector
feature_vector = [
profile.risk_score,
profile.investment_horizon / 10, # Normalize horizon
profile.age / 100, # Normalize age
profile.financial_knowledge / 5, # Normalize knowledge
profile.income_stability / 5, # Normalize stability
profile.total_assets / 1000000 if profile.total_assets > 0 else 0, # Normalize assets
profile.total_debt / 1000000 if profile.total_debt > 0 else 0, # Normalize debt
len(profile.investment_goals) / 5 # Normalize goal count
]
features.append(feature_vector)
profile_ids.append(profile.id)
# Skip clustering if too few profiles
if len(features) < 3:
return {"cluster_1": profile_ids}
# Perform k-means clustering
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
# Standardize features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
# Determine optimal number of clusters (between 2 and 5)
n_clusters = min(max(2, len(profiles) // 3), 5)
# Perform clustering
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
clusters = kmeans.fit_predict(features_scaled)
# Organize profiles by cluster
cluster_dict = {}
for i, cluster_id in enumerate(clusters):
cluster_name = f"cluster_{cluster_id + 1}"
if cluster_name not in cluster_dict:
cluster_dict[cluster_name] = []
cluster_dict[cluster_name].append(profile_ids[i])
return cluster_dict
except Exception as e:
self.logger.error(f"Error in profile clustering: {e}")
return {"cluster_error": [p.id for p in profiles]}
def generate_asset_allocation(self, risk_score: float, investment_horizon: int, esg_preferences: bool = False) -> Dict[str, Any]:
"""Génère une allocation d'actifs basée sur le profil de risque et l'horizon
Args:
risk_score: Score de risque (1-5 ou 0-1)
investment_horizon: Horizon d'investissement en années
esg_preferences: Indique si l'investisseur a des préférences ESG
Returns:
Dictionnaire contenant l'allocation d'actifs et le profil de risque
"""
try:
# Normaliser le score de risque entre 0 et 1 s'il est sur une échelle différente
normalized_risk = risk_score
if normalized_risk > 1: # Si le score est sur une échelle de 1-5
normalized_risk = risk_score / 5.0
# Définir les catégories de risque
risk_category = ""
if normalized_risk < 0.2:
risk_category = "Très conservateur"
elif normalized_risk < 0.4:
risk_category = "Conservateur"
elif normalized_risk < 0.6:
risk_category = "Modéré"
elif normalized_risk < 0.8:
risk_category = "Dynamique"
else:
risk_category = "Très dynamique"
# Allocation de base selon le profil de risque
allocation = {}
if risk_category == "Très conservateur":
allocation = {
'Actions': 0.15,
'Obligations': 0.65,
'Liquidités': 0.20
}
elif risk_category == "Conservateur":
allocation = {
'Actions': 0.30,
'Obligations': 0.55,
'Liquidités': 0.15
}
elif risk_category == "Modéré":
allocation = {
'Actions': 0.50,
'Obligations': 0.40,
'Liquidités': 0.10
}
elif risk_category == "Dynamique":
allocation = {
'Actions': 0.70,
'Obligations': 0.25,
'Liquidités': 0.05
}
else: # Très dynamique
allocation = {
'Actions': 0.85,
'Obligations': 0.10,
'Liquidités': 0.05
}
# Ajustement selon l'horizon
horizon_factor = 0
if investment_horizon < 5:
# Réduire les actions pour les horizons courts
horizon_factor = -0.15
elif investment_horizon > 15:
# Augmenter les actions pour les horizons longs
horizon_factor = 0.10
# Appliquer l'ajustement basé sur l'horizon
if horizon_factor != 0:
action_shift = min(allocation['Actions'], abs(horizon_factor)) * (1 if horizon_factor > 0 else -1)
allocation['Actions'] += action_shift
# Répartir le changement sur les obligations et liquidités
remaining_shift = -action_shift
obligation_shift = remaining_shift * 0.7
liquidity_shift = remaining_shift * 0.3
allocation['Obligations'] += obligation_shift
allocation['Liquidités'] += liquidity_shift
# Ajustement pour les préférences ESG
if esg_preferences:
esg_allocation = min(allocation['Actions'], 0.3)
allocation['Actions'] -= esg_allocation
allocation['Actions ESG'] = esg_allocation
# S'assurer que tous les pourcentages sont positifs
allocation = {k: max(0, v) for k, v in allocation.items()}
# Normaliser pour que la somme soit égale à 1
total = sum(allocation.values())
allocation = {k: v / total for k, v in allocation.items()}
# Retourner l'allocation avec la catégorie de risque
return {
'asset_allocation': allocation,
'risk_profile': {
'score': normalized_risk,
'category': risk_category
}
}
except Exception as e:
self.logger.error(f"Error generating asset allocation: {str(e)}")
# Allocation par défaut en cas d'erreur
return {
'asset_allocation': {'Actions': 0.6, 'Obligations': 0.3, 'Liquidités': 0.1},
'risk_profile': {'score': 0.5, 'category': 'Modéré'}
}
async def generate_investment_strategy(self, profile: PersonalProfile) -> Dict[str, Any]:
"""Génère une stratégie d'investissement personnalisée pour un profil"""
try:
# Vérifier que le profil est valide
if not profile or not profile.id:
raise ValueError("Invalid profile")
# Obtenir monthly_savings avec fallback pour éviter les erreurs
monthly_savings = getattr(profile, 'monthly_savings', 0)
if monthly_savings == 0 and hasattr(profile, 'financial_analysis') and hasattr(profile.financial_analysis, 'monthly_savings'):
monthly_savings = profile.financial_analysis.monthly_savings
# Calculer l'allocation d'actifs
risk_score = profile.risk_tolerance if 1 <= profile.risk_tolerance <= 5 else 3
asset_allocation = self.generate_asset_allocation(
risk_score=risk_score,
investment_horizon=profile.investment_horizon,
esg_preferences=profile.esg_preferences
)
# Personnaliser les recommandations en fonction du profil
recommendations = []
# Recommandation de base pour tous
recommendations.append("Diversifier le portefeuille selon le profil de risque")
# Recommandations spécifiques selon le profil
if profile.age < 30:
recommendations.append("Privilégier les investissements à fort potentiel de croissance")
recommendations.append("Adopter une stratégie d'investissement régulier (DCA)")
elif profile.age < 50:
recommendations.append("Équilibrer croissance et sécurité dans votre allocation")
recommendations.append("Réviser annuellement votre stratégie fiscale")
else:
recommendations.append("Sécuriser progressivement votre capital")
recommendations.append("Optimiser votre stratégie en vue de la retraite")
# Recommandations selon les objectifs
if "Retraite" in profile.investment_goals:
recommendations.append("Maximiser les versements sur votre plan de retraite")
if "Achat immobilier" in profile.investment_goals:
recommendations.append("Constituer un apport pour votre projet immobilier")
if "Études des enfants" in profile.investment_goals:
recommendations.append("Mettre en place une épargne dédiée aux études des enfants")
# Recommandations spécifiques à la situation financière
if profile.total_debt > profile.annual_income:
recommendations.append("Prioriser le remboursement des dettes à taux élevé")
if monthly_savings < (profile.annual_income / 12) * 0.1:
recommendations.append("Augmenter votre taux d'épargne mensuel")
# Limiter à 5 recommandations principales
if len(recommendations) > 5:
recommendations = recommendations[:5]
# Construire la stratégie complète
strategy = {
'profile_id': profile.id,
'timestamp': datetime.now().isoformat(),
'risk_profile': asset_allocation['risk_profile'],
'asset_allocation': asset_allocation['asset_allocation'],
'recommendations': recommendations,
'targets': {
'monthly_savings': max(profile.annual_income * 0.15 / 12, monthly_savings),
'emergency_fund': profile.annual_income / 6
},
'investment_products': self._recommend_products(profile, asset_allocation)
}
self.logger.info(f"Generated investment strategy for profile {profile.id}")
return strategy
except Exception as e:
self.logger.error(f"Error generating investment strategy: {str(e)}")
# Stratégie par défaut en cas d'erreur
return {
'risk_profile': {'category': 'Modéré'},
'asset_allocation': {'Actions': 0.6, 'Obligations': 0.3, 'Liquidités': 0.1},
'recommendations': [
"Diversifier le portefeuille selon le profil de risque",
"Maintenir une épargne de sécurité",
"Adopter une stratégie d'investissement régulier"
]
}
def _refine_asset_allocation(self, allocation: Dict[str, float], profile: PersonalProfile) -> Dict[str, float]:
"""Raffine l'allocation d'actifs basée sur des contraintes supplémentaires"""
refined = allocation.copy()
# Apply ESG preference adjustments
if profile.esg_preferences and 'Actions' in refined:
# Reduce regular stocks and add ESG stocks
regular_stocks = refined.get('Actions', 0)
if regular_stocks > 0.3:
refined['Actions'] = regular_stocks * 0.7
refined['Actions ESG'] = regular_stocks * 0.3
# Apply liquidity needs adjustment
if hasattr(profile, 'financial_analysis') and profile.financial_analysis:
liquidity_needs = profile.financial_analysis.liquidity_needs
if liquidity_needs > 0.3 and 'Liquidités' in refined:
# Ensure minimum cash allocation
min_cash = max(0.15, liquidity_needs)
if refined['Liquidités'] < min_cash:
# Reduce other allocations proportionally
shortfall = min_cash - refined['Liquidités']
for asset, weight in list(refined.items()):
if asset != 'Liquidités':
reduction = weight * (shortfall / (1 - refined['Liquidités']))
refined[asset] = max(0, weight - reduction)
refined['Liquidités'] = min_cash
# Normalize to ensure sum is 1.0
total = sum(refined.values())
if abs(total - 1.0) > 0.01:
refined = {k: v/total for k, v in refined.items()}
return refined
def _refine_geographic_allocation(self, allocation: Dict[str, float], profile: PersonalProfile) -> Dict[str, float]:
"""Raffine l'allocation géographique basée sur des contraintes supplémentaires"""
# Use provided allocation or default
if not allocation:
allocation = {
'Europe': 0.4,
'Amérique du Nord': 0.3,
'Asie-Pacifique': 0.2,
'Pays émergents': 0.1
}
refined = allocation.copy()
# Apply profile-specific adjustments
if profile.risk_score < 0.3:
# More conservative profile - reduce emerging markets
if 'Pays émergents' in refined:
emerging = refined['Pays émergents']
refined['Pays émergents'] = emerging * 0.5
# Redistribute to developed markets
for region in ['Europe', 'Amérique du Nord']:
if region in refined:
refined[region] += emerging * 0.25
elif profile.risk_score > 0.7:
# More aggressive profile - increase emerging markets
if 'Pays émergents' in refined and refined['Pays émergents'] < 0.2:
increase = min(0.1, 0.2 - refined['Pays émergents'])
refined['Pays émergents'] += increase
# Take from developed markets
for region in ['Europe', 'Amérique du Nord']:
if region in refined:
refined[region] -= increase / 2
# Normalize to ensure sum is 1.0
total = sum(refined.values())
if abs(total - 1.0) > 0.01:
refined = {k: v/total for k, v in refined.items()}
return refined
def _generate_implementation_steps(self, strategy: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Génère des étapes d'implémentation pratiques pour la stratégie"""
steps = []
# Step 1: Set up accounts
steps.append({
'order': 1,
'title': 'Mise en place des comptes',
'description': 'Ouvrir les comptes nécessaires pour implémenter la stratégie',
'actions': [
'Ouvrir un compte-titres classique pour les investissements généraux',
'Mettre en place un PEA pour les actions européennes',
'Considérer une assurance-vie pour les investissements à long terme'
]
})
# Step 2: Initial asset allocation
asset_allocation = strategy.get('asset_allocation', {})
allocation_actions = []
for asset, weight in asset_allocation.items():
allocation_actions.append(f'Allouer {weight:.1%} du portefeuille à la classe d\'actifs "{asset}"')
steps.append({
'order': 2,
'title': 'Allocation initiale des actifs',
'description': 'Répartir les investissements selon l\'allocation d\'actifs recommandée',
'actions': allocation_actions
})
# Step 3: Implementation schedule
steps.append({
'order': 3,
'title': 'Calendrier d\'implémentation',
'description': 'Établir un calendrier progressif pour l\'implémentation de la stratégie',
'actions': [
'Mois 1: Mise en place des comptes et investissement de 25% du capital',
'Mois 2-3: Investissement de 25% supplémentaires',
'Mois 4-6: Investissement du capital restant',
'Trimestriel: Rééquilibrage du portefeuille selon l\'allocation cible'
]
})
# Step 4: Monitoring and adjustments
steps.append({
'order': 4,
'title': 'Suivi et ajustements',
'description': 'Établir un processus de suivi régulier de la stratégie',
'actions': [
'Suivre mensuellement la performance du portefeuille',
'Vérifier trimestriellement l\'alignement avec l\'allocation cible',
'Réévaluer annuellement la stratégie complète',
'Ajuster en fonction des changements de situation personnelle'
]
})
return steps
def _determine_rebalancing_schedule(self, profile: PersonalProfile) -> Dict[str, Any]:
"""Détermine le calendrier de rééquilibrage optimal"""
# Base frequency on risk profile and investment knowledge
if profile.risk_score > 0.7 or profile.financial_knowledge >= 4:
frequency = 'Trimestrielle'
threshold = 0.05 # 5% drift threshold
elif profile.risk_score < 0.3 or profile.financial_knowledge <= 2:
frequency = 'Annuelle'
threshold = 0.1 # 10% drift threshold
else:
frequency = 'Semestrielle'
threshold = 0.075 # 7.5% drift threshold
return {
'frequency': frequency,
'drift_threshold': threshold,
'method': 'Threshold-based' if profile.financial_knowledge >= 3 else 'Calendar-based',
'next_scheduled': datetime.now() + self._frequency_to_timedelta(frequency)
}
def _frequency_to_timedelta(self, frequency: str) -> timedelta:
"""Convertit une fréquence textuelle en objet timedelta"""
if frequency == 'Mensuelle':
return timedelta(days=30)
elif frequency == 'Trimestrielle':
return timedelta(days=90)
elif frequency == 'Semestrielle':
return timedelta(days=180)
elif frequency == 'Annuelle':
return timedelta(days=365)
else:
return timedelta(days=90) # Default to quarterly
def _calculate_expected_performance(self, allocation: Dict[str, float], years: int) -> Dict[str, Any]:
"""Calcule les performances attendues basées sur l'allocation et l'horizon"""
# Define return and volatility assumptions by asset class
assumptions = {
'Actions': {'return': 0.07, 'volatility': 0.18},
'Actions ESG': {'return': 0.065, 'volatility': 0.17},
'Obligations': {'return': 0.03, 'volatility': 0.05},
'Immobilier': {'return': 0.05, 'volatility': 0.15},
'Liquidités': {'return': 0.01, 'volatility': 0.01},
'Or': {'return': 0.04, 'volatility': 0.16},
'Matières premières': {'return': 0.04, 'volatility': 0.2},
'Cryptomonnaies': {'return': 0.15, 'volatility': 0.6}
}
# Calculate weighted expected return and volatility
expected_return = 0
expected_volatility = 0
for asset, weight in allocation.items():
if asset in assumptions:
expected_return += assumptions[asset]['return'] * weight
expected_volatility += assumptions[asset]['volatility'] * weight
# Apply time horizon adjustments (slightly higher returns for longer horizons)
if years > 10:
expected_return *= 1.05
elif years < 5:
expected_return *= 0.95
# Calculate compound return
compound_return = (1 + expected_return) ** years - 1
# Calculate different scenarios
return {
'expected_annual_return': expected_return,
'expected_volatility': expected_volatility,
'expected_total_return': compound_return,
'scenarios': {
'conservative': compound_return * 0.7,
'expected': compound_return,
'optimistic': compound_return * 1.3
},
'assumptions': {
'time_horizon': years,
'asset_assumptions': assumptions
}
}
async def _generate_fallback_strategy(self, profile: PersonalProfile) -> Dict[str, Any]:
"""Génère une stratégie de repli lorsque l'analyse LLM n'est pas disponible"""
# Generate asset allocation based on risk profile
asset_allocation = self._generate_default_allocation(profile)
# Standard geographic allocation
geographic_allocation = {
'Europe': 0.4,
'Amérique du Nord': 0.3,
'Asie-Pacifique': 0.2,
'Pays émergents': 0.1
}
# Default recommendations
recommendations = [
"Diversifier votre portefeuille selon votre profil de risque",
"Maintenir une épargne de sécurité équivalente à 3-6 mois de dépenses",
"Investir régulièrement pour lisser l'exposition au risque de marché",
"Rééquilibrer périodiquement votre portefeuille pour maintenir l'allocation cible"
]
# Default attention points
attention_points = [
"Surveiller l'évolution de votre tolérance au risque avec le temps",
"Ajuster votre stratégie en fonction des changements importants de votre situation personnelle",
"Rester discipliné face aux fluctuations du marché"
]
return {
'risk_profile': {
'score': profile.risk_tolerance / 5,
'category': self._get_risk_category(profile.risk_tolerance / 5)
},
'investment_horizon': profile.investment_horizon,
'primary_goals': profile.investment_goals[:3],
'asset_allocation': asset_allocation,
'geographic_allocation': geographic_allocation,
'recommendations': recommendations,
'attention_points': attention_points
}
def _generate_default_allocation(self, profile: PersonalProfile) -> Dict[str, float]:
"""Génère une allocation par défaut basée sur le profil de risque"""
risk_score = profile.risk_tolerance / 5
# Base allocation components
stocks = risk_score
bonds = (1 - risk_score) * 0.8
cash = (1 - risk_score) * 0.2
# Additional components for higher risk profiles
if risk_score > 0.6:
# Add alternatives for high risk profiles
alternatives = stocks * 0.15
stocks -= alternatives
return {
'Actions': stocks,
'Obligations': bonds,
'Liquidités': cash,
'Actifs alternatifs': alternatives
}
else:
return {
'Actions': stocks,
'Obligations': bonds,
'Liquidités': cash
}
def _get_risk_category(self, risk_score: float) -> str:
"""Détermine la catégorie de risque basée sur le score"""
if risk_score < 0.2:
return "Très conservateur"
elif risk_score < 0.4:
return "Conservateur"
elif risk_score < 0.6:
return "Modéré"
elif risk_score < 0.8:
return "Dynamique"
else:
return "Très dynamique"
def _generate_simple_fallback_strategy(self, profile: PersonalProfile) -> Dict[str, Any]:
"""Génère une stratégie simplifiée en cas d'erreur"""
risk_level = profile.risk_tolerance
if risk_level <= 2:
allocation = {'Actions': 0.3, 'Obligations': 0.5, 'Liquidités': 0.2}
elif risk_level <= 3:
allocation = {'Actions': 0.5, 'Obligations': 0.4, 'Liquidités': 0.1}
else:
allocation = {'Actions': 0.7, 'Obligations': 0.2, 'Liquidités': 0.1}
return {
'risk_profile': {
'score': risk_level / 5,
'category': self._get_risk_category(risk_level / 5)
},
'investment_horizon': profile.investment_horizon,
'asset_allocation': allocation,
'recommendations': [
"Diversifier votre portefeuille",
"Investir régulièrement",
"Maintenir une épargne de sécurité"
]
}