Sunday, November 30, 2025

Hypergraph Agentic AI Co-Pilot: Why Traditional Graphs Break in Real Incidents

 Background.

“Most AI copilots today think in straight lines. Real business problems live in messy webs.”

When a big company faces a major incident or a risky change, it’s never just one thing.

  • It’s many services interacting,

  • under a special campaign (like Black Friday),

  • in a specific region (EU vs US),

  • with experiments running,

  • touching databases, teams, SLA metrics, and revenue.

Today’s “agentic AI” generally does one of these:

  1. A single LLM that calls tools in a straight flow (like a decision tree), or

  2. A graph of many agents passing messages node-to-node.

Both are powerful, but they mostly assume pairwise connections:
“service A → service B”, “team X owns service A”, “campaign Y affects region Z”.

The real world is different:

The problem itself is a group:
“Black Friday + EU + Checkout + Payments + DB Cluster A + New Pricing Experiment + Revenue Metric.”

And that “group” is one unit of meaning, not many separate edges.


(2) Simple analogy: Graph vs Hypergraph

Use this analogy. It’s very intuitive.

  • A graph is like your phone’s call log:

    • one person calls another person, one-to-one.

  • A hypergraph is like a WhatsApp group:

    • one group contains many people at once.

If I try to represent a WhatsApp group using only 1-to-1 calls, I have to explode it into:

  • A calls B,

  • A calls C,

  • B calls C,

  • C calls D, …

You lose the idea that

“All these people were together in one conversation, at the same time, talking about the same issue.”

That is exactly the problem with using normal graphs for complex incidents and changes.

  • Graph = many pairwise links.

  • Hypergraph = one edge that directly connects many things together.

In our world:

  • Nodes = services, metrics, experiments, regions, teams, infra.

  • Hyperedge = “one real incident or change scenario” connecting many nodes.

Part-1: Why Graphs Fail in Real Incidents: Building a Hypergraph Agentic AI Co-Pilot (with Live Code)


Part-2: Why Graphs Fail in Real Incidents: Building a Hypergraph Agentic AI Co-Pilot (with Live Code)



Source Code.

import os
import json
import time
import textwrap
from dataclasses import dataclass, field
from typing import Dict, List, Set, Any, Tuple

import requests # make sure requests is installed: pip install requests


# -------------------------------
# Groq API configuration (as given)
# -------------------------------
api_key: str = os.getenv(
"GROQ_API_KEY",
"PLEASE USE YOUR OWN API KEY",
)
endpoint: str = os.getenv(
"GROQ_ENDPOINT",
"https://api.groq.com/openai/v1/chat/completions",
)
model: str = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant")
timeout: int = int(os.getenv("GROQ_TIMEOUT", "60"))
HEADERS = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
USE_LLM = str(os.getenv("USE_LLM", "1")).strip() in {"1", "true", "True", "YES", "yes"}
LLM_STRICT = str(os.getenv("LLM_STRICT", "0")).strip() in {"1", "true", "True", "YES", "yes"}
LLM_MAX_CALLS = int(os.getenv("LLM_MAX_CALLS", "12"))

# --- LLM cost knobs (we'll obey the limit; others are optional here) ---
SEED_LIMIT = int(os.getenv("SEED_LIMIT", "2"))
REFINE_ROOTS_LIMIT = int(os.getenv("REFINE_ROOTS_LIMIT", "1"))
REFINE_LIMIT = int(os.getenv("REFINE_LIMIT", "2"))
LLM_USE_PLAUS = int(os.getenv("LLM_USE_PLAUS", "0")) # not heavily used here, but wired in

LLM_CALLS = 0
LLM_LOG: List[Dict[str, Any]] = []


# ===============================
# 1. Hypergraph Data Structures
# ===============================

@dataclass
class Node:
id: str
type: str # e.g., "service", "metric", "infra", "region", "feature", "team", "kpi"
name: str
attrs: Dict[str, Any] = field(default_factory=dict)


@dataclass
class Hyperedge:
id: str
type: str # e.g., "incident", "change", "journey"
node_ids: List[str]
summary: str = ""
attrs: Dict[str, Any] = field(default_factory=dict)


