Wednesday, August 27, 2025

This AI FIXES Its Own Mistakes?! Agentic LLMs & Self-Improving Prompts Explained

 Introduction.

In this tutorial, we break down the future of AI assistants by exploring Agentic LLMs and Self-Improving Prompts—two techniques that transform chatbots from passive answer machines into reliable, evidence-backed problem solvers. You’ll learn how Agentic LLMs plan, call tools, and fetch real data, while Self-Improving Prompts add a reflection and repair loop that makes answers safer, more consistent, and audit-ready. From finance to healthcare, discover why these methods are the secret to building trustworthy AI systems for high-stakes, real-world use.

Video Tutorial.


Code.

import os, json, requests, time, math, datetime, textwrap
from typing import Any, Dict, List

# ======================
# Config
# ======================
ENDPOINT = "https://api.groq.com/openai/v1/chat/completions"
MODEL = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant") # known-good public model
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "USE YOUR OWN KEYS")
TEMPERATURE = float(os.getenv("GROQ_TEMPERATURE", "0.2"))
TODAY = os.getenv("TODAY", "2025-08-23")

if not GROQ_API_KEY:
raise SystemExit("Please set GROQ_API_KEY in this shell. Example: export GROQ_API_KEY='YOUR_REAL_KEY'")

def _mask(k: str) -> str:
return "<EMPTY>" if not k else f"{k[:4]}{k[-4:]} (len={len(k)})"

print("== GROQ CONFIG ==")
print("Endpoint:", ENDPOINT)
print("Model :", MODEL)
print("Key :", _mask(GROQ_API_KEY))

# ======================
# API Request part
# ======================
def _post(payload: dict) -> dict:
headers = {
"Authorization": f"Bearer {GROQ_API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json",
}
# IMPORTANT: use data=json.dumps(payload) like your probe
r = requests.post(ENDPOINT, headers=headers, data=json.dumps(payload), timeout=60)
if not r.ok:
print("\n--- LLM API ERROR ---")
print("Status:", r.status_code)
try:
print("Body:", r.json())
except Exception:
print("Body:", r.text)
r.raise_for_status()
return r.json()

# ======================
# Smoke test (identical pattern to your probe)
# ======================
def smoke_test():
payload = {
"model": MODEL,
"temperature": 0,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Reply with the single word: pong"},
],
}
resp = _post(payload)
msg = resp["choices"][0]["message"]["content"].strip()
print("Smoke test:", msg)
if "pong" not in msg.lower():
print("Warning: unexpected smoke test response. Check model/endpoint if issues persist.")

# ======================
# --- Synthetic data
# ======================
CUSTOMERS = {
"CUST-1001": {
"customer_id": "CUST-1001",
"name": "Arjun Mehta",
"dob": "1990-02-14",
"risk_tier": "Low",
"country": "IN",
"kyc_status": "Verified",
"account_ids": ["ACCT-111", "ACCT-112"]
},
"CUST-2002": {
"customer_id": "CUST-2002",
"name": "Priya Nair",
"dob": "1986-11-02",
"risk_tier": "Medium",
"country": "IN",
"kyc_status": "Verified",
"account_ids": ["ACCT-221"]
},
"CUST-3003": {
"customer_id": "CUST-3003",
"name": "Mohammed Rahman",
"dob": "1978-07-29",
"risk_tier": "High",
"country": "IN",
"kyc_status": "Verified",
"account_ids": ["ACCT-331"]
}
}

ACCOUNTS = {
"ACCT-111": {"account_id": "ACCT-111", "customer_id": "CUST-1001", "type": "debit_card"},
"ACCT-112": {"account_id": "ACCT-112", "customer_id": "CUST-1001", "type": "savings"},
"ACCT-221": {"account_id": "ACCT-221", "customer_id": "CUST-2002", "type": "checking"},
"ACCT-331": {"account_id": "ACCT-331", "customer_id": "CUST-3003", "type": "checking"},
}

