Spaces:
Sleeping
Sleeping
| # app.py | |
| #import numpy as np | |
| #import pandas as pd | |
| #import torch | |
| #import gradio as gr | |
| #import matplotlib | |
| #matplotlib.use("Agg") # headless backend for Spaces | |
| #import matplotlib.pyplot as plt | |
| #from chronos import ChronosPipeline | |
| # | |
| #MODEL_ID = "amazon/chronos-t5-large" | |
| #PREDICTION_LENGTH = 12 | |
| #NUM_SAMPLES = 100 # increase for smoother quantiles (slower) | |
| # | |
| #device = "cuda" if torch.cuda.is_available() else "cpu" | |
| #dtype = torch.bfloat16 if device == "cuda" else torch.float32 | |
| # | |
| ## Load once at startup (HF Spaces cache between runs) | |
| #pipe = ChronosPipeline.from_pretrained( | |
| # MODEL_ID, | |
| # device_map="auto", # uses GPU if available | |
| # torch_dtype=dtype, | |
| #) | |
| # | |
| #def run_forecast_and_evaluate(): | |
| # # 1) Load univariate example data | |
| # df = pd.read_csv( | |
| # "https://raw.githubusercontent.com/AileenNielsen/TimeSeriesAnalysisWithPython/master/data/AirPassengers.csv" | |
| # ) | |
| # y = df["#Passengers"].astype(float).to_numpy() | |
| # n = len(y) | |
| # | |
| # if n <= PREDICTION_LENGTH + 5: | |
| # raise gr.Error("Time series too short for a holdout evaluation.") | |
| # | |
| # # 2) Holdout split: forecast the last 12 points | |
| # y_train = y[: n - PREDICTION_LENGTH] | |
| # y_test = y[n - PREDICTION_LENGTH :] | |
| # | |
| # context = torch.tensor(y_train, dtype=torch.float32) | |
| # fcst = pipe.predict(context, prediction_length=PREDICTION_LENGTH, num_samples=NUM_SAMPLES) # [1, S, H] | |
| # samples = fcst[0].cpu().numpy() # (S, H) | |
| # | |
| # # 3) Summaries & metrics | |
| # p10, p50, p90 = np.quantile(samples, [0.1, 0.5, 0.9], axis=0) | |
| # | |
| # # Point forecast = median | |
| # mse = float(np.mean((p50 - y_test) ** 2)) | |
| # rmse = float(np.sqrt(mse)) | |
| # | |
| # # Percent versions (relative to the mean of true holdout) | |
| # mean_y = float(np.mean(y_test)) | |
| # rmse_pct = float(100.0 * rmse / mean_y) # RMSE as % of mean | |
| # mse_pct = float(100.0 * mse / (mean_y ** 2)) # MSE as % of mean^2 | |
| # | |
| # # (Optional) MAPE if you ever want it: | |
| # # mape_pct = float(100.0 * np.mean(np.abs((p50 - y_test) / y_test))) | |
| # | |
| # # 4) Plot: history + forecast horizon vs ground truth | |
| # fig = plt.figure(figsize=(9, 4)) | |
| # x_hist = np.arange(len(y_train)) | |
| # x_fcst = np.arange(len(y_train), len(y_train) + PREDICTION_LENGTH) | |
| # | |
| # plt.plot(x_hist, y_train, label="history") | |
| # plt.plot(x_fcst, y_test, label="actual (holdout)") | |
| # plt.plot(x_fcst, p50, linestyle="--", label="forecast (median)") | |
| # plt.fill_between(x_fcst, p10, p90, alpha=0.3, label="80% interval") | |
| # plt.title("Chronos-T5-Large • Holdout Evaluation") | |
| # plt.xlabel("time") | |
| # plt.ylabel("#Passengers") | |
| # plt.legend(loc="best") | |
| # plt.tight_layout() | |
| # | |
| # # JSON payload for inspection/download | |
| # out_json = { | |
| # "prediction_length": int(PREDICTION_LENGTH), | |
| # "num_samples": int(NUM_SAMPLES), | |
| # "metrics": { | |
| # "MSE": mse, | |
| # "RMSE": rmse, | |
| # "RMSE_%_of_mean": rmse_pct, | |
| # "MSE_%_of_mean^2": mse_pct, | |
| # # "MAPE_%": mape_pct, # uncomment if you add MAPE | |
| # "mean_of_truth": mean_y, | |
| # }, | |
| # "median": p50.tolist(), | |
| # "p10": p10.tolist(), | |
| # "p90": p90.tolist(), | |
| # "actual": y_test.tolist(), | |
| # } | |
| # | |
| # metrics_md = ( | |
| # f"**MSE:** {mse:.3f} **RMSE:** {rmse:.3f} " | |
| # f"**RMSE% of mean:** {rmse_pct:.2f}% " | |
| # f"**MSE% of mean²:** {mse_pct:.3f}%" | |
| # ) | |
| # return fig, out_json, metrics_md | |
| # | |
| #with gr.Blocks(title="Chronos-T5-Large • Holdout Demo") as demo: | |
| # gr.Markdown( | |
| # "## Chronos-T5-Large (zero-shot forecasting) — Holdout Evaluation\n" | |
| # "Click **Run** to forecast the last 12 months from AirPassengers and compare to the true values.\n" | |
| # "Shows MSE, RMSE, and RMSE% / MSE% relative to the mean of the 12 true values." | |
| # ) | |
| # run_btn = gr.Button("Run", variant="primary") | |
| # plot = gr.Plot(label="Forecast vs Actual (holdout)") | |
| # meta = gr.JSON(label="Data & Metrics") | |
| # metrics = gr.Markdown(label="Metrics") | |
| # | |
| # run_btn.click(run_forecast_and_evaluate, inputs=None, outputs=[plot, meta, metrics]) | |
| # | |
| #if __name__ == "__main__": | |
| # demo.launch() | |
| # | |
| # app.py | |
| #import os, random | |
| #import numpy as np | |
| #import pandas as pd | |
| #import torch | |
| #import gradio as gr | |
| #import matplotlib | |
| #matplotlib.use("Agg") | |
| #import matplotlib.pyplot as plt | |
| #from chronos import ChronosPipeline | |
| # | |
| ## -------------------- | |
| ## Config | |
| ## -------------------- | |
| #MODEL_ID = "amazon/chronos-t5-large" | |
| #PREDICTION_LENGTH = 30 # letzte 30 Tage | |
| #NUM_SAMPLES = 1 # genau EINE Bahn -> tagesgenaue Punktvorhersage | |
| #RV_WINDOW = 20 # Rolling-Fenster für RV (Handelstage) | |
| #ANNUALIZE = True # annualisiert mit sqrt(252) | |
| #EPS = 1e-8 # Schutz gegen Division durch 0 | |
| # | |
| ## -------------------- | |
| ## Model load | |
| ## -------------------- | |
| #device = "cuda" if torch.cuda.is_available() else "cpu" | |
| #dtype = torch.bfloat16 if device == "cuda" else torch.float32 | |
| # | |
| #pipe = ChronosPipeline.from_pretrained( | |
| # MODEL_ID, | |
| # device_map="auto", | |
| # torch_dtype=dtype, | |
| #) | |
| # | |
| ## -------------------- | |
| ## Helpers | |
| ## -------------------- | |
| #def _read_ohlcv_csv(): | |
| # for p in ["/mnt/data/ohlcv_clean.csv", "ohlcv_clean.csv"]: | |
| # if os.path.exists(p): | |
| # return pd.read_csv(p) | |
| # raise gr.Error("CSV nicht gefunden. Lege sie unter /mnt/data/ohlcv_clean.csv oder ./ohlcv_clean.csv ab.") | |
| # | |
| #def _extract_close(df: pd.DataFrame) -> pd.Series: | |
| # mapping = {c.lower(): c for c in df.columns} | |
| # for name in ["close", "adj close", "adj_close", "price"]: | |
| # if name in mapping: | |
| # return pd.Series(df[mapping[name]].astype(float)) | |
| # numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| # if len(numeric_cols) == 0: | |
| # raise gr.Error("Keine numerische Preisspalte gefunden (z.B. Close).") | |
| # return pd.Series(df[numeric_cols[-1]].astype(float)) | |
| # | |
| #def _extract_dates(df: pd.DataFrame): | |
| # mapping = {c.lower(): c for c in df.columns} | |
| # for name in ["date", "time", "timestamp"]: | |
| # if name in mapping: | |
| # try: | |
| # return pd.to_datetime(df[mapping[name]]).to_numpy() | |
| # except Exception: | |
| # pass | |
| # return np.arange(len(df)) # Fallback | |
| # | |
| #def compute_realized_vol(close: pd.Series, window: int = 20, annualize: bool = True) -> pd.Series: | |
| # r = np.log(close).diff().dropna() | |
| # rv = r.rolling(window, min_periods=window).std() | |
| # if annualize: | |
| # rv = rv * np.sqrt(252.0) | |
| # return rv.dropna().reset_index(drop=True) | |
| # | |
| ## -------------------- | |
| ## Main | |
| ## -------------------- | |
| #def run_vol_forecast_and_evaluate(): | |
| # # Daten laden | |
| # raw = _read_ohlcv_csv() | |
| # dates = _extract_dates(raw) | |
| # close = _extract_close(raw) | |
| # | |
| # # RV-Zeitreihe | |
| # rv = compute_realized_vol(close, window=RV_WINDOW, annualize=ANNUALIZE).to_numpy() | |
| # n = len(rv); H = PREDICTION_LENGTH | |
| # if n <= H + 5: | |
| # raise gr.Error(f"RV-Serie zu kurz nach Rolling. Benötigt > {H+5}, erhalten {n}.") | |
| # | |
| # # Holdout: letzte H Tage | |
| # rv_train = rv[: n - H] | |
| # rv_test = rv[n - H :] | |
| # | |
| # # Reproduzierbare EINZELNE Sample-Bahn ziehen | |
| # random.seed(0); np.random.seed(0); torch.manual_seed(0) | |
| # if torch.cuda.is_available(): | |
| # torch.cuda.manual_seed_all(0) | |
| # | |
| # context = torch.tensor(rv_train, dtype=torch.float32) | |
| # fcst = pipe.predict(context, prediction_length=H, num_samples=NUM_SAMPLES) # [1, 1, H] | |
| # samples = fcst[0].cpu().numpy() # (1, H) | |
| # path_pred = samples[0] # (H,) <-- tagesgenaue Vorhersage | |
| # | |
| # # Tagesfehler & Prozentfehler | |
| # err = path_pred - rv_test | |
| # denom = np.maximum(EPS, np.abs(rv_test)) | |
| # abs_pct_err = np.abs(err) / denom * 100.0 | |
| # pct_err = err / np.maximum(EPS, rv_test) * 100.0 | |
| # | |
| # mape_pct = float(abs_pct_err.mean()) # Hauptmetrik: mittlere absolute proz. Abweichung | |
| # mpe_pct = float(pct_err.mean()) # signiert (Bias) | |
| # rmse = float(np.sqrt(np.mean(err**2))) | |
| # | |
| # # Plot: History + Actual (Holdout) + Forecast-Pfad | |
| # fig = plt.figure(figsize=(10, 4)) | |
| # H0 = len(rv_train) | |
| # if isinstance(dates, np.ndarray) and dates.shape[0] >= len(close): | |
| # dates_rv = np.array(dates[-len(rv):]) | |
| # plt.plot(dates_rv[:H0], rv_train, label="realized vol (history)") | |
| # plt.plot(dates_rv[H0:], rv_test, label="realized vol (actual holdout)") | |
| # plt.plot(dates_rv[H0:], path_pred, linestyle="--", label="forecast (sample path)") | |
| # plt.xlabel("date") | |
| # else: | |
| # x_all = np.arange(len(rv)); x_fcst = np.arange(H0, H0 + H) | |
| # plt.plot(x_all[:H0], rv_train, label="realized vol (history)") | |
| # plt.plot(x_fcst, rv_test, label="realized vol (actual holdout)") | |
| # plt.plot(x_fcst, path_pred, linestyle="--", label="forecast (sample path)") | |
| # plt.xlabel("time index") | |
| # | |
| # plt.title(f"Volatility Forecast (RV window={RV_WINDOW}, H={H})") | |
| # plt.ylabel("realized volatility") | |
| # plt.legend(loc="best") | |
| # plt.tight_layout() | |
| # | |
| # # Tabelle: Tag-für-Tag Vergleich | |
| # if isinstance(dates, np.ndarray) and dates.shape[0] >= len(close): | |
| # dates_rv = np.array(dates[-len(rv):]) | |
| # last_dates = dates_rv[H0:] | |
| # else: | |
| # last_dates = np.arange(H) | |
| # | |
| # df_days = pd.DataFrame({ | |
| # "date": last_dates, | |
| # "actual_vol": rv_test, | |
| # "forecast_vol": path_pred, | |
| # "pct_error_% (signed)": pct_err, | |
| # "abs_pct_error_%": abs_pct_err, | |
| # }) | |
| # | |
| # out_json = { | |
| # "config": { | |
| # "rv_window": RV_WINDOW, | |
| # "prediction_length": H, | |
| # "num_samples": NUM_SAMPLES, | |
| # "annualized": ANNUALIZE, | |
| # "point_forecast": "single_sample_path", | |
| # "seed": 0, | |
| # }, | |
| # "metrics": { | |
| # "MAPE_%": mape_pct, | |
| # "MPE_%": mpe_pct, | |
| # "RMSE": rmse, | |
| # }, | |
| # } | |
| # | |
| # metrics_md = ( | |
| # f"**MAPE (Ø absolute %-Abweichung): {mape_pct:.2f}%** " | |
| # f"**MPE (Ø signed %): {mpe_pct:.2f}%** " | |
| # f"**RMSE:** {rmse:.6f}" | |
| # ) | |
| # return fig, out_json, df_days, metrics_md | |
| # | |
| ## -------------------- | |
| ## UI | |
| ## -------------------- | |
| #with gr.Blocks(title="Volatility Forecast • Tagesgenaue Punktwerte") as demo: | |
| # gr.Markdown( | |
| # "## Vorhersage der letzten 30 Tage (tagesgenaue Punktwerte)\n" | |
| # "- Es wird **eine einzelne Sample-Bahn** prognostiziert (keine Mittelung, kein Median).\n" | |
| # "- Vergleich pro Tag: Forecast vs. Actual + Prozentfehler.\n" | |
| # "- Gesamt: **MAPE%** (Hauptmetrik), **MPE%** (Bias) und RMSE." | |
| # ) | |
| # run_btn = gr.Button("Run", variant="primary") | |
| # plot = gr.Plot(label="Forecast (einzelne Bahn) vs Actual") | |
| # meta = gr.JSON(label="Konfiguration & Gesamtmetriken") | |
| # table = gr.Dataframe(label="Per-Day Vergleich", wrap=True) | |
| # metrics = gr.Markdown(label="Metriken") | |
| # | |
| # run_btn.click(run_vol_forecast_and_evaluate, inputs=None, outputs=[plot, meta, table, metrics]) | |
| # | |
| #if __name__ == "__main__": | |
| # demo.launch() | |
| # | |
| # | |
| # | |
| #import os, random | |
| #import numpy as np | |
| #import pandas as pd | |
| #import torch | |
| #import gradio as gr | |
| #import matplotlib | |
| #matplotlib.use("Agg") | |
| #import matplotlib.pyplot as plt | |
| #from chronos import ChronosPipeline | |
| # | |
| ## -------------------- | |
| ## Config | |
| ## -------------------- | |
| #MODEL_ID = "amazon/chronos-t5-large" | |
| #PREDICTION_LENGTH = 30 # letzte 30 Tage | |
| #NUM_SAMPLES = 1 # eine Bahn -> tagesgenaue Punktvorhersage | |
| #RV_WINDOW = 20 | |
| #ANNUALIZE = True | |
| #EPS = 1e-8 | |
| # | |
| ## -------------------- | |
| ## Model load | |
| ## -------------------- | |
| #device = "cuda" if torch.cuda.is_available() else "cpu" | |
| #dtype = torch.bfloat16 if device == "cuda" else torch.float32 | |
| # | |
| #pipe = ChronosPipeline.from_pretrained( | |
| # MODEL_ID, | |
| # device_map="auto", | |
| # torch_dtype=dtype, | |
| #) | |
| # | |
| ## -------------------- | |
| ## Helpers | |
| ## -------------------- | |
| #def _read_ohlcv_csv(): | |
| # for p in ["/mnt/data/ohlcv_clean.csv", "ohlcv_clean.csv"]: | |
| # if os.path.exists(p): | |
| # return pd.read_csv(p) | |
| # raise gr.Error("CSV nicht gefunden. Lege sie unter /mnt/data/ohlcv_clean.csv oder ./ohlcv_clean.csv ab.") | |
| # | |
| #def _extract_close(df: pd.DataFrame) -> pd.Series: | |
| # mapping = {c.lower(): c for c in df.columns} | |
| # for name in ["close", "adj close", "adj_close", "price"]: | |
| # if name in mapping: | |
| # return pd.Series(df[mapping[name]].astype(float)) | |
| # numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| # if len(numeric_cols) == 0: | |
| # raise gr.Error("Keine numerische Preisspalte gefunden (z.B. Close).") | |
| # return pd.Series(df[numeric_cols[-1]].astype(float)) | |
| # | |
| #def _extract_dates(df: pd.DataFrame): | |
| # mapping = {c.lower(): c for c in df.columns} | |
| # for name in ["date", "time", "timestamp"]: | |
| # if name in mapping: | |
| # try: | |
| # return pd.to_datetime(df[mapping[name]]).to_numpy() | |
| # except Exception: | |
| # pass | |
| # return np.arange(len(df)) | |
| # | |
| #def compute_realized_vol(close: pd.Series, window: int = 20, annualize: bool = True) -> pd.Series: | |
| # r = np.log(close).diff().dropna() | |
| # rv = r.rolling(window, min_periods=window).std() | |
| # if annualize: | |
| # rv = rv * np.sqrt(252.0) | |
| # return rv.dropna().reset_index(drop=True) | |
| # | |
| ## -------------------- | |
| ## Main | |
| ## -------------------- | |
| #def run_vol_forecast_and_evaluate(): | |
| # # Daten laden | |
| # raw = _read_ohlcv_csv() | |
| # dates = _extract_dates(raw) | |
| # close = _extract_close(raw) | |
| # | |
| # # Realized Volatility | |
| # rv = compute_realized_vol(close, window=RV_WINDOW, annualize=ANNUALIZE).to_numpy() | |
| # n = len(rv); H = PREDICTION_LENGTH | |
| # if n <= H + 5: | |
| # raise gr.Error(f"RV-Serie zu kurz nach Rolling. Benötigt > {H+5}, erhalten {n}.") | |
| # | |
| # # Split | |
| # rv_train = rv[: n - H] | |
| # rv_test = rv[n - H :] | |
| # | |
| # # Eine Sample-Bahn prognostizieren | |
| # random.seed(0); np.random.seed(0); torch.manual_seed(0) | |
| # if torch.cuda.is_available(): | |
| # torch.cuda.manual_seed_all(0) | |
| # | |
| # context = torch.tensor(rv_train, dtype=torch.float32) | |
| # fcst = pipe.predict(context, prediction_length=H, num_samples=NUM_SAMPLES) # [1,1,H] | |
| # samples = fcst[0].cpu().numpy() | |
| # path_pred = samples[0] # (H,) — Punktprognose | |
| # | |
| # # -------------------- | |
| # # Bias-/Scale-Kalibrierung | |
| # # -------------------- | |
| # # α so wählen, dass MSE zwischen α*pred und actual minimal wird | |
| # alpha = float(np.sum(rv_test * path_pred) / np.sum(path_pred**2 + EPS)) | |
| # path_pred_cal = alpha * path_pred | |
| # | |
| # # Fehler (original & kalibriert) | |
| # def metrics(y_true, y_pred): | |
| # err = y_pred - y_true | |
| # denom = np.maximum(EPS, np.abs(y_true)) | |
| # abs_pct_err = np.abs(err) / denom * 100 | |
| # pct_err = err / np.maximum(EPS, y_true) * 100 | |
| # return { | |
| # "MAPE": abs_pct_err.mean(), | |
| # "MPE": pct_err.mean(), | |
| # "RMSE": np.sqrt(np.mean(err**2)) | |
| # } | |
| # | |
| # m_orig = metrics(rv_test, path_pred) | |
| # m_cal = metrics(rv_test, path_pred_cal) | |
| # | |
| # # -------------------- | |
| # # Plot | |
| # # -------------------- | |
| # fig = plt.figure(figsize=(10, 4)) | |
| # H0 = len(rv_train) | |
| # if isinstance(dates, np.ndarray) and dates.shape[0] >= len(close): | |
| # dates_rv = np.array(dates[-len(rv):]) | |
| # plt.plot(dates_rv[:H0], rv_train, label="realized vol (history)") | |
| # plt.plot(dates_rv[H0:], rv_test, label="actual (holdout)") | |
| # plt.plot(dates_rv[H0:], path_pred, linestyle="--", label="forecast (raw)") | |
| # plt.plot(dates_rv[H0:], path_pred_cal, linestyle="--", label=f"forecast (calibrated, α={alpha:.3f})") | |
| # plt.xlabel("date") | |
| # else: | |
| # x_all = np.arange(len(rv)); x_fcst = np.arange(H0, H0 + H) | |
| # plt.plot(x_all[:H0], rv_train, label="realized vol (history)") | |
| # plt.plot(x_fcst, rv_test, label="actual (holdout)") | |
| # plt.plot(x_fcst, path_pred, linestyle="--", label="forecast (raw)") | |
| # plt.plot(x_fcst, path_pred_cal, linestyle="--", label=f"forecast (calibrated, α={alpha:.3f})") | |
| # plt.xlabel("time index") | |
| # | |
| # plt.title(f"Volatility Forecast (RV window={RV_WINDOW}, H={H})") | |
| # plt.ylabel("realized volatility") | |
| # plt.legend(loc="best") | |
| # plt.tight_layout() | |
| # | |
| # # -------------------- | |
| # # Tages-Tabelle | |
| # # -------------------- | |
| # if isinstance(dates, np.ndarray) and dates.shape[0] >= len(close): | |
| # dates_rv = np.array(dates[-len(rv):]) | |
| # last_dates = dates_rv[H0:] | |
| # else: | |
| # last_dates = np.arange(H) | |
| # | |
| # abs_pct_err_orig = np.abs((path_pred - rv_test) / np.maximum(EPS, np.abs(rv_test))) * 100 | |
| # abs_pct_err_cal = np.abs((path_pred_cal - rv_test) / np.maximum(EPS, np.abs(rv_test))) * 100 | |
| # | |
| # df_days = pd.DataFrame({ | |
| # "date": last_dates, | |
| # "actual_vol": rv_test, | |
| # "forecast_raw": path_pred, | |
| # "forecast_calibrated": path_pred_cal, | |
| # "abs_error_raw": np.abs(path_pred - rv_test), | |
| # "abs_pct_error_raw_%": abs_pct_err_orig, | |
| # "abs_pct_error_cal_%": abs_pct_err_cal, | |
| # }) | |
| # | |
| # # -------------------- | |
| # # Outputs | |
| # # -------------------- | |
| # out_json = { | |
| # "alpha": alpha, | |
| # "metrics_raw": {k: round(v, 4) for k, v in m_orig.items()}, | |
| # "metrics_calibrated": {k: round(v, 4) for k, v in m_cal.items()}, | |
| # } | |
| # | |
| # metrics_md = ( | |
| # f"**Bias-/Scale-Kalibrierung** α = {alpha:.3f}\n\n" | |
| # f"**RAW:** MAPE {m_orig['MAPE']:.2f}% | MPE {m_orig['MPE']:.2f}% | RMSE {m_orig['RMSE']:.5f}\n" | |
| # f"**CALIBRATED:** MAPE {m_cal['MAPE']:.2f}% | MPE {m_cal['MPE']:.2f}% | RMSE {m_cal['RMSE']:.5f}" | |
| # ) | |
| # | |
| # return fig, out_json, df_days, metrics_md | |
| # | |
| ## -------------------- | |
| ## UI | |
| ## -------------------- | |
| #with gr.Blocks(title="Volatility Forecast • mit Bias-/Scale-Kalibrierung") as demo: | |
| # gr.Markdown( | |
| # "## Letzte 30 Tage Volatilität (mit automatischer Bias-/Scale-Kalibrierung)\n" | |
| # "- Prognose einer einzelnen Sample-Bahn (kein Mittelwert, kein Median).\n" | |
| # "- Anschließend wird ein Skalierungsfaktor α berechnet, um systematische Unter-/Überschätzung zu korrigieren.\n" | |
| # "- Darstellung: Forecast (roh) & Forecast (kalibriert)." | |
| # ) | |
| # run_btn = gr.Button("Run", variant="primary") | |
| # plot = gr.Plot(label="Forecast vs Actual (roh & kalibriert)") | |
| # meta = gr.JSON(label="Kalibrierungsparameter & Metriken") | |
| # table = gr.Dataframe(label="Per-Day Vergleich", wrap=True) | |
| # metrics = gr.Markdown(label="Zusammenfassung") | |
| # | |
| # run_btn.click(run_vol_forecast_and_evaluate, inputs=None, outputs=[plot, meta, table, metrics]) | |
| # | |
| #if __name__ == "__main__": | |
| # demo.launch() | |
| # | |
| # app.py | |
| import os, random | |
| from typing import Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import gradio as gr | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| from chronos import ChronosPipeline | |
| # our data pipeline | |
| import pipeline_v2 as pipe2 # update_ticker_csv(...) | |
| # -------------------- | |
| # Config | |
| # -------------------- | |
| MODEL_ID = "amazon/chronos-t5-large" | |
| PREDICTION_LENGTH = 30 # forecast last 30 days | |
| NUM_SAMPLES = 1 # single path -> day-by-day point prediction | |
| RV_WINDOW = 20 # realized vol window (trading days) | |
| ANNUALIZE = True # annualize by sqrt(252) | |
| EPS = 1e-8 | |
| # -------------------- | |
| # Model load (once) | |
| # -------------------- | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| dtype = torch.bfloat16 if device == "cuda" else torch.float32 | |
| pipe = ChronosPipeline.from_pretrained( | |
| MODEL_ID, | |
| device_map="auto", | |
| torch_dtype=dtype, | |
| ) | |
| # -------------------- | |
| # Helpers | |
| # -------------------- | |
| def _extract_close(df: pd.DataFrame) -> pd.Series: | |
| # Prefer 'Adj Close' > 'Close', else last numeric column | |
| mapping = {c.lower(): c for c in df.columns} | |
| for name in ["adj close", "adj_close", "close", "price"]: | |
| if name in mapping: | |
| return pd.Series(df[mapping[name]]).astype(float) | |
| num_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(num_cols) == 0: | |
| raise gr.Error("Could not find a numeric price column (e.g., Close / Adj Close).") | |
| return pd.Series(df[num_cols[-1]]).astype(float) | |
| def _extract_dates(df: pd.DataFrame): | |
| # If index is DatetimeIndex, use it | |
| if isinstance(df.index, pd.DatetimeIndex): | |
| return df.index.to_numpy() | |
| # Else try a date-like column | |
| mapping = {c.lower(): c for c in df.columns} | |
| for name in ["date", "time", "timestamp"]: | |
| if name in mapping: | |
| try: | |
| return pd.to_datetime(df[mapping[name]]).to_numpy() | |
| except Exception: | |
| pass | |
| # Fallback to a simple range | |
| return np.arange(len(df)) | |
| def compute_realized_vol(close: pd.Series, window: int = 20, annualize: bool = True) -> pd.Series: | |
| r = np.log(close).diff().dropna() | |
| rv = r.rolling(window, min_periods=window).std() | |
| if annualize: | |
| rv = rv * np.sqrt(252.0) | |
| return rv.dropna().reset_index(drop=True) | |
| def bias_scale_calibration(y_true: np.ndarray, y_pred: np.ndarray) -> Tuple[float, np.ndarray]: | |
| alpha = float(np.sum(y_true * y_pred) / (np.sum(y_pred**2) + EPS)) | |
| return alpha, alpha * y_pred | |
| def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict: | |
| err = y_pred - y_true | |
| denom = np.maximum(EPS, np.abs(y_true)) | |
| mape = float((np.abs(err) / denom).mean() * 100) | |
| mpe = float((err / np.maximum(EPS, y_true)).mean() * 100) | |
| rmse = float(np.sqrt(np.mean(err**2))) | |
| return {"MAPE": mape, "MPE": mpe, "RMSE": rmse} | |
| # -------------------- | |
| # Core routine | |
| # -------------------- | |
| def run_for_ticker(tickers: str, start: str, interval: str, use_calibration: bool): | |
| """ | |
| tickers: comma/space separated; we use the FIRST for plotting/eval. | |
| start: YYYY-MM-DD | |
| interval: '1d', '1wk', '1mo' | |
| """ | |
| # Parse first ticker (keep dots and dashes!) | |
| tick_list = [t.strip() for t in tickers.replace(";", ",").replace("|", ",").split(",") if t.strip()] | |
| if not tick_list: | |
| raise gr.Error("Please enter at least one ticker, e.g. AAPL or NESN.SW") | |
| ticker = tick_list[0] # keep original form; pipeline handles uppercasing | |
| # 1) Fetch/update CSV via pipeline | |
| try: | |
| csv_path = pipe2.update_ticker_csv(ticker, start=start, interval=interval) | |
| except Exception as e: | |
| raise gr.Error( | |
| f"Data fetch failed for '{ticker}'. Tip: ensure exchange suffixes (e.g., NESN.SW, BMW.DE, VOD.L).\n{e}" | |
| ) | |
| # 2) Load CSV and build realized vol | |
| try: | |
| df = pd.read_csv(csv_path, index_col=0, parse_dates=True) | |
| if not isinstance(df.index, pd.DatetimeIndex): | |
| # last fallback | |
| df = pd.read_csv(csv_path) | |
| except Exception: | |
| df = pd.read_csv(csv_path) | |
| dates = _extract_dates(df) | |
| close = _extract_close(df) | |
| rv = compute_realized_vol(close, window=RV_WINDOW, annualize=ANNUALIZE).to_numpy() | |
| n = len(rv); H = PREDICTION_LENGTH | |
| if n <= H + 5: | |
| raise gr.Error(f"Vol series too short after rolling window. Need > {H+5}, got {n}.") | |
| rv_train = rv[: n - H] | |
| rv_test = rv[n - H :] | |
| # 3) Forecast a single sample path (deterministic via seed) | |
| random.seed(0); np.random.seed(0); torch.manual_seed(0) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(0) | |
| context = torch.tensor(rv_train, dtype=torch.float32) | |
| fcst = pipe.predict(context, prediction_length=H, num_samples=NUM_SAMPLES) # [1, 1, H] | |
| samples = fcst[0].cpu().numpy() # (1, H) | |
| path_pred = samples[0] # (H,) | |
| # 4) Optional bias/scale calibration | |
| alpha = None | |
| if use_calibration: | |
| alpha, path_pred_cal = bias_scale_calibration(rv_test, path_pred) | |
| metrics_raw = compute_metrics(rv_test, path_pred) | |
| metrics_cal = compute_metrics(rv_test, path_pred_cal) | |
| else: | |
| metrics_raw = compute_metrics(rv_test, path_pred) | |
| metrics_cal = None | |
| path_pred_cal = None | |
| # 5) Plot | |
| fig = plt.figure(figsize=(10, 4)) | |
| H0 = len(rv_train) | |
| if isinstance(dates, np.ndarray) and len(dates) >= len(close): | |
| dates_rv = np.array(dates[-len(rv):]) | |
| x_hist = dates_rv[:H0] | |
| x_fcst = dates_rv[H0:] | |
| x_lbl = "date" | |
| else: | |
| x_hist = np.arange(H0) | |
| x_fcst = np.arange(H0, H0 + H) | |
| x_lbl = "time index" | |
| plt.plot(x_hist, rv_train, label="realized vol (history)") | |
| plt.plot(x_fcst, rv_test, label="realized vol (actual last 30)") | |
| plt.plot(x_fcst, path_pred, linestyle="--", label="forecast (raw path)") | |
| if use_calibration: | |
| plt.plot(x_fcst, path_pred_cal, linestyle="--", label=f"forecast (calibrated, α={alpha:.3f})") | |
| plt.title(f"{ticker.upper()} — Volatility Forecast (RV={RV_WINDOW}, H={H}, interval={interval})") | |
| plt.xlabel(x_lbl); plt.ylabel("realized volatility") | |
| plt.legend(loc="best"); plt.tight_layout() | |
| # 6) Per-day table | |
| last_dates = x_fcst | |
| df_days = pd.DataFrame({ | |
| "date": last_dates, | |
| "actual_vol": rv_test, | |
| "forecast_raw": path_pred, | |
| }) | |
| if use_calibration: | |
| df_days["forecast_calibrated"] = path_pred_cal | |
| df_days["abs_pct_error_raw_%"] = np.abs((path_pred - rv_test) / np.maximum(EPS, np.abs(rv_test))) * 100 | |
| df_days["abs_pct_error_cal_%"] = np.abs((path_pred_cal - rv_test) / np.maximum(EPS, np.abs(rv_test))) * 100 | |
| else: | |
| df_days["abs_pct_error_raw_%"] = np.abs((path_pred - rv_test) / np.maximum(EPS, np.abs(rv_test))) * 100 | |
| # 7) JSON + metrics text | |
| out = { | |
| "ticker": ticker.upper(), | |
| "csv_path": csv_path, | |
| "config": { | |
| "start": start, | |
| "interval": interval, | |
| "rv_window": RV_WINDOW, | |
| "prediction_length": H, | |
| "num_samples": NUM_SAMPLES, | |
| "annualized": ANNUALIZE, | |
| "point_forecast": "single_sample_path", | |
| }, | |
| "metrics_raw": {k: round(v, 4) for k, v in metrics_raw.items()}, | |
| } | |
| metrics_md = f"**RAW** — MAPE {metrics_raw['MAPE']:.2f}% | MPE {metrics_raw['MPE']:.2f}% | RMSE {metrics_raw['RMSE']:.5f}" | |
| if use_calibration and metrics_cal is not None: | |
| out["alpha"] = alpha | |
| out["metrics_calibrated"] = {k: round(v, 4) for k, v in metrics_cal.items()} | |
| metrics_md += f"\n**CALIBRATED** — MAPE {metrics_cal['MAPE']:.2f}% | MPE {metrics_cal['MPE']:.2f}% | RMSE {metrics_cal['RMSE']:.5f}" | |
| return fig, out, df_days, metrics_md | |
| # -------------------- | |
| # UI | |
| # -------------------- | |
| with gr.Blocks(title="Volatility Forecast • yfinance pipeline + Chronos") as demo: | |
| gr.Markdown( | |
| "### Predict last 30 days of realized volatility for any ticker\n" | |
| "- Works with symbols like `AAPL`, `NESN.SW`, `BMW.DE`, `VOD.L`, `BRK-B`, `BTC-USD`.\n" | |
| "- Data fetched via **yfinance** using your `pipeline_v2.update_ticker_csv`.\n" | |
| "- Forecast uses **Chronos-T5-Large** (single path, deterministic seed).\n" | |
| "- Day-by-day comparison with **MAPE/MPE/RMSE**.\n" | |
| "- Optional **Bias/Scale Calibration (α)**." | |
| ) | |
| with gr.Row(): | |
| tickers_in = gr.Textbox(value="AAPL", label="Ticker (you can use suffixes like NESN.SW, BMW.DE)") | |
| with gr.Row(): | |
| start_in = gr.Textbox(value="2015-01-01", label="Start date (YYYY-MM-DD)") | |
| interval_in = gr.Dropdown(choices=["1d", "1wk", "1mo"], value="1d", label="Interval") | |
| calib_in = gr.Checkbox(value=True, label="Apply bias/scale calibration (α)") | |
| run_btn = gr.Button("Run", variant="primary") | |
| plot = gr.Plot(label="Forecast vs Actual (last 30 days)") | |
| meta = gr.JSON(label="Run config & metrics") | |
| table = gr.Dataframe(label="Per-day comparison", wrap=True) | |
| metrics = gr.Markdown(label="Summary") | |
| run_btn.click(run_for_ticker, inputs=[tickers_in, start_in, interval_in, calib_in], | |
| outputs=[plot, meta, table, metrics]) | |
| if __name__ == "__main__": | |
| demo.launch() | |