|
|
""" |
|
|
Cache manager for storing predictions and uploaded data |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, Optional |
|
|
from datetime import datetime, timedelta |
|
|
import pandas as pd |
|
|
|
|
|
from config.constants import MAX_PREDICTION_HISTORY |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class CacheManager: |
|
|
""" |
|
|
Manages caching of predictions and data to improve performance |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.predictions = [] |
|
|
self.uploaded_data = {} |
|
|
self.max_predictions = MAX_PREDICTION_HISTORY |
|
|
|
|
|
def store_prediction( |
|
|
self, |
|
|
data_hash: str, |
|
|
horizon: int, |
|
|
confidence_levels: list, |
|
|
result: Dict |
|
|
): |
|
|
""" |
|
|
Store a prediction result |
|
|
|
|
|
Args: |
|
|
data_hash: Hash of the input data |
|
|
horizon: Forecast horizon used |
|
|
confidence_levels: Confidence levels used |
|
|
result: Prediction result dictionary |
|
|
""" |
|
|
prediction_entry = { |
|
|
'data_hash': data_hash, |
|
|
'horizon': horizon, |
|
|
'confidence_levels': confidence_levels, |
|
|
'result': result, |
|
|
'timestamp': datetime.now() |
|
|
} |
|
|
|
|
|
self.predictions.append(prediction_entry) |
|
|
|
|
|
|
|
|
if len(self.predictions) > self.max_predictions: |
|
|
self.predictions = self.predictions[-self.max_predictions:] |
|
|
|
|
|
logger.debug(f"Stored prediction, cache size: {len(self.predictions)}") |
|
|
|
|
|
def get_prediction( |
|
|
self, |
|
|
data_hash: str, |
|
|
horizon: int, |
|
|
confidence_levels: list |
|
|
) -> Optional[Dict]: |
|
|
""" |
|
|
Retrieve a cached prediction if available |
|
|
|
|
|
Args: |
|
|
data_hash: Hash of the input data |
|
|
horizon: Forecast horizon |
|
|
confidence_levels: Confidence levels |
|
|
|
|
|
Returns: |
|
|
Cached prediction result or None |
|
|
""" |
|
|
for entry in reversed(self.predictions): |
|
|
if (entry['data_hash'] == data_hash and |
|
|
entry['horizon'] == horizon and |
|
|
entry['confidence_levels'] == confidence_levels): |
|
|
|
|
|
logger.info("Cache hit for prediction") |
|
|
return entry['result'] |
|
|
|
|
|
logger.debug("Cache miss for prediction") |
|
|
return None |
|
|
|
|
|
def store_data(self, filename: str, data: pd.DataFrame): |
|
|
""" |
|
|
Store uploaded data |
|
|
|
|
|
Args: |
|
|
filename: Name of the uploaded file |
|
|
data: DataFrame containing the data |
|
|
""" |
|
|
self.uploaded_data[filename] = { |
|
|
'data': data, |
|
|
'timestamp': datetime.now() |
|
|
} |
|
|
|
|
|
logger.info(f"Stored data for {filename}") |
|
|
|
|
|
def get_data(self, filename: str) -> Optional[pd.DataFrame]: |
|
|
""" |
|
|
Retrieve uploaded data |
|
|
|
|
|
Args: |
|
|
filename: Name of the file |
|
|
|
|
|
Returns: |
|
|
DataFrame or None |
|
|
""" |
|
|
if filename in self.uploaded_data: |
|
|
return self.uploaded_data[filename]['data'] |
|
|
return None |
|
|
|
|
|
def clear_old_data(self, max_age_hours: int = 24): |
|
|
""" |
|
|
Clear data older than specified hours |
|
|
|
|
|
Args: |
|
|
max_age_hours: Maximum age in hours |
|
|
""" |
|
|
cutoff = datetime.now() - timedelta(hours=max_age_hours) |
|
|
|
|
|
|
|
|
old_files = [ |
|
|
filename for filename, entry in self.uploaded_data.items() |
|
|
if entry['timestamp'] < cutoff |
|
|
] |
|
|
|
|
|
for filename in old_files: |
|
|
del self.uploaded_data[filename] |
|
|
|
|
|
if old_files: |
|
|
logger.info(f"Cleared {len(old_files)} old data entries") |
|
|
|
|
|
def clear_all(self): |
|
|
"""Clear all cached data""" |
|
|
self.predictions.clear() |
|
|
self.uploaded_data.clear() |
|
|
logger.info("Cleared all cache") |
|
|
|
|
|
def get_stats(self) -> Dict: |
|
|
"""Get cache statistics""" |
|
|
return { |
|
|
'num_predictions': len(self.predictions), |
|
|
'num_datasets': len(self.uploaded_data), |
|
|
'total_memory_mb': self._estimate_memory() |
|
|
} |
|
|
|
|
|
def _estimate_memory(self) -> float: |
|
|
"""Estimate memory usage in MB (rough estimate)""" |
|
|
try: |
|
|
total_bytes = 0 |
|
|
|
|
|
|
|
|
for entry in self.predictions: |
|
|
if 'forecast' in entry['result']: |
|
|
total_bytes += entry['result']['forecast'].memory_usage(deep=True).sum() |
|
|
|
|
|
|
|
|
for entry in self.uploaded_data.values(): |
|
|
total_bytes += entry['data'].memory_usage(deep=True).sum() |
|
|
|
|
|
return total_bytes / (1024 * 1024) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to estimate memory: {str(e)}") |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
|
|
|
cache_manager = CacheManager() |
|
|
|