← back to knowledge-hub

Debugging Multi-Agent CrewAI Workflows

In the previous post, we made agents persistent with custom memory systems. Now we have the opposite problem: agents that remember things and do things across runs—and we have no idea why they did what they did.

Multi-agent debugging is uniquely painful. A bug in a single function has a stack trace. A bug in a multi-agent workflow looks like: agent B produced the wrong analysis. Why? Was the research from agent A incomplete? Did agent B call the wrong tool? Did it hallucinate a result and skip the tool entirely? Did the task context pass incorrectly?

Without visibility into each decision, you’re guessing. This post gives you the tools to stop guessing.

Why Multi-Agent Debugging Is Different

Single-agent failures are relatively transparent: the model produced bad output. Multi-agent failures are compound: the failure at step 4 may have originated at step 1. Diagnosing it requires understanding:

  1. What each agent received as context (did the previous task output arrive correctly?)
  2. Which tools were called, with what inputs, and what they returned
  3. What the LLM “thought” it should do at each step
  4. Where tokens were spent (sometimes the real bug is a context window overrun)

CrewAI provides several layers to answer these questions.

Layer 1: Verbose Logging

The simplest tool. Enable it on both Agent and Crew.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from crewai import Agent, Task, Crew

researcher = Agent(
    role="Senior Research Analyst",
    goal="Uncover accurate company information",
    backstory="You specialize in market research with 10 years experience.",
    verbose=True,          # logs this agent's reasoning and tool calls
    llm="openai/gpt-4o"
)

crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, financial_task, summary_task],
    verbose=True           # logs crew-level orchestration
)

verbose=True on an agent logs every reasoning step: what the agent decided to do, what tool it called, what that tool returned. On the crew, it adds orchestration logs: task assignment, handoffs, and completion events.

During development: always enable. In production: disable. Verbose output is enormous and exposes sensitive data (tool inputs/outputs go to stdout).

Layer 2: Callbacks

Verbose output hits stdout. Callbacks give you structured, programmatic access to the same events—without modifying agent behavior.

step_callback

Called after every agent step (a step is one LLM reasoning + optional tool call cycle).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from datetime import datetime

def log_agent_step(step_output):
    """Called after each agent step with the step result."""
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Step completed:")
    print(f"  Output: {str(step_output)[:200]}...")

crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, financial_task, summary_task],
    step_callback=log_agent_step
)

The step_output argument is the raw output of that step—typically a string containing the agent’s reasoning or the tool result. This is useful for spotting the exact moment things go wrong: if a tool returns an error, step_callback fires immediately and you see it.

task_callback

Called after each complete task finishes. Receives a TaskOutput object.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from crewai import TaskOutput

def on_task_complete(task_output: TaskOutput):
    """Called after each task completes."""
    print(f"\n{'='*50}")
    print(f"Task agent:  {task_output.agent}")
    print(f"Task desc:   {task_output.description[:80]}...")
    print(f"Output raw:  {task_output.raw[:300]}...")
    print(f"Format:      {task_output.output_format}")
    print(f"{'='*50}\n")

crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, financial_task, summary_task],
    task_callback=on_task_complete
)

TaskOutput attributes you’ll use most:

  • task_output.raw — unformatted string output of the task
  • task_output.agent — name of the agent that ran it
  • task_output.description — the task’s description
  • task_output.pydantic — populated if task used output_pydantic=
  • task_output.json_dict — populated if task used output_json=
  • task_output.messages — the LLM messages from the final execution

You can also access task output after the run via the task object itself:

1
2
3
4
5
6
result = crew.kickoff()

# Access individual task outputs after run
print(research_task.output.raw)
print(financial_task.output.agent)
print(summary_task.output.raw)

This is useful for post-run analysis: compare each task’s output to understand how context flowed.

before_kickoff_callbacks and after_kickoff_callbacks

For crew-level lifecycle hooks, use the list-based callback params:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def before_run():
    print(f"Crew starting at {datetime.now().isoformat()}")
    # Load external state, reset counters, etc.

def after_run(result):
    print(f"Crew finished. Final output length: {len(str(result))}")
    # Persist results, send alerts, etc.

crew = Crew(
    agents=[...],
    tasks=[...],
    before_kickoff_callbacks=[before_run],
    after_kickoff_callbacks=[after_run]
)

These are lists—you can register multiple callbacks for the same event.

Layer 3: Persistent Logs

Callbacks are great during development but disappear when the process exits. For production debugging, write logs to disk.

1
2
3
4
5
crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, financial_task, summary_task],
    output_log_file="crew_run.json"   # .json suffix = JSON format
)

output_log_file accepts:

  • True → saves to logs.txt
  • "filename.txt" → saves as plain text
  • "filename.json" → saves as structured JSON (much easier to parse)
  • None (default) → no file logging

JSON format is strongly recommended. You get structured records you can query:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import json

with open("crew_run.json") as f:
    logs = json.load(f)

