""" Forecast evaluation metrics """ import logging from typing import Dict, Any import numpy as np import pandas as pd logger = logging.getLogger(__name__) def calculate_metrics( actual: pd.Series, forecast: pd.Series, include_percentage: bool = True ) -> Dict[str, float]: """ Calculate forecast accuracy metrics Args: actual: Actual values forecast: Forecasted values include_percentage: Include percentage-based metrics Returns: Dictionary of metrics """ try: # Ensure same length min_len = min(len(actual), len(forecast)) actual = actual.iloc[:min_len].values forecast = forecast.iloc[:min_len].values # Remove NaN values mask = ~(np.isnan(actual) | np.isnan(forecast)) actual = actual[mask] forecast = forecast[mask] if len(actual) == 0: return {'error': 'No valid values for metric calculation'} metrics = {} # Mean Absolute Error metrics['MAE'] = float(np.mean(np.abs(actual - forecast))) # Root Mean Squared Error metrics['RMSE'] = float(np.sqrt(np.mean((actual - forecast) ** 2))) # Mean Error (bias) metrics['ME'] = float(np.mean(forecast - actual)) if include_percentage: # Mean Absolute Percentage Error # Avoid division by zero mask_nonzero = actual != 0 if mask_nonzero.any(): mape = np.mean(np.abs((actual[mask_nonzero] - forecast[mask_nonzero]) / actual[mask_nonzero])) * 100 metrics['MAPE'] = float(mape) # Symmetric MAPE denominator = (np.abs(actual) + np.abs(forecast)) / 2 mask_nonzero = denominator != 0 if mask_nonzero.any(): smape = np.mean(np.abs(actual[mask_nonzero] - forecast[mask_nonzero]) / denominator[mask_nonzero]) * 100 metrics['sMAPE'] = float(smape) # R-squared ss_res = np.sum((actual - forecast) ** 2) ss_tot = np.sum((actual - np.mean(actual)) ** 2) if ss_tot != 0: metrics['R2'] = float(1 - (ss_res / ss_tot)) return metrics except Exception as e: logger.error(f"Error calculating metrics: {str(e)}", exc_info=True) return {'error': str(e)} def calculate_coverage( actual: pd.Series, lower_bound: pd.Series, upper_bound: pd.Series ) -> float: """ Calculate coverage of prediction intervals Args: actual: Actual values lower_bound: Lower bound of prediction interval upper_bound: Upper bound of prediction interval Returns: Coverage percentage (0-100) """ try: # Ensure same length min_len = min(len(actual), len(lower_bound), len(upper_bound)) actual = actual.iloc[:min_len].values lower_bound = lower_bound.iloc[:min_len].values upper_bound = upper_bound.iloc[:min_len].values # Count values within bounds within_bounds = (actual >= lower_bound) & (actual <= upper_bound) coverage = np.mean(within_bounds) * 100 return float(coverage) except Exception as e: logger.error(f"Error calculating coverage: {str(e)}", exc_info=True) return 0.0 def calculate_interval_width( lower_bound: pd.Series, upper_bound: pd.Series ) -> Dict[str, float]: """ Calculate statistics about prediction interval width Args: lower_bound: Lower bound of prediction interval upper_bound: Upper bound of prediction interval Returns: Dictionary with width statistics """ try: widths = upper_bound - lower_bound return { 'mean_width': float(widths.mean()), 'median_width': float(widths.median()), 'min_width': float(widths.min()), 'max_width': float(widths.max()), 'std_width': float(widths.std()) } except Exception as e: logger.error(f"Error calculating interval width: {str(e)}", exc_info=True) return {} def format_metric(value: float, metric_name: str) -> str: """ Format metric value for display Args: value: Metric value metric_name: Name of the metric Returns: Formatted string """ if metric_name in ['MAPE', 'sMAPE', 'R2']: return f"{value:.2f}%" elif metric_name in ['MAE', 'RMSE', 'ME']: if abs(value) >= 1000: return f"{value:,.2f}" else: return f"{value:.4f}" else: return f"{value:.4f}" def summarize_forecast_quality( forecast_df: pd.DataFrame, confidence_levels: list ) -> Dict[str, Any]: """ Summarize the quality of a forecast Args: forecast_df: DataFrame with forecast results confidence_levels: List of confidence levels Returns: Summary dictionary """ try: summary = { 'horizon': len(forecast_df), 'forecast_range': { 'min': float(forecast_df['forecast'].min()), 'max': float(forecast_df['forecast'].max()), 'mean': float(forecast_df['forecast'].mean()) } } # Analyze interval widths for each confidence level interval_widths = {} for cl in confidence_levels: lower_col = f'lower_{cl}' upper_col = f'upper_{cl}' if lower_col in forecast_df.columns and upper_col in forecast_df.columns: width = (forecast_df[upper_col] - forecast_df[lower_col]).mean() interval_widths[f'{cl}%'] = float(width) summary['interval_widths'] = interval_widths return summary except Exception as e: logger.error(f"Error summarizing forecast: {str(e)}", exc_info=True) return {}