コンピュータアクセスシステム

自律型エージェントシステムのデバッグと可観測性


サイレントに失敗する自律型エージェントは、エージェントがないよりも悪い。従来の関数が例外をスローすると、スタックトレースが得られる。エージェントが20回のツール呼び出しと3回のモデル呼び出しにわたって間違った道をたどると、間違った回答が得られる——そして明確な説明はない。

エージェントのデバッグには異なるメンタルモデルが必要だ。システムは決定論的なパスを実行しているのではなく、一連の意思決定を行っている。可観測性とは、これらの決定を捕捉することを意味する——入力と出力だけでなく、それらを結ぶ推論も含めて。

なぜ従来のデバッグがエージェントに機能しないのか

標準的なロギングは何が起きたかを捕捉する。エージェントの可観測性にはなぜを捕捉することが必要だ——モデルが何を結論づけたか、どのツールを選びなぜか、どの中間状態から作業していたか。

失敗モードも異なる:

  • サイレントハルシネーション:エージェントが不確実性を示さずに自信を持って間違った答えを出す。
  • 意思決定のドリフト:各ステップはローカルには合理的に見えるが、シーケンス全体が目標から外れていく。
  • ツールの誤使用:エージェントが正しいツールを微妙に間違ったパラメーターで呼び出す。
  • 無限ループ:エージェントが失敗したアプローチを再試行し続けてスタックする。
  • コンテキスト汚染:早い段階での悪い出力が後続のすべての推論を汚染する。

これらのどれも例外を生成しない。間違った動作を生み出し、それは完全な実行トレースを再構築したときにのみ見えてくる。

エージェントの意思決定のための構造化ロギング

最初のステップは、すべてのエージェントインタラクションを構造化ログでラップすることだ。生のAPIレスポンスをログに記録するのではなく、セマンティックイベントをログに記録する。

import json
import time
import uuid
from dataclasses import dataclass, asdict
from typing import Any
import anthropic
client = anthropic.Anthropic()
@dataclass
class AgentEvent:
trace_id: str
step: int
event_type: str # "llm_call", "tool_call", "tool_result", "decision", "error"
model: str | None
input_tokens: int | None
output_tokens: int | None
latency_ms: float | None
content: dict[str, Any]
timestamp: float
def log_event(event: AgentEvent):
print(json.dumps(asdict(event))) # ログシンクに置き換える
class TracedAgent:
def __init__(self, trace_id: str | None = None):
self.trace_id = trace_id or str(uuid.uuid4())
self.step = 0
self.tools = []
def add_tool(self, name: str, description: str, input_schema: dict):
self.tools.append({
"name": name,
"description": description,
"input_schema": input_schema
})
def call(self, messages: list[dict], system: str = "") -> str:
self.step += 1
start = time.monotonic()
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=4096,
system=system,
tools=self.tools,
messages=messages
)
latency_ms = (time.monotonic() - start) * 1000
log_event(AgentEvent(
trace_id=self.trace_id,
step=self.step,
event_type="llm_call",
model="claude-opus-4-6",
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
latency_ms=latency_ms,
content={
"stop_reason": response.stop_reason,
"text_blocks": [b.text for b in response.content if b.type == "text"],
"tool_calls": [
{"name": b.name, "input": b.input}
for b in response.content if b.type == "tool_use"
]
},
timestamp=time.time()
))
return response

完全なトレースの構築

単一のログ行では不十分だ——各決定をその結果と結びつける完全な実行トレースが必要になる:

from typing import Callable
def run_traced_agent(
task: str,
tools: dict[str, Callable],
tool_schemas: list[dict],
system: str,
max_steps: int = 20,
) -> dict:
agent = TracedAgent()
for schema in tool_schemas:
agent.add_tool(**schema)
messages = [{"role": "user", "content": task}]
trace = {"trace_id": agent.trace_id, "task": task, "steps": []}
step_count = 0
while step_count < max_steps:
step_count += 1
response = agent.call(messages, system=system)
step_record = {
"step": step_count,
"stop_reason": response.stop_reason,
"model_output": [],
"tool_results": []
}
if response.stop_reason == "end_turn":
for block in response.content:
if block.type == "text":
step_record["model_output"].append(block.text)
trace["steps"].append(step_record)
trace["final_answer"] = step_record["model_output"][-1] if step_record["model_output"] else ""
break
tool_results = []
for block in response.content:
if block.type == "tool_use":
step_record["model_output"].append({
"tool": block.name,
"input": block.input
})
tool_fn = tools.get(block.name)
if not tool_fn:
result = f"エラー: 不明なツール '{block.name}'"
else:
try:
result = tool_fn(**block.input)
except Exception as e:
result = f"ツールエラー: {e}"
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(result)
})
step_record["tool_results"].append({
"tool": block.name,
"result_preview": str(result)[:200]
})
trace["steps"].append(step_record)
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
else:
trace["error"] = f"最大ステップ数を超過 ({max_steps})"
return trace