TXNS = {
"ACCT-111": [
{"ts": "2025-08-22T18:15:00", "amount": 7999, "currency": "INR", "mcc": "5812", "merchant": "Cafe Brew", "lat": 19.119, "lon": 72.846, "country": "IN"},
{"ts": "2025-08-22T21:05:00", "amount": 108000, "currency": "INR", "mcc": "6011", "merchant": "ATM Withdrawal", "lat": 19.118, "lon": 72.847, "country": "IN"},
{"ts": "2025-08-23T01:20:00", "amount": 149999, "currency": "INR", "mcc": "4829", "merchant": "Money Transfer", "lat": 28.556, "lon": 77.100, "country": "IN"},
{"ts": "2025-08-23T01:55:00", "amount": 149900, "currency": "INR", "mcc": "4829", "merchant": "Money Transfer", "lat": 28.556, "lon": 77.100, "country": "IN"},
],
"ACCT-221": [
{"ts": "2025-08-22T10:05:00", "amount": 9500, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
{"ts": "2025-08-22T15:25:00", "amount": 9700, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
{"ts": "2025-08-22T19:05:00", "amount": 9800, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
{"ts": "2025-08-22T22:35:00", "amount": 9900, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
{"ts": "2025-08-23T00:05:00", "amount": 10000, "currency": "INR", "mcc": "5411", "merchant": "Grocery World", "lat": 12.971, "lon": 77.594, "country": "IN"},
],
"ACCT-331": [
{"ts": "2025-08-21T12:00:00", "amount": 400000, "currency": "INR", "mcc": "4829", "merchant": "Wire Transfer", "lat": 25.204, "lon": 55.271, "country": "AE"},
{"ts": "2025-08-22T09:30:00", "amount": 385000, "currency": "INR", "mcc": "4829", "merchant": "Wire Transfer", "lat": 25.204, "lon": 55.271, "country": "AE"},
{"ts": "2025-08-23T02:40:00", "amount": 410000, "currency": "INR", "mcc": "4829", "merchant": "Wire Transfer", "lat": 25.204, "lon": 55.271, "country": "AE"},
],
}

SANCTIONS = {
"individuals": [
{"name": "Mohammed Rahman", "dob": "1978-07-29", "country": "PK"},
{"name": "Rahul Sharma", "dob": "1982-05-18", "country": "IN"}
],
"entities": []
}

HIGH_RISK_MCC = {"4829", "6011"}
CTR_REPORTING_THRESHOLD = 100000

# ======================
# Utilities
# ======================
def haversine_km(lat1, lon1, lat2, lon2) -> float:
R = 6371
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1))*math.cos(math.radians(lat2))*math.sin(dlon/2)**2
return 2*R*math.asin(math.sqrt(a))

def parse_ts(ts: str) -> datetime.datetime:
return datetime.datetime.fromisoformat(ts)

def hours_between(a: str, b: str) -> float:
return abs((parse_ts(b) - parse_ts(a)).total_seconds())/3600.0

def last_geojump_km(txns: List[Dict[str, Any]]) -> float:
if len(txns) < 2: return 0.0
last2 = sorted(txns, key=lambda x: x["ts"])[-2:]
(a, b) = last2
return haversine_km(a["lat"], a["lon"], b["lat"], b["lon"])

def near_threshold_structuring(txns: List[Dict[str, Any]], threshold: int, window_hours: float = 24.0) -> Dict[str, Any]:
txns_sorted = sorted(txns, key=lambda x: x["ts"])
recent = [t for t in txns_sorted if hours_between(t["ts"], f"{TODAY}T00:00:00") <= window_hours]
near = [t for t in recent if 0.85*threshold <= t["amount"] <= threshold]
return {"count": len(near), "sum": sum(t["amount"] for t in near), "examples": near[:3]}

def velocity_spend(txns: List[Dict[str, Any]], hours_window: float = 6.0) -> Dict[str, Any]:
cutoff = parse_ts(f"{TODAY}T00:00:00") - datetime.timedelta(hours=hours_window)
recent = [t for t in txns if parse_ts(t["ts"]) >= cutoff]
return {"count": len(recent), "sum": sum(t["amount"] for t in recent)}

def mcc_risk(txns: List[Dict[str, Any]]) -> Dict[str, Any]:
risky = [t for t in txns if t["mcc"] in HIGH_RISK_MCC]
return {"risky_count": len(risky), "examples": risky[:3]}

def sanctions_name_match(name: str, dob: str) -> Dict[str, Any]:
for p in SANCTIONS["individuals"]:
if p["name"].lower() == name.lower() and p["dob"] == dob:
return {"hit": True, "record": p}
return {"hit": False}

# ======================
# Tools (simulated)
# ======================
def tool_get_customer(customer_id: str) -> Dict[str, Any]:
time.sleep(0.02)
c = CUSTOMERS.get(customer_id)
return {"ok": bool(c), "data": c}

def tool_get_accounts(customer_id: str) -> Dict[str, Any]:
time.sleep(0.02)
c = CUSTOMERS.get(customer_id)
if not c: return {"ok": False, "error": "customer not found"}
return {"ok": True, "data": [ACCOUNTS[aid] for aid in c["account_ids"]]}

def tool_get_transactions(account_id: str, hours: int = 168) -> Dict[str, Any]:
time.sleep(0.02)
tx = TXNS.get(account_id, [])
return {"ok": True, "data": tx}

def tool_compute_risk_signals(account_id: str) -> Dict[str, Any]:
time.sleep(0.02)
tx = TXNS.get(account_id, [])
geo_jump = last_geojump_km(tx)
vel = velocity_spend(tx, 6.0)
mcc = mcc_risk(tx)
struct = near_threshold_structuring(tx, CTR_REPORTING_THRESHOLD, 24.0)
return {"ok": True, "data": {"geo_jump_km_last2": geo_jump, "velocity_6h": vel, "mcc_risk": mcc, "structuring_24h": struct}}

def tool_check_sanctions(name: str, dob: str) -> Dict[str, Any]:
time.sleep(0.02)
return {"ok": True, "data": sanctions_name_match(name, dob)}

# ======================
# LLM call
# ======================
def call_llm(messages: List[Dict[str, str]], tools=None, tool_choice="auto") -> Dict[str, Any]:
payload = {"model": MODEL, "temperature": TEMPERATURE, "messages": messages}
if tools is not None:
payload["tools"] = tools
if tool_choice is not None:
payload["tool_choice"] = tool_choice
return _post(payload)

# ======================
# Tool schemas (function-calling)
# ======================
TOOLS = [
{"type":"function","function":{"name":"tool_get_customer","description":"Fetch KYC summary by customer_id.","parameters":{"type":"object","properties":{"customer_id":{"type":"string"}},"required":["customer_id"]}}},
{"type":"function","function":{"name":"tool_get_accounts","description":"List accounts for a customer.","parameters":{"type":"object","properties":{"customer_id":{"type":"string"}},"required":["customer_id"]}}},
{"type":"function","function":{"name":"tool_get_transactions","description":"Fetch recent transactions for an account.","parameters":{"type":"object","properties":{"account_id":{"type":"string"},"hours":{"type":"integer","default":168}},"required":["account_id"]}}},
{"type":"function","function":{"name":"tool_compute_risk_signals","description":"Compute velocity, MCC, structuring, and geo-jump features.","parameters":{"type":"object","properties":{"account_id":{"type":"string"}},"required":["account_id"]}}},
{"type":"function","function":{"name":"tool_check_sanctions","description":"Check simple sanctions/PEP name+dob match.","parameters":{"type":"object","properties":{"name":{"type":"string"},"dob":{"type":"string"}},"required":["name","dob"]}}}
]

# ======================
# System prompts (Resolver & Critic)
# ======================
RESOLVER_SYSTEM = """\
You are FinCrimeResolver v1 — a precise Fraud/AML case triage agent.

OBJECTIVE
- Given an alert describing a suspicious pattern, call tools to fetch KYC, accounts, transactions, risk signals, and sanctions status.
- Produce a structured case disposition with evidence and safe actions.

STRICT OUTPUT SCHEMA (JSON ONLY):
{
"alert_id": "<string>",
"customer_id": "<string>",
"primary_account": "<string>",
"hypothesis": "<string>",
"confidence": <float 0..1>,
"evidence": ["<bullet points>"],
"signals": {
"geo_jump_km_last2": <float>,
"velocity_6h": {"count": <int>, "sum": <float>},
"mcc_risk": {"risky_count": <int>, "examples": [<tx>]},
"structuring_24h": {"count": <int>, "sum": <float>, "examples": [<tx>]},
"sanctions_hit": true/false
},
"actions": {
"immediate": ["<nondestructive steps: contact, soft-block card, VI call, additional auth>"],
"with_approval": ["<disruptive steps: hard block, law enforcement escalation, SAR filing draft>"]
},
"case_notes": "<short narrative for case system>",
"needs_followup": ["<specific missing information to request>"]
}

MANDATORY BEHAVIOR
- ALWAYS call tools: tool_get_customer, tool_get_accounts, tool_get_transactions (for primary), tool_compute_risk_signals (for primary), and tool_check_sanctions.
- Prefer conservative, reversible actions if confidence < 0.8.
- No hallucinated data; use only tool outputs.
- You are not giving legal advice; decisions must be reviewed by a human analyst.
"""

CRITIC_SYSTEM = """\
You are FinCrimeCritic v1 — strict auditor for triage quality.

Validate the draft JSON using this rubric:
1) All required tools were effectively used (KYC/accounts/txns/signals/sanctions).
2) Evidence references concrete signals (velocity, MCC risk, geo jump, structuring).
3) Actions are SAFE given confidence (disruptive steps only under 'with_approval' if confidence < 0.8).
4) Case notes are clear and minimal.
5) Needs_followup is specific (e.g., confirm travel, verify device, merchant receipts).

OUTPUT (JSON ONLY):
{
"ok": true/false,
"findings": ["<specific gap>"],
"improved_draft": { <corrected JSON per schema> }
}
If destructive actions were listed under 'immediate' with confidence < 0.8, move them to 'with_approval' and justify.
"""

# ======================
# LLM orchestration
# ======================
def tool_router(name: str, args: Dict[str, Any]) -> Dict[str, Any]:
if name == "tool_get_customer": return tool_get_customer(**args)
if name == "tool_get_accounts": return tool_get_accounts(**args)
if name == "tool_get_transactions": return tool_get_transactions(**args)
if name == "tool_compute_risk_signals": return tool_compute_risk_signals(**args)
if name == "tool_check_sanctions": return tool_check_sanctions(**args)
return {"ok": False, "error": f"Unknown tool {name}"}

def safe_json_loads(s: str) -> Any:
try:
return json.loads(s)
except Exception:
import re
m = re.search(r"\{.*\}", s, flags=re.DOTALL)
if m:
try:
return json.loads(m.group(0))
except Exception:
pass
return {"_raw": s, "_error": "Could not parse JSON"}

def llm_draft(alert_text: str) -> Dict[str, Any]:
messages = [
{"role": "system", "content": RESOLVER_SYSTEM},
{"role": "user", "content": f"TODAY: {TODAY}\nALERT:\n{alert_text}\nOutput strictly JSON per schema."}
]
# Tool-use loop (OpenAI-compatible)
while True:
resp = call_llm(messages, tools=TOOLS, tool_choice="auto")
msg = resp["choices"][0]["message"]
tcs = msg.get("tool_calls")
if tcs:
for tc in tcs:
fn = tc["function"]["name"]
args = json.loads(tc["function"]["arguments"])
result = tool_router(fn, args)
# tool message echoes back to the model
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": json.dumps(result)
})
continue
return safe_json_loads(msg.get("content","").strip())

