In traditional liquidity risk models, banks often see a single number: a score, a label, maybe a traffic light. Useful, but shallow. It doesn’t answer the real questions regulators and risk teams care about:
-
What exactly is going wrong?
-
When did the trouble really start?
-
Which signals should we watch next time?
The Graph-of-Thought (GoT) framework for financial liquidity risk tackles this gap by turning an LLM into an investigative reasoning engine, not just a predictor.
From Raw Signals to “Red-Flag Moments”
The pipeline starts with time-series data across many banks: how fast deposits are leaving, how stressed the liquidity buffers are, how concentrated the depositor base is, how unusual current behaviour is compared to the past, and so on.
First, the system learns what “normal” looks like from the peer group. Instead of hard-coded thresholds, it uses cohort statistics to define adaptive cut-offs for what counts as “abnormally high outflow”, “unusual loss behaviour”, or “regime shift”.
For the target bank, it then scans the timeline and marks red-flag moments: time periods where the bank’s behaviour is significantly off from the cohort. These become the “symptoms” that any good explanation must cover.
A Complete Video Tutorial
Hypotheses as Nodes in a Reasoning Graph
Now the Graph-of-Thoughts engine comes in.
Instead of jumping to a single narrative, the LLM is asked to propose several coarse hypotheses:
-
“Losses on long-term positions are spooking depositors.”
-
“A few large depositors are driving outflows.”
-
“Negative news and social chatter are accelerating panic.”
Each hypothesis becomes a node in a graph. Edges represent relationships: a refined version of the same idea, a lateral variant, or a hypothesis that builds on others.
The system then scores every hypothesis against the data:
-
How many red-flag moments does it explain?
-
Does it light up early, or only after the worst stress appears?
-
Is it compact and plausible, or bloated and vague?
-
Is it robust when we perturb the data (tail coverage, not just average)?
Good hypotheses are refined further; weak ones are pruned. Over time, the graph grows into a map of competing narratives, some strong, some quickly discarded.
Selecting the Most Defensible Story
In the final stage, the engine traces back through this graph to extract the best explanation path: a short sequence of hypotheses that together offer a clear, data-aligned story of how liquidity stress emerged.
The result is not just: “Risk = 0.82”.
It is something like:
“Rising losses on long-dated assets coincided with early deposit outflows from a concentrated segment. This was followed by a surge in negative chatter and visible pressure on liquid buffers, explaining most of the severe stress periods we observe.”
Because the full graph—nodes, edges, scores—is available, human experts can inspect the reasoning, see which alternatives were rejected, and challenge or refine the story.
A completely Working Code.
# got_finance_liquidity_risk_llm.py
import os, json, math
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
# Headless-safe plotting
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import networkx as nx
try:
import requests
except Exception:
requests = None
# -------------------------------
# Groq API configuration
# -------------------------------
api_key: str = os.getenv("GROQ_API_KEY", "PLEASE USE YOUR OWN KEY")
endpoint: str = os.getenv("GROQ_ENDPOINT", "https://api.groq.com/openai/v1/chat/completions")
model: str = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant")
timeout: int = int(os.getenv("GROQ_TIMEOUT", "60"))
HEADERS = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
USE_LLM = str(os.getenv("USE_LLM", "1")).strip() in {"1","true","True","YES","yes"}
LLM_STRICT = str(os.getenv("LLM_STRICT", "0")).strip() in {"1","true","True","YES","yes"}
LLM_MAX_CALLS = int(os.getenv("LLM_MAX_CALLS", "12"))
# --- LLM cost knobs (default: ~1/4 calls) ---
SEED_LIMIT = int(os.getenv("SEED_LIMIT", "2")) # how many seeds to use (1 LLM call returns many; we slice)
REFINE_ROOTS_LIMIT = int(os.getenv("REFINE_ROOTS_LIMIT", "1")) # how many root nodes we refine via LLM
REFINE_LIMIT = int(os.getenv("REFINE_LIMIT", "2")) # how many refinements to keep from LLM output
LLM_USE_PLAUS = int(os.getenv("LLM_USE_PLAUS", "0")) # 0=off by default to save calls; 1=enable LLM plausibility
LLM_CALLS = 0
LLM_LOG = []
def _default_out_dir():
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
except NameError:
script_dir = os.getcwd()
return os.path.join(script_dir, "outputs")
OUT_DIR = os.path.abspath(os.getenv("OUT_DIR", _default_out_dir()))
os.makedirs(OUT_DIR, exist_ok=True)
# Transcript controls
LLM_VERBOSE = str(os.getenv("LLM_VERBOSE", "1")).strip() in {"1","true","True","YES","yes"}
LLM_TRANSCRIPT_PATH = os.getenv("LLM_TRANSCRIPT", os.path.join(OUT_DIR, "llm_transcript.txt"))
def _append_transcript(title: str, payload: str):
try:
with open(LLM_TRANSCRIPT_PATH, "a", encoding="utf-8") as f:
f.write(f"\n===== {title} =====\n")
f.write(payload)
f.write("\n")
except Exception as _e:
pass
def _fmt_messages(messages):
parts = []
for m in messages:
role = m.get("role", "?")
content = m.get("content","")
parts.append(f"[{role.upper()}]\n{content}\n")
return "\n".join(parts)
# -------------------------------
# Synthetic data for demo
# -------------------------------
def gen_synth(n_banks=5, T=36, seed=11):
rng = np.random.default_rng(seed)
banks = []
base_names = [f"BANK-{i}" for i in range(n_banks-1)] + ["NEOBANK-Q4"]
for name in base_names:
df = pd.DataFrame({
"t": np.arange(T),
"rate_shock": np.clip(0.5*np.sin(np.arange(T)/4.0)+0.6, 0, None),
})
df["htm_loss_ratio"] = np.clip(0.02 + 0.1*df["rate_shock"] + rng.normal(0,0.01,T), 0, 0.5)
df["deposit_concentration"] = np.clip(0.3 + rng.normal(0,0.03,T), 0, 1)
df["social_chatter_intensity"] = np.clip(0.1 + 0.1*df["rate_shock"] + rng.normal(0,0.02,T), 0, 1)
df["deposit_outflow_rate"] = np.clip(0.02 + 0.05*df["social_chatter_intensity"] + rng.normal(0,0.01,T), 0, 1)
df["LCR"] = np.clip(1.2 - 0.6*df["deposit_outflow_rate"] - 0.3*df["htm_loss_ratio"] + rng.normal(0,0.03,T), 0, 2)
df["name"] = name
if name == "NEOBANK-Q4":
df.loc[df["t"]>=18, "social_chatter_intensity"] += 0.25
df["deposit_outflow_rate"] = np.clip(
0.03 + 0.15*df["social_chatter_intensity"] + 0.05*df["deposit_concentration"]
+ rng.normal(0,0.015,T), 0, 1)
df["LCR"] = np.clip(
1.1 - 0.9*df["deposit_outflow_rate"] - 0.5*df["htm_loss_ratio"] + rng.normal(0,0.03,T), 0, 2)
df["deposit_concentration"] = np.clip(df["deposit_concentration"] + 0.1, 0, 1)
banks.append(df)
return pd.concat(banks, ignore_index=True)
DATA = gen_synth()
FEATURES = ["deposit_outflow_rate","LCR","htm_loss_ratio","deposit_concentration","social_chatter_intensity"]
# -------------------------------
# Cohort adaptation + OOD
# -------------------------------
def cohort_stats(df_all: pd.DataFrame, bank: str) -> Dict[str, Dict[str, float]]:
peers = df_all[df_all["name"]!=bank]
stats = {}
for c in FEATURES:
q10 = float(peers[c].quantile(0.10))
q25 = float(peers[c].quantile(0.25))
q50 = float(peers[c].quantile(0.50))
q75 = float(peers[c].quantile(0.75))
q90 = float(peers[c].quantile(0.90))
iqr = max(1e-6, q75 - q25)
stats[c] = {"q10":q10,"q25":q25,"q50":q50,"q75":q75,"q90":q90,"iqr":iqr}
return stats
def adaptive_thresholds(stats: Dict[str, Dict[str, float]]) -> Dict[str, float]:
outflow_high = max(0.20, stats["deposit_outflow_rate"]["q90"])
lcr_min = min(1.00, stats["LCR"]["q10"])
htm_loss_high = max(0.15, stats["htm_loss_ratio"]["q90"])
conc_high = max(0.55, stats["deposit_concentration"]["q90"])
chatter_high = max(0.50, stats["social_chatter_intensity"]["q90"])
return {"outflow_high": outflow_high, "lcr_min": lcr_min, "htm_loss_high": htm_loss_high,
"concentration_high": conc_high, "chatter_high": chatter_high}
def robust_z(x, med, iqr):
return abs(x - med) / (iqr + 1e-9)
def compute_ood_scores(df_bank: pd.DataFrame, stats: Dict[str, Dict[str, float]]) -> pd.Series:
zs = []
for c in FEATURES:
med = stats[c]["q50"]; iqr = stats[c]["iqr"]
zs.append( robust_z(df_bank[c].values, med, iqr) )
zs = np.vstack(zs)
return pd.Series(zs.mean(axis=0), index=df_bank.index, name="ood_score")
# -------------------------------
# Flags + weighted coverage
# -------------------------------
def compute_flags(df: pd.DataFrame, thr: Dict[str, float], ood_score: pd.Series) -> pd.DataFrame:
f = pd.DataFrame({
"name": df["name"],
"t": df["t"],
"outflow_flag": (df["deposit_outflow_rate"] >= thr["outflow_high"]).astype(int),
"lcr_breach": (df["LCR"] < thr["lcr_min"]).astype(int),
"htm_flag": (df["htm_loss_ratio"] >= thr["htm_loss_high"]).astype(int),
"conc_flag": (df["deposit_concentration"] >= thr["concentration_high"]).astype(int),
"chatter_flag": (df["social_chatter_intensity"] >= thr["chatter_high"]).astype(int),
"ood_score": ood_score.values
})
cutoff = max(3.0, float(np.quantile(f["ood_score"], 0.90)))
f["ood_flag"] = (f["ood_score"] >= cutoff).astype(int)
f["any_flag"] = f[["outflow_flag","lcr_breach","htm_flag","conc_flag","chatter_flag","ood_flag"]].sum(axis=1).astype(int)
return f
def point_weight(flag_row: pd.Series) -> float:
return 1.0 + 0.5*flag_row["lcr_breach"] + 0.25*flag_row["outflow_flag"] + 0.25*flag_row["chatter_flag"] + 0.5*flag_row["ood_flag"]
def weighted_coverage(indices: List[int], flags: pd.DataFrame) -> float:
if len(flags)==0: return 0.0
w_all = flags.apply(point_weight, axis=1).values
w_cov = w_all[indices] if len(indices)>0 else np.array([])
return float(w_cov.sum() / max(1e-9, w_all.sum()))
# -------------------------------
# LLM helpers
# -------------------------------
def groq_chat(messages: List[Dict], temperature=0.2, max_tokens=500) -> str:
global LLM_CALLS
if not USE_LLM:
raise RuntimeError("LLM disabled (USE_LLM=0).")
if requests is None:
raise RuntimeError("The 'requests' package is required for Groq calls.")
if LLM_CALLS >= LLM_MAX_CALLS:
raise RuntimeError("LLM call budget exceeded.")
payload = {"model": model, "messages": messages, "temperature": float(temperature), "max_tokens": int(max_tokens), "stream": False}
# Log request
_req = f"ENDPOINT: {endpoint}\nMODEL: {model}\nTEMPERATURE: {temperature}\nMAX_TOKENS: {max_tokens}\n\n" + _fmt_messages(messages)
if LLM_VERBOSE:
print("\n=== LLM REQUEST ===")
print(_req)
_append_transcript("LLM REQUEST", _req)
try:
resp = requests.post(endpoint, headers=HEADERS, json=payload, timeout=timeout)
resp.raise_for_status()
data = resp.json()
LLM_CALLS += 1
text = data["choices"][0]["message"]["content"]
LLM_LOG.append({"messages": messages, "response": text})
if LLM_VERBOSE:
print("\n=== LLM RESPONSE ===")
print(text)
_append_transcript("LLM RESPONSE", text)
return text
except Exception as e:
LLM_LOG.append({"messages": messages, "error": str(e)})
if LLM_VERBOSE:
print("\n=== LLM ERROR ===")
print(str(e))
_append_transcript("LLM ERROR", str(e))
if LLM_STRICT:
raise
# Fallback
fb = _llm_fallback(messages)
if LLM_VERBOSE:
print("\n=== LLM FALLBACK ===")
print(fb)
_append_transcript("LLM FALLBACK", fb)
return fb
def _llm_fallback(messages: List[Dict]) -> str:
# A deterministic, informative fallback for offline runs.
last = messages[-1]["content"] if messages else ""
if "Propose" in last or "hypotheses" in last.lower():
_append_transcript("LLM FALLBACK REQUEST", _fmt_messages(messages))
return "\n".join([
"- Deposit flight risk driven by social chatter",
"- Duration risk on HTM portfolio causing confidence loss",
"- Concentrated deposits in SME/VC causing rapid outflows",
"- Liquidity coverage ratio breach from compounding stresses",
"- Confidence spiral (chatter + outflows + LCR dip)",
"- Regime shift (OOD): behavior deviates from peer cohort",
])
if "Refine" in last or "Refine a hypothesis" in last:
_append_transcript("LLM FALLBACK REQUEST", _fmt_messages(messages))
out = "\n".join([
"- Deposit flight driven by social chatter after guidance cut; concentration amplifies",
"- SME/VC segment-led deposit flight; weekend acceleration pattern",
"- Duration risk interacts with social chatter to accelerate outflows",
"- LCR breach: HQLA drawdown concurrent with deposit spike",
])
_append_transcript("LLM FALLBACK", out)
return out
if "rate plausibility" in last.lower() or "critic" in last.lower():
return "plausibility:0.78; notes: plausible under rising-rate and chatter conditions; red flags: concentration, OOD spikes"
_append_transcript("LLM FALLBACK", "OK")
return "OK"
def parse_bullets(text: str, limit: int=7) -> List[str]:
lines = [ln.strip("-• ").strip() for ln in text.splitlines() if ln.strip()]
uniq = []
for l in lines:
if l and l not in uniq:
uniq.append(l)
return uniq[:limit]
# -------------------------------
# GoT explainers and critic
# -------------------------------
def explain_indices(hypo: str, df: pd.DataFrame, flags: pd.DataFrame) -> List[int]:
idx = []
h = hypo.lower()
for i in range(len(df)):
fb = flags.loc[i]
if "deposit flight" in h and fb["outflow_flag"]==1: idx.append(i)
if "liquidity coverage" in h and fb["lcr_breach"]==1: idx.append(i)
if "duration risk" in h and fb["htm_flag"]==1: idx.append(i)
if "concentrated deposits" in h and fb["conc_flag"]==1: idx.append(i)
if "social chatter" in h and fb["chatter_flag"]==1: idx.append(i)
if "contagion" in h and (fb["outflow_flag"]==1 or fb["lcr_breach"]==1): idx.append(i)
if "confidence spiral" in h and (fb["outflow_flag"]==1 or fb["chatter_flag"]==1 or fb["lcr_breach"]==1): idx.append(i)
if "regime shift" in h or "ood" in h:
if fb.get("ood_flag",0)==1: idx.append(i)
return sorted(list(set(idx)))
def temporal_concordance(indices: List[int], df: pd.DataFrame) -> float:
if not indices: return 0.0
ts = sorted(df.loc[indices, "t"].tolist()); span = max(ts) - min(ts) + 1e-9
return 1.0 / (1.0 + span/12.0)
def simplicity_penalty(text: str) -> float:
wc = len(text.split()); return 1.0 / (1.0 + max(0, wc-12)/12.0)
def prior(text: str) -> float:
h = text.lower(); p = 0.10
if "duration risk" in h: p += 0.10
if "concentrated deposits" in h: p += 0.08
if "social chatter" in h: p += 0.08
if "contagion" in h: p += 0.05
if "confidence spiral" in h: p += 0.07
if "regime shift" in h or "ood" in h: p += 0.05
return min(p, 0.45)
def bootstrap_metrics(hypo: str, df: pd.DataFrame, flags: pd.DataFrame, B=30, seed=123) -> Dict[str, float]:
rng = np.random.default_rng(seed)
N = len(df)
idx = explain_indices(hypo, df, flags)
if len(idx)==0:
return {"cov_mean":0.0, "cov_cvar05":0.0, "tmp":0.0, "indices": []}
covs = []
for _ in range(B):
sample_idx = rng.integers(low=0, high=N, size=N)
f_boot = flags.iloc[sample_idx].reset_index(drop=True)
df_boot = df.iloc[sample_idx].reset_index(drop=True)
idx_boot = explain_indices(hypo, df_boot, f_boot)
covs.append( weighted_coverage(idx_boot, f_boot) )
covs = np.array(covs)
cov_mean = float(covs.mean())
k = max(1, int(0.05*B))
cov_cvar05 = float(np.sort(covs)[:k].mean())
tmp = temporal_concordance(idx, df)
return {"cov_mean":cov_mean, "cov_cvar05":cov_cvar05, "tmp":tmp, "indices":idx}
LLM_PLAUS_CACHE = {}
def llm_plausibility(hypo: str, context: Dict) -> float:
# Gate by flag; when off, avoid any LLM call and return prior-like value
if not LLM_USE_PLAUS:
return 0.8
if not USE_LLM:
return 0.8 # offline fallback
# Cache to avoid duplicate calls on identical hypotheses
key = (hypo.strip().lower(), tuple(sorted(context.get('thresholds',{}).items())))
if key in LLM_PLAUS_CACHE:
return LLM_PLAUS_CACHE[key]
prompt = [
{"role":"system","content":"You are a risk analyst. Given a hypothesis about a bank's liquidity stress and structured signals, rate plausibility 0-1 and supply brief notes. Respond like: plausibility:0.76; notes: ..."},
{"role":"user","content": f"Signals schema: {json.dumps(context['schema'], indent=2)}\nThresholds: {json.dumps(context['thresholds'], indent=2)}\nOOD cutoff: {context['ood_cutoff']:.3f}\nHypothesis: {hypo}\nRate plausibility and add 1-2 red flags to watch."}
]
text = groq_chat(prompt, temperature=0.2, max_tokens=120)
# parse "plausibility:x" if present
x = 0.75
for tok in text.split(";"):
if "plausibility" in tok:
try:
x = float(tok.split(":")[1].strip())
except Exception:
pass
x = float(np.clip(x, 0.1, 0.99))
LLM_PLAUS_CACHE[key] = x
return x
def utility(cov_mean, cov_cvar05, tmp, simp, prior_p, plaus, cost):
tail_factor = 0.0 if cov_mean<=0 else min(1.6, 0.7 + 0.4*(cov_cvar05 / max(1e-9, cov_mean)))
# Include LLM plausibility as an additional multiplicative term
u = (cov_mean**0.48) * (tmp**0.18) * (simp**0.12) * (prior_p**0.12) * (plaus**0.10) * tail_factor
return u - 0.0035*cost
def critic_score(hypo: str, context: Dict) -> Dict:
df = context["df"]; flags = context["flags"]
boot = bootstrap_metrics(hypo, df, flags, B=int(os.getenv("BOOTSTRAP_B","30")))
simp = simplicity_penalty(hypo)
pr = prior(hypo)
plaus = llm_plausibility(hypo, context)
cost = 18 + 2*len(hypo.split())
util = utility(boot["cov_mean"], boot["cov_cvar05"], boot["tmp"], simp, pr, plaus, cost)
return {"indices": boot["indices"], "cov_mean": boot["cov_mean"], "cov_cvar05": boot["cov_cvar05"], "tmp": boot["tmp"],
"simp": simp, "prior": pr, "plaus": plaus, "cost": cost, "utility": util}
# -------------------------------
# LLM-driven proposal/refinement
# -------------------------------
def propose_coarse_hypotheses(context: Dict) -> List[str]:
if USE_LLM:
prompt = [
{"role":"system","content":"Propose 5–7 diverse, non-overlapping coarse hypotheses for a bank's liquidity stress. Prefer short, specific phrases. Include at least one regime-shift/OOD angle."},
{"role":"user","content": f"Bank: {context['bank']}\nSignals summary: {json.dumps(context['schema'], indent=2)}\nAdaptive thresholds: {json.dumps(context['thresholds'], indent=2)}"}
]
txt = groq_chat(prompt, temperature=0.3, max_tokens=220)
return parse_bullets(txt, limit=7)
# offline fallback
return parse_bullets(_llm_fallback([{"role":"user","content":"Propose"}]), limit=7)
def refine_hypothesis(hypo: str, context: Dict) -> List[str]:
if USE_LLM:
prompt = [
{"role":"system","content":"Refine the hypothesis into 2–4 concrete sub-hypotheses that would explain more flagged points. Each should mention measurable signals or co-movements."},
{"role":"user","content": f"Hypothesis: {hypo}\nSignals summary: {json.dumps(context['schema'], indent=2)}\nWhat refinements should we consider?"}
]
txt = groq_chat(prompt, temperature=0.2, max_tokens=220)
return parse_bullets(txt, limit=4)
return parse_bullets(_llm_fallback([{"role":"user","content":"Refine"}]), limit=4)
# -------------------------------
# Controller
# -------------------------------
def run_got(context: Dict, beam=4, max_nodes=80, cov_stop=0.7):
G = nx.DiGraph()
nid = 0
open_list: List[Tuple[float,int]] = []
def add_node(text, level, parent=None):
nonlocal nid
sc = critic_score(text, context)
t = {"id": nid, "level": level, "text": text, "covers": sc["indices"], "score": sc["utility"], "cost": sc["cost"],
"parent": parent, "meta": {"cov_mean": sc["cov_mean"], "cov_cvar05": sc["cov_cvar05"], "tmp": sc["tmp"],
"simp": sc["simp"], "prior": sc["prior"], "plaus": sc["plaus"]}}
G.add_node(nid, **t)
if parent is not None:
G.add_edge(parent, nid, etype="refine" if level > G.nodes[parent]["level"] else "support")
nid += 1
return G.nodes[nid-1]
# seeds (LLM call happens once; we then slice to SEED_LIMIT)
_seeds_all = propose_coarse_hypotheses(context)
for h in _seeds_all[:SEED_LIMIT]:
t = add_node(h, level=0, parent=None)
open_list.append((-t["score"], t["id"]))
best = None
log = []
refined_roots = 0
while open_list and G.number_of_nodes() < max_nodes:
open_list.sort()
open_list = open_list[:beam]
_, cur_id = open_list.pop(0)
cur = G.nodes[cur_id]
log.append({"expand_id": cur_id, "text": cur["text"], "score": cur["score"], "cov_mean": cur["meta"]["cov_mean"],
"cov_cvar05": cur["meta"]["cov_cvar05"], "plaus": cur["meta"]["plaus"], "level": cur["level"]})
if best is None or cur["score"] > best["score"]:
best = cur
if cur["meta"]["cov_mean"] >= cov_stop and cur["level"]>=1:
best = cur
break
children = []
if cur["level"] == 0:
if refined_roots < REFINE_ROOTS_LIMIT:
_ref = refine_hypothesis(cur["text"], context)
for r in _ref[:REFINE_LIMIT]:
children.append(add_node(r, level=1, parent=cur_id))
refined_roots += 1
# else: skip LLM refinement for this root to save calls
else:
txt = cur["text"].lower()
if "deposit flight" in txt and "social chatter" not in txt:
children.append(add_node(cur["text"]+"; social chatter accelerant", level=1, parent=cur_id))
if "duration risk" in txt and "lcr" not in txt:
children.append(add_node(cur["text"]+"; LCR pressure from HQLA drawdown", level=1, parent=cur_id))
if "concentrated deposits" in txt and "confidence spiral" not in txt:
children.append(add_node(cur["text"]+"; confidence spiral from top-20 depositors", level=1, parent=cur_id))
if "regime shift" in txt and "deposit flight" not in txt:
children.append(add_node(cur["text"]+"; deposit flight co-moves with OOD spikes", level=1, parent=cur_id))
for ch in children:
open_list.append((-ch["score"], ch["id"]))
# path
path = [best["id"]]
p = best["parent"]
while p is not None:
path.append(p)
p = G.nodes[p]["parent"]
path = list(reversed(path))
# DataFrames
nodes = []
for n, a in G.nodes(data=True):
nodes.append({"id": n, "level": a["level"], "text": a["text"], "score": round(a["score"],5),
"cov_mean": round(a["meta"]["cov_mean"],4), "cov_cvar05": round(a["meta"]["cov_cvar05"],4),
"temporal": round(a["meta"]["tmp"],4), "prior": round(a["meta"]["prior"],4),
"plaus": round(a["meta"]["plaus"],4), "parent": a["parent"], "on_best_path": int(n in path)})
edges = [{"src":u,"dst":v,"etype":ed.get("etype","link")} for u,v,ed in G.edges(data=True)]
return pd.DataFrame(nodes).sort_values(["on_best_path","score"], ascending=[False,False]), pd.DataFrame(edges), pd.DataFrame(log), path, best
# -------------------------------
# Architecture export
# -------------------------------
def build_got_architecture_spec():
components = [
{"id":"ClientInput", "label":"Problem Context", "layer":0},
{"id":"SchemaProfiler", "label":"Schema Profiler", "layer":0},
{"id":"SeedProposer", "label":"Seed Proposer (LLM/Groq)", "layer":1},
{"id":"HypothesisBank", "label":"Hypothesis Memory", "layer":1},
{"id":"Refiner", "label":"Refiner (LLM)", "layer":2},
{"id":"LateralComposer", "label":"Lateral Composer", "layer":2},
{"id":"EvidenceBinder", "label":"Evidence Binder (flags+OOD)", "layer":2},
{"id":"LLMCritic", "label":"LLM Critic (plausibility)", "layer":3},
{"id":"RuleVerifier", "label":"Rule Verifier (coverage/CVaR)", "layer":3},
{"id":"Controller", "label":"Controller (Beam)", "layer":4},
{"id":"Selector", "label":"Selector (Best Path/Subgraph)", "layer":5},
{"id":"Renderer", "label":"Renderer (CSV/JSON/PNG)", "layer":5},
{"id":"AuditLogger", "label":"Audit Logger (Trust/LLM log)", "layer":5},
]
edges = [
("ClientInput","SeedProposer","context"),
("SchemaProfiler","SeedProposer","schema"),
("SeedProposer","Refiner","coarse_thoughts"),
("Refiner","EvidenceBinder","hypotheses"),
("EvidenceBinder","RuleVerifier","flags"),
("Refiner","LLMCritic","hypotheses"),
("LLMCritic","RuleVerifier","plausibility"),
("RuleVerifier","Controller","utilities"),
("Controller","Refiner","expand"),
("Controller","Selector","final"),
("Selector","Renderer","outputs"),
("Selector","AuditLogger","trace"),
]
return components, edges
def export_architecture(out_dir=OUT_DIR):
comps, edges = build_got_architecture_spec()
pd.DataFrame(comps).to_csv(os.path.join(out_dir,"got_arch_nodes.csv"), index=False)
pd.DataFrame([{"src":s,"dst":d,"etype":e} for s,d,e in edges]).to_csv(os.path.join(out_dir,"got_arch_edges.csv"), index=False)
# Draw
G = nx.DiGraph()
for c in comps: G.add_node(c["id"], **c)
for s,d,e in edges: G.add_edge(s,d, etype=e)
layers = {}
for c in comps: layers.setdefault(c["layer"], []).append(c["id"])
pos = {}
x_gap, y_gap = 3.0, 1.6
for layer, ids in layers.items():
ids_sorted = sorted(ids)
y0 = - (len(ids_sorted)-1)*y_gap/2.0
for i, nid in enumerate(ids_sorted):
pos[nid] = (layer*x_gap, y0 + i*y_gap)
plt.figure(figsize=(16,8))
nx.draw_networkx_edges(G, pos, arrows=True)
nx.draw_networkx_nodes(G, pos, node_size=1500)
labels = {n: G.nodes[n]["label"] for n in G.nodes}
nx.draw_networkx_labels(G, pos, labels=labels, font_size=8)
edge_labels = {(u,v): G.edges[u,v]["etype"] for u,v in G.edges}
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=6)
plt.axis('off'); plt.tight_layout()
plt.savefig(os.path.join(out_dir,"got_architecture_finance.png"), dpi=200); plt.close()
# -------------------------------
# Schema summary + early warning
# -------------------------------
def summarize_schema(df: pd.DataFrame) -> Dict:
summ = {}
for c in FEATURES + ["rate_shock"]:
if c in df.columns:
summ[c] = {"min": float(df[c].min()), "max": float(df[c].max()), "mean": float(df[c].mean())}
return summ
def early_warning_lead(indices: List[int], flags: pd.DataFrame, lead=2) -> float:
t_breach = flags.index[flags["lcr_breach"]==1].tolist()
if not t_breach: return 0.0
hits = 0; sidx = set(indices)
for t in t_breach:
if sidx & set(range(max(0,t-lead), t+1)): hits += 1
return hits / len(t_breach)
# -------------------------------
# Main
# -------------------------------
def main():
BANK = "NEOBANK-Q4"
df_bank = DATA[DATA["name"]==BANK].reset_index(drop=True)
stats = cohort_stats(DATA, BANK)
thr = adaptive_thresholds(stats)
ood = compute_ood_scores(df_bank, stats)
flags_bank = compute_flags(df_bank, thr, ood)
context = {
"bank": BANK,
"df": df_bank.copy(),
"flags": flags_bank.copy(),
"schema": summarize_schema(df_bank),
"thresholds": thr,
"ood_cutoff": float(np.quantile(flags_bank["ood_score"], 0.90)),
}
df_nodes, df_edges, df_log, best_path, best = run_got(context, beam=6, max_nodes=110, cov_stop=0.7)
# Write outputs
os.makedirs(OUT_DIR, exist_ok=True)
# Transcript controls
LLM_VERBOSE = str(os.getenv("LLM_VERBOSE", "1")).strip() in {"1","true","True","YES","yes"}
LLM_TRANSCRIPT_PATH = os.getenv("LLM_TRANSCRIPT", os.path.join(OUT_DIR, "llm_transcript.txt"))
def _append_transcript(title: str, payload: str):
try:
with open(LLM_TRANSCRIPT_PATH, "a", encoding="utf-8") as f:
f.write(f"\n===== {title} =====\n")
f.write(payload)
f.write("\n")
except Exception as _e:
pass
def _fmt_messages(messages):
parts = []
for m in messages:
role = m.get("role", "?")
content = m.get("content","")
parts.append(f"[{role.upper()}]\n{content}\n")
return "\n".join(parts)
nodes_p = os.path.join(OUT_DIR, f"got_nodes_{BANK}.csv")
edges_p = os.path.join(OUT_DIR, f"got_edges_{BANK}.csv")
log_p = os.path.join(OUT_DIR, f"got_log_{BANK}.csv")
df_nodes.to_csv(nodes_p, index=False)
df_edges.to_csv(edges_p, index=False)
df_log.to_csv(log_p, index=False)
summary = {
"bank": BANK,
"best_hypothesis": best["text"],
"best_score": round(best["score"],5),
"best_cov_mean": round(best["meta"]["cov_mean"],4),
"best_cov_cvar05": round(best["meta"]["cov_cvar05"],4),
"best_temporal": round(best["meta"]["tmp"],4),
"best_plausibility": round(best["meta"]["plaus"],4),
"alerts_explained": int(len(best["covers"])),
"total_points": int(len(context["df"])),
"best_path_node_ids": best_path
}
with open(os.path.join(OUT_DIR,"summary.json"), "w") as f:
json.dump(summary, f, indent=2)
trust = {
"thresholds": thr,
"ood": {"min": float(flags_bank['ood_score'].min()), "max": float(flags_bank['ood_score'].max()),
"p90": float(np.quantile(flags_bank['ood_score'],0.90)), "ood_rate": float(flags_bank['ood_flag'].mean())},
"early_warning_lead@2": early_warning_lead(best["covers"], flags_bank, lead=2),
"llm": {"enabled": USE_LLM, "calls": LLM_CALLS, "max_calls": LLM_MAX_CALLS}
}
with open(os.path.join(OUT_DIR,"trust_report.json"), "w") as f:
json.dump(trust, f, indent=2)
# LLM call log (for auditability)
with open(os.path.join(OUT_DIR,"llm_call_log.json"), "w") as f:
json.dump(LLM_LOG, f, indent=2, default=str)
export_architecture(OUT_DIR)
print("OK (LLM-capable):", os.path.abspath(OUT_DIR))
print("LLM transcript saved to:", LLM_TRANSCRIPT_PATH)
if __name__ == "__main__":
main()