class Hypergraph:
def __init__(self, nodes: Dict[str, Node], edges: Dict[str, Hyperedge]):
self.nodes = nodes
self.edges = edges
self.node_to_edges: Dict[str, Set[str]] = {}
for eid, e in edges.items():
for nid in e.node_ids:
self.node_to_edges.setdefault(nid, set()).add(eid)

def edge_scores_for_seed_nodes(self, seeds: Set[str]) -> Dict[str, int]:
"""
Score each hyperedge by how many seed nodes it contains.
Returns: dict[edge_id, score]
"""
scores: Dict[str, int] = {}
for nid in seeds:
for eid in self.node_to_edges.get(nid, []):
scores[eid] = scores.get(eid, 0) + 1
return scores

def describe_edge(self, edge_id: str) -> str:
"""
Human-readable description of a hyperedge: summary + entity list.
"""
e = self.edges[edge_id]
parts = [f"Hyperedge {e.id} ({e.type}): {e.summary or '[no summary]'}", "Entities:"]
for nid in e.node_ids:
n = self.nodes.get(nid)
if n:
parts.append(f" - {n.type}: {n.id} ({n.name})")
else:
parts.append(f" - {nid} (missing node)")
return "\n".join(parts)


# ===============================
# 2. LLM Clients: Groq + FakeLLM
# ===============================

class GroqLLM:
"""
Wrapper around Groq chat completion API with call limiting.
"""

def __init__(self, endpoint: str, model: str, headers: Dict[str, str], timeout: int):
self.endpoint = endpoint
self.model = model
self.headers = headers
self.timeout = timeout

def chat(self, messages: List[Dict[str, str]], temperature: float = 0.2,
max_tokens: int = 512) -> str:
global LLM_CALLS, LLM_LOG

if not USE_LLM:
# If disabled, we never actually call the remote API.
return FakeLLM().chat(messages)

if LLM_CALLS >= LLM_MAX_CALLS:
warn_msg = (
f"[LLM] Call limit reached ({LLM_MAX_CALLS}). "
"Returning a fallback message instead of calling the API."
)
print(warn_msg)
if LLM_STRICT:
raise RuntimeError(warn_msg)
# Fallback: simple summary of the last user message.
last_user = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "")
return "[LLM call limit reached]\nLast user message was:\n" + last_user[:300]

payload = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}

LLM_CALLS += 1
call_meta = {
"timestamp": time.time(),
"model": self.model,
"num_messages": len(messages),
}

try:
resp = requests.post(
self.endpoint,
headers=self.headers,
json=payload,
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
call_meta["status"] = "ok"
LLM_LOG.append(call_meta)
return data["choices"][0]["message"]["content"]
except Exception as e:
call_meta["status"] = f"error: {e}"
LLM_LOG.append(call_meta)
print(f"[LLM ERROR] {e}")
# Fallback to FakeLLM on error
return FakeLLM().chat(messages)


class FakeLLM:
"""
Simple fallback that fabricates sensible-looking text,
useful when you run without a real LLM.
"""

def chat(self, messages: List[Dict[str, str]], temperature: float = 0.2,
max_tokens: int = 512) -> str:
user_msg = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "")
return (
"[FakeLLM simulated answer]\n"
"I read the following request:\n\n"
+ textwrap.shorten(user_msg, width=400, placeholder="...")
+ "\n\n(This is a stub. Plug in Groq to get real reasoning.)"
)


# ===============================
# 3. Hypergraph Agentic System
# ===============================

class HypergraphAgenticSystem:
def __init__(self, hypergraph: Hypergraph, llm_client):
self.hg = hypergraph
self.llm = llm_client
# Keyword-to-node mapping for seed extraction (simple but effective)
self.keyword_to_nodes: Dict[str, Set[str]] = self._build_keyword_index()

# ---------- Keyword index ----------

def _build_keyword_index(self) -> Dict[str, Set[str]]:
"""
Build a simple keyword->node_ids mapping.
In a real system, you'd use more robust entity linking.
"""
mapping: Dict[str, Set[str]] = {}

def add(kw: str, nid: str):
mapping.setdefault(kw, set()).add(nid)

