chronos2-forecasting / services /cache_manager.py
abhaypratapsingh111's picture
Upload folder using huggingface_hub
33ccadb verified
"""
Cache manager for storing predictions and uploaded data
"""
import logging
from typing import Dict, Optional
from datetime import datetime, timedelta
import pandas as pd
from config.constants import MAX_PREDICTION_HISTORY
logger = logging.getLogger(__name__)
class CacheManager:
"""
Manages caching of predictions and data to improve performance
"""
def __init__(self):
self.predictions = [] # List of prediction results
self.uploaded_data = {} # Dict of uploaded datasets
self.max_predictions = MAX_PREDICTION_HISTORY
def store_prediction(
self,
data_hash: str,
horizon: int,
confidence_levels: list,
result: Dict
):
"""
Store a prediction result
Args:
data_hash: Hash of the input data
horizon: Forecast horizon used
confidence_levels: Confidence levels used
result: Prediction result dictionary
"""
prediction_entry = {
'data_hash': data_hash,
'horizon': horizon,
'confidence_levels': confidence_levels,
'result': result,
'timestamp': datetime.now()
}
self.predictions.append(prediction_entry)
# Keep only the most recent predictions
if len(self.predictions) > self.max_predictions:
self.predictions = self.predictions[-self.max_predictions:]
logger.debug(f"Stored prediction, cache size: {len(self.predictions)}")
def get_prediction(
self,
data_hash: str,
horizon: int,
confidence_levels: list
) -> Optional[Dict]:
"""
Retrieve a cached prediction if available
Args:
data_hash: Hash of the input data
horizon: Forecast horizon
confidence_levels: Confidence levels
Returns:
Cached prediction result or None
"""
for entry in reversed(self.predictions):
if (entry['data_hash'] == data_hash and
entry['horizon'] == horizon and
entry['confidence_levels'] == confidence_levels):
logger.info("Cache hit for prediction")
return entry['result']
logger.debug("Cache miss for prediction")
return None
def store_data(self, filename: str, data: pd.DataFrame):
"""
Store uploaded data
Args:
filename: Name of the uploaded file
data: DataFrame containing the data
"""
self.uploaded_data[filename] = {
'data': data,
'timestamp': datetime.now()
}
logger.info(f"Stored data for {filename}")
def get_data(self, filename: str) -> Optional[pd.DataFrame]:
"""
Retrieve uploaded data
Args:
filename: Name of the file
Returns:
DataFrame or None
"""
if filename in self.uploaded_data:
return self.uploaded_data[filename]['data']
return None
def clear_old_data(self, max_age_hours: int = 24):
"""
Clear data older than specified hours
Args:
max_age_hours: Maximum age in hours
"""
cutoff = datetime.now() - timedelta(hours=max_age_hours)
# Clear old uploaded data
old_files = [
filename for filename, entry in self.uploaded_data.items()
if entry['timestamp'] < cutoff
]
for filename in old_files:
del self.uploaded_data[filename]
if old_files:
logger.info(f"Cleared {len(old_files)} old data entries")
def clear_all(self):
"""Clear all cached data"""
self.predictions.clear()
self.uploaded_data.clear()
logger.info("Cleared all cache")
def get_stats(self) -> Dict:
"""Get cache statistics"""
return {
'num_predictions': len(self.predictions),
'num_datasets': len(self.uploaded_data),
'total_memory_mb': self._estimate_memory()
}
def _estimate_memory(self) -> float:
"""Estimate memory usage in MB (rough estimate)"""
try:
total_bytes = 0
# Estimate prediction cache size
for entry in self.predictions:
if 'forecast' in entry['result']:
total_bytes += entry['result']['forecast'].memory_usage(deep=True).sum()
# Estimate data cache size
for entry in self.uploaded_data.values():
total_bytes += entry['data'].memory_usage(deep=True).sum()
return total_bytes / (1024 * 1024)
except Exception as e:
logger.warning(f"Failed to estimate memory: {str(e)}")
return 0.0
# Global cache instance
cache_manager = CacheManager()