自律型エージェントシステムのデバッグと可観測性
サイレントに失敗する自律型エージェントは、エージェントがないよりも悪い。従来の関数が例外をスローすると、スタックトレースが得られる。エージェントが20回のツール呼び出しと3回のモデル呼び出しにわたって間違った道をたどると、間違った回答が得られる——そして明確な説明はない。
エージェントのデバッグには異なるメンタルモデルが必要だ。システムは決定論的なパスを実行しているのではなく、一連の意思決定を行っている。可観測性とは、これらの決定を捕捉することを意味する——入力と出力だけでなく、それらを結ぶ推論も含めて。
なぜ従来のデバッグがエージェントに機能しないのか
標準的なロギングは何が起きたかを捕捉する。エージェントの可観測性にはなぜを捕捉することが必要だ——モデルが何を結論づけたか、どのツールを選びなぜか、どの中間状態から作業していたか。
失敗モードも異なる:
- サイレントハルシネーション:エージェントが不確実性を示さずに自信を持って間違った答えを出す。
- 意思決定のドリフト:各ステップはローカルには合理的に見えるが、シーケンス全体が目標から外れていく。
- ツールの誤使用:エージェントが正しいツールを微妙に間違ったパラメーターで呼び出す。
- 無限ループ:エージェントが失敗したアプローチを再試行し続けてスタックする。
- コンテキスト汚染:早い段階での悪い出力が後続のすべての推論を汚染する。
これらのどれも例外を生成しない。間違った動作を生み出し、それは完全な実行トレースを再構築したときにのみ見えてくる。
エージェントの意思決定のための構造化ロギング
最初のステップは、すべてのエージェントインタラクションを構造化ログでラップすることだ。生のAPIレスポンスをログに記録するのではなく、セマンティックイベントをログに記録する。
import jsonimport timeimport uuidfrom dataclasses import dataclass, asdictfrom typing import Anyimport anthropic
client = anthropic.Anthropic()
@dataclassclass AgentEvent: trace_id: str step: int event_type: str # "llm_call", "tool_call", "tool_result", "decision", "error" model: str | None input_tokens: int | None output_tokens: int | None latency_ms: float | None content: dict[str, Any] timestamp: float
def log_event(event: AgentEvent): print(json.dumps(asdict(event))) # ログシンクに置き換える
class TracedAgent: def __init__(self, trace_id: str | None = None): self.trace_id = trace_id or str(uuid.uuid4()) self.step = 0 self.tools = []
def add_tool(self, name: str, description: str, input_schema: dict): self.tools.append({ "name": name, "description": description, "input_schema": input_schema })
def call(self, messages: list[dict], system: str = "") -> str: self.step += 1 start = time.monotonic()
response = client.messages.create( model="claude-opus-4-6", max_tokens=4096, system=system, tools=self.tools, messages=messages )
latency_ms = (time.monotonic() - start) * 1000
log_event(AgentEvent( trace_id=self.trace_id, step=self.step, event_type="llm_call", model="claude-opus-4-6", input_tokens=response.usage.input_tokens, output_tokens=response.usage.output_tokens, latency_ms=latency_ms, content={ "stop_reason": response.stop_reason, "text_blocks": [b.text for b in response.content if b.type == "text"], "tool_calls": [ {"name": b.name, "input": b.input} for b in response.content if b.type == "tool_use" ] }, timestamp=time.time() ))
return response完全なトレースの構築
単一のログ行では不十分だ——各決定をその結果と結びつける完全な実行トレースが必要になる:
from typing import Callable
def run_traced_agent( task: str, tools: dict[str, Callable], tool_schemas: list[dict], system: str, max_steps: int = 20,) -> dict: agent = TracedAgent() for schema in tool_schemas: agent.add_tool(**schema)
messages = [{"role": "user", "content": task}] trace = {"trace_id": agent.trace_id, "task": task, "steps": []} step_count = 0
while step_count < max_steps: step_count += 1 response = agent.call(messages, system=system)
step_record = { "step": step_count, "stop_reason": response.stop_reason, "model_output": [], "tool_results": [] }
if response.stop_reason == "end_turn": for block in response.content: if block.type == "text": step_record["model_output"].append(block.text) trace["steps"].append(step_record) trace["final_answer"] = step_record["model_output"][-1] if step_record["model_output"] else "" break
tool_results = [] for block in response.content: if block.type == "tool_use": step_record["model_output"].append({ "tool": block.name, "input": block.input })
tool_fn = tools.get(block.name) if not tool_fn: result = f"エラー: 不明なツール '{block.name}'" else: try: result = tool_fn(**block.input) except Exception as e: result = f"ツールエラー: {e}"
tool_results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(result) }) step_record["tool_results"].append({ "tool": block.name, "result_preview": str(result)[:200] })
trace["steps"].append(step_record) messages.append({"role": "assistant", "content": response.content}) messages.append({"role": "user", "content": tool_results})
else: trace["error"] = f"最大ステップ数を超過 ({max_steps})"
return traceループ検出
無限ループは一般的な失敗モードだ。各LLM呼び出しのツール呼び出しパターンのフィンガープリントを取ることで検出できる:
def detect_loop(trace: dict, window: int = 4) -> bool: steps = trace["steps"] if len(steps) < window: return False
def step_signature(step: dict) -> str: tools_called = sorted( t["tool"] if isinstance(t, dict) else t for t in step.get("model_output", []) if isinstance(t, dict) and "tool" in t ) return "|".join(tools_called)
recent = [step_signature(s) for s in steps[-window:]] if len(set(recent)) == 1 and recent[0]: return True
if len(steps) >= 4: pattern = [step_signature(s) for s in steps[-4:]] if pattern[0] == pattern[2] and pattern[1] == pattern[3]: return True
return False本番環境で追跡すべきメトリクス
from collections import Counter
def compute_trace_metrics(trace: dict) -> dict: steps = trace["steps"] errors = [s for s in steps if "error" in s]
tool_calls_by_name: Counter = Counter() for step in steps: for output in step.get("model_output", []): if isinstance(output, dict) and "tool" in output: tool_calls_by_name[output["tool"]] += 1
return { "trace_id": trace["trace_id"], "total_steps": len(steps), "error_steps": len(errors), "tool_call_distribution": dict(tool_calls_by_name), "completed": "final_answer" in trace, "loop_detected": detect_loop(trace), }アラートの主要シグナル:
- ループ率 > 5% — エージェントがスタックしている
- ツールごとのエラー率 > しきい値 — ツールが壊れている
- 平均ステップ数の上昇傾向 — タスクが難しくなっている
- p99レイテンシのスパイク — モデルエンドポイントが遅い
OpenTelemetryとの統合
すでにOpenTelemetryを使用しているチームは、エージェントトレースをスパンとして送出できる:
from opentelemetry import trace as otel_trace
tracer = otel_trace.get_tracer("agent")
def run_with_otel(task: str, tools: dict, tool_schemas: list, system: str): with tracer.start_as_current_span("agent.run") as root_span: root_span.set_attribute("agent.task", task[:200])
agent = TracedAgent() for schema in tool_schemas: agent.add_tool(**schema)
messages = [{"role": "user", "content": task}]
for step in range(20): with tracer.start_as_current_span(f"agent.step.{step}") as step_span: response = agent.call(messages, system=system) step_span.set_attribute("llm.stop_reason", response.stop_reason) step_span.set_attribute("llm.input_tokens", response.usage.input_tokens)
if response.stop_reason == "end_turn": breakログにおけるPII削除
エージェントログには機密データが含まれることが多い。外部システムに送出する前に削除する:
import re
PII_PATTERNS = [ (re.compile(r'\b[\w.+-]+@[\w-]+\.[a-z]{2,}\b'), '[メール]'), (re.compile(r'\b0\d{1,4}[-\s]?\d{1,4}[-\s]?\d{4}\b'), '[電話番号]'), (re.compile(r'\bsk-[a-zA-Z0-9]{20,}\b'), '[APIキー]'),]
def redact(text: str) -> str: for pattern, replacement in PII_PATTERNS: text = pattern.sub(replacement, text) return text最も重要な3つのメトリクス
タスク完了率 — 何割の実行がfinal_answerに到達するか、max_stepsまたはエラーで終わるかの比率。タスクの種類ごとにベースラインを設定する。
タスクあたりのトークンコスト — すべてのステップにわたるinput_tokens + output_tokensの合計。完了率が変わらずにコストが20%増加するのは、通常プロンプトの劣化を示す。
ツールエラー率 — error_steps / total_steps。このメトリクスのスパイクは壊れたツールやAPIを直接指している。
エージェントシステムにおける可観測性はオプションではない——イテレーションできるシステムと、壊れたときにリスタートするだけのシステムの違いだ。構造化されたイベントとトレースIDから始めよう。ループ検出を追加する。メトリクスをプッシュする。本番環境で最初の障害が発生して、推測するのではなく何が起きたかを正確に再構築できるとき、この投資の価値がわかる。
関連記事
- エージェントのエラー回復:本番環境の信頼性のための5つのパターン
- マルチエージェントパターン:オーケストレーター、ワーカー、パイプライン
- ツール使用パターン:信頼性の高いエージェント-ツールインターフェースの構築
- 状態機械とエージェント:LangGraphで信頼性の高いワークフローを構築する