for nid, node in self.hg.nodes.items():
name_low = node.name.lower()
# naive heuristic: map individual words in the name to this node
for token in name_low.replace("/", " ").replace("-", " ").split():
if len(token) >= 3:
add(token, nid)

# Explicit synonyms
synonyms = {
"checkout": {"svc_checkout", "metric_checkout_latency"},
"cart": {"svc_checkout"},
"payment": {"svc_payments", "metric_payment_error"},
"payments": {"svc_payments", "metric_payment_error"},
"search": {"svc_search", "metric_search_latency"},
"recommendation": {"svc_reco", "metric_reco_ctr"},
"reco": {"svc_reco", "metric_reco_ctr"},
"eu": {"region_eu"},
"europe": {"region_eu"},
"us": {"region_us"},
"usa": {"region_us"},
"apac": {"region_apac"},
"asia": {"region_apac"},
"black friday": {"feature_black_friday"},
"sale": {"feature_black_friday"},
"new pricing": {"feature_new_pricing"},
"one-click": {"feature_one_click_checkout"},
"one click": {"feature_one_click_checkout"},
"free shipping": {"feature_free_shipping"},
"revenue": {"metric_revenue"},
"conversion": {"metric_conversion"},
}
for kw, nids in synonyms.items():
for nid in nids:
add(kw, nid)

return mapping

# ---------- Seed extraction ----------

def extract_seed_nodes(self, query: str) -> Set[str]:
"""
Map natural language query to a set of seed nodes in the hypergraph.
Simple approach: keyword mapping + optional LLM expansion.
"""
q = query.lower()
seeds: Set[str] = set()

# 1) Keyword matching
for kw, nids in self.keyword_to_nodes.items():
if kw in q:
seeds.update(nids)

# 2) If no seeds, fall back to all services (so something happens)
if not seeds:
for nid, node in self.hg.nodes.items():
if node.type == "service":
seeds.add(nid)

# 3) Optionally: LLM-based hint expansion (if enabled)
if USE_LLM and LLM_USE_PLAUS:
# Ask LLM: "Which of these node types seem relevant?"
# To keep it cheap, we only do a single call.
node_summaries = [
f"{nid}: {node.type} - {node.name}"
for nid, node in self.hg.nodes.items()
]
system_msg = (
"You are a system that helps pick relevant nodes from a list "
"for a given incident/change question. "
"You MUST return a JSON list of node IDs only."
)
user_msg = (
f"Question:\n{query}\n\n"
"Here is the list of available nodes with their types:\n"
+ "\n".join(node_summaries)
+ "\n\nReturn a JSON array of up to 10 node IDs that look relevant."
)
try:
resp = self.llm.chat(
[
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
],
temperature=0.1,
max_tokens=256,
)
# try to parse JSON
start = resp.find("[")
end = resp.rfind("]")
if start != -1 and end != -1 and end > start:
json_str = resp[start : end + 1]
suggested = json.loads(json_str)
if isinstance(suggested, list):
for nid in suggested:
if isinstance(nid, str) and nid in self.hg.nodes:
seeds.add(nid)
except Exception as e:
print(f"[Seed expansion LLM error] {e}")

# Limit number of seeds if configured
if len(seeds) > SEED_LIMIT > 0:
seeds = set(list(seeds)[:SEED_LIMIT])

return seeds

# ---------- Scenario specialist prompt builder ----------

def _build_entity_list(self, edge: Hyperedge) -> str:
"""
Build a readable entity list grouped by type.
"""
groups: Dict[str, List[str]] = {}
for nid in edge.node_ids:
node = self.hg.nodes.get(nid)
if not node:
continue
groups.setdefault(node.type, []).append(
f"- {nid} ({node.name})"
)

order = ["service", "metric", "infra", "region", "feature", "team", "kpi"]
lines: List[str] = []
for t in order:
if t in groups:
label = t.capitalize() + "s" if t != "kpi" else "KPIs"
lines.append(f"- {label}:")
lines.extend(" " + line for line in groups[t])

# include any other types
for t, lst in groups.items():
if t not in order:
lines.append(f"- {t.capitalize()}s:")
lines.extend(" " + line for line in lst)

return "\n".join(lines)