def llm_critic(draft_json: Dict[str, Any]) -> Dict[str, Any]:
messages = [
{"role": "system", "content": CRITIC_SYSTEM},
{"role": "user", "content": f"Evaluate and fix if needed:\n{json.dumps(draft_json, ensure_ascii=False, indent=2)}"}
]
resp = call_llm(messages)
return safe_json_loads(resp["choices"][0]["message"]["content"].strip())

# ======================
# Programmatic guardrails
# ======================
def guardrails(final_json: Dict[str, Any]) -> Dict[str, Any]:
try:
conf = float(final_json.get("confidence", 0))
actions = final_json.get("actions", {"immediate": [], "with_approval": []})
immediate = actions.get("immediate", [])
with_approval = actions.get("with_approval", [])

destructive_keywords = ["hard block", "close account", "freeze funds", "law enforcement", "report", "SAR", "FIR", "police"]
if conf < 0.8:
keep, move = [], []
for step in immediate:
if any(k in step.lower() for k in destructive_keywords):
move.append(step)
else:
keep.append(step)
if move:
final_json["actions"]["immediate"] = keep
final_json["actions"]["with_approval"] = list(dict.fromkeys(with_approval + move))
final_json.setdefault("evidence", []).append(
"Moved potentially disruptive steps to 'with_approval' because confidence < 0.8."
)
except Exception:
pass
return final_json