# Find all tool calls that returned errors
errors = [
    entry for entry in logs
    if "Error" in str(entry.get("output", ""))
]

for err in errors:
    print(f"Task: {err.get('task')}")
    print(f"Error: {err.get('output')[:200]}")
    print()

Combine output_log_file with timestamps in your log path to avoid overwriting:

1
2
3
4
5
6
7
8
9
from datetime import datetime

log_path = f"logs/crew_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"

crew = Crew(
    agents=[...],
    tasks=[...],
    output_log_file=log_path
)

Layer 4: Usage Metrics

Agents are expensive. A workflow that works correctly but costs $40 per run in tokens isn’t production-ready.

After any crew.kickoff(), access token usage:

1
2
3
4
5
result = crew.kickoff()

# Access token usage breakdown
metrics = crew.usage_metrics
print(metrics)

usage_metrics reflects LLM usage across all tasks in the crew. Use this to:

  • Identify runaway tasks: one task consuming 80% of tokens signals a context problem
  • Compare runs: after changing agent backstories or task descriptions, compare token counts
  • Set budgets: if a run exceeds N tokens, log a warning or alert

A practical wrapper:

1
2
3
4
5
6
7
8
def run_with_metrics(crew: Crew, inputs: dict = None) -> tuple:
    """Run crew and return (result, metrics)."""
    result = crew.kickoff(inputs=inputs) if inputs else crew.kickoff()
    metrics = crew.usage_metrics
    return result, metrics

result, metrics = run_with_metrics(crew)
print(f"Total tokens used: {metrics}")

Layer 5: Event Listeners

For advanced observability—shipping events to a monitoring system, building dashboards, or triggering alerts—use CrewAI’s event bus.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from crewai.events import crewai_event_bus, BaseEventListener
from crewai.events.types import (
    ToolUsageStartedEvent,
    ToolUsageFinishedEvent,
    TaskStartedEvent,
    TaskCompletedEvent,
    ToolUsageErrorEvent,
    AgentExecutionErrorEvent,
)

class DebugListener(BaseEventListener):
    """Custom listener that captures all events for debugging."""
    
    def __init__(self):
        super().__init__()
        self.events = []
    
    def setup_listeners(self, crewai_event_bus):
        # Tool execution events
        @crewai_event_bus.on(ToolUsageStartedEvent)
        def on_tool_start(source, event: ToolUsageStartedEvent):
            self.events.append({
                "type": "tool_start",
                "timestamp": event.timestamp,
                "data": vars(event)
            })
        
        @crewai_event_bus.on(ToolUsageFinishedEvent)
        def on_tool_finish(source, event: ToolUsageFinishedEvent):
            self.events.append({
                "type": "tool_finish",
                "timestamp": event.timestamp,
                "data": vars(event)
            })
        
        # Task events
        @crewai_event_bus.on(TaskStartedEvent)
        def on_task_start(source, event: TaskStartedEvent):
            print(f"[EVENT] Task started: {getattr(event, 'task_name', 'unknown')}")
        
        @crewai_event_bus.on(TaskCompletedEvent)
        def on_task_complete(source, event: TaskCompletedEvent):
            print(f"[EVENT] Task completed: {getattr(event, 'task_name', 'unknown')}")
        
        # Error events — highest priority
        @crewai_event_bus.on(ToolUsageErrorEvent)
        def on_tool_error(source, event: ToolUsageErrorEvent):
            print(f"[ERROR] Tool error: {getattr(event, 'error_message', str(event))}")
        
        @crewai_event_bus.on(AgentExecutionErrorEvent)
        def on_agent_error(source, event: AgentExecutionErrorEvent):
            print(f"[ERROR] Agent error: {getattr(event, 'error_message', str(event))}")

# Instantiate in the same file as your crew — keeps it in memory
debug_listener = DebugListener()

# Run crew — events fire automatically
result = crew.kickoff()

# Inspect captured events after run
print(f"Total events captured: {len(debug_listener.events)}")
tool_calls = [e for e in debug_listener.events if e["type"] == "tool_start"]
print(f"Tool calls made: {len(tool_calls)}")

The event system covers: crew execution lifecycle, agent operations, task lifecycle, tool usage (start/finish/error/validation), LLM calls and streaming, memory operations, and agent-to-agent delegation. It’s the most complete view into what happened.

Diagnosing Common Failures

Failure 1: Agent Ignores a Tool

Symptom: Task output doesn’t reflect what the tool would have returned. Agent seems to have “made up” the answer.

Diagnosis: Check step_callback output or event listener tool events. If no tool call fires, the agent decided it already knew the answer—a hallucination pattern.

Fix: Make the task description explicit: “You MUST use the fetch_financial_data tool to retrieve current metrics. Do not estimate or recall values.” Also check the tool’s docstring—vague descriptions lead to agents skipping tools.

Failure 2: Context Not Flowing Between Tasks

Symptom: Task B output ignores everything Task A produced. Agent B starts fresh.

Diagnosis: In task_callback, print each task_output.raw. If Task A produced output but Task B didn’t reference it, check that context=[task_a] is set on task B.

