# src/monitoring/notification_system.py from ..core.market_states import MarketConditions from typing import Callable, List, Dict import pandas as pd from datetime import datetime class NotificationSystem: def __init__(self): self.subscribers = set() self.notification_rules = {} self.alert_history = [] def add_notification_rule(self, event_type: str, condition: Callable, message_template: str): self.notification_rules[event_type] = { 'condition': condition, 'template': message_template } def check_portfolio_alerts(self, portfolio: pd.DataFrame, market_conditions: MarketConditions): alerts = [] for event_type, rule in self.notification_rules.items(): if rule['condition'](portfolio, market_conditions): alert = self._create_alert(event_type, rule['template']) alerts.append(alert) self.alert_history.append(alert) return alerts def _create_alert(self, event_type: str, template: str) -> Dict: return { 'type': event_type, 'message': template, 'timestamp': datetime.now() }