# ======================
# Demo Cases
# ======================
CASES = [
{
"title": "ALRT-901: Sudden geo jump + high-risk money transfers",
"alert": textwrap.dedent("""
ALERT_ID: ALRT-901
CUSTOMER_ID: CUST-1001
PRIMARY_ACCOUNT: ACCT-111
CONTEXT: Card used in Mumbai yesterday evening; within ~4 hours, two large money transfers originated from Delhi location.
SYMPTOMS: High-risk MCC (4829), geo jump > 1000km in short interval; possible account takeover.
""").strip()
},
{
"title": "ALRT-902: Multiple deposits near CTR threshold (structuring)",
"alert": textwrap.dedent("""
ALERT_ID: ALRT-902
CUSTOMER_ID: CUST-2002
PRIMARY_ACCOUNT: ACCT-221
CONTEXT: Repeated cash-like deposits under INR 100,000 clustered in < 24h.
SYMPTOMS: Pattern suggests potential structuring to avoid reporting thresholds.
""").strip()
},
{
"title": "ALRT-903: Sanctions name collision false positive?",
"alert": textwrap.dedent("""
ALERT_ID: ALRT-903
CUSTOMER_ID: CUST-3003
PRIMARY_ACCOUNT: ACCT-331
CONTEXT: Customer name appears similar to a listed person. Cross-border wires to AE corridor observed.
SYMPTOMS: Possible PEP/sanctions match; need DOB verification and corridor risk assessment.
""").strip()
},
]

