ALM_LLM / app.py
AshenH's picture
Update app.py
3ef1e5c verified
import os
import sys
import traceback
from pathlib import Path
from typing import List, Tuple, Any
import duckdb
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use("Agg") # headless for Spaces
import matplotlib.pyplot as plt
import gradio as gr
# =========================
# Basic configuration
# =========================
APP_TITLE = "ALCO Liquidity & Interest-Rate Risk Dashboard"
TABLE_FQN = "my_db.main.masterdataset_v" # source table
VIEW_FQN = "my_db.main.positions_v" # normalized view created by this app
PRODUCT_ASSETS = [
"loan", "overdraft", "advances", "bills", "bill",
"tbond", "t-bond", "tbill", "t-bill", "repo_asset", "assets"
]
PRODUCT_SOF = [
"fd", "term_deposit", "td", "savings", "current",
"call", "repo_liab"
]
# =========================
# Helpers
# =========================
def connect_md() -> duckdb.DuckDBPyConnection:
token = os.environ.get("MOTHERDUCK_TOKEN", "")
if not token:
# In a real environment, this token should be securely managed
raise RuntimeError("MOTHERDUCK_TOKEN is not set. Add it in Space β†’ Settings β†’ Secrets.")
return duckdb.connect(f"md:?motherduck_token={token}")
def discover_columns(conn: duckdb.DuckDBPyConnection, table_fqn: str) -> List[str]:
# Try DESCRIBE first (fast), fall back to information_schema
try:
df = conn.execute(f"DESCRIBE {table_fqn};").fetchdf()
name_col = "column_name" if "column_name" in df.columns else df.columns[0]
return [str(c).lower() for c in df[name_col].tolist()]
except Exception:
df = conn.execute(
f"""
SELECT lower(column_name) AS col
FROM information_schema.columns
WHERE table_catalog = split_part('{table_fqn}', '.', 1)
AND table_schema = split_part('{table_fqn}', '.', 2)
AND table_name = split_part('{table_fqn}', '.', 3)
"""
).fetchdf()
return df["col"].tolist()
def build_view_sql(existing_cols: List[str]) -> str:
wanted = [
"as_of_date", "product", "months", "segments",
"currency", "Portfolio_value", "Interest_rate",
"days_to_maturity"
]
sel = []
for c in wanted:
if c.lower() in existing_cols:
sel.append(c)
else:
# Cast nulls for consistency, assuming most positions have these columns
if c in ("Portfolio_value", "Interest_rate", "days_to_maturity", "months"):
sel.append(f"CAST(NULL AS DOUBLE) AS {c}")
else:
sel.append(f"CAST(NULL AS VARCHAR) AS {c}")
sof_list = ", ".join([f"'{p}'" for p in PRODUCT_SOF])
asset_list = ", ".join([f"'{p}'" for p in PRODUCT_ASSETS])
bucket_case = (
f"CASE "
f"WHEN lower(product) IN ({sof_list}) THEN 'SoF' "
f"WHEN lower(product) IN ({asset_list}) THEN 'Assets' "
f"ELSE 'Unknown' END AS bucket"
)
select_sql = ",\n ".join(sel + [bucket_case])
return f"""
CREATE OR REPLACE VIEW {VIEW_FQN} AS
SELECT
{select_sql}
FROM {TABLE_FQN};
"""
def ensure_view(conn: duckdb.DuckDBPyConnection, cols: List[str]) -> None:
required = {"product", "portfolio_value", "days_to_maturity"}
if not required.issubset(set(cols)):
raise RuntimeError(
f"Source table {TABLE_FQN} must contain columns {sorted(required)}; found {sorted(cols)}"
)
conn.execute(build_view_sql(cols))
def safe_num(x) -> float:
try:
return float(0.0 if x is None or (isinstance(x, float) and np.isnan(x)) else x)
except Exception:
return 0.0
def zeros_like_index(index) -> pd.Series:
return pd.Series([0] * len(index), index=index)
def plot_ladder(df: pd.DataFrame):
try:
if df is None or df.empty:
fig, ax = plt.subplots(figsize=(7, 3))
ax.text(0.5, 0.5, "No data", ha="center", va="center")
ax.axis("off")
return fig
pivot = df.pivot(index="time_bucket", columns="bucket", values="Amount (LKR Mn)").fillna(0)
# Re-order the standard liquidity buckets
order = ["T+1", "T+2..7", "T+8..30", "T+31+"]
pivot = pivot.reindex(order)
fig, ax = plt.subplots(figsize=(7, 4))
assets = pivot["Assets"] if "Assets" in pivot.columns else zeros_like_index(pivot.index)
sof = pivot["SoF"] if "SoF" in pivot.columns else zeros_like_index(pivot.index)
ax.bar(pivot.index, assets, label="Assets", color="#4CAF50")
ax.bar(pivot.index, -sof, label="SoF", color="#FF9800")
ax.axhline(0, color="gray", lw=1)
ax.set_ylabel("LKR (Mn)")
ax.set_title("Maturity Ladder (Assets vs SoF)")
ax.legend()
fig.tight_layout()
return fig
except Exception as e:
fig, ax = plt.subplots(figsize=(7, 3))
ax.text(0.01, 0.8, "Chart Error:", fontsize=12, ha="left")
ax.text(0.01, 0.5, str(e), fontsize=10, ha="left", wrap=True)
ax.axis("off")
return fig
# =========================
# Query fragments
# =========================
KPI_SQL = f"""
SELECT
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS assets_t1,
COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS sof_t1,
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0)
- COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS net_gap_t1
FROM positions_v_stressed;
"""
LADDER_SQL = f"""
SELECT
CASE
WHEN days_to_maturity <= 1 THEN 'T+1'
WHEN days_to_maturity BETWEEN 2 AND 7 THEN 'T+2..7'
WHEN days_to_maturity BETWEEN 8 AND 30 THEN 'T+8..30'
ELSE 'T+31+'
END AS time_bucket,
bucket,
SUM(stressed_pv) / 1000000.0 AS "Amount (LKR Mn)"
FROM positions_v_stressed
GROUP BY 1,2
ORDER BY 1,2;
"""
GAP_DRIVERS_SQL = f"""
SELECT
product,
bucket,
SUM(stressed_pv) / 1000000.0 AS "Amount (LKR Mn)"
FROM positions_v_stressed
WHERE days_to_maturity <= 1
GROUP BY 1, 2
ORDER BY 3 DESC;
"""
def get_duration_components_sql(cols: List[str]) -> str:
"""Calculates Modified Duration, Portfolio Value, and Weights for Assets/Liabilities."""
# Use days_to_maturity as the best proxy for repricing/duration tenor
has_months = "months" in cols
has_ir = "interest_rate" in cols
# Time-to-Maturity (in years) used as proxy for Macaulay Duration (T)
t_expr = "CASE WHEN days_to_maturity IS NOT NULL THEN days_to_maturity/365.0"
if has_months:
t_expr += " WHEN months IS NOT NULL THEN months/12.0"
t_expr += " ELSE 0.0001 END" # Avoid division by zero, use minimal time if unknown
# Yield (Interest Rate / 100)
y_expr = "(Interest_rate/100.0)" if has_ir else "0.05" # Assume 5% if rate missing
return f"""
WITH irr_calcs AS (
SELECT
bucket,
stressed_pv,
-- Approximate Modified Duration = (Time / (1 + Yield))
({t_expr}) / (1 + {y_expr}) AS mod_dur
FROM positions_v_stressed
WHERE bucket IN ('Assets', 'SoF')
)
SELECT
bucket,
SUM(stressed_pv) AS total_pv,
SUM(stressed_pv * mod_dur) AS weighted_duration_sum
FROM irr_calcs
GROUP BY bucket;
"""
def get_nii_sensitivity_sql() -> str:
"""
Calculates the 1-Year Repricing Gap (Assets vs. Liabilities repricing within 1 year).
This is a simplification used to estimate NII change (Delta NII).
"""
return f"""
WITH repricing_volume AS (
SELECT
bucket,
-- Assume repricing happens within 1 year (365 days)
SUM(CASE WHEN days_to_maturity <= 365 THEN stressed_pv ELSE 0 END) AS repricing_pv
FROM positions_v_stressed
WHERE bucket IN ('Assets', 'SoF')
GROUP BY bucket
)
SELECT
COALESCE(SUM(CASE WHEN bucket = 'Assets' THEN repricing_pv ELSE 0 END), 0) AS assets_repricing_pv,
COALESCE(SUM(CASE WHEN bucket = 'SoF' THEN repricing_pv ELSE 0 END), 0) AS liabilities_repricing_pv,
-- Repricing Gap = Repricing Assets - Repricing Liabilities
(COALESCE(SUM(CASE WHEN bucket = 'Assets' THEN repricing_pv ELSE 0 END), 0) -
COALESCE(SUM(CASE WHEN bucket = 'SoF' THEN repricing_pv ELSE 0 END), 0)) AS repricing_gap
FROM repricing_volume;
"""
# =========================
# Dashboard callback
# =========================
def run_dashboard(scenario: str, runoff_pct: float, rate_shock_bps_input: float, nii_shock_bps: float) -> Tuple[str, str, str, str, str, Any, pd.DataFrame, pd.DataFrame, pd.DataFrame, str, pd.DataFrame]:
"""
Returns:
status, as_of, a1_text, a2_text, a3_text, figure, ladder_df, irr_df (BPV),
nii_df, explain_text, drivers_df
"""
try:
conn = connect_md()
# 1) Discover columns & ensure view is created
cols = discover_columns(conn, TABLE_FQN)
ensure_view(conn, cols)
# --- Scenario Application ---
stressed_view_fqn = "positions_v_stressed"
runoff_factor = 1.0
rate_shock_bps = 0.0 # Used for EVE (BPV) and NII sensitivity
if scenario == "Liquidity Stress: High Deposit Runoff" and runoff_pct > 0:
runoff_factor = (100.0 - runoff_pct) / 100.0
# Set shock to 0 for Liquidity stress
rate_shock_bps = 0.0
elif scenario == "IRR Stress: Rate Shock" and rate_shock_bps_input != 0:
rate_shock_bps = rate_shock_bps_input
# Use only run-off factor 1.0 (no liquidity stress)
runoff_factor = 1.0
# Create temporary view with scenario adjustments for both PV and Rate
# NOTE: Rate shock is currently only applied to derived metrics, not stored PV
scenario_sql = f"""
CREATE OR REPLACE TEMP VIEW {stressed_view_fqn} AS
SELECT
*,
-- Apply runoff only to liabilities (SoF)
CASE WHEN lower(product) IN ({', '.join([f"'{p}'" for p in PRODUCT_SOF])})
THEN Portfolio_value * {runoff_factor}
ELSE Portfolio_value
END AS stressed_pv,
-- Apply rate shock to Interest_rate for NII/Duration modeling (optional, but good practice)
Interest_rate + ({rate_shock_bps} / 100.0) AS stressed_ir
FROM {VIEW_FQN};
"""
conn.execute(scenario_sql)
# 2) As-of (optional)
as_of = "N/A"
if "as_of_date" in cols:
tmp = conn.execute(f"SELECT max(as_of_date) AS d FROM {VIEW_FQN}").fetchdf()
if not tmp.empty and not pd.isna(tmp["d"].iloc[0]):
as_of = str(tmp["d"].iloc[0])[:10]
# 3) KPIs (Liquidity Gap)
kpi = conn.execute(KPI_SQL).fetchdf()
assets_t1 = safe_num(kpi["assets_t1"].iloc[0]) if not kpi.empty else 0.0
sof_t1 = safe_num(kpi["sof_t1"].iloc[0]) if not kpi.empty else 0.0
net_gap = safe_num(kpi["net_gap_t1"].iloc[0]) if not kpi.empty else 0.0
# 4) Ladder and Gap Drivers
ladder = conn.execute(LADDER_SQL).fetchdf()
drivers = conn.execute(GAP_DRIVERS_SQL).fetchdf()
# 5) Duration Gap & BPV (IRR - EVE)
duration_components = conn.execute(get_duration_components_sql(cols)).fetchdf()
# Calculate Modified Duration (D_A, D_L) and L/A Ratio
pv_assets = duration_components[duration_components['bucket'] == 'Assets']['total_pv'].sum()
pv_liab = duration_components[duration_components['bucket'] == 'SoF']['total_pv'].sum()
wd_assets = duration_components[duration_components['bucket'] == 'Assets']['weighted_duration_sum'].sum()
wd_liab = duration_components[duration_components['bucket'] == 'SoF']['weighted_duration_sum'].sum()
mod_dur_assets = wd_assets / pv_assets if pv_assets > 0 else 0.0
mod_dur_liab = wd_liab / pv_liab if pv_liab > 0 else 0.0
# L/A Ratio (Liabilities / Assets)
l_a_ratio = pv_liab / pv_assets if pv_assets > 0 else 0.0
# Duration Gap = D_A – D_L Γ— (L/A)
duration_gap = mod_dur_assets - (mod_dur_liab * l_a_ratio)
# BPV (Basis Point Value) / DV01 (Dollar Value of 01)
# BPV is the combined sensitivity (SUM(PV * Mod_Dur)) * 0.0001
net_bpv = (wd_assets - wd_liab) * 0.0001
# Calculate EVE Impact
eve_impact = net_bpv * rate_shock_bps
# Create EVE/BPV display table
irr_df = pd.DataFrame({
"Metric": ["Assets Mod. Duration (Yrs)", "Liabilities Mod. Duration (Yrs)", "Duration Gap (Yrs)", "Net BPV (LKR)"],
"Value": [mod_dur_assets, mod_dur_liab, duration_gap, net_bpv]
})
irr_df['Value'] = irr_df['Value'].map('{:,.4f}'.format)
# 6) NII Sensitivity (IRR - NII)
nii_data = conn.execute(get_nii_sensitivity_sql()).fetchdf()
assets_repricing_pv = safe_num(nii_data["assets_repricing_pv"].iloc[0])
liabilities_repricing_pv = safe_num(nii_data["liabilities_repricing_pv"].iloc[0])
repricing_gap = safe_num(nii_data["repricing_gap"].iloc[0])
# NII Delta = Repricing Gap * (Rate Shock / 10000)
nii_delta = repricing_gap * (nii_shock_bps / 10000.0)
# Create NII display table (in Mn)
nii_df = pd.DataFrame({
"Metric": [
"Assets Repricing (LKR Mn)",
"Liabilities Repricing (LKR Mn)",
"1-Year Repricing Gap (LKR Mn)",
f"NII Delta (+{nii_shock_bps:.0f}bps Shock) (LKR Mn)"
],
"Value": [
assets_repricing_pv / 1000000.0,
liabilities_repricing_pv / 1000000.0,
repricing_gap / 1000000.0,
nii_delta / 1000000.0
]
})
nii_df['Value'] = nii_df['Value'].map('{:,.2f}'.format)
# 7) Format output dataframes for UI
ladder_display = ladder.copy()
if "Amount (LKR Mn)" in ladder.columns:
ladder_display["Amount (LKR Mn)"] = ladder_display["Amount (LKR Mn)"].map('{:,.2f}'.format)
else:
ladder_display = pd.DataFrame()
drivers_display = drivers.copy()
if "Amount (LKR Mn)" in drivers.columns:
drivers_display["Amount (LKR Mn)"] = drivers_display["Amount (LKR Mn)"].map('{:,.2f}'.format)
else:
drivers_display = pd.DataFrame()
# 8) Chart
fig = plot_ladder(ladder)
# 9) Explanations
assets_t1_mn_str = f"{(assets_t1 / 1_000_000):,.2f}"
sof_t1_mn_str = f"{(sof_t1 / 1_000_000):,.2f}"
net_gap_mn_str = f"{(net_gap / 1_000_000):,.2f}"
gap_sign_str = "positive (surplus)" if net_gap >= 0 else "negative (deficit)"
a1_text = f"The amount of Assets maturing tomorrow (T+1) is **LKR {assets_t1_mn_str} Mn**."
a2_text = f"The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is **LKR {sof_t1_mn_str} Mn**."
a3_text = f"The resulting Net Liquidity Gap for tomorrow (T+1) is **LKR {net_gap_mn_str} Mn**."
# Build "Why" text
sof_drivers = drivers[drivers["bucket"] == "SoF"]
asset_drivers = drivers[drivers["bucket"] == "Assets"]
top_sof_prod = sof_drivers.iloc[0] if not sof_drivers.empty else None
top_asset_prod = asset_drivers.iloc[0] if not asset_drivers.empty else None
explain_text = f"### Liquidity Gap Analysis (T+1)\n"
explain_text += f"The T+1 Net Liquidity Gap is **LKR {net_gap_mn_str} Mn** ({gap_sign_str}).\n\n"
if top_sof_prod is not None:
explain_text += f"* **Largest Outflow:** From `{top_sof_prod['product']}` at **LKR {top_sof_prod['Amount (LKR Mn)']:,.2f} Mn**.\n"
if top_asset_prod is not None:
explain_text += f"* **Largest Inflow:** From `{top_asset_prod['product']}` at **LKR {top_asset_prod['Amount (LKR Mn)']:,.2f} Mn**.\n"
# Add EVE/NII analysis to explanation
explain_text += f"\n### Interest Rate Risk (IRR) Analysis\n"
# NII Explain
nii_delta_mn = safe_num(nii_delta / 1000000.0)
repricing_gap_mn = safe_num(repricing_gap / 1000000.0)
explain_text += f"* **NII Sensitivity:** Based on the 1-Year Repricing Gap (LKR {repricing_gap_mn:,.2f} Mn), a **+{nii_shock_bps:.0f} bps** rate shock suggests a **LKR {nii_delta_mn:,.2f} Mn** change in 1-year Net Interest Income.\n"
# EVE Explain
eve_impact_mn = safe_num(eve_impact / 1000000.0)
explain_text += f"* **EVE Sensitivity:** The Duration Gap is **{duration_gap:,.2f} years**. A **+{rate_shock_bps:.0f} bps** parallel rate shock is projected to change the portfolio's Economic Value (EVE) by **LKR {eve_impact_mn:,.2f} Mn**."
if scenario != "Baseline":
explain_text += f"\n\n**SCENARIO ACTIVE:** Results reflect the '{scenario}' scenario."
status = f"βœ… OK (as of {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')})"
return (
status,
as_of,
a1_text,
a2_text,
a3_text,
fig,
ladder_display,
irr_df,
nii_df,
explain_text,
drivers_display,
)
except Exception as e:
tb = traceback.format_exc()
empty_df = pd.DataFrame()
fig = plot_ladder(empty_df)
return (
f"❌ Error: {e}\n\n{tb}",
"N/A",
"0",
"0",
"0",
fig,
empty_df,
empty_df,
empty_df,
"Analysis could not be performed.",
empty_df,
)
# =========================
# Build Gradio UI
# =========================
with gr.Blocks(title=APP_TITLE) as demo:
gr.Markdown(f"# {APP_TITLE}\n_Source:_ `{TABLE_FQN}` β†’ `{VIEW_FQN}`")
status = gr.Textbox(label="Status", interactive=False, lines=8)
with gr.Row():
refresh_btn = gr.Button("πŸ”„ Refresh/Calculate", variant="primary")
theme_btn = gr.Button("πŸŒ— Toggle Theme")
theme_btn.click(
None,
None,
js="() => { document.querySelector('html').classList.toggle('dark'); }"
)
with gr.Row():
# --- Left Column: Controls and Explanations ---
with gr.Column(scale=1):
scenario_dd = gr.Dropdown(
label="Select Stress Scenario",
choices=["Baseline", "Liquidity Stress: High Deposit Runoff", "IRR Stress: Rate Shock"],
value="Baseline"
)
with gr.Accordion("Stress Scenario Parameters", open=True):
runoff_slider = gr.Slider(
label="Deposit Runoff (%)",
minimum=0, maximum=100, step=5, value=20,
info="For Liquidity Stress: Percentage of key deposits that run off."
)
shock_slider = gr.Slider(
label="EVE Rate Shock (bps)",
minimum=-500, maximum=500, step=25, value=200,
info="For IRR Stress: Parallel shift in the yield curve for EVE (Duration) calculation."
)
nii_shock_slider = gr.Slider(
label="NII Rate Shock (bps)",
minimum=-500, maximum=500, step=25, value=100,
info="For NII Sensitivity: Shock applied to 1-Year Repricing Gap."
)
explain_text = gr.Markdown("Analysis of the T+1 gap and IRR will appear here...")
# --- Right Column: KPIs, Charts, and Tables ---
with gr.Column(scale=3):
with gr.Row():
as_of = gr.Textbox(label="As of date", interactive=False)
a1 = gr.Markdown("The amount of Assets maturing tomorrow (T+1) is...")
a2 = gr.Markdown("The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is...")
a3 = gr.Markdown("The resulting Net Liquidity Gap for tomorrow (T+1) is...")
chart = gr.Plot(label="Maturity Ladder")
with gr.Tabs():
with gr.TabItem("Liquidity Gap Detail"):
ladder_df = gr.Dataframe(
headers=["Time Bucket", "Bucket", "Amount (LKR Mn)"],
type="pandas"
)
with gr.TabItem("T+1 Gap Drivers"):
drivers_df = gr.Dataframe(
headers=["Product", "Bucket", "Amount (LKR Mn)"],
type="pandas"
)
with gr.TabItem("IRR - EVE (Duration Gap)"):
irr_df = gr.Dataframe(
headers=["Metric", "Value"],
type="pandas"
)
with gr.TabItem("IRR - NII (Repricing Gap)"):
nii_df = gr.Dataframe(
headers=["Metric", "Value"],
type="pandas"
)
refresh_btn.click(
fn=run_dashboard,
inputs=[scenario_dd, runoff_slider, shock_slider, nii_shock_slider],
outputs=[status, as_of, a1, a2, a3, chart, ladder_df, irr_df, nii_df, explain_text, drivers_df],
)
if __name__ == "__main__":
demo.launch()