def build_scenario_specialist_prompts(
self, edge: Hyperedge, user_query: str, recent_context: str = "(none)"
) -> Tuple[str, str]:
"""
Return (system_prompt, user_prompt) for a scenario specialist agent.
"""
system_prompt = (
"You are a scenario specialist agent for one hyperedge in a hypergraph "
"representing complex incidents and changes in a large system.\n\n"
"Your job:\n"
"- Look only at the scenario assigned to you (the hyperedge).\n"
"- Combine its entities (services, metrics, infra, region, features, teams, KPIs)\n"
" with the user’s question.\n"
"- Propose a clear local explanation of what might be happening and\n"
" local actions that help.\n\n"
"Output structure:\n"
"1. Local_hypothesis: (2–5 bullet points)\n"
"2. Local_actions: (3–7 bullet points, very concrete)\n"
"3. Confidence: (one of: low / medium / high)\n"
)

entity_list = self._build_entity_list(edge)

user_prompt = f"""
User question:
{user_query}

You are now responsible for this scenario (hyperedge):

Hyperedge ID: {edge.id}
Type: {edge.type}
Summary: {edge.summary or "[no summary]"}

Entities involved:
{entity_list}

(Optional) Recent context:
{recent_context}

Please respond following the required structure:

1. Local_hypothesis:
- ...
2. Local_actions:
- ...
3. Confidence:
- ...
""".strip()

return system_prompt, user_prompt

# ---------- Main pipeline ----------

def answer_query(self, query: str, top_k_edges: int = 3) -> str:
"""
Full pipeline:
- extract seeds
- retrieve hyperedges
- run scenario agents
- global planner
- critic
- reporter
"""
print("\n[Stage 1] Query understanding -> seed nodes")
seeds = self.extract_seed_nodes(query)
print(" Seed nodes:", seeds)

print("\n[Stage 2] Hyperedge retrieval & ranking")
scores = self.hg.edge_scores_for_seed_nodes(seeds)
if not scores:
return "No relevant scenarios (hyperedges) found for this query."

sorted_edges = sorted(scores.items(), key=lambda kv: kv[1], reverse=True)
selected_ids = [eid for eid, _ in sorted_edges[:top_k_edges]]
print(" Selected hyperedges:", selected_ids)

# Scenario agents
print("\n[Stage 3] Scenario specialist agents")
scenario_reports: Dict[str, str] = {}
for eid in selected_ids:
edge = self.hg.edges[eid]
sys_p, usr_p = self.build_scenario_specialist_prompts(edge, query)
messages = [
{"role": "system", "content": sys_p},
{"role": "user", "content": usr_p},
]
print(f" Calling scenario agent for {eid}...")
resp = self.llm.chat(messages, temperature=0.2, max_tokens=512)
scenario_reports[eid] = resp

# Global planner
print("\n[Stage 4] Global planner agent")
planner_system = (
"You are a global planner agent.\n"
"You receive several local analyses from scenario-specialist agents, "
"each of which covers one hyperedge (incident/change/journey pattern).\n\n"
"Your tasks:\n"
"- Merge their local hypotheses into one global root-cause view.\n"
"- Merge their suggestions into one coherent, ordered action plan.\n"
"- Highlight business impact in simple language.\n\n"
"Output structure:\n"
"1. Global_root_cause: (3–7 bullet points)\n"
"2. Unified_action_plan: (ordered list of steps, 5–10 items max)\n"
"3. Business_impact: (2–5 bullet points)\n"
)
reports_text = []
for eid, text in scenario_reports.items():
hdr = f"[Scenario {eid}]\n"
reports_text.append(hdr + text)
planner_user = f"""
User question:
{query}

Local reports from scenario agents:
{chr(10).join(reports_text)}

Please respond using the requested structure.
""".strip()
global_plan = self.llm.chat(
[
{"role": "system", "content": planner_system},
{"role": "user", "content": planner_user},
],
temperature=0.2,
max_tokens=800,
)