# ======================
# Orchestrator
# ======================
def run_case(alert_text: str) -> Dict[str, Any]:
draft = llm_draft(alert_text)
critic = llm_critic(draft)
improved = critic.get("improved_draft", draft)
final = guardrails(improved)
return {"draft": draft, "critic": critic, "final": final}

def main():
smoke_test()
for case in CASES:
print("\n" + "="*120)
print("CASE:", case["title"])
out = run_case(case["alert"])
print("\n--- DRAFT -------------------")
print(json.dumps(out["draft"], ensure_ascii=False, indent=2))
print("\n--- CRITIC ------------------")
print(json.dumps(out["critic"], ensure_ascii=False, indent=2))
print("\n--- FINAL -------------------")
print(json.dumps(out["final"], ensure_ascii=False, indent=2))

if __name__ == "__main__":
main()

Key References

1. Self-Reflection in LLM Agents
M. Renze and E. Guven, "Self‑Reflection in LLM Agents: Effects on Problem‑Solving Performance," arXiv, May 2024.

2. Self-Refine: Iterative Refinement with Self-Feedback
A. Madaan et al., "Self‑Refine: Iterative Refinement with Self‑Feedback," arXiv, Mar. 2023. 

3. Reflexion: Language Agents with Verbal Reinforcement Learning
N. Shinn et al., "Reflexion: Language Agents with Verbal Reinforcement Learning," Oct. 2023. 

4. Promptbreeder: Self-Referential Self-Improvement Via Prompt Evolution
C. Fernando et al., "Promptbreeder: Self‑Referential Self‑Improvement Via Prompt Evolution," arXiv, Sep. 2023.

5. Agentic Large Language Models: A Survey (Self-Reflection)
“Agentic Large Language Models, a survey,” Leiden University, Mar. 2025.

Monday, August 11, 2025

LLM as a Judge EXPLAINED! 🏆 Fair AI Rankings with BTL, Elo & Bias Busting Secrets!

 

Content Summary

🔥 Learn how to make Large Language Models (LLMs) your ultimate fair judges!
In this step-by-step tutorial, we’ll go from beginner-friendly basics to research-grade techniques for building an unbiased, mathematically grounded evaluation pipeline.

You’ll learn:

  • What is LLM-as-a-Judge and why it’s a game-changer for model evaluation.

  • Bradley–Terry–Luce (BTL) for global rankings from pairwise matches.

  • Elo Rating for live, online leaderboards.

  • Wilson Score Confidence Interval to measure ranking reliability.

  • Bias detection & mitigation — position bias, verbosity bias, self-enhancement, and more.

  • Working Python Code using the Groq API with llama3-70b-8192.

  • How to combine BTL + Elo + Wilson CI in a complete evaluation pipeline.

  • Visual explanations, analogies, and a flowchart for your own projects.

Working Code:

import os
import json
import math
import time
import random
import statistics
import dataclasses
from typing import Dict, List, Tuple, Any, Optional
import requests

# -------- Groq API Config --------
USE_REAL_LLM = True # Set False for mock/test (no network)
ENDPOINT = "https://api.groq.com/openai/v1/chat/completions"
MODEL = "llama3-70b-8192" # Or "llama3-8b-8192"
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "Use your own key here")

# ---------- Utilities ----------

