Spaces:
Sleeping
Sleeping
| # src/monitoring/notification_system.py | |
| from ..core.market_states import MarketConditions | |
| from typing import Callable, List, Dict | |
| import pandas as pd | |
| from datetime import datetime | |
| class NotificationSystem: | |
| def __init__(self): | |
| self.subscribers = set() | |
| self.notification_rules = {} | |
| self.alert_history = [] | |
| def add_notification_rule(self, event_type: str, condition: Callable, | |
| message_template: str): | |
| self.notification_rules[event_type] = { | |
| 'condition': condition, | |
| 'template': message_template | |
| } | |
| def check_portfolio_alerts(self, portfolio: pd.DataFrame, | |
| market_conditions: MarketConditions): | |
| alerts = [] | |
| for event_type, rule in self.notification_rules.items(): | |
| if rule['condition'](portfolio, market_conditions): | |
| alert = self._create_alert(event_type, rule['template']) | |
| alerts.append(alert) | |
| self.alert_history.append(alert) | |
| return alerts | |
| def _create_alert(self, event_type: str, template: str) -> Dict: | |
| return { | |
| 'type': event_type, | |
| 'message': template, | |
| 'timestamp': datetime.now() | |
| } |