# Critic
print("\n[Stage 5] Safety / risk critic agent")
critic_system = (
"You are a careful risk and safety critic.\n"
"Given a proposed global plan, you must:\n"
"- Identify any risky, conflicting, or incomplete steps.\n"
"- Suggest minimal corrections or additions.\n"
"- Output a corrected plan if needed.\n\n"
"Output structure:\n"
"1. Critique: (2–5 bullet points)\n"
"2. Corrected_plan: (if changes are needed; otherwise restate the plan briefly)\n"
)
critic_user = f"""
User question:
{query}

Proposed global plan:
{global_plan}

Please respond using the requested structure.
""".strip()
critiqued_plan = self.llm.chat(
[
{"role": "system", "content": critic_system},
{"role": "user", "content": critic_user},
],
temperature=0.25,
max_tokens=800,
)

# Reporter
print("\n[Stage 6] Reporter agent (final answer)")
reporter_system = (
"You are a reporting agent.\n"
"Given a technical plan and critique, produce two views:\n"
"A. Executive summary for senior management (3–7 bullets, non-technical).\n"
"B. Technical summary for engineers (clear, structured, refer to services/metrics/etc.).\n"
"Be concise but specific.\n"
)
reporter_user = f"""
User question:
{query}

Plan + Critique:
{critiqued_plan}

Please produce:

A. Executive summary
B. Technical summary
""".strip()
final_answer = self.llm.chat(
[
{"role": "system", "content": reporter_system},
{"role": "user", "content": reporter_user},
],
temperature=0.25,
max_tokens=1000,
)

return final_answer


# ===============================
# 4. Build a Synthetic Hypergraph
# ===============================

def build_synthetic_hypergraph() -> Hypergraph:
"""
Build a reasonably sized synthetic hypergraph with:
- ~15 nodes
- ~12+ hyperedges (incidents, changes, journeys)
tuned around checkout/payments/search/reco examples.
"""
nodes: Dict[str, Node] = {}

def add_node(nid: str, ntype: str, name: str, **attrs):
nodes[nid] = Node(id=nid, type=ntype, name=name, attrs=attrs)

# Services
add_node("svc_checkout", "service", "Checkout Service")
add_node("svc_payments", "service", "Payments Service")
add_node("svc_search", "service", "Search Service")
add_node("svc_reco", "service", "Recommendation Service")
add_node("svc_user", "service", "User Profile Service")
add_node("svc_orders", "service", "Orders Service")

# Metrics
add_node("metric_checkout_latency", "metric", "Checkout Latency (p95)")
add_node("metric_payment_error", "metric", "Payment Error Rate")
add_node("metric_search_latency", "metric", "Search Latency (p95)")
add_node("metric_reco_ctr", "metric", "Recommendation CTR")
add_node("metric_revenue", "metric", "Hourly Revenue")
add_node("metric_conversion", "metric", "Checkout Conversion Rate")

# Infra
add_node("infra_db_checkout", "infra", "Checkout DB Cluster")
add_node("infra_db_payments", "infra", "Payments DB Cluster")
add_node("infra_cache_main", "infra", "Main Cache Cluster")
add_node("infra_kafka_orders", "infra", "Orders Kafka")

# Regions
add_node("region_eu", "region", "EU Region")
add_node("region_us", "region", "US Region")
add_node("region_apac", "region", "APAC Region")

# Features / Experiments
add_node("feature_black_friday", "feature", "Black Friday Mega Sale")
add_node("feature_new_pricing", "feature", "New Pricing Experiment")
add_node("feature_one_click_checkout", "feature", "One-Click Checkout")
add_node("feature_reco_experiment", "feature", "Personalized Reco Experiment")
add_node("feature_free_shipping", "feature", "Free Shipping Campaign")

# Teams
add_node("team_checkout", "team", "Checkout Team")
add_node("team_payments", "team", "Payments Team")
add_node("team_search", "team", "Search Team")
add_node("team_reco", "team", "Recommendation Team")
add_node("team_infra", "team", "Infrastructure Team")
add_node("team_growth", "team", "Growth / Experimentation Team")
add_node("team_sre", "team", "SRE Team")

# KPIs
add_node("kpi_revenue", "kpi", "Daily Revenue")
add_node("kpi_conversion", "kpi", "Site-wide Conversion Rate")

edges: Dict[str, Hyperedge] = {}