ループ検出

無限ループは一般的な失敗モードだ。各LLM呼び出しのツール呼び出しパターンのフィンガープリントを取ることで検出できる:

def detect_loop(trace: dict, window: int = 4) -> bool:
steps = trace["steps"]
if len(steps) < window:
return False
def step_signature(step: dict) -> str:
tools_called = sorted(
t["tool"] if isinstance(t, dict) else t
for t in step.get("model_output", [])
if isinstance(t, dict) and "tool" in t
)
return "|".join(tools_called)
recent = [step_signature(s) for s in steps[-window:]]
if len(set(recent)) == 1 and recent[0]:
return True
if len(steps) >= 4:
pattern = [step_signature(s) for s in steps[-4:]]
if pattern[0] == pattern[2] and pattern[1] == pattern[3]:
return True
return False

本番環境で追跡すべきメトリクス

from collections import Counter
def compute_trace_metrics(trace: dict) -> dict:
steps = trace["steps"]
errors = [s for s in steps if "error" in s]
tool_calls_by_name: Counter = Counter()
for step in steps:
for output in step.get("model_output", []):
if isinstance(output, dict) and "tool" in output:
tool_calls_by_name[output["tool"]] += 1
return {
"trace_id": trace["trace_id"],
"total_steps": len(steps),
"error_steps": len(errors),
"tool_call_distribution": dict(tool_calls_by_name),
"completed": "final_answer" in trace,
"loop_detected": detect_loop(trace),
}

アラートの主要シグナル:

  • ループ率 > 5% — エージェントがスタックしている
  • ツールごとのエラー率 > しきい値 — ツールが壊れている
  • 平均ステップ数の上昇傾向 — タスクが難しくなっている
  • p99レイテンシのスパイク — モデルエンドポイントが遅い

OpenTelemetryとの統合

すでにOpenTelemetryを使用しているチームは、エージェントトレースをスパンとして送出できる:

from opentelemetry import trace as otel_trace
tracer = otel_trace.get_tracer("agent")
def run_with_otel(task: str, tools: dict, tool_schemas: list, system: str):
with tracer.start_as_current_span("agent.run") as root_span:
root_span.set_attribute("agent.task", task[:200])
agent = TracedAgent()
for schema in tool_schemas:
agent.add_tool(**schema)
messages = [{"role": "user", "content": task}]
for step in range(20):
with tracer.start_as_current_span(f"agent.step.{step}") as step_span:
response = agent.call(messages, system=system)
step_span.set_attribute("llm.stop_reason", response.stop_reason)
step_span.set_attribute("llm.input_tokens", response.usage.input_tokens)
if response.stop_reason == "end_turn":
break

ログにおけるPII削除

エージェントログには機密データが含まれることが多い。外部システムに送出する前に削除する:

import re
PII_PATTERNS = [
(re.compile(r'\b[\w.+-]+@[\w-]+\.[a-z]{2,}\b'), '[メール]'),
(re.compile(r'\b0\d{1,4}[-\s]?\d{1,4}[-\s]?\d{4}\b'), '[電話番号]'),
(re.compile(r'\bsk-[a-zA-Z0-9]{20,}\b'), '[APIキー]'),
]
def redact(text: str) -> str:
for pattern, replacement in PII_PATTERNS:
text = pattern.sub(replacement, text)
return text

最も重要な3つのメトリクス

タスク完了率 — 何割の実行がfinal_answerに到達するか、max_stepsまたはエラーで終わるかの比率。タスクの種類ごとにベースラインを設定する。

タスクあたりのトークンコスト — すべてのステップにわたるinput_tokens + output_tokensの合計。完了率が変わらずにコストが20%増加するのは、通常プロンプトの劣化を示す。

ツールエラー率error_steps / total_steps。このメトリクスのスパイクは壊れたツールやAPIを直接指している。


エージェントシステムにおける可観測性はオプションではない——イテレーションできるシステムと、壊れたときにリスタートするだけのシステムの違いだ。構造化されたイベントとトレースIDから始めよう。ループ検出を追加する。メトリクスをプッシュする。本番環境で最初の障害が発生して、推測するのではなく何が起きたかを正確に再構築できるとき、この投資の価値がわかる。


関連記事