def seed_everything(seed: int = 123):
random.seed(seed)

def wilson_ci(wins: int, n: int, z: float = 1.96) -> Tuple[float, float]:
if n == 0:
return (0.0, 1.0)
p = wins / n
denom = 1.0 + z**2 / n
num = p + z**2 / (2 * n)
rad = z * math.sqrt((p * (1 - p) + z**2 / (4 * n)) / n)
lo = (num - rad) / denom
hi = (num + rad) / denom
return (max(0.0, lo), min(1.0, hi))

# ---------- Judge prompt ----------

JUDGE_SYSTEM_PROMPT = """You are a strict, fair evaluation judge.
Follow the rubric exactly. Do NOT reward verbosity.
If both candidates are poor or indistinguishable, output TIE.
Always produce VALID JSON ONLY (no extra text)."""

def build_pairwise_user_prompt(task: str, rubric: Dict[str, str], candA: str, candB: str) -> str:
rubric_lines = "\n".join([f"- {k} (0-5): {v}" for k, v in rubric.items()])
prompt = f"""TASK:
{task}

RUBRIC:
{rubric_lines}

CANDIDATE A:
{candA}

CANDIDATE B:
{candB}

INSTRUCTIONS:
1) Briefly justify per dimension (1–2 sentences each).
2) Output JSON ONLY with fields:
{{
"scores": {{
"A": {{"Correctness": x, "Faithfulness": x, "Completeness": x, "Clarity": x, "Safety": x}},
"B": {{"Correctness": x, "Faithfulness": x, "Completeness": x, "Clarity": x, "Safety": x}}
}},
"winner": "A" | "B" | "TIE",
"rationale": "<1-3 sentence summary>"
}}
Note: x must be numbers in [0,5]."""
return prompt

def default_rubric() -> Dict[str, str]:
return {
"Correctness": "Factually correct and logically sound.",
"Faithfulness": "Grounded in the given input/context; no hallucinations.",
"Completeness": "Covers all requested aspects and edge cases.",
"Clarity": "Clear, concise, well-structured writing.",
"Safety": "No policy-violating or harmful content.",
}

# ---------- Groq LLM call ----------

class LLMError(Exception):
pass

def call_groq_chat(messages: List[Dict[str, str]], temperature: float = 0.0, max_retries: int = 3, timeout: int = 60) -> str:
if not USE_REAL_LLM:
return mock_llm(messages)

headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {GROQ_API_KEY}",
}
payload = {
"model": MODEL,
"messages": messages,
"temperature": temperature,
"response_format": {"type": "text"},
}

last_error = None
for attempt in range(1, max_retries + 1):
try:
resp = requests.post(ENDPOINT, headers=headers, json=payload, timeout=timeout)
if resp.status_code != 200:
last_error = f"HTTP {resp.status_code}: {resp.text[:500]}"
time.sleep(1.2 * attempt)
continue
data = resp.json()
content = data["choices"][0]["message"]["content"]
return content
except Exception as e:
last_error = str(e)
time.sleep(1.2 * attempt)

raise LLMError(f"Groq API failed after {max_retries} attempts. Last error: {last_error}")

def mock_llm(messages: List[Dict[str, str]]) -> str:
user_msg = [m for m in messages if m["role"] == "user"][-1]["content"]
a_start = user_msg.find("CANDIDATE A:")
b_start = user_msg.find("CANDIDATE B:")
a_text = user_msg[a_start:b_start]
b_text = user_msg[b_start:]
a_because = a_text.lower().count("because")
b_because = b_text.lower().count("because")

if a_because == b_because:
winner = "TIE"
else:
winner = "A" if a_because > b_because else "B"

j = {
"scores": {
"A": {"Correctness": 4, "Faithfulness": 3, "Completeness": 3, "Clarity": 5, "Safety": 5},
"B": {"Correctness": 5, "Faithfulness": 5, "Completeness": 4, "Clarity": 4, "Safety": 5},
},
"winner": winner,
"rationale": "Choice based on cause-focused content; minor clarity tradeoffs."
}
return json.dumps(j)

# ---------- Parsing & validation ----------

def parse_judge_json(s: str) -> Dict[str, Any]:
try:
obj = json.loads(s)
except json.JSONDecodeError as e:
start = s.find("{")
end = s.rfind("}")
if start != -1 and end != -1 and end > start:
obj = json.loads(s[start:end + 1])
else:
raise ValueError(f"Judge did not return valid JSON. Raw:\n{s[:500]}") from e