def add_edge(eid: str, etype: str, node_ids: List[str], summary: str):
edges[eid] = Hyperedge(id=eid, type=etype, node_ids=node_ids, summary=summary)

# --- Incidents ---

add_edge(
"INC_BF_EU_CHECKOUT",
"incident",
[
"svc_checkout",
"svc_payments",
"infra_db_checkout",
"infra_db_payments",
"infra_cache_main",
"metric_checkout_latency",
"metric_payment_error",
"metric_revenue",
"metric_conversion",
"region_eu",
"feature_black_friday",
"feature_new_pricing",
"team_checkout",
"team_payments",
"team_infra",
"team_sre",
"kpi_revenue",
"kpi_conversion",
],
"Black Friday EU checkout slowdown and payment failures after enabling New Pricing.",
)

add_edge(
"INC_US_SEARCH_SLOW",
"incident",
[
"svc_search",
"infra_cache_main",
"metric_search_latency",
"region_us",
"team_search",
"team_infra",
"team_sre",
],
"US search latency incident due to cache pressure.",
)

add_edge(
"INC_APAC_PAYMENTS_TIMEOUT",
"incident",
[
"svc_payments",
"infra_db_payments",
"metric_payment_error",
"metric_revenue",
"region_apac",
"team_payments",
"team_infra",
"team_sre",
"kpi_revenue",
],
"APAC payments timeouts during flash sale.",
)

add_edge(
"INC_EU_RECO_DROP",
"incident",
[
"svc_reco",
"svc_search",
"metric_reco_ctr",
"metric_conversion",
"region_eu",
"feature_reco_experiment",
"team_reco",
"team_search",
"team_growth",
"kpi_conversion",
],
"EU recommendation CTR drop linked with new recommendation experiment.",
)

add_edge(
"INC_US_CHECKOUT_DB_LOCK",
"incident",
[
"svc_checkout",
"infra_db_checkout",
"metric_checkout_latency",
"region_us",
"team_checkout",
"team_infra",
"team_sre",
],
"US checkout latency spike due to DB lock contention.",
)

# --- Changes (deployments / experiments) ---

add_edge(
"CHG_NEW_PRICING_EU",
"change",
[
"svc_checkout",
"svc_payments",
"feature_new_pricing",
"region_eu",
"team_checkout",
"team_payments",
"team_growth",
"metric_revenue",
"metric_conversion",
"kpi_revenue",
"kpi_conversion",
],
"Rollout of New Pricing experiment in EU affecting checkout and payments.",
)

add_edge(
"CHG_FREE_SHIPPING_US",
"change",
[
"svc_checkout",
"feature_free_shipping",
"region_us",
"team_checkout",
"team_growth",
"metric_revenue",
"metric_conversion",
"kpi_revenue",
"kpi_conversion",
],
"Free shipping campaign rollout in US.",
)

add_edge(
"CHG_ONECLICK_EU",
"change",
[
"svc_checkout",
"svc_user",
"feature_one_click_checkout",
"region_eu",
"team_checkout",
"team_growth",
"metric_checkout_latency",
"metric_conversion",
"kpi_conversion",
],
"One-click checkout rollout for EU users.",
)

add_edge(
"CHG_RECO_EXP_GLOBAL",
"change",
[
"svc_reco",
"feature_reco_experiment",
"region_eu",
"region_us",
"region_apac",
"team_reco",
"team_growth",
"metric_reco_ctr",
"metric_conversion",
"kpi_conversion",
],
"Global rollout of personalized recommendation experiment.",
)

# --- Journeys (business journeys) ---

add_edge(
"JRN_EU_HIGH_VALUE_CHECKOUT",
"journey",
[
"svc_search",
"svc_reco",
"svc_checkout",
"svc_payments",
"svc_orders",
"metric_checkout_latency",
"metric_payment_error",
"metric_revenue",
"metric_conversion",
"region_eu",
"feature_black_friday",
"feature_one_click_checkout",
"team_checkout",
"team_payments",
"team_reco",
"team_search",
"kpi_revenue",
"kpi_conversion",
],
"High-value EU checkout journey during sales events.",
)

