Debugging and Observability in Autonomous Agent Systems
An autonomous agent that fails silently is worse than no agent at all. When a traditional function throws an exception, you get a stack trace. When an agent takes a wrong turn across twenty tool calls and three model invocations, you get a wrong answer — and no obvious explanation.
Debugging agents requires a different mental model. The system isn’t executing a deterministic path; it’s making a series of decisions. Observability means capturing those decisions — not just inputs and outputs, but the reasoning that connects them.
Why Traditional Debugging Fails for Agents
Standard logging captures what happened. Agent observability requires capturing why — what the model concluded, which tool it chose and why, and what intermediate state it was working from.
The failure modes are different too:
- Silent hallucination: The agent confidently produces a wrong answer without signaling uncertainty.
- Decision drift: Each step looks reasonable locally, but the sequence drifts away from the goal.
- Tool misuse: The agent calls the right tool with subtly wrong parameters.
- Infinite loops: The agent gets stuck retrying a failing approach.
- Context poisoning: Bad output from an early step corrupts all subsequent reasoning.
None of these produce an exception. They produce wrong behavior that’s only visible when you reconstruct the full execution trace.
Structured Logging for Agent Decisions
The first step is wrapping every agent interaction in structured logs. Don’t log raw API responses — log semantic events.
import jsonimport timeimport uuidfrom dataclasses import dataclass, asdictfrom typing import Anyimport anthropic
client = anthropic.Anthropic()
@dataclassclass AgentEvent: trace_id: str step: int event_type: str # "llm_call", "tool_call", "tool_result", "decision", "error" model: str | None input_tokens: int | None output_tokens: int | None latency_ms: float | None content: dict[str, Any] timestamp: float
def log_event(event: AgentEvent): print(json.dumps(asdict(event))) # Replace with your log sink
class TracedAgent: def __init__(self, trace_id: str | None = None): self.trace_id = trace_id or str(uuid.uuid4()) self.step = 0 self.tools = []
def add_tool(self, name: str, description: str, input_schema: dict): self.tools.append({ "name": name, "description": description, "input_schema": input_schema })
def call(self, messages: list[dict], system: str = "") -> str: self.step += 1 start = time.monotonic()
response = client.messages.create( model="claude-opus-4-6", max_tokens=4096, system=system, tools=self.tools, messages=messages )
latency_ms = (time.monotonic() - start) * 1000
log_event(AgentEvent( trace_id=self.trace_id, step=self.step, event_type="llm_call", model="claude-opus-4-6", input_tokens=response.usage.input_tokens, output_tokens=response.usage.output_tokens, latency_ms=latency_ms, content={ "stop_reason": response.stop_reason, "text_blocks": [b.text for b in response.content if b.type == "text"], "tool_calls": [ {"name": b.name, "input": b.input} for b in response.content if b.type == "tool_use" ] }, timestamp=time.time() ))
return responseEvery LLM call now emits a structured event with the trace ID, step number, token counts, latency, and the model’s decision (text output or tool calls). These events are the raw material for everything else.
Building a Complete Trace
A single log line isn’t enough — you need the full execution trace that connects each decision to its outcome. Build a trace accumulator that records the entire agent loop:
from typing import Callable
def run_traced_agent( task: str, tools: dict[str, Callable], tool_schemas: list[dict], system: str, max_steps: int = 20,) -> dict: """ Runs a full agent loop with complete tracing. Returns the final answer and the execution trace. """ agent = TracedAgent() for schema in tool_schemas: agent.add_tool(**schema)
messages = [{"role": "user", "content": task}] trace = {"trace_id": agent.trace_id, "task": task, "steps": []} step_count = 0
while step_count < max_steps: step_count += 1 response = agent.call(messages, system=system)
step_record = { "step": step_count, "stop_reason": response.stop_reason, "model_output": [], "tool_results": [] }
if response.stop_reason == "end_turn": for block in response.content: if block.type == "text": step_record["model_output"].append(block.text) trace["steps"].append(step_record) trace["final_answer"] = step_record["model_output"][-1] if step_record["model_output"] else "" break
# Handle tool calls tool_results = [] for block in response.content: if block.type == "tool_use": step_record["model_output"].append({ "tool": block.name, "input": block.input })
tool_fn = tools.get(block.name) if not tool_fn: result = f"Error: unknown tool '{block.name}'" log_event(AgentEvent( trace_id=agent.trace_id, step=step_count, event_type="error", model=None, input_tokens=None, output_tokens=None, latency_ms=None, content={"error": result, "tool": block.name}, timestamp=time.time() )) else: t_start = time.monotonic() try: result = tool_fn(**block.input) t_ms = (time.monotonic() - t_start) * 1000 log_event(AgentEvent( trace_id=agent.trace_id, step=step_count, event_type="tool_result", model=None, input_tokens=None, output_tokens=None, latency_ms=t_ms, content={"tool": block.name, "input": block.input, "result": str(result)[:500]}, timestamp=time.time() )) except Exception as e: result = f"Tool error: {e}" log_event(AgentEvent( trace_id=agent.trace_id, step=step_count, event_type="error", model=None, input_tokens=None, output_tokens=None, latency_ms=None, content={"tool": block.name, "input": block.input, "error": str(e)}, timestamp=time.time() ))
tool_results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(result) }) step_record["tool_results"].append({ "tool": block.name, "result_preview": str(result)[:200] })
trace["steps"].append(step_record) messages.append({"role": "assistant", "content": response.content}) messages.append({"role": "user", "content": tool_results})
else: trace["error"] = f"exceeded max_steps ({max_steps})"
return traceThe returned trace is the complete execution record. Store it alongside the final answer so you can replay or inspect any run.
Loop Detection
Infinite loops are a common failure mode. Detect them by fingerprinting each LLM call’s tool invocation pattern:
from collections import Counter
def detect_loop(trace: dict, window: int = 4) -> bool: """ Returns True if the last `window` steps show the same tool call pattern — a strong signal for an infinite loop. """ steps = trace["steps"] if len(steps) < window: return False
def step_signature(step: dict) -> str: tools_called = sorted( t["tool"] if isinstance(t, dict) else t for t in step.get("model_output", []) if isinstance(t, dict) and "tool" in t ) return "|".join(tools_called)
recent = [step_signature(s) for s in steps[-window:]] # If all recent steps have the same non-empty tool signature, it's a loop if len(set(recent)) == 1 and recent[0]: return True
# Also check for alternating two-step loops if len(steps) >= 4: pattern = [step_signature(s) for s in steps[-4:]] if pattern[0] == pattern[2] and pattern[1] == pattern[3]: return True
return FalseCall this inside your agent loop and break early if a loop is detected, then log it as a structured error event.
Metrics to Track in Production
Once you have structured logs, derive aggregate metrics per-agent and per-task-type:
def compute_trace_metrics(trace: dict) -> dict: steps = trace["steps"] errors = [s for s in steps if "error" in s]
tool_calls_by_name: Counter = Counter() total_latency_ms = 0.0 tool_failures = 0
for step in steps: for output in step.get("model_output", []): if isinstance(output, dict) and "tool" in output: tool_calls_by_name[output["tool"]] += 1
# Pull from raw log events for latency (or pass them in) return { "trace_id": trace["trace_id"], "total_steps": len(steps), "error_steps": len(errors), "tool_call_distribution": dict(tool_calls_by_name), "completed": "final_answer" in trace, "loop_detected": detect_loop(trace), "error_messages": [s.get("error") for s in errors], }At scale, push these metrics to your time-series database (Prometheus, Datadog, CloudWatch). Key signals to alert on:
- Loop rate > 5% of runs — the agent is getting stuck
- Error rate per tool > threshold — a tool is broken
- Average step count trending up — tasks are getting harder or prompts are degrading
- p99 latency spike — a model endpoint is slow
OpenTelemetry Integration
For teams already using OpenTelemetry, emit agent traces as spans. This lets you correlate agent behavior with the rest of your infrastructure:
from opentelemetry import trace as otel_tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
# Setup — replace ConsoleSpanExporter with your OTLP exporterprovider = TracerProvider()provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))otel_trace.set_tracer_provider(provider)tracer = otel_trace.get_tracer("agent")
def run_with_otel(task: str, tools: dict, tool_schemas: list, system: str): with tracer.start_as_current_span("agent.run") as root_span: root_span.set_attribute("agent.task", task[:200])
agent = TracedAgent() for schema in tool_schemas: agent.add_tool(**schema)
messages = [{"role": "user", "content": task}]
for step in range(20): with tracer.start_as_current_span(f"agent.step.{step}") as step_span: response = agent.call(messages, system=system) step_span.set_attribute("llm.stop_reason", response.stop_reason) step_span.set_attribute("llm.input_tokens", response.usage.input_tokens) step_span.set_attribute("llm.output_tokens", response.usage.output_tokens)
if response.stop_reason == "end_turn": break
for block in response.content: if block.type == "tool_use": with tracer.start_as_current_span(f"tool.{block.name}") as tool_span: tool_span.set_attribute("tool.name", block.name) tool_span.set_attribute("tool.input", json.dumps(block.input)[:500]) # ... execute toolEvery agent run becomes a distributed trace you can visualize in Jaeger, Grafana Tempo, or Honeycomb.
A Debug Harness for Local Testing
During development, use a debug harness that lets you step through an agent run interactively:
def debug_run(task: str, tools: dict, tool_schemas: list, system: str): """ Interactive debug mode: pause after each step to inspect state. Set DEBUG_AGENT=1 in your environment to enable. """ import os debug = os.getenv("DEBUG_AGENT") == "1"
trace = run_traced_agent(task, tools, tool_schemas, system)
for i, step in enumerate(trace["steps"]): print(f"\n{'='*60}") print(f"STEP {i+1} | stop_reason: {step['stop_reason']}") print(f"{'='*60}")
if step["model_output"]: print("Model output:") for output in step["model_output"]: print(f" {json.dumps(output, indent=2)}")
if step["tool_results"]: print("Tool results:") for result in step["tool_results"]: print(f" [{result['tool']}]: {result['result_preview']}")
if debug: input("\nPress Enter to continue...")
print(f"\n{'='*60}") if "final_answer" in trace: print(f"FINAL ANSWER:\n{trace['final_answer']}") else: print(f"ERROR: {trace.get('error', 'unknown')}")
metrics = compute_trace_metrics(trace) print(f"\nMETRICS: {json.dumps(metrics, indent=2)}") return traceRun with DEBUG_AGENT=1 python agent.py to pause at each step. Run without it for non-interactive traces in CI or staging.
PII Redaction in Logs
Agent logs often contain sensitive data. Before emitting to any external system, redact it:
import re
PII_PATTERNS = [ (re.compile(r'\b[\w.+-]+@[\w-]+\.[a-z]{2,}\b'), '[EMAIL]'), (re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'), '[PHONE]'), (re.compile(r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'), '[CARD]'), (re.compile(r'\bsk-[a-zA-Z0-9]{20,}\b'), '[API_KEY]'),]
def redact(text: str) -> str: for pattern, replacement in PII_PATTERNS: text = pattern.sub(replacement, text) return text
def safe_log_event(event: AgentEvent): safe = asdict(event) safe["content"] = json.loads(redact(json.dumps(safe["content"]))) log_event(AgentEvent(**safe)) # or print directlyApply redaction in the log emission layer, not at the application layer. The agent still sees the real data — only the logs are sanitized.
Post-Mortem Analysis
When an agent run fails in production, your trace gives you everything you need:
- Load the trace by
trace_idfrom your log store - Find the divergence point — the first step where the model output looks unexpected
- Check tool inputs/outputs at that step — was a tool returning bad data?
- Re-run the trace up to that step with the same inputs to reproduce the failure
- Inspect the full message history at the failing step to see what context the model had
The structured logs from run_traced_agent contain everything needed for steps 1–4. For step 5, add an option to dump the raw messages list at each step:
if os.getenv("DUMP_MESSAGES") == "1": with open(f"trace_{agent.trace_id}_messages.json", "w") as f: json.dump(messages, f, indent=2, default=str)This creates a replayable artifact. Given the same messages at step N, the model will produce similar output — making failures reproducible even in non-deterministic systems.
What to Measure
Three metrics matter most for production agent health:
Task completion rate — what fraction of runs reach final_answer vs. hitting max_steps or an error. Baseline this per task type; a research task completing in 10 steps isn’t comparable to a simple lookup in 2.
Token cost per task — sum input_tokens + output_tokens across all steps. Track this over time. A 20% increase in cost with no change in completion rate usually signals prompt degradation or a task complexity shift.
Tool error rate — error_steps / total_steps. Spikes in this metric point directly to a broken tool or API. Since tool errors cascade into model confusion, fixing the tool almost always improves completion rate too.
Observability in agent systems isn’t optional — it’s the difference between a system you can iterate on and one you can only restart when it breaks. Start with structured events and trace IDs. Add loop detection. Push metrics. The investment pays off the first time you have a production failure and can reconstruct exactly what happened instead of guessing.
Related Articles
- Agent Error Recovery: 5 Patterns for Production Reliability
- Multi-Agent Patterns: Orchestrators, Workers, and Pipelines
- Tool Use Patterns: Building Reliable Agent-Tool Interfaces
- State Machines and Agents: Building Reliable Workflows with LangGraph