Table of Contents
- Why Agent Coordination Matters
- Core Coordination Patterns
- Architecture for Multi-Agent Claude Deployments
- State Management and Handoff
- Tool Orchestration and Validation
- Failure Scenarios and Recovery
- Observability and Monitoring
- Production Implementation Guide
- Real-World Patterns and Trade-offs
- Next Steps
Why Agent Coordination Matters
Building a single Claude agent is straightforward. Running multiple Claude agents in production that actually work together—that’s where most teams hit the wall.
When you deploy agents at scale, you’re not just managing one conversation loop. You’re orchestrating task dependencies, routing decisions between agents, managing shared state, handling failures gracefully, and keeping costs under control. Get the coordination wrong and you’ll ship agents that hallucinate their way into loops, miss critical context, or blow through your token budget on redundant work.
This guide covers the specific patterns that work in production. We’ve built these with teams at PADISO across financial services, insurance, and SaaS platforms where agent failures cost real money. We’ll walk through the architecture, show you the code patterns that prevent common failures, and explain the trade-offs you’ll face when scaling from prototype to production.
The core problem: Claude agents need clear handoff protocols, explicit state passing, and deterministic routing. Without these, you get agents that repeat work, lose context, or deadlock waiting for each other.
Core Coordination Patterns
Sequential Agent Chains
Sequential coordination is the simplest pattern: Agent A completes its task, passes results to Agent B, and so on. This works well when tasks have a natural order and each agent’s output feeds directly into the next agent’s input.
The key insight is that sequential chains are predictable. You know exactly which agent runs when, which makes debugging and cost tracking straightforward. The trade-off is latency: you can’t parallelise work, so if you have five sequential steps, you wait for all five to complete.
class SequentialAgentChain:
def __init__(self, agents: List[Agent]):
self.agents = agents
self.state = {}
async def run(self, initial_input: str) -> dict:
current_input = initial_input
for i, agent in enumerate(self.agents):
print(f"Running agent {i}: {agent.name}")
result = await agent.execute(
input=current_input,
context=self.state
)
# Store result and pass to next agent
self.state[agent.name] = result
current_input = result.get('output', '')
return self.state
In practice, sequential chains work well for research workflows, content generation pipelines, and data processing. Each agent specialises in one step: gathering information, analysing it, drafting a response, and reviewing it. You control the flow explicitly, which makes production debugging much easier than trying to debug emergent behaviour from concurrent agents.
Hierarchical Orchestration
Hierarchical patterns use a coordinator agent that delegates work to specialist agents. The coordinator decides what needs doing, which agent should do it, and how to combine results.
This pattern scales better than pure sequential chains because the coordinator can make intelligent routing decisions. Instead of always running agents in the same order, it can skip steps, run agents in parallel, or retry failed work.
The challenge is that the coordinator becomes a bottleneck. If the coordinator is too simple, it makes poor routing decisions. If it’s too complex, it becomes expensive to run and hard to debug.
class HierarchicalOrchestrator:
def __init__(self, coordinator: Agent, specialists: Dict[str, Agent]):
self.coordinator = coordinator
self.specialists = specialists
self.execution_log = []
async def orchestrate(self, task: str) -> dict:
# Coordinator decides what to do
plan = await self.coordinator.plan(
task=task,
available_agents=list(self.specialists.keys())
)
results = {}
for step in plan.steps:
agent_name = step.agent
agent = self.specialists[agent_name]
result = await agent.execute(
input=step.input,
context=results
)
results[agent_name] = result
self.execution_log.append({
'agent': agent_name,
'step': step.description,
'success': result.get('success', False)
})
return results
We’ve used hierarchical orchestration successfully with teams building AI-powered customer support systems, where a routing agent decides whether to escalate to a specialist, attempt a resolution, or gather more information. The coordinator’s decision-making directly affects cost and latency, so you need to monitor its performance closely.
Reactive Message Passing
Reactive patterns treat agents as autonomous entities that communicate via messages. Agent A publishes a message, Agent B subscribes and reacts, and so on. This is more like a traditional distributed system than a scripted workflow.
Reactive systems are flexible and can handle complex, non-linear workflows. They’re also harder to reason about. You lose the explicit control of sequential chains, and you gain the complexity of managing message queues, ensuring messages are processed exactly once, and debugging non-deterministic ordering.
class ReactiveAgentSystem:
def __init__(self, agents: List[Agent]):
self.agents = agents
self.message_queue = asyncio.Queue()
self.subscriptions = {} # agent_name -> list of message types
self.state = {}
def subscribe(self, agent_name: str, message_types: List[str]):
self.subscriptions[agent_name] = message_types
async def publish(self, message: dict):
await self.message_queue.put(message)
async def run(self):
while True:
message = await self.message_queue.get()
# Find agents subscribed to this message type
for agent_name, message_types in self.subscriptions.items():
if message['type'] in message_types:
agent = next(a for a in self.agents if a.name == agent_name)
# Agent processes message and may publish new messages
response = await agent.handle_message(
message=message,
state=self.state
)
if response.get('publish'):
await self.publish(response['publish'])
Reactive patterns shine in systems where agents need to collaborate on shared problems—like a team of agents analysing a support ticket, each contributing their expertise and building on each other’s findings. The downside is operational complexity: you need solid observability to understand what’s happening when multiple agents are reacting to messages concurrently.
Architecture for Multi-Agent Claude Deployments
Reference Architecture
A production Claude agent coordination system typically has these layers:
Input Layer: Receives requests (API, webhook, queue message) and validates them. This is where you catch malformed input before it reaches your agents.
Orchestration Layer: Decides which agents run, in what order, and with what context. This is where you implement your coordination pattern (sequential, hierarchical, reactive).
Agent Layer: Individual Claude instances with specific roles, tools, and guardrails. Each agent has its own system prompt, tool definitions, and error handling.
Tool Layer: Functions your agents can call—databases, APIs, file systems, external services. Tools are how agents interact with the real world.
State Layer: Persistent storage for context, conversation history, and intermediate results. This is critical for handoff between agents and for recovery after failures.
Output Layer: Formats results, stores them, and returns them to the caller.
Observability Layer: Logs, traces, metrics, and error tracking across all layers. You can’t debug production agent systems without comprehensive observability.
Each layer should be independently testable and replaceable. If you can swap out the orchestration pattern without changing agent code, you’ve got good separation of concerns.
State Passing and Context Management
State is how agents remember what they’ve learned and what they need to do. Without proper state management, agents repeat work or lose critical context.
The key principle: make state explicit and immutable at handoff points.
When Agent A hands off to Agent B, Agent A should produce a clean, well-documented state object that Agent B can use. Don’t rely on Agent B inferring context from the conversation history—that’s how you get hallucinations and repeated work.
class AgentState:
def __init__(self):
self.context = {}
self.results = {}
self.errors = []
self.timestamp = datetime.now()
def add_context(self, key: str, value: any):
"""Add context that will be passed to the next agent."""
self.context[key] = value
def add_result(self, agent_name: str, result: dict):
"""Record an agent's output."""
self.results[agent_name] = {
'output': result,
'timestamp': datetime.now()
}
def to_handoff(self) -> str:
"""Format state for passing to the next agent."""
return json.dumps({
'context': self.context,
'previous_results': self.results,
'errors': self.errors
}, indent=2, default=str)
When you pass state between agents, include:
- What was asked: The original task or question.
- What was found: Concrete results from previous agents.
- What failed: Errors or dead ends, so the next agent doesn’t repeat them.
- What’s next: Explicit instructions about what this agent should focus on.
This sounds verbose, but it dramatically reduces hallucinations and repeated work. In production, the cost of passing an extra 500 tokens of context is far less than the cost of agents repeating work or missing critical information.
Tool Orchestration and Validation
Defining Tools for Coordinated Agents
Tools are how agents interact with the real world. In a coordinated system, tools need to be carefully designed so agents can use them reliably and their outputs can be validated.
When you define a tool, be explicit about:
- Input schema: Exactly what parameters the tool accepts. Use strict JSON schemas, not loose descriptions.
- Output schema: What the tool returns. Again, be strict. Use structured outputs where possible.
- Error handling: What happens when the tool fails. Does it retry? Does it return a specific error format?
- Rate limits: Can agents call this tool repeatedly, or should they use it sparingly?
- Cost: Some tools are expensive (API calls, database queries). Track and limit tool usage.
class Tool:
def __init__(self, name: str, description: str, input_schema: dict, output_schema: dict):
self.name = name
self.description = description
self.input_schema = input_schema
self.output_schema = output_schema
self.call_count = 0
self.total_cost = 0.0
async def execute(self, **kwargs) -> dict:
# Validate input against schema
try:
jsonschema.validate(kwargs, self.input_schema)
except jsonschema.ValidationError as e:
return {'error': f'Invalid input: {e.message}'}
# Execute the actual tool logic
try:
result = await self._run(**kwargs)
except Exception as e:
return {'error': f'Tool execution failed: {str(e)}'}
# Validate output against schema
try:
jsonschema.validate(result, self.output_schema)
except jsonschema.ValidationError as e:
return {'error': f'Invalid output: {e.message}'}
self.call_count += 1
return result
async def _run(self, **kwargs) -> dict:
raise NotImplementedError()
For agent coordination specifically, tools should have clear, deterministic outputs. Avoid tools that return different results for the same input (unless that’s intentional, like querying a live database). Deterministic outputs make it easier to reason about agent behaviour and debug failures.
Validation and Error Recovery
When agents use tools, things go wrong. Networks fail, databases are unavailable, APIs return unexpected data. Your coordination system needs to handle these gracefully.
The pattern we use in production:
- Validate early: Check inputs before passing them to tools.
- Validate outputs: Check that tool results match expected schemas.
- Retry intelligently: Retry transient failures, but give up on permanent ones.
- Degrade gracefully: If a tool is unavailable, can agents proceed with partial information?
- Log everything: You can’t debug failures without detailed logs.
class ToolExecutor:
def __init__(self, max_retries: int = 3, timeout_seconds: int = 30):
self.max_retries = max_retries
self.timeout_seconds = timeout_seconds
async def execute_with_retry(self, tool: Tool, **kwargs) -> dict:
last_error = None
for attempt in range(self.max_retries):
try:
result = await asyncio.wait_for(
tool.execute(**kwargs),
timeout=self.timeout_seconds
)
if 'error' not in result:
return result
last_error = result['error']
# Check if error is retryable
if not self._is_retryable(last_error):
return result
# Exponential backoff
await asyncio.sleep(2 ** attempt)
except asyncio.TimeoutError:
last_error = f'Tool timeout after {self.timeout_seconds}s'
except Exception as e:
last_error = str(e)
return {'error': f'Tool failed after {self.max_retries} attempts: {last_error}'}
def _is_retryable(self, error: str) -> bool:
retryable_keywords = ['timeout', 'connection', 'temporarily', 'unavailable']
return any(keyword in error.lower() for keyword in retryable_keywords)
In coordinated agent systems, tool failures in one agent affect downstream agents. If Agent A can’t fetch data because a database is down, Agent B can’t proceed. Your coordination layer needs to detect these cascading failures and either retry the whole chain or escalate to a human.
Failure Scenarios and Recovery
This is where most teams underestimate complexity. In production, agents fail constantly. Your job is to make failures obvious and recoverable.
Common Failure Scenarios
Agent Hallucination: The agent generates plausible-sounding but false information. In coordinated systems, this cascades—Agent B bases its work on Agent A’s hallucination, and you end up with garbage.
Context Loss: An agent doesn’t have the information it needs to complete its task. This usually happens because state wasn’t passed correctly between agents.
Tool Unavailability: An agent tries to use a tool that’s temporarily down. The agent might retry indefinitely, burning tokens and increasing latency.
Deadlock: Two agents are waiting for each other. Agent A is waiting for Agent B’s result, Agent B is waiting for Agent A’s result. The system hangs.
Cost Explosion: An agent gets into a loop, calling expensive tools repeatedly. Your token budget evaporates.
Timeout: An agent takes too long to complete, exceeding your timeout threshold.
Recovery Patterns
Explicit Validation Gates: After each agent completes, validate its output before passing it to the next agent. Use Claude itself to validate—ask it to check that the output is correct, complete, and makes sense given the context.
class ValidationGate:
def __init__(self, validator_agent: Agent):
self.validator = validator_agent
async def validate(self, agent_output: dict, context: dict) -> bool:
validation_prompt = f"""
An agent produced the following output:
{json.dumps(agent_output, indent=2)}
Given this context:
{json.dumps(context, indent=2)}
Is this output correct, complete, and consistent with the context?
Respond with only 'VALID' or 'INVALID', followed by a brief explanation.
"""
result = await self.validator.query(validation_prompt)
return result.startswith('VALID')
Fallback Agents: If one agent fails, have a fallback agent that can attempt the task differently. This might be a simpler agent, or one that uses different tools.
class FallbackChain:
def __init__(self, primary_agent: Agent, fallback_agents: List[Agent]):
self.primary = primary_agent
self.fallbacks = fallback_agents
async def execute(self, task: str, context: dict) -> dict:
agents = [self.primary] + self.fallbacks
for agent in agents:
try:
result = await agent.execute(task=task, context=context)
if result.get('success'):
return result
except Exception as e:
print(f"Agent {agent.name} failed: {e}")
continue
return {'success': False, 'error': 'All agents failed'}
Checkpointing: Save state at regular intervals so you can resume from the last checkpoint if something fails. This is especially important for long-running workflows.
class CheckpointedWorkflow:
def __init__(self, checkpoint_dir: str):
self.checkpoint_dir = checkpoint_dir
async def run(self, workflow_id: str, steps: List[Step]) -> dict:
# Load previous checkpoint if it exists
checkpoint = self._load_checkpoint(workflow_id)
start_index = checkpoint.get('completed_steps', 0) if checkpoint else 0
state = checkpoint.get('state', {}) if checkpoint else {}
for i in range(start_index, len(steps)):
step = steps[i]
result = await step.execute(state=state)
state[step.name] = result
# Checkpoint after each step
self._save_checkpoint(workflow_id, {
'completed_steps': i + 1,
'state': state
})
return state
def _save_checkpoint(self, workflow_id: str, data: dict):
path = os.path.join(self.checkpoint_dir, f"{workflow_id}.json")
with open(path, 'w') as f:
json.dump(data, f)
def _load_checkpoint(self, workflow_id: str) -> dict:
path = os.path.join(self.checkpoint_dir, f"{workflow_id}.json")
if os.path.exists(path):
with open(path, 'r') as f:
return json.load(f)
return None
Timeout and Circuit Breaker: Set explicit timeouts for agent execution and implement circuit breakers to stop calling failing agents temporarily.
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout_seconds: int = 60):
self.failure_threshold = failure_threshold
self.timeout_seconds = timeout_seconds
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
async def call(self, agent: Agent, **kwargs) -> dict:
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout_seconds:
self.state = 'HALF_OPEN'
else:
return {'error': 'Circuit breaker is OPEN'}
try:
result = await agent.execute(**kwargs)
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
return {'error': str(e)}
These patterns work together. You validate outputs, use fallbacks when validation fails, checkpoint progress so you can resume, and use circuit breakers to stop hammering failing services. This combination makes your agent system resilient to the inevitable failures that happen in production.
Observability and Monitoring
You can’t operate agent systems you can’t see. Observability is not optional—it’s how you debug production failures, understand costs, and catch problems before they affect users.
Structured Logging
Log every significant event: agent startup, tool calls, state transitions, errors, timeouts. Include enough context that you can reconstruct what happened.
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def log_agent_start(self, agent_name: str, task: str, context: dict):
self.logger.info(json.dumps({
'event': 'agent_start',
'agent': agent_name,
'task': task,
'context_keys': list(context.keys()),
'timestamp': datetime.now().isoformat()
}))
def log_tool_call(self, agent_name: str, tool_name: str, input_params: dict):
self.logger.info(json.dumps({
'event': 'tool_call',
'agent': agent_name,
'tool': tool_name,
'input_keys': list(input_params.keys()),
'timestamp': datetime.now().isoformat()
}))
def log_agent_complete(self, agent_name: str, result: dict, duration_seconds: float):
self.logger.info(json.dumps({
'event': 'agent_complete',
'agent': agent_name,
'success': result.get('success', False),
'duration_seconds': duration_seconds,
'output_keys': list(result.keys()),
'timestamp': datetime.now().isoformat()
}))
def log_error(self, agent_name: str, error: str, context: dict):
self.logger.error(json.dumps({
'event': 'error',
'agent': agent_name,
'error': error,
'context': context,
'timestamp': datetime.now().isoformat()
}))
Tracing Agent Workflows
Traces show you the full execution path of a workflow: which agents ran, in what order, what they called, what they returned. Use a tracing library like LangGraph documentation to track workflows end-to-end.
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
tracer = trace.get_tracer(__name__)
async def execute_agent_workflow(agents: List[Agent], initial_input: str):
with tracer.start_as_current_span('agent_workflow') as workflow_span:
workflow_span.set_attribute('agent_count', len(agents))
current_input = initial_input
for agent in agents:
with tracer.start_as_current_span(f'agent_{agent.name}') as agent_span:
agent_span.set_attribute('agent_name', agent.name)
result = await agent.execute(input=current_input)
agent_span.set_attribute('success', result.get('success', False))
agent_span.set_attribute('output_length', len(str(result)))
current_input = result.get('output', '')
Metrics and Alerting
Track metrics that matter for production:
- Agent execution time: How long does each agent take? Are agents getting slower over time?
- Tool call count: How many times are agents calling tools? Is this increasing unexpectedly?
- Error rate: What fraction of agent runs fail?
- Token usage: How many tokens are agents consuming? Is this trending up?
- Cost: What’s the actual cost per workflow execution?
from prometheus_client import Counter, Histogram, Gauge
agent_execution_time = Histogram(
'agent_execution_seconds',
'Time taken for agent execution',
['agent_name']
)
agent_errors = Counter(
'agent_errors_total',
'Total agent errors',
['agent_name', 'error_type']
)
tool_calls = Counter(
'tool_calls_total',
'Total tool calls',
['agent_name', 'tool_name']
)
token_usage = Counter(
'tokens_used_total',
'Total tokens used',
['agent_name', 'token_type'] # input, output
)
async def execute_with_metrics(agent: Agent, **kwargs):
start_time = time.time()
try:
result = await agent.execute(**kwargs)
duration = time.time() - start_time
agent_execution_time.labels(agent_name=agent.name).observe(duration)
token_usage.labels(
agent_name=agent.name,
token_type='input'
).inc(result.get('input_tokens', 0))
token_usage.labels(
agent_name=agent.name,
token_type='output'
).inc(result.get('output_tokens', 0))
return result
except Exception as e:
agent_errors.labels(
agent_name=agent.name,
error_type=type(e).__name__
).inc()
raise
Production Implementation Guide
Step 1: Start with a Single Agent
Before you build a coordinated multi-agent system, make sure you can reliably run a single Claude agent in production. This means:
- Clear input validation
- Tool definitions with strict schemas
- Error handling and retries
- Logging and tracing
- Cost tracking
- Timeout and rate limiting
Test your single agent thoroughly. Run it against edge cases. Make sure it fails gracefully when tools are unavailable or inputs are malformed. Only once you have a solid single-agent foundation should you add coordination complexity.
Step 2: Define Your Coordination Pattern
Choose one of the patterns we discussed:
- Sequential: For linear workflows where each step feeds into the next.
- Hierarchical: For workflows where a coordinator makes routing decisions.
- Reactive: For complex, non-linear workflows where agents collaborate.
Start simple. Sequential chains are easier to debug and reason about than reactive systems. Once you’ve mastered sequential coordination, you can add complexity.
For teams building AI-powered platforms with Platform Design & Engineering, we typically start with hierarchical orchestration—it gives you enough flexibility to handle real-world complexity without the operational overhead of fully reactive systems.
Step 3: Implement State Management
Define your state schema explicitly. What information needs to pass between agents? What’s optional? What’s required?
class WorkflowState:
def __init__(self):
self.task = None # The original task
self.context = {} # Information gathered so far
self.results = {} # Results from each agent
self.errors = [] # Errors encountered
def to_dict(self) -> dict:
return {
'task': self.task,
'context': self.context,
'results': self.results,
'errors': self.errors
}
Make state immutable at handoff points. When Agent A hands off to Agent B, create a new state object with Agent A’s results. Don’t mutate shared state—that’s how you get race conditions and unpredictable behaviour.
Step 4: Add Observability
Before you deploy to production, wire up logging, tracing, and metrics. You’ll need these to debug failures.
- Log every agent start, tool call, and completion.
- Trace the full execution path of each workflow.
- Track execution time, error rate, and token usage.
- Set up alerts for anomalies (execution time increasing, error rate spiking, token usage exploding).
Step 5: Test Failure Scenarios
Don’t wait for production to find out how your system handles failures. Test these scenarios:
- Tool unavailability: Mock a tool to always fail. Does your system handle it gracefully?
- Timeout: Set a very short timeout. Does your system fail fast or hang?
- Agent hallucination: Inject false information into an agent’s context. Does the next agent catch it?
- Cost explosion: Limit token budget and see what happens when an agent runs out of tokens.
For each scenario, you should have a clear recovery path.
Step 6: Deploy and Monitor
Deploy to production with comprehensive monitoring. Start with a small percentage of traffic (canary deployment). Watch metrics closely.
Be prepared to roll back quickly if something goes wrong. Have a runbook for common failure scenarios. Make sure your on-call team knows how to debug agent systems.
In our experience with teams at AI Advisory Services Sydney, the transition from prototype to production usually uncovers issues you didn’t anticipate in testing. That’s normal. The key is catching and fixing them quickly.
Real-World Patterns and Trade-offs
Cost vs. Latency
There’s always a trade-off between cost and latency in agent systems. More sophisticated coordination (hierarchical, reactive) can make better decisions about which agents to run and in what order, reducing wasted work and saving money. But this sophistication comes at a cost—the coordinator itself is expensive to run.
Sequential chains are cheaper because they’re deterministic, but they’re slower because they can’t parallelise work.
For financial services and regulated industries, we typically favour sequential chains or simple hierarchical patterns. The predictability is worth the latency cost. For consumer-facing applications where latency matters more than cost, we use more sophisticated orchestration.
Stateful vs. Stateless
Stateful agents remember context between calls. This reduces hallucinations and makes coordination easier, but it adds operational complexity—you need to manage state storage, ensure state is consistent, and handle state expiration.
Stateless agents don’t remember anything. Each call is independent. This is simpler operationally, but agents repeat work and lose context, making coordination harder.
In production, we use a hybrid approach: agents are stateless within a single workflow execution, but the workflow maintains explicit state that’s passed between agents. This gives you the simplicity of stateless agents with the context-awareness of stateful systems.
Determinism and Reproducibility
Production systems need to be reproducible. If something goes wrong, you need to be able to replay the exact same inputs and get the same outputs. This is hard with language models—they’re inherently non-deterministic.
To improve reproducibility:
- Use low temperature: Set temperature to 0 or very close to it. This makes Claude more deterministic.
- Use structured outputs: Instead of asking for free-form text, ask for structured JSON. This reduces hallucinations.
- Validate outputs: Check that outputs match expected schemas. Reject outputs that don’t.
- Log everything: Store inputs, outputs, and any randomness (like which fallback agent was chosen).
class DeterministicAgent:
def __init__(self, name: str, system_prompt: str):
self.name = name
self.system_prompt = system_prompt
async def query(self, prompt: str, output_schema: dict = None) -> dict:
response = await self.client.messages.create(
model='claude-3-5-sonnet-20241022',
max_tokens=2048,
temperature=0, # Deterministic
system=self.system_prompt,
messages=[{'role': 'user', 'content': prompt}]
)
output = response.content[0].text
# Validate against schema if provided
if output_schema:
try:
parsed = json.loads(output)
jsonschema.validate(parsed, output_schema)
return parsed
except (json.JSONDecodeError, jsonschema.ValidationError) as e:
raise ValueError(f'Output validation failed: {e}')
return {'output': output}
When to Use External Orchestration Frameworks
Frameworks like LangGraph and AutoGen provide pre-built patterns for agent coordination. They’re useful if:
- You need to handle complex, non-linear workflows.
- You want built-in support for retries, timeouts, and error handling.
- You want to visualize your agent workflows.
- You’re building a system that other teams will use.
They add overhead and abstraction. For simple sequential chains, building your own orchestration is often simpler and more transparent. For complex systems, a framework saves time and reduces bugs.
We’ve had good success using LangGraph documentation for complex workflows at scale. The key is understanding the framework well enough to debug it when things go wrong.
Advanced Patterns for Production Scale
Multi-Agent Research Systems
Anthropics’s engineering team published how they built their multi-agent research system, which provides excellent reference patterns for coordinating multiple Claude agents on complex research tasks. Their approach uses explicit planning, tool coordination, and result validation—all patterns we’ve discussed here.
The key insight from their work: structure the problem so agents can work in parallel where possible, but maintain explicit handoff points where agents share results. This gives you the parallelism benefits of reactive systems with the clarity of sequential chains.
Tool Orchestration at Scale
As your agent system grows, you’ll have many tools. Agents need to know which tools are available, when to use them, and how to use them correctly. Anthropic’s agents and tools documentation covers best practices for tool definition and usage.
At scale, you need:
- Tool discovery: Agents need to discover what tools are available.
- Tool versioning: Tools change over time. How do you handle new versions?
- Tool governance: Who can add new tools? How do you prevent agents from using tools incorrectly?
- Tool monitoring: Which tools are agents using? Are they using them correctly?
Structured Outputs for Reliable Coordination
Structured outputs are critical for agent coordination. Instead of asking Claude to return free-form text, ask it to return structured JSON. This makes outputs predictable and easier to validate.
OpenAI’s structured outputs documentation covers this pattern in detail. Claude supports this via the API—define a JSON schema and ask Claude to return output that matches the schema.
output_schema = {
'type': 'object',
'properties': {
'decision': {'type': 'string', 'enum': ['escalate', 'resolve', 'gather_more_info']},
'reasoning': {'type': 'string'},
'next_steps': {'type': 'array', 'items': {'type': 'string'}}
},
'required': ['decision', 'reasoning', 'next_steps']
}
response = await client.messages.create(
model='claude-3-5-sonnet-20241022',
max_tokens=1024,
system="You are a routing agent. Decide whether to escalate, resolve, or gather more information.",
messages=[...],
# Note: Claude doesn't have native structured outputs like OpenAI,
# but you can enforce this via validation
)
Since Claude doesn’t have native structured outputs, you enforce the schema on the client side—parse the output, validate it against the schema, and reject it if it doesn’t match. This gives you the same reliability benefits.
Retrieval-Augmented Generation for Agent Coordination
When agents need access to large amounts of information, retrieval-augmented generation (RAG) is the standard pattern. Instead of putting all the information in the prompt, you retrieve relevant information from a knowledge base and include only that.
For agent coordination, RAG is particularly useful for:
- Reducing context size: Each agent gets only the information it needs, reducing token usage.
- Improving accuracy: Agents can retrieve specific information rather than relying on training data.
- Enabling dynamic updates: You can update the knowledge base without retraining or updating agents.
class RAGAgent:
def __init__(self, name: str, retriever):
self.name = name
self.retriever = retriever # e.g., vector database
async def execute(self, task: str, context: dict) -> dict:
# Retrieve relevant information
query = f"{task} {' '.join(context.get('keywords', []))}"
retrieved_docs = await self.retriever.retrieve(query, top_k=5)
# Build prompt with retrieved information
prompt = f"""
Task: {task}
Relevant information:
{chr(10).join(f"- {doc}" for doc in retrieved_docs)}
Context:
{json.dumps(context, indent=2)}
Complete the task using the provided information.
"""
result = await self.client.messages.create(
model='claude-3-5-sonnet-20241022',
max_tokens=2048,
messages=[{'role': 'user', 'content': prompt}]
)
return {'output': result.content[0].text}
RAG works well with coordinated agents. Each agent can retrieve the specific information it needs, reducing redundant work and keeping context focused.
Next Steps
Agent coordination is a deep topic, and this guide covers the essential patterns. Here’s how to move forward:
1. Start with a prototype. Pick one of the coordination patterns (sequential is easiest) and build a simple prototype with 2-3 agents. Get comfortable with state passing and tool calls.
2. Add observability. Wire up logging and tracing. You’ll need this to debug production issues.
3. Test failure scenarios. Don’t wait for production. Test what happens when tools fail, agents timeout, or inputs are malformed.
4. Deploy to production with monitoring. Start small (canary deployment) and watch metrics closely. Be prepared to roll back.
5. Iterate based on real-world usage. Production always reveals issues you didn’t anticipate in testing. That’s normal. The key is catching and fixing them quickly.
If you’re building agent systems for a regulated industry (financial services, insurance, healthcare), compliance and auditability become critical. This is where tools like AI Strategy & Readiness and Security Audit (SOC 2 / ISO 27001) matter. You need to be able to explain how your agents work, trace their decisions, and ensure they’re complying with regulations.
For teams in Australia, Fractional CTO & CTO Advisory in Sydney can help you navigate the technical and architectural decisions required for production agent systems. The team at PADISO has built coordinated agent systems for financial services, insurance, and SaaS companies. If you’re building something complex, it’s worth talking to people who’ve done it before.
Agent coordination is not easy, but it’s not magic either. It’s engineering. Follow the patterns, test thoroughly, observe closely, and you’ll build systems that work reliably in production.
The teams doing this well right now are shipping faster, automating more work, and building products that would be impossible without coordinated agents. If you’re not using agents yet, now’s the time to start. If you’re using agents but struggling with coordination, the patterns in this guide will help you move from prototype to production.
Good luck. Build fast. Observe everything. And when things break (they will), you’ll have the logs and traces to fix them quickly.