Sunday, September 21, 2025

LLMS OPTIMIZE THEMSELVES (Beyond Manual Prompt Engineering)

 

Video Content Summary

This video explains a cutting-edge AI framework that allows large language models (LLMs) to automatically improve their own prompts. We'll break down how this works using a simple sentiment analysis example, showing how an average prompt can be transformed into a highly specific and effective one without any manual effort.

Discover how this "self-optimizing" loop, powered by a meta-prompt and a feedback system, can achieve better results than prompts written by human experts. We'll also dive into the latest research in this field, exploring how this concept is paving the way for a new era of AI systems that can continuously improve themselves.

Hashtags

#AI #LLM #PromptEngineering #SelfImprovingAI #AIautomation #MachineLearning #DeepLearning #TechExplained #ArtificialIntelligence #GoogleDeepMind

Tutorial:



Code:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
OPRO-style prompt optimization (sentiment classification) using Groq Llama 3.1 70B.

Fixes vs previous version:
- RateLimiter ensures a minimum gap between API calls (prevents "too quick" bursts)
- Robust retry with exponential backoff + jitter on 429/5xx
- Friendly 400 error messages with full Groq error text
- Safer defaults for max_tokens and proposals_per_round
- Optional sleeps inside scoring to further smooth request cadence

Usage:
pip install requests
export GROQ_API_KEY=...
python opro_sentiment_groq.py
"""

import os
import time
import json
import re
import random
import requests
from dataclasses import dataclass
from typing import List, Dict, Tuple, Any

# -----------------------------
# Configuration for Groq API
# -----------------------------
@dataclass
class GroqConfig:
api_key: str = os.getenv("GROQ_API_KEY", "Please use your own key")
endpoint: str = os.getenv("GROQ_ENDPOINT", "https://api.groq.com/openai/v1/chat/completions")
model: str = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant")
timeout: int = int(os.getenv("GROQ_TIMEOUT", "60"))

# Rate/Retry tuning
min_interval_s: float = float(os.getenv("GROQ_MIN_INTERVAL_S", "0.8")) # min gap between calls
max_retries: int = int(os.getenv("GROQ_MAX_RETRIES", "5"))
backoff_base_s: float = float(os.getenv("GROQ_BACKOFF_BASE_S", "0.8"))
backoff_jitter_s: float = float(os.getenv("GROQ_BACKOFF_JITTER_S", "0.4"))

# -----------------------------
# Simple rate limiter
# -----------------------------
class RateLimiter:
def __init__(self, min_interval_s: float = 0.8):
self.min_interval_s = min_interval_s
self._last = 0.0

def wait(self):
now = time.time()
gap = now - self._last
if gap < self.min_interval_s:
time.sleep(self.min_interval_s - gap)
self._last = time.time()

# -----------------------------
# Groq Client (OpenAI-compatible)
# -----------------------------
class GroqClient:
def __init__(self, cfg: GroqConfig):
self.cfg = cfg
if not self.cfg.api_key:
raise RuntimeError("GROQ_API_KEY is required.")
# Ensure endpoint points to /chat/completions
if not self.cfg.endpoint.endswith("/chat/completions"):
self.cfg.endpoint = self.cfg.endpoint.rstrip("/") + "/chat/completions"
self.session = requests.Session()
self.rate = RateLimiter(self.cfg.min_interval_s)

def _headers(self):
return {
"Authorization": f"Bearer {self.cfg.api_key}",
"Content-Type": "application/json",
"User-Agent": "opro-groq-example/1.0",
}

def chat(self, messages: List[Dict[str, str]], temperature: float = 0.2,
max_tokens: int = 128, top_p: float = 1.0, stop: List[str] = None) -> str:
"""
Resilient chat with:
- min interval spacing
- retries with exponential backoff on 429/5xx
- friendly 400 error messages (no blind tracebacks)
"""
payload = {
"model": self.cfg.model,
"messages": messages,
"temperature": float(temperature),
"top_p": float(top_p),
"max_tokens": int(max_tokens),
}
if stop:
payload["stop"] = stop

# Light sanity checks to avoid 400s
if not payload["messages"] or not isinstance(payload["messages"], list):
raise ValueError("messages must be a non-empty list")
if payload["max_tokens"] <= 0:
raise ValueError("max_tokens must be > 0")
if not isinstance(payload["model"], str) or not payload["model"]:
raise ValueError("model must be a non-empty string")

attempt = 0
while True:
attempt += 1
self.rate.wait() # space out calls
try:
r = self.session.post(
self.cfg.endpoint,
headers=self._headers(),
json=payload,
timeout=self.cfg.timeout,
)
except requests.RequestException as e:
# Network-level error -> backoff unless we've exhausted retries
if attempt <= self.cfg.max_retries:
self._sleep_backoff(attempt)
continue
raise RuntimeError(f"Network error after {attempt} attempts: {e}") from e

if r.status_code == 200:
data = r.json()
try:
return data["choices"][0]["message"]["content"].strip()
except Exception as e:
raise RuntimeError(f"Unexpected response schema: {json.dumps(data)[:500]}") from e

# Friendly handling for 400 (Bad Request) to show Groq error text
if r.status_code == 400:
try:
err = r.json()
msg = err.get("error", {}).get("message") or err
except Exception:
msg = r.text
raise ValueError(f"400 Bad Request from Groq: {msg}")

# Retryable statuses
if r.status_code in (408, 409, 429, 500, 502, 503, 504):
if attempt <= self.cfg.max_retries:
self._sleep_backoff(attempt, rate_or_server=r.status_code)
continue
try:
err = r.json()
msg = err.get("error", {}).get("message") or err
except Exception:
msg = r.text
raise RuntimeError(f"HTTP {r.status_code} after {attempt} attempts: {msg}")

# Non-retryable unexpected status
try:
err = r.json()
msg = err.get("error", {}).get("message") or err
except Exception:
msg = r.text
raise RuntimeError(f"HTTP {r.status_code}: {msg}")

def _sleep_backoff(self, attempt: int, rate_or_server: int = None):
base = self.cfg.backoff_base_s * (2 ** (attempt - 1))
jitter = random.uniform(0, self.cfg.backoff_jitter_s)
# Cap the backoff to something sane
sleep_s = min(base + jitter, 10.0)
if rate_or_server:
print(f"[retry {attempt}] got {rate_or_server}; sleeping {sleep_s:.2f}s...")
else:
print(f"[retry {attempt}] network error; sleeping {sleep_s:.2f}s...")
time.sleep(sleep_s)

# -----------------------------
# Seed history from the user
# -----------------------------
prompt_history: List[Dict[str, Any]] = [
{"prompt": "Classify the following text's sentiment. Respond with 'positive', 'negative', or 'neutral'.", "score": 0.67},
{"prompt": "Analyze the text for emotional tone. Provide a single-word classification: 'positive', 'negative', or 'neutral'.", "score": 0.83},
{"prompt": "Determine the sentiment of the provided text. The possible categories are 'positive', 'negative', and 'neutral'.", "score": 0.83},
{"prompt": "What is the sentiment of the text? Your options are positive, negative, or neutral.", "score": 0.67},
{"prompt": "Please assign a sentiment label to the text. Choose from 'positive', 'negative', or 'neutral'.", "score": 0.83},
{"prompt": "Identify the sentiment. Select one: 'positive', 'negative', 'neutral'.", "score": 0.83},
{"prompt": "Strictly classify the sentiment. Output must be only one of: positive, negative, neutral.", "score": 0.83},
{"prompt": "Classify the sentiment of the text below. Be specific and use only one of the following words: positive, negative, neutral.", "score": 0.83},
{"prompt": "Determine the emotional valence of the text. Respond with positive, negative, or neutral.", "score": 0.67},
{"prompt": "Provide the sentiment classification for the text. Use only the terms positive, negative, or neutral.", "score": 0.83},
{"prompt": "Categorize the text's sentiment. The only valid outputs are 'positive', 'negative', or 'neutral'.", "score": 1.00},
{"prompt": "For the following text, is the sentiment positive, negative, or neutral? Respond with only the chosen word.", "score": 1.00},
{"prompt": "Analyze the text for its core sentiment. Respond with 'positive', 'negative', or 'neutral'. No other words.", "score": 1.00},
{"prompt": "Sentiment analysis: positive, negative, or neutral?", "score": 0.67},
{"prompt": "What is the sentiment of the text? Answer with one word: 'positive', 'negative', or 'neutral'.", "score": 0.83},
{"prompt": "Classify the sentiment of the text. Output must be a single word: positive, negative, or neutral.", "score": 1.00},
{"prompt": "Perform sentiment classification on the text. Return only the class name: positive, negative, or neutral.", "score": 1.00},
{"prompt": "The sentiment of the text is... (positive/negative/neutral).", "score": 0.83},
{"prompt": "Determine if the text has a positive, negative, or neutral sentiment.", "score": 0.67},
{"prompt": "Analyze the text and classify its sentiment as 'positive', 'negative', or 'neutral'. Respond with only the sentiment label.", "score": 1.00},
]

# -----------------------------
# Validation data (the evaluator)
# -----------------------------
validation_data: List[Tuple[str, str]] = [
("I love this product, it's amazing!", "positive"),
("The customer service was terrible.", "negative"),
("The movie was okay, not great.", "neutral"),
("This is the best day ever!", "positive"),
("I am so frustrated with the service.", "negative"),
("The delivery was on time.", "neutral"),
]

VALID_LABELS = {"positive", "negative", "neutral"}

def normalize_label(text: str) -> str:
"""Extract a clean single-word label from model output."""
t = text.strip().lower()
for token in re.split(r"[\s\.\,\!\:\;\-\_\/\|\(\)\[\]\{\}]+", t):
if token in VALID_LABELS:
return token
if "posit" in t: return "positive"
if "negat" in t: return "negative"
if "neutr" in t: return "neutral"
return t # may be invalid; caller will handle

def classify_once(llm: GroqClient, instruction: str, text: str) -> str:
"""Run the classifier model once with the given instruction and text."""
sys = (
"You are a strict sentiment classifier. "
"You must output exactly one word: positive, negative, or neutral. "
"No punctuation, no extra words."
)
user = f"{instruction}\n\nText: {text}\nSentiment:"
out = llm.chat(
messages=[{"role": "system", "content": sys},
{"role": "user", "content": user}],
temperature=0.0, # deterministic for scoring
max_tokens=4,
top_p=1.0,
stop=["\n"] # small guard to keep it to one word
)
return normalize_label(out)

def score_prompt(llm: GroqClient, instruction: str, sleep_between_calls: float = 0.25) -> float:
"""Accuracy on the small validation set (with optional inter-call sleep)."""
correct = 0
for text, gold in validation_data:
pred = classify_once(llm, instruction, text)
if pred == gold:
correct += 1
# Gentle pacing to avoid bursty scoring calls
if sleep_between_calls > 0:
time.sleep(sleep_between_calls)
return correct / len(validation_data)

# -----------------------------
# OPRO meta-prompt & proposal
# -----------------------------
def build_meta_prompt(task_desc: str,
history: List[Dict[str, Any]],
num_proposals: int) -> str:
header = (
"You are optimizing an instruction for a sentiment classification task.\n"
"Goal: maximize accuracy on a small validation set.\n\n"
"You will see previous candidate instructions with their scores (0.00–1.00).\n"
"Propose NEW instructions that are likely to achieve even higher accuracy.\n"
"Constraints for each proposed instruction:\n"
" • Demand a single-word output: positive, negative, or neutral.\n"
" • Discourage extra text or punctuation.\n"
" • Single line only (no numbering, quotes, or code blocks).\n"
"Return exactly {k} new instructions, each on its own line, NOTHING ELSE.\n"
).format(k=num_proposals)

lines = ["Previous (score → instruction):"]
for h in sorted(history, key=lambda x: x["score"], reverse=True):
lines.append(f"{h['score']:.2f}{h['prompt']}")

tail = (
f"\nTask description:\n{task_desc}\n\n"
f"Now propose {num_proposals} new instructions as specified."
)
return header + "\n".join(lines) + tail

def parse_proposals(raw: str) -> List[str]:
cands = []
# Remove code fences if the model disobeys
raw = re.sub(r"^```.*?\n|\n```$", "", raw, flags=re.DOTALL)
for ln in raw.splitlines():
ln = ln.strip()
if not ln:
continue
ln = re.sub(r"^(\d+[\.\)]\s*|\-\s*|\*\s*)", "", ln).strip()
if ln and len(ln) < 300:
cands.append(ln)
return cands

def propose_instructions(optimizer: GroqClient,
history: List[Dict[str, Any]],
num_proposals: int = 6) -> List[str]:
task_desc = (
"Given any English text, classify its sentiment as exactly one of: "
"positive, negative, or neutral."
)
meta_prompt = build_meta_prompt(task_desc, history, num_proposals)
sys = "You are an expert prompt engineer and optimizer."
out = optimizer.chat(
messages=[{"role": "system", "content": sys},
{"role": "user", "content": meta_prompt}],
temperature=0.7, # exploration
max_tokens=256, # safe upper bound
top_p=1.0
)
return parse_proposals(out)

# -----------------------------
# prompt optimization loop
# -----------------------------
def run_opro(optimizer: GroqClient,
evaluator: GroqClient,
seed_history: List[Dict[str, Any]],
rounds: int = 2,
proposals_per_round: int = 6,
keep_top_k: int = 16) -> Tuple[List[Dict[str, Any]], Dict[str, float]]:
history = list(seed_history)
seen = set(h["prompt"] for h in history)

for r in range(1, rounds + 1):
print(f"\n=== OPRO Round {r} ===")
# Propose
proposals = propose_instructions(optimizer, history, num_proposals=proposals_per_round)

# Dedup
unique_props = [p for p in proposals if p not in seen]
if not unique_props:
print("No novel proposals received; try increasing temperature or num_proposals.")
break

# Score
new_items = []
for instr in unique_props:
try:
acc = score_prompt(evaluator, instr, sleep_between_calls=0.3)
except ValueError as ve:
# Likely a 400 with helpful message; print & skip this candidate
print(f"[skip: 400] {ve}")
continue
except Exception as e:
print(f"[skip: error] {e}")
continue

new_items.append({"prompt": instr, "score": acc})
print(f"[{acc:.2f}] {instr}")
seen.add(instr)

# Update history and keep top-k
history.extend(new_items)
history = sorted(history, key=lambda x: x["score"], reverse=True)[:keep_top_k]

# Gentle pause between rounds to avoid bursts
time.sleep(1.2)

best = max(history, key=lambda x: x["score"])
leaderboard = {h["prompt"]: h["score"] for h in history}
return history, {"best_prompt": best["prompt"], "best_score": best["score"]}

# -----------------------------
# Main
# -----------------------------
def main():
cfg = GroqConfig()
client = GroqClient(cfg)

optimizer = client
evaluator = client

# Re-score seed prompts so all scores are from THIS evaluator & dataset
rescored_history = []
print("Scoring seed prompts on validation set...")
for h in prompt_history:
try:
acc = score_prompt(evaluator, h["prompt"], sleep_between_calls=0.25)
except ValueError as ve:
# A 400 typically means malformed instruction; keep original score but warn
print(f"[warn seed 400] {ve}")
acc = h.get("score", 0.0)
except Exception as e:
print(f"[warn seed error] {e}")
acc = h.get("score", 0.0)
rescored_history.append({"prompt": h["prompt"], "score": acc})
print(f"[{acc:.2f}] {h['prompt']}")
# Small pause between seeds
time.sleep(0.2)

# Run
updated_history, summary = run_opro(
optimizer=optimizer,
evaluator=evaluator,
seed_history=rescored_history,
rounds=2, # tune as needed
proposals_per_round=6, # lower = fewer/friendlier requests
keep_top_k=16
)

print("\n=== Leaderboard (top prompts) ===")
for h in sorted(updated_history, key=lambda x: x["score"], reverse=True):
print(f"{h['score']:.2f} :: {h['prompt']}")

print("\n=== Best discovered instruction ===")
print(f"Score: {summary['best_score']:.2f}")
print(f"Prompt: {summary['best_prompt']}")

if __name__ == "__main__":
main()

Reference: 

  1. Yang, C., Wang, X., Lu, Y., Liu, H., Le, Q. V., Zhou, D., & Chen, X. (2023, September). Large language models as optimizers. In The Twelfth International Conference on Learning Representations.
  2. Zhang, Tuo, Jinyue Yuan, and Salman Avestimehr. "Revisiting opro: The limitations of small-scale llms as optimizers." arXiv preprint arXiv:2405.10276 (2024).
  3. Guo, Q., Wang, R., Guo, J., Li, B., Song, K., Tan, X., ... & Yang, Y. (2025). EvoPrompt: Connecting LLMs with Evolutionary Algorithms Yields Powerful Prompt Optimizers. arXiv preprint arXiv:2309.08532.
  4. Tao, Zhengwei, Ting-En Lin, Xiancai Chen, Hangyu Li, Yuchuan Wu, Yongbin Li, Zhi Jin, Fei Huang, Dacheng Tao, and Jingren Zhou. "A survey on self-evolution of large language models." arXiv preprint arXiv:2404.14387 (2024).

Wednesday, August 27, 2025

This AI FIXES Its Own Mistakes?! Agentic LLMs & Self-Improving Prompts Explained

 Introduction.

In this tutorial, we break down the future of AI assistants by exploring Agentic LLMs and Self-Improving Prompts—two techniques that transform chatbots from passive answer machines into reliable, evidence-backed problem solvers. You’ll learn how Agentic LLMs plan, call tools, and fetch real data, while Self-Improving Prompts add a reflection and repair loop that makes answers safer, more consistent, and audit-ready. From finance to healthcare, discover why these methods are the secret to building trustworthy AI systems for high-stakes, real-world use.

Video Tutorial.


Code.

import os, json, requests, time, math, datetime, textwrap
from typing import Any, Dict, List

# ======================
# Config
# ======================
ENDPOINT = "https://api.groq.com/openai/v1/chat/completions"
MODEL = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant") # known-good public model
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "USE YOUR OWN KEYS")
TEMPERATURE = float(os.getenv("GROQ_TEMPERATURE", "0.2"))
TODAY = os.getenv("TODAY", "2025-08-23")

if not GROQ_API_KEY:
raise SystemExit("Please set GROQ_API_KEY in this shell. Example: export GROQ_API_KEY='YOUR_REAL_KEY'")

def _mask(k: str) -> str:
return "<EMPTY>" if not k else f"{k[:4]}{k[-4:]} (len={len(k)})"

print("== GROQ CONFIG ==")
print("Endpoint:", ENDPOINT)
print("Model :", MODEL)
print("Key :", _mask(GROQ_API_KEY))

# ======================
# API Request part
# ======================
def _post(payload: dict) -> dict:
headers = {
"Authorization": f"Bearer {GROQ_API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json",
}
# IMPORTANT: use data=json.dumps(payload) like your probe
r = requests.post(ENDPOINT, headers=headers, data=json.dumps(payload), timeout=60)
if not r.ok:
print("\n--- LLM API ERROR ---")
print("Status:", r.status_code)
try:
print("Body:", r.json())
except Exception:
print("Body:", r.text)
r.raise_for_status()
return r.json()

# ======================
# Smoke test (identical pattern to your probe)
# ======================
def smoke_test():
payload = {
"model": MODEL,
"temperature": 0,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Reply with the single word: pong"},
],
}
resp = _post(payload)
msg = resp["choices"][0]["message"]["content"].strip()
print("Smoke test:", msg)
if "pong" not in msg.lower():
print("Warning: unexpected smoke test response. Check model/endpoint if issues persist.")

# ======================
# --- Synthetic data
# ======================
CUSTOMERS = {
"CUST-1001": {
"customer_id": "CUST-1001",
"name": "Arjun Mehta",
"dob": "1990-02-14",
"risk_tier": "Low",
"country": "IN",
"kyc_status": "Verified",
"account_ids": ["ACCT-111", "ACCT-112"]
},
"CUST-2002": {
"customer_id": "CUST-2002",
"name": "Priya Nair",
"dob": "1986-11-02",
"risk_tier": "Medium",
"country": "IN",
"kyc_status": "Verified",
"account_ids": ["ACCT-221"]
},
"CUST-3003": {
"customer_id": "CUST-3003",
"name": "Mohammed Rahman",
"dob": "1978-07-29",
"risk_tier": "High",
"country": "IN",
"kyc_status": "Verified",
"account_ids": ["ACCT-331"]
}
}

ACCOUNTS = {
"ACCT-111": {"account_id": "ACCT-111", "customer_id": "CUST-1001", "type": "debit_card"},
"ACCT-112": {"account_id": "ACCT-112", "customer_id": "CUST-1001", "type": "savings"},
"ACCT-221": {"account_id": "ACCT-221", "customer_id": "CUST-2002", "type": "checking"},
"ACCT-331": {"account_id": "ACCT-331", "customer_id": "CUST-3003", "type": "checking"},
}

TXNS = {
"ACCT-111": [
{"ts": "2025-08-22T18:15:00", "amount": 7999, "currency": "INR", "mcc": "5812", "merchant": "Cafe Brew", "lat": 19.119, "lon": 72.846, "country": "IN"},
{"ts": "2025-08-22T21:05:00", "amount": 108000, "currency": "INR", "mcc": "6011", "merchant": "ATM Withdrawal", "lat": 19.118, "lon": 72.847, "country": "IN"},
{"ts": "2025-08-23T01:20:00", "amount": 149999, "currency": "INR", "mcc": "4829", "merchant": "Money Transfer", "lat": 28.556, "lon": 77.100, "country": "IN"},
{"ts": "2025-08-23T01:55:00", "amount": 149900, "currency": "INR", "mcc": "4829", "merchant": "Money Transfer", "lat": 28.556, "lon": 77.100, "country": "IN"},
],
"ACCT-221": [
{"ts": "2025-08-22T10:05:00", "amount": 9500, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
{"ts": "2025-08-22T15:25:00", "amount": 9700, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
{"ts": "2025-08-22T19:05:00", "amount": 9800, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
{"ts": "2025-08-22T22:35:00", "amount": 9900, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
{"ts": "2025-08-23T00:05:00", "amount": 10000, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
],
"ACCT-331": [
{"ts": "2025-08-21T12:00:00", "amount": 400000, "currency": "INR", "mcc": "4829", "merchant": "Wire Transfer", "lat": 25.204, "lon": 55.271, "country": "AE"},
{"ts": "2025-08-22T09:30:00", "amount": 385000, "currency": "INR", "mcc": "4829", "merchant": "Wire Transfer", "lat": 25.204, "lon": 55.271, "country": "AE"},
{"ts": "2025-08-23T02:40:00", "amount": 410000, "currency": "INR", "mcc": "4829", "merchant": "Wire Transfer", "lat": 25.204, "lon": 55.271, "country": "AE"},
],
}

SANCTIONS = {
"individuals": [
{"name": "Mohammed Rahman", "dob": "1978-07-29", "country": "PK"},
{"name": "Rahul Sharma", "dob": "1982-05-18", "country": "IN"}
],
"entities": []
}

HIGH_RISK_MCC = {"4829", "6011"}
CTR_REPORTING_THRESHOLD = 100000

# ======================
# Utilities
# ======================
def haversine_km(lat1, lon1, lat2, lon2) -> float:
R = 6371
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1))*math.cos(math.radians(lat2))*math.sin(dlon/2)**2
return 2*R*math.asin(math.sqrt(a))

def parse_ts(ts: str) -> datetime.datetime:
return datetime.datetime.fromisoformat(ts)

def hours_between(a: str, b: str) -> float:
return abs((parse_ts(b) - parse_ts(a)).total_seconds())/3600.0

def last_geojump_km(txns: List[Dict[str, Any]]) -> float:
if len(txns) < 2: return 0.0
last2 = sorted(txns, key=lambda x: x["ts"])[-2:]
(a, b) = last2
return haversine_km(a["lat"], a["lon"], b["lat"], b["lon"])

def near_threshold_structuring(txns: List[Dict[str, Any]], threshold: int, window_hours: float = 24.0) -> Dict[str, Any]:
txns_sorted = sorted(txns, key=lambda x: x["ts"])
recent = [t for t in txns_sorted if hours_between(t["ts"], f"{TODAY}T00:00:00") <= window_hours]
near = [t for t in recent if 0.85*threshold <= t["amount"] <= threshold]
return {"count": len(near), "sum": sum(t["amount"] for t in near), "examples": near[:3]}

def velocity_spend(txns: List[Dict[str, Any]], hours_window: float = 6.0) -> Dict[str, Any]:
cutoff = parse_ts(f"{TODAY}T00:00:00") - datetime.timedelta(hours=hours_window)
recent = [t for t in txns if parse_ts(t["ts"]) >= cutoff]
return {"count": len(recent), "sum": sum(t["amount"] for t in recent)}

def mcc_risk(txns: List[Dict[str, Any]]) -> Dict[str, Any]:
risky = [t for t in txns if t["mcc"] in HIGH_RISK_MCC]
return {"risky_count": len(risky), "examples": risky[:3]}

def sanctions_name_match(name: str, dob: str) -> Dict[str, Any]:
for p in SANCTIONS["individuals"]:
if p["name"].lower() == name.lower() and p["dob"] == dob:
return {"hit": True, "record": p}
return {"hit": False}

# ======================
# Tools (simulated)
# ======================
def tool_get_customer(customer_id: str) -> Dict[str, Any]:
time.sleep(0.02)
c = CUSTOMERS.get(customer_id)
return {"ok": bool(c), "data": c}

def tool_get_accounts(customer_id: str) -> Dict[str, Any]:
time.sleep(0.02)
c = CUSTOMERS.get(customer_id)
if not c: return {"ok": False, "error": "customer not found"}
return {"ok": True, "data": [ACCOUNTS[aid] for aid in c["account_ids"]]}

def tool_get_transactions(account_id: str, hours: int = 168) -> Dict[str, Any]:
time.sleep(0.02)
tx = TXNS.get(account_id, [])
return {"ok": True, "data": tx}

def tool_compute_risk_signals(account_id: str) -> Dict[str, Any]:
time.sleep(0.02)
tx = TXNS.get(account_id, [])
geo_jump = last_geojump_km(tx)
vel = velocity_spend(tx, 6.0)
mcc = mcc_risk(tx)
struct = near_threshold_structuring(tx, CTR_REPORTING_THRESHOLD, 24.0)
return {"ok": True, "data": {"geo_jump_km_last2": geo_jump, "velocity_6h": vel, "mcc_risk": mcc, "structuring_24h": struct}}

def tool_check_sanctions(name: str, dob: str) -> Dict[str, Any]:
time.sleep(0.02)
return {"ok": True, "data": sanctions_name_match(name, dob)}

# ======================
# LLM call
# ======================
def call_llm(messages: List[Dict[str, str]], tools=None, tool_choice="auto") -> Dict[str, Any]:
payload = {"model": MODEL, "temperature": TEMPERATURE, "messages": messages}
if tools is not None:
payload["tools"] = tools
if tool_choice is not None:
payload["tool_choice"] = tool_choice
return _post(payload)

# ======================
# Tool schemas (function-calling)
# ======================
TOOLS = [
{"type":"function","function":{"name":"tool_get_customer","description":"Fetch KYC summary by customer_id.","parameters":{"type":"object","properties":{"customer_id":{"type":"string"}},"required":["customer_id"]}}},
{"type":"function","function":{"name":"tool_get_accounts","description":"List accounts for a customer.","parameters":{"type":"object","properties":{"customer_id":{"type":"string"}},"required":["customer_id"]}}},
{"type":"function","function":{"name":"tool_get_transactions","description":"Fetch recent transactions for an account.","parameters":{"type":"object","properties":{"account_id":{"type":"string"},"hours":{"type":"integer","default":168}},"required":["account_id"]}}},
{"type":"function","function":{"name":"tool_compute_risk_signals","description":"Compute velocity, MCC, structuring, and geo-jump features.","parameters":{"type":"object","properties":{"account_id":{"type":"string"}},"required":["account_id"]}}},
{"type":"function","function":{"name":"tool_check_sanctions","description":"Check simple sanctions/PEP name+dob match.","parameters":{"type":"object","properties":{"name":{"type":"string"},"dob":{"type":"string"}},"required":["name","dob"]}}}
]

# ======================
# System prompts (Resolver & Critic)
# ======================
RESOLVER_SYSTEM = """\
You are FinCrimeResolver v1 — a precise Fraud/AML case triage agent.

