ProfilingAI / src /monitoring /notification_system.py
Sandrine Guétin
Version propre de DeepVest
2106f78
raw
history blame
1.28 kB
# src/monitoring/notification_system.py
from ..core.market_states import MarketConditions
from typing import Callable, List, Dict
import pandas as pd
from datetime import datetime
class NotificationSystem:
def __init__(self):
self.subscribers = set()
self.notification_rules = {}
self.alert_history = []
def add_notification_rule(self, event_type: str, condition: Callable,
message_template: str):
self.notification_rules[event_type] = {
'condition': condition,
'template': message_template
}
def check_portfolio_alerts(self, portfolio: pd.DataFrame,
market_conditions: MarketConditions):
alerts = []
for event_type, rule in self.notification_rules.items():
if rule['condition'](portfolio, market_conditions):
alert = self._create_alert(event_type, rule['template'])
alerts.append(alert)
self.alert_history.append(alert)
return alerts
def _create_alert(self, event_type: str, template: str) -> Dict:
return {
'type': event_type,
'message': template,
'timestamp': datetime.now()
}