add_edge(
"JRN_US_ONBOARDING_CHECKOUT",
"journey",
[
"svc_user",
"svc_checkout",
"svc_payments",
"metric_checkout_latency",
"metric_payment_error",
"metric_conversion",
"region_us",
"team_checkout",
"team_payments",
"team_growth",
"kpi_conversion",
],
"US new-user onboarding and first checkout journey.",
)

# Maybe a couple more edges for richness

add_edge(
"INC_EU_CACHE_PRESSURE",
"incident",
[
"svc_search",
"svc_reco",
"infra_cache_main",
"metric_search_latency",
"metric_reco_ctr",
"region_eu",
"team_search",
"team_reco",
"team_infra",
"team_sre",
],
"EU-wide cache pressure impacting search and recommendations.",
)

add_edge(
"INC_GLOBAL_KAFKA_LAG",
"incident",
[
"svc_orders",
"infra_kafka_orders",
"metric_revenue",
"region_eu",
"region_us",
"region_apac",
"team_infra",
"team_sre",
"kpi_revenue",
],
"Global Kafka lag causing delayed order processing.",
)

return Hypergraph(nodes=nodes, edges=edges)


# ===============================
# 5. Main / CLI
# ===============================

def main():
print("=== Hypergraph-based Agentic AI Demo ===")
print("Building synthetic hypergraph...")
hg = build_synthetic_hypergraph()
print(f" Nodes: {len(hg.nodes)}")
print(f" Hyperedges: {len(hg.edges)}")

# Choose LLM backend
if USE_LLM:
print("\n[Config] Using Groq LLM backend")
llm_client = GroqLLM(endpoint=endpoint, model=model, headers=HEADERS, timeout=timeout)
else:
print("\n[Config] USE_LLM=0 -> Using FakeLLM (no external calls)")
llm_client = FakeLLM()

system = HypergraphAgenticSystem(hg, llm_client)

# Interactive loop
print("\nEnter a natural language query about incidents/changes.")
print("Press Enter with empty line to use a sample query.")
print("Type 'exit' to quit.\n")

while True:
user_query = input("Your query> ").strip()
if not user_query:
user_query = (
"Since we launched the new pricing experiment during the Black Friday sale in EU, "
"checkout has become slow and payment failures increased. "
"What is happening, and what should we do in the next 60 minutes?"
)
print(f"Using sample query:\n {user_query}\n")

if user_query.lower() in {"exit", "quit"}:
break

answer = system.answer_query(user_query, top_k_edges=3)

print("\n=== FINAL ANSWER ===\n")
print(answer)
print("\n====================\n")

# Show LLM call stats
print(f"[LLM] Total calls so far: {LLM_CALLS} (limit: {LLM_MAX_CALLS})")
print("You can ask another question, or type 'exit' to quit.\n")


if __name__ == "__main__":
main()

References.

[1] T. Guo et al., “Large Language Model Based Multi-Agents: A Survey of Progress and Challenges,” in Proc. IJCAI, pp. 8063–8071, 2024. [Online]. Available: https://www.ijcai.org/proceedings/2024/890 IJCAI

[2] L. Xu et al., “MAgIC: Investigation of Large Language Model Powered Multi-Agent in Cognition, Adaptability, Rationality and Collaboration,” in Proc. EMNLP, pp. 7315–7332, 2024. [Online]. Available: https://aclanthology.org/2024.emnlp-main.416.pdf ACL Anthology

[3] J. Lim et al., “Large Language Model-Enabled Multi-Agent Systems for Smart Manufacturing,” arXiv preprint, arXiv:2406.01893, 2024. [Online]. Available: https://arxiv.org/abs/2406.01893 arXiv

[4] C. Jimenez-Romero et al., “Multi-agent Systems Powered by Large Language Models,” Frontiers in Artificial Intelligence, vol. 8, 2025. [Online]. Available: https://www.frontiersin.org/articles/10.3389/frai.2025.1593017/full Frontiers

[5] Y. Quan and Z. Liu, “InvAgent: A Large Language Model based Multi-Agent System for Inventory Management in Supply Chains,” arXiv preprint, arXiv:2407.11384, 2024. [Online]. Available: https://arxiv.org/abs/2407.11384 ResearchGate



No comments:

Post a Comment