1
2
3
4
5
# Missing context — B doesn't see A's output
task_b = Task(description="...", agent=analyst)

# Correct — B receives A's output as context
task_b = Task(description="...", agent=analyst, context=[task_a])

Fix: Add context= dependencies. Also check task descriptions—if Task B’s description doesn’t tell the agent to use prior context, it may ignore it even when provided.

Failure 3: Tool Returns Error, Agent Continues Anyway

Symptom: Tool returns "Error: API timeout", but the final output looks like the analysis ran fine. The agent invented results.

Diagnosis: Set a step_callback that checks for error strings:

1
2
3
4
5
6
7
def check_for_errors(step_output):
    output_str = str(step_output)
    if "Error:" in output_str:
        print(f"⚠ TOOL ERROR DETECTED: {output_str[:300]}")
        # Optionally raise to halt the crew

crew = Crew(..., step_callback=check_for_errors)

Fix: Improve tool error messages to be unambiguous. An agent seeing "Error: could not fetch data" may rationalize around it. Seeing "CRITICAL: Required data unavailable. Cannot proceed without this data." is harder to ignore.

Failure 4: Token Budget Exhaustion

Symptom: Later tasks produce truncated or low-quality output. Agent stops reasoning mid-way through complex tasks.

Diagnosis: Check crew.usage_metrics after each run. If a single task consumes most tokens, inspect that task’s context chain—are you passing too many previous task outputs via context=?

Fix: Be selective with context=. Pass only the tasks whose output is actually needed, not every upstream task. For long research outputs, add a summarization step before passing downstream.

Failure 5: Infinite Agent Loops

Symptom: Crew runs indefinitely. Agent keeps retrying a tool or re-reasoning without producing output.

Diagnosis: Set max_iter on the agent to cap reasoning iterations:

1
2
3
4
5
6
researcher = Agent(
    role="Senior Research Analyst",
    goal="...",
    max_iter=10,     # stop after 10 reasoning iterations (default is 20)
    verbose=True
)

With verbose enabled you’ll see the iteration count in logs. Common cause: tool returns empty results and agent loops trying different queries.

Structured Debug Harness

For repeatable debugging sessions, encapsulate everything into a debug wrapper:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import json
from datetime import datetime
from crewai import Crew, TaskOutput

class CrewDebugHarness:
    """Run a crew with full debug instrumentation."""
    
    def __init__(self, crew: Crew, log_dir: str = "./debug_logs"):
        self.crew = crew
        self.log_dir = log_dir
        self.steps = []
        self.tasks = []
        
        # Wire callbacks
        self.crew.step_callback = self._on_step
        self.crew.task_callback = self._on_task
    
    def _on_step(self, step_output):
        entry = {
            "time": datetime.now().isoformat(),
            "output": str(step_output)[:500]
        }
        self.steps.append(entry)
        
        if "Error" in str(step_output):
            print(f"⚠ Step error at {entry['time']}: {entry['output'][:200]}")
    
    def _on_task(self, task_output: TaskOutput):
        entry = {
            "time": datetime.now().isoformat(),
            "agent": task_output.agent,
            "description": task_output.description[:100],
            "output_length": len(task_output.raw),
            "format": str(task_output.output_format)
        }
        self.tasks.append(entry)
        print(f"✓ Task done [{task_output.agent}] — {len(task_output.raw)} chars output")
    
    def run(self, inputs: dict = None) -> str:
        import os
        os.makedirs(self.log_dir, exist_ok=True)
        
        start = datetime.now()
        result = self.crew.kickoff(inputs=inputs) if inputs else self.crew.kickoff()
        duration = (datetime.now() - start).total_seconds()
        
        # Build debug report
        report = {
            "run_time": start.isoformat(),
            "duration_seconds": duration,
            "total_steps": len(self.steps),
            "tasks_completed": len(self.tasks),
            "usage_metrics": str(self.crew.usage_metrics),
            "tasks": self.tasks,
            "steps": self.steps
        }
        
        report_path = os.path.join(
            self.log_dir,
            f"debug_{start.strftime('%Y%m%d_%H%M%S')}.json"
        )
        with open(report_path, "w") as f:
            json.dump(report, f, indent=2)
        
        print(f"\nDebug report: {report_path}")
        print(f"Duration: {duration:.1f}s | Steps: {len(self.steps)} | Tasks: {len(self.tasks)}")
        
        return str(result)

# Usage
harness = CrewDebugHarness(crew)
result = harness.run()

This captures every step and task, logs errors inline, writes a timestamped JSON report, and prints a summary. Run it once per debugging session; compare reports across runs to track regressions.

What’s Next

With debugging covered, you now have full visibility into what your crew does, what it costs, and where it fails. The next post tackles performance optimization—reducing token usage, cutting latency, and making agents efficient enough for production workloads.


This is part 4 of the CrewAI series. Previous: Part 1: Getting Started, Part 2: Building Custom Tools, Part 3: Memory and State Management