OBJECTIVE
- Given an alert describing a suspicious pattern, call tools to fetch KYC, accounts, transactions, risk signals, and sanctions status.
- Produce a structured case disposition with evidence and safe actions.

STRICT OUTPUT SCHEMA (JSON ONLY):
{
"alert_id": "<string>",
"customer_id": "<string>",
"primary_account": "<string>",
"hypothesis": "<string>",
"confidence": <float 0..1>,
"evidence": ["<bullet points>"],
"signals": {
"geo_jump_km_last2": <float>,
"velocity_6h": {"count": <int>, "sum": <float>},
"mcc_risk": {"risky_count": <int>, "examples": [<tx>]},
"structuring_24h": {"count": <int>, "sum": <float>, "examples": [<tx>]},
"sanctions_hit": true/false
},
"actions": {
"immediate": ["<nondestructive steps: contact, soft-block card, VI call, additional auth>"],
"with_approval": ["<disruptive steps: hard block, law enforcement escalation, SAR filing draft>"]
},
"case_notes": "<short narrative for case system>",
"needs_followup": ["<specific missing information to request>"]
}

MANDATORY BEHAVIOR
- ALWAYS call tools: tool_get_customer, tool_get_accounts, tool_get_transactions (for primary), tool_compute_risk_signals (for primary), and tool_check_sanctions.
- Prefer conservative, reversible actions if confidence < 0.8.
- No hallucinated data; use only tool outputs.
- You are not giving legal advice; decisions must be reviewed by a human analyst.
"""

CRITIC_SYSTEM = """\
You are FinCrimeCritic v1 — strict auditor for triage quality.