if "scores" not in obj or "winner" not in obj:
raise ValueError(f"Missing keys in judge JSON. Got keys: {list(obj.keys())}")

if obj["winner"] not in ["A", "B", "TIE"]:
raise ValueError(f"Invalid winner: {obj['winner']}")

for side in ["A", "B"]:
if side not in obj["scores"]:
raise ValueError(f"Missing scores for {side}")
for k in ["Correctness", "Faithfulness", "Completeness", "Clarity", "Safety"]:
v = obj["scores"][side].get(k, None)
if not isinstance(v, (int, float)):
raise ValueError(f"Score for {side}.{k} must be a number, got {v}")
if not (0 <= float(v) <= 5):
raise ValueError(f"Score for {side}.{k} out of range [0,5]: {v}")

return obj

# ---------- Judging core ----------

@dataclasses.dataclass
class JudgeResult:
winner: str
scores: Dict[str, Dict[str, float]]
rationale: str
order: Tuple[str, str]

def judge_pair(task: str, rubric: Dict[str, str], A: str, B: str,
temperature: float = 0.0, swap: bool = False) -> JudgeResult:
candA, candB = (B, A) if swap else (A, B)
order = ("B", "A") if swap else ("A", "B")

messages = [
{"role": "system", "content": JUDGE_SYSTEM_PROMPT},
{"role": "user", "content": build_pairwise_user_prompt(task, rubric, candA, candB)},
]
raw = call_groq_chat(messages, temperature=temperature)
obj = parse_judge_json(raw)

if swap:
winner_map = {"A": "B", "B": "A", "TIE": "TIE"}
obj["winner"] = winner_map[obj["winner"]]
obj["scores"] = {"A": obj["scores"]["B"], "B": obj["scores"]["A"]}

rationale = obj.get("rationale", "")
return JudgeResult(winner=obj["winner"], scores=obj["scores"], rationale=rationale, order=order)

def run_pairwise_trials(task: str, rubric: Dict[str, str], A: str, B: str,
trials: int = 10, seed: int = 123) -> List[JudgeResult]:
seed_everything(seed)
results: List[JudgeResult] = []
for t in range(trials):
swap = (random.random() < 0.5)
res = judge_pair(task, rubric, A, B, temperature=0.0, swap=swap)
results.append(res)
return results

# ---------- Aggregation: Elo & Bradley–Terry ----------

def elo_update(Ra: float, Rb: float, Sa: float, K: float = 16.0) -> Tuple[float, float]:
Ea = 1.0 / (1.0 + 10 ** ((Rb - Ra) / 400.0))
Ra_new = Ra + K * (Sa - Ea)
Rb_new = Rb + K * ((1 - Sa) - (1 - Ea))
return Ra_new, Rb_new

def aggregate_elo(results: List[JudgeResult], R_init: float = 1500.0, K: float = 16.0) -> Tuple[float, float]:
Ra, Rb = R_init, R_init
for r in results:
if r.winner == "A":
Sa = 1.0
elif r.winner == "B":
Sa = 0.0
else:
Sa = 0.5
Ra, Rb = elo_update(Ra, Rb, Sa, K=K)
return Ra, Rb

def fit_btl_from_pairwise(results: List[JudgeResult]) -> Tuple[float, float]:
w_AB = sum(1 for r in results if r.winner == "A")
w_BA = sum(1 for r in results if r.winner == "B")
n = w_AB + w_BA
if n == 0:
return 0.0, 0.0
num = w_BA + 0.5
den = w_AB + 0.5
delta = math.log(num / den)
beta_A = -0.5 * delta
beta_B = +0.5 * delta
return beta_A, beta_B

# ---------- Reporting ----------

@dataclasses.dataclass
class PairwiseReport:
n: int
wins_A: int
wins_B: int
ties: int
winrate_B: float
ci_B: Tuple[float, float]
elo_A: float
elo_B: float
btl_A: float
btl_B: float

def summarize_results(results: List[JudgeResult]) -> PairwiseReport:
n = len(results)
wins_A = sum(1 for r in results if r.winner == "A")
wins_B = sum(1 for r in results if r.winner == "B")
ties = n - wins_A - wins_B
winrate_B = 0.0 if (wins_A + wins_B) == 0 else wins_B / (wins_A + wins_B)
ci_B = wilson_ci(wins_B, wins_A + wins_B)
elo_A, elo_B = aggregate_elo(results, R_init=1500.0, K=16.0)
btl_A, btl_B = fit_btl_from_pairwise(results)
return PairwiseReport(n, wins_A, wins_B, ties, winrate_B, ci_B, elo_A, elo_B, btl_A, btl_B)