Validate the draft JSON using this rubric:
1) All required tools were effectively used (KYC/accounts/txns/signals/sanctions).
2) Evidence references concrete signals (velocity, MCC risk, geo jump, structuring).
3) Actions are SAFE given confidence (disruptive steps only under 'with_approval' if confidence < 0.8).
4) Case notes are clear and minimal.
5) Needs_followup is specific (e.g., confirm travel, verify device, merchant receipts).

OUTPUT (JSON ONLY):
{
"ok": true/false,
"findings": ["<specific gap>"],
"improved_draft": { <corrected JSON per schema> }
}
If destructive actions were listed under 'immediate' with confidence < 0.8, move them to 'with_approval' and justify.
"""

# ======================
# LLM orchestration
# ======================
def tool_router(name: str, args: Dict[str, Any]) -> Dict[str, Any]:
if name == "tool_get_customer": return tool_get_customer(**args)
if name == "tool_get_accounts": return tool_get_accounts(**args)
if name == "tool_get_transactions": return tool_get_transactions(**args)
if name == "tool_compute_risk_signals": return tool_compute_risk_signals(**args)
if name == "tool_check_sanctions": return tool_check_sanctions(**args)
return {"ok": False, "error": f"Unknown tool {name}"}

def safe_json_loads(s: str) -> Any:
try:
return json.loads(s)
except Exception:
import re
m = re.search(r"\{.*\}", s, flags=re.DOTALL)
if m:
try:
return json.loads(m.group(0))
except Exception:
pass
return {"_raw": s, "_error": "Could not parse JSON"}

def llm_draft(alert_text: str) -> Dict[str, Any]:
messages = [
{"role": "system", "content": RESOLVER_SYSTEM},
{"role": "user", "content": f"TODAY: {TODAY}\nALERT:\n{alert_text}\nOutput strictly JSON per schema."}
]
# Tool-use loop (OpenAI-compatible)
while True:
resp = call_llm(messages, tools=TOOLS, tool_choice="auto")
msg = resp["choices"][0]["message"]
tcs = msg.get("tool_calls")
if tcs:
for tc in tcs:
fn = tc["function"]["name"]
args = json.loads(tc["function"]["arguments"])
result = tool_router(fn, args)
# tool message echoes back to the model
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": json.dumps(result)
})
continue
return safe_json_loads(msg.get("content","").strip())

def llm_critic(draft_json: Dict[str, Any]) -> Dict[str, Any]:
messages = [
{"role": "system", "content": CRITIC_SYSTEM},
{"role": "user", "content": f"Evaluate and fix if needed:\n{json.dumps(draft_json, ensure_ascii=False, indent=2)}"}
]
resp = call_llm(messages)
return safe_json_loads(resp["choices"][0]["message"]["content"].strip())

# ======================
# Programmatic guardrails
# ======================
def guardrails(final_json: Dict[str, Any]) -> Dict[str, Any]:
try:
conf = float(final_json.get("confidence", 0))
actions = final_json.get("actions", {"immediate": [], "with_approval": []})
immediate = actions.get("immediate", [])
with_approval = actions.get("with_approval", [])

destructive_keywords = ["hard block", "close account", "freeze funds", "law enforcement", "report", "SAR", "FIR", "police"]
if conf < 0.8:
keep, move = [], []
for step in immediate:
if any(k in step.lower() for k in destructive_keywords):
move.append(step)
else:
keep.append(step)
if move:
final_json["actions"]["immediate"] = keep
final_json["actions"]["with_approval"] = list(dict.fromkeys(with_approval + move))
final_json.setdefault("evidence", []).append(
"Moved potentially disruptive steps to 'with_approval' because confidence < 0.8."
)
except Exception:
pass
return final_json

# ======================
# Demo Cases
# ======================
CASES = [
{
"title": "ALRT-901: Sudden geo jump + high-risk money transfers",
"alert": textwrap.dedent("""
ALERT_ID: ALRT-901
CUSTOMER_ID: CUST-1001
PRIMARY_ACCOUNT: ACCT-111
CONTEXT: Card used in Mumbai yesterday evening; within ~4 hours, two large money transfers originated from Delhi location.
SYMPTOMS: High-risk MCC (4829), geo jump > 1000km in short interval; possible account takeover.
""").strip()
},
{
"title": "ALRT-902: Multiple deposits near CTR threshold (structuring)",
"alert": textwrap.dedent("""
ALERT_ID: ALRT-902
CUSTOMER_ID: CUST-2002
PRIMARY_ACCOUNT: ACCT-221
CONTEXT: Repeated cash-like deposits under INR 100,000 clustered in < 24h.
SYMPTOMS: Pattern suggests potential structuring to avoid reporting thresholds.
""").strip()
},
{
"title": "ALRT-903: Sanctions name collision false positive?",
"alert": textwrap.dedent("""
ALERT_ID: ALRT-903
CUSTOMER_ID: CUST-3003
PRIMARY_ACCOUNT: ACCT-331
CONTEXT: Customer name appears similar to a listed person. Cross-border wires to AE corridor observed.
SYMPTOMS: Possible PEP/sanctions match; need DOB verification and corridor risk assessment.
""").strip()
},
]

# ======================
# Orchestrator
# ======================
def run_case(alert_text: str) -> Dict[str, Any]:
draft = llm_draft(alert_text)
critic = llm_critic(draft)
improved = critic.get("improved_draft", draft)
final = guardrails(improved)
return {"draft": draft, "critic": critic, "final": final}

def main():
smoke_test()
for case in CASES:
print("\n" + "="*120)
print("CASE:", case["title"])
out = run_case(case["alert"])
print("\n--- DRAFT -------------------")
print(json.dumps(out["draft"], ensure_ascii=False, indent=2))
print("\n--- CRITIC ------------------")
print(json.dumps(out["critic"], ensure_ascii=False, indent=2))
print("\n--- FINAL -------------------")
print(json.dumps(out["final"], ensure_ascii=False, indent=2))

if __name__ == "__main__":
main()

Key References

1. Self-Reflection in LLM Agents
M. Renze and E. Guven, "Self‑Reflection in LLM Agents: Effects on Problem‑Solving Performance," arXiv, May 2024.

2. Self-Refine: Iterative Refinement with Self-Feedback
A. Madaan et al., "Self‑Refine: Iterative Refinement with Self‑Feedback," arXiv, Mar. 2023. 

3. Reflexion: Language Agents with Verbal Reinforcement Learning
N. Shinn et al., "Reflexion: Language Agents with Verbal Reinforcement Learning," Oct. 2023. 

4. Promptbreeder: Self-Referential Self-Improvement Via Prompt Evolution
C. Fernando et al., "Promptbreeder: Self‑Referential Self‑Improvement Via Prompt Evolution," arXiv, Sep. 2023.

5. Agentic Large Language Models: A Survey (Self-Reflection)
“Agentic Large Language Models, a survey,” Leiden University, Mar. 2025.