def print_report(report: PairwiseReport, example_rationales: List[str]):
print("\n=== Pairwise Judge Summary ===")
print(f"Total trials: {report.n}")
print(f"Wins A / Wins B / Ties: {report.wins_A} / {report.wins_B} / {report.ties}")
wr = f"{100*report.winrate_B:.1f}%" if report.wins_A + report.wins_B > 0 else "NA"
print(f"Win rate (B over A): {wr} (Wilson 95% CI: [{100*report.ci_B[0]:.1f}%, {100*report.ci_B[1]:.1f}%])")
print(f"Elo ratings: A={report.elo_A:.1f}, B={report.elo_B:.1f}")
print(f"BTL latent scores: A={report.btl_A:.3f}, B={report.btl_B:.3f}\n")
if example_rationales:
print("Example judge rationales:")
for r in example_rationales[:3]:
print(f"- {r}")

# ---------- Example tasks & runner ----------

EXAMPLE_TASK = (
"Summarize the paragraph in 1–2 sentences focusing on the *causes*."
)
EXAMPLE_PARAGRAPH = (
"Yesterday, heavy rain flooded several city streets. "
"Drainage systems were clogged due to poor maintenance. "
"As a result, traffic delays lasted for hours."
)
CANDIDATE_A = "Several city streets flooded yesterday, causing long traffic delays."
CANDIDATE_B = "Streets flooded yesterday because drainage systems were poorly maintained and clogged, causing long delays."

def evaluate_one_pair(task: str, context: Optional[str], A: str, B: str,
trials: int = 10, seed: int = 123) -> None:
rubric = default_rubric()
task_full = f"{task}\n\nCONTEXT:\n{context}" if context else task
print("\n" + "="*80)
print(f"Task:\n{task_full}\n")
print("Candidate A:\n", A, "\n")
print("Candidate B:\n", B, "\n")
results = run_pairwise_trials(task_full, rubric, A, B, trials=trials, seed=seed)
report = summarize_results(results)
rationales = [r.rationale for r in results if r.rationale][:5]
print_report(report, rationales)
print("="*80 + "\n")

if __name__ == "__main__":
USE_REAL_LLM = True
evaluate_one_pair(EXAMPLE_TASK, EXAMPLE_PARAGRAPH, CANDIDATE_A, CANDIDATE_B, trials=6, seed=42)
print("Done.")

References:

  1. Zheng, L. et al. (2023). “Judging LLM-as-a-Judge with MT-Bench and Chatbot Arena.” https://arxiv.org/abs/2306.05685 arXiv
  2. Shi, L. et al. (2024). “Judging the Judges: A Systematic Study of Position Bias in LLM-as-a-Judge.” https://arxiv.org/abs/2406.07791 arXiv
  3. Gu, J. et al. (2024). “A Survey on LLM-as-a-Judge. https://arxiv.org/abs/2411.15594 arXiv
  4. Bradley, R. A., & Terry, M. E. (1952). “Rank Analysis of Incomplete Block Designs: The Method of Paired Comparisons.” Biometrikahttps://academic.oup.com/biomet/article-abstract/39/3-4/324/326091 Oxford Academic
  5. Elo, A. E. (1978). The Rating of Chessplayers, Past and Presenthttps://archive.org/details/ratingofchesspla00unse Internet Archive
  6. Wilson, E. B. (1927). “Probable Inference, the Law of Succession, and Statistical Inference.” JASAOpen PDF: https://www.med.mcgill.ca/epidemiology/Hanley/bios601/Proportion/wilson_jasa_1927.pdf McGill University Medicine
  7. Huang, T.-K., & Lin, C.-J. (2006). “Generalized Bradley-Terry Models and Multi-class Probability Estimates.” https://www.csie.ntu.edu.tw/~cjlin/papers/generalBT.pdf 國立臺灣大學資訊工程學系
  8. Li, D. et al. (2024). “From Generation to Judgment: Opportunities and Challenges of LLM-as-a-Judge.” (survey) https://arxiv.org/abs/2411.16594 arXiv