Table of Contents
- Why Streaming Tool Use Matters in Production
- Understanding Tool Use in AI Agents
- Streaming Architectures: The Real Patterns
- Code Patterns That Scale
- Handling Partial Tool Outputs
- Error Handling and Resilience
- Observability and Debugging
- Performance Tuning at Scale
- Security Considerations
- Migration and Rollout Strategy
Why Streaming Tool Use Matters in Production
Streaming tool use outputs isn’t a nice-to-have feature in production AI agents—it’s the difference between a system that feels responsive and one that disappears into a black box for 30 seconds while your users stare at a loading spinner.
When you deploy agentic AI in production, you’re not just running a single inference. You’re orchestrating a chain of tool calls, each with latency: database queries, API calls, file operations, external service integrations. Without streaming, users wait for the entire agent loop to complete before seeing any progress. With streaming, they see tool calls as they happen, intermediate results as they arrive, and final outputs as they’re generated.
At PADISO, we’ve shipped agentic AI systems across 3PL operations, healthcare prior authorisation workflows, and financial services integrations. The teams that got streaming right saw 40–60% improvements in perceived latency and significantly better error recovery. The teams that didn’t? They hit production horror stories—timeouts, user frustration, and abandoned integrations.
This guide covers the real engineering patterns we use to stream tool use outputs reliably at scale.
Understanding Tool Use in AI Agents
What Tool Use Actually Is
Tool use is the mechanism by which an AI agent decides it needs to call an external function, API, or system, and then uses the result to inform its next decision. Unlike traditional function calling, tool use in modern agents is iterative and context-aware: the agent can reason about which tool to call, with what parameters, and what to do with the result before deciding on the next step.
When you deploy an agent with tools, you’re giving it:
- Tool definitions: JSON schemas describing what each tool does, what inputs it accepts, and what it returns
- Tool implementations: The actual code that executes when the agent requests a tool call
- Tool results: The data returned by the tool, which the agent uses to decide its next action
The OpenAI Agents Documentation and Anthropic Claude Agent Building Guide both cover this well, but the production reality is messier than the docs suggest.
Why Streaming Changes Everything
Without streaming, your agent system looks like this:
- User sends request
- Agent thinks about which tool to call (latency: 0.5–2s)
- Agent calls tool (latency: 0.5–10s+ depending on tool)
- Agent receives result
- Agent thinks about next step (latency: 0.5–2s)
- … repeat until done
- Return final result to user
Total latency: 5–30+ seconds, with no feedback until the end.
With streaming, you can emit events as they happen:
- User sends request
- Stream “thinking” event (user sees agent is working)
- Agent decides on tool call
- Stream “tool_call” event with tool name and parameters
- Tool executes in background
- Stream “tool_result” event with intermediate data
- Agent continues
- … repeat
- Stream “final_response” event
Total latency from user perspective: ~500ms to first event, then progressive updates. Total wall-clock time is the same, but perceived latency drops dramatically.
Streaming Architectures: The Real Patterns
Pattern 1: Server-Sent Events (SSE) for Web Clients
For web applications, Server-Sent Events is the standard pattern. The client opens a persistent HTTP connection, and the server pushes events down the stream as they occur.
Why SSE works:
- Simple HTTP, no WebSocket complexity
- Built-in reconnection and event ID tracking
- Works through proxies and load balancers
- Native browser support via EventSource API
The architecture:
Client → HTTP POST /agent/stream → Server
← SSE stream (Content-Type: text/event-stream) ←
The server holds the connection open and sends events like:
data: {"type": "thinking", "message": "Analyzing request..."}
data: {"type": "tool_call", "tool": "query_database", "params": {...}}
data: {"type": "tool_result", "data": {...}}
data: {"type": "final_response", "text": "..."}
This pattern works beautifully for most web use cases. We use it for AI-powered dashboard queries where users need to see query execution in real-time.
Pattern 2: WebSocket for Bidirectional Communication
When you need the client to send new instructions mid-stream (e.g., “cancel this tool call” or “use this alternative approach”), WebSocket is more appropriate. It’s more complex but gives you true bidirectional communication.
When to use WebSocket:
- Interactive agent loops where users can interrupt or redirect
- Real-time collaborative workflows
- Systems where the agent needs to ask the user for clarification mid-execution
The trade-off: WebSocket adds complexity (connection state management, reconnection logic, message ordering) but enables richer interactions.
Pattern 3: Message Queue + Polling for Async Workflows
For long-running agent tasks (processing 1000 documents, running overnight batch jobs), neither SSE nor WebSocket is appropriate. Instead, use a message queue (SQS, RabbitMQ, Kafka) with the client polling for results.
Architecture:
Client → POST /agent/task → Server (returns task_id)
Server → Enqueue task to message queue
Worker ← Dequeue task
Worker → Process with streaming
Worker → Write progress to database
Client ← Poll /agent/task/{task_id} status
This is what we use for 3PL operations automation where agents process inbound bookings across multiple WMS instances.
Pattern 4: gRPC Streaming for Service-to-Service
When your agent is called by another service (not a web client), gRPC streaming is more efficient than HTTP. gRPC uses HTTP/2 multiplexing and protobuf serialisation, reducing overhead.
When to use:
- Agent as a microservice called by other services
- High-volume tool use with strict latency budgets
- Systems already using gRPC elsewhere
We’ve used gRPC streaming in financial services AI systems where agents query risk engines and compliance databases with sub-100ms latency requirements.
Code Patterns That Scale
Pattern: Streaming with Claude and Python
Here’s a production-grade pattern for streaming Claude agent responses with tool use:
import anthropic
import json
from typing import Generator, Any
class StreamingClaudeAgent:
def __init__(self):
self.client = anthropic.Anthropic()
self.model = "claude-opus-4-1-20250805"
self.tools = [
{
"name": "query_database",
"description": "Query the operational database",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 10}
},
"required": ["query"]
}
},
{
"name": "call_external_api",
"description": "Call an external API endpoint",
"input_schema": {
"type": "object",
"properties": {
"endpoint": {"type": "string"},
"method": {"type": "string", "enum": ["GET", "POST"]},
"payload": {"type": "object"}
},
"required": ["endpoint", "method"]
}
}
]
def _execute_tool(self, tool_name: str, tool_input: dict) -> str:
"""Execute a tool and return result as JSON string."""
if tool_name == "query_database":
# In production, this queries your actual database
return json.dumps({
"status": "success",
"rows": [{"id": 1, "value": "example"}]
})
elif tool_name == "call_external_api":
# In production, this makes actual HTTP calls
return json.dumps({
"status": 200,
"body": {"result": "api response"}
})
else:
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def stream_agent_response(self, user_message: str) -> Generator[dict, None, None]:
"""Stream agent response with tool use."""
messages = [{"role": "user", "content": user_message}]
while True:
# Call Claude with streaming
with self.client.messages.stream(
model=self.model,
max_tokens=4096,
tools=self.tools,
messages=messages
) as stream:
# Collect the full response
full_response = None
for event in stream:
if hasattr(event, 'type'):
if event.type == 'content_block_start':
if hasattr(event, 'content_block') and event.content_block.type == 'tool_use':
# Emit tool call event
yield {
"type": "tool_call",
"tool_name": event.content_block.name,
"tool_id": event.content_block.id,
"input": event.content_block.input
}
elif event.type == 'content_block_delta':
if hasattr(event, 'delta'):
if hasattr(event.delta, 'text'):
# Emit text delta
yield {
"type": "text_delta",
"text": event.delta.text
}
full_response = stream.get_final_message()
# Check if we need to handle tool use
has_tool_use = any(
block.type == "tool_use"
for block in full_response.content
)
if not has_tool_use:
# No more tool calls, we're done
yield {"type": "final_response", "stop_reason": full_response.stop_reason}
break
# Process tool calls
messages.append({"role": "assistant", "content": full_response.content})
# Execute tools and collect results
tool_results = []
for block in full_response.content:
if block.type == "tool_use":
tool_result = self._execute_tool(block.name, block.input)
yield {
"type": "tool_result",
"tool_id": block.id,
"tool_name": block.name,
"result": tool_result
}
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": tool_result
})
# Add tool results back to messages
messages.append({"role": "user", "content": tool_results})
This pattern:
- Uses Anthropic’s streaming API to emit events as they occur
- Handles the agentic loop (tool call → tool result → next step)
- Yields events that can be sent to clients via SSE or WebSocket
- Properly manages message history for multi-turn interactions
Pattern: FastAPI Streaming Endpoint
Here’s how to expose this as a web endpoint:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
agent = StreamingClaudeAgent()
@app.post("/api/agent/stream")
async def stream_agent(request: dict):
"""Stream agent response to client."""
user_message = request.get("message")
if not user_message:
raise HTTPException(status_code=400, detail="Missing message")
async def event_generator():
try:
for event in agent.stream_agent_response(user_message):
# Format as SSE
yield f"data: {json.dumps(event)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
The key headers:
Content-Type: text/event-streamtells the client this is an SSE streamCache-Control: no-cacheprevents caching of streaming responsesX-Accel-Buffering: nodisables nginx buffering (critical for real-time streaming)
Pattern: Client-Side Event Handling
On the frontend, consume the stream like this:
const eventSource = new EventSource('/api/agent/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({message: userInput})
});
eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
switch(data.type) {
case 'tool_call':
updateUI(`Calling ${data.tool_name}...`);
break;
case 'tool_result':
updateUI(`Got result from ${data.tool_name}`);
break;
case 'text_delta':
appendToResponse(data.text);
break;
case 'final_response':
eventSource.close();
showComplete();
break;
case 'error':
showError(data.error);
eventSource.close();
break;
}
});
Handling Partial Tool Outputs
In production, tools don’t always complete instantly or return perfect data. You need to handle:
Timeouts
If a tool takes longer than expected, you have options:
- Hard timeout: Kill the tool call after N seconds, return a timeout error, let the agent decide what to do next
- Soft timeout: Return partial results, let the agent work with what it has
- Async timeout: Start the tool asynchronously, return a “pending” status, let the client poll for completion
For most production systems, we use soft timeouts (return partial results) combined with logging. This lets the agent continue rather than fail entirely.
import asyncio
from concurrent.futures import TimeoutError
async def execute_tool_with_timeout(tool_name: str, tool_input: dict, timeout_seconds: int = 10):
"""Execute tool with timeout, returning partial results if needed."""
try:
result = await asyncio.wait_for(
_execute_tool_async(tool_name, tool_input),
timeout=timeout_seconds
)
return {"status": "success", "data": result}
except asyncio.TimeoutError:
return {
"status": "timeout",
"message": f"Tool {tool_name} timed out after {timeout_seconds}s",
"partial_data": None # Or return partial results if available
}
Partial Results
Some tools naturally return partial results (streaming APIs, paginated endpoints). Emit each chunk as it arrives:
def stream_tool_results(tool_name: str, tool_input: dict):
"""Stream results from a tool, emitting chunks as they arrive."""
if tool_name == "search_documents":
# Simulate streaming search results
for i in range(5):
yield {
"type": "tool_result_chunk",
"tool_id": tool_input.get("id"),
"chunk_index": i,
"total_chunks": 5,
"data": {"document_id": i, "relevance": 0.95 - (i * 0.1)}
}
Retryable Failures
If a tool fails transiently (network error, service temporarily down), emit a retry event and let the agent decide whether to retry:
def execute_tool_with_retry(tool_name: str, tool_input: dict, max_retries: int = 3):
"""Execute tool with exponential backoff retry."""
import time
for attempt in range(max_retries):
try:
return {"status": "success", "data": _execute_tool(tool_name, tool_input)}
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
yield {
"type": "tool_retry",
"tool_name": tool_name,
"attempt": attempt + 1,
"max_retries": max_retries,
"wait_seconds": wait_time,
"error": str(e)
}
time.sleep(wait_time)
else:
return {"status": "failed", "error": str(e)}
Error Handling and Resilience
Production agents fail in ways the documentation doesn’t prepare you for. Here are the real patterns:
Pattern: Circuit Breaker for Flaky Tools
If a tool fails repeatedly, don’t keep calling it. Use a circuit breaker:
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Tool is failing, reject calls
HALF_OPEN = "half_open" # Testing if tool has recovered
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout_seconds: int = 60):
self.failure_threshold = failure_threshold
self.timeout_seconds = timeout_seconds
self.failure_count = 0
self.state = CircuitState.CLOSED
self.last_failure_time = None
def call(self, tool_func, *args, **kwargs):
"""Call tool through circuit breaker."""
if self.state == CircuitState.OPEN:
# Check if we should try half-open
if datetime.now() > self.last_failure_time + timedelta(seconds=self.timeout_seconds):
self.state = CircuitState.HALF_OPEN
else:
raise Exception(f"Circuit breaker OPEN for {self.timeout_seconds}s")
try:
result = tool_func(*args, **kwargs)
# Success, reset
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise e
Pattern: Fallback Tools
When a tool fails, offer the agent a fallback:
def execute_tool_with_fallback(tool_name: str, tool_input: dict):
"""Execute tool with fallback strategy."""
fallbacks = {
"query_database": "query_cache",
"call_external_api": "use_cached_response",
"generate_report": "return_template"
}
try:
return _execute_tool(tool_name, tool_input)
except Exception as e:
fallback = fallbacks.get(tool_name)
if fallback:
return {
"status": "fallback",
"primary_tool": tool_name,
"fallback_tool": fallback,
"reason": str(e),
"data": _execute_tool(fallback, tool_input)
}
else:
raise e
The agent sees that a tool failed and a fallback was used, so it can adjust its reasoning accordingly.
Pattern: Deadletter Queue for Failed Executions
When all else fails, capture the failed execution for later analysis:
import uuid
from datetime import datetime
def capture_failed_execution(agent_id: str, user_message: str, tool_name: str, tool_input: dict, error: Exception):
"""Capture failed execution for debugging."""
execution_id = str(uuid.uuid4())
deadletter = {
"execution_id": execution_id,
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"user_message": user_message,
"tool_name": tool_name,
"tool_input": tool_input,
"error": str(error),
"error_type": type(error).__name__,
"traceback": traceback.format_exc()
}
# Write to deadletter queue (SQS, Kafka, database)
deadletter_queue.put(deadletter)
# Emit event to client
return {
"type": "execution_failed",
"execution_id": execution_id,
"message": "This execution has been logged for debugging. Support can investigate using execution ID."
}
Observability and Debugging
Streaming makes debugging harder because you lose the ability to see the full execution after the fact. You need proper observability.
Structured Logging
Every event should be logged with full context:
import logging
import json
from uuid import uuid4
logger = logging.getLogger(__name__)
class StreamingAgentLogger:
def __init__(self, agent_id: str, user_id: str):
self.agent_id = agent_id
self.user_id = user_id
self.session_id = str(uuid4())
self.start_time = datetime.now()
def log_event(self, event_type: str, **kwargs):
"""Log an agent event with full context."""
log_entry = {
"session_id": self.session_id,
"agent_id": self.agent_id,
"user_id": self.user_id,
"event_type": event_type,
"timestamp": datetime.now().isoformat(),
"elapsed_ms": (datetime.now() - self.start_time).total_seconds() * 1000,
**kwargs
}
logger.info(json.dumps(log_entry))
return log_entry
This structured logging lets you reconstruct the entire execution flow from logs.
Distributed Tracing
For complex systems with multiple services, use distributed tracing (OpenTelemetry, Datadog, Jaeger):
from opentelemetry import trace, context
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
tracer = trace.get_tracer(__name__)
def stream_agent_with_tracing(user_message: str):
"""Stream agent response with distributed tracing."""
with tracer.start_as_current_span("agent_execution") as span:
span.set_attribute("user_message", user_message)
for event in agent.stream_agent_response(user_message):
if event["type"] == "tool_call":
with tracer.start_as_current_span(f"tool_{event['tool_name']}") as tool_span:
tool_span.set_attribute("tool_name", event["tool_name"])
tool_span.set_attribute("tool_input", json.dumps(event["input"]))
yield event
This gives you a complete trace of tool calls, latencies, and failures across your entire system.
Performance Tuning at Scale
When you’re streaming tool outputs for thousands of concurrent agents, performance becomes critical.
Buffering Strategy
Don’t emit every single event immediately. Buffer small events and flush periodically:
class EventBuffer:
def __init__(self, max_size: int = 100, flush_interval_ms: int = 100):
self.max_size = max_size
self.flush_interval_ms = flush_interval_ms
self.buffer = []
self.last_flush = datetime.now()
def add(self, event: dict):
"""Add event to buffer, flush if needed."""
self.buffer.append(event)
should_flush = (
len(self.buffer) >= self.max_size or
(datetime.now() - self.last_flush).total_seconds() * 1000 > self.flush_interval_ms
)
if should_flush:
return self.flush()
return None
def flush(self):
"""Return buffered events and reset."""
events = self.buffer
self.buffer = []
self.last_flush = datetime.now()
return events
Connection Pooling for Tools
If your tools make database or API calls, use connection pooling to avoid exhausting resources:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Create engine with connection pooling
engine = create_engine(
"postgresql://user:password@localhost/db",
poolclass=QueuePool,
pool_size=20, # Number of connections to keep in pool
max_overflow=10, # Additional connections when pool is exhausted
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True # Test connections before using
)
Rate Limiting
Prevent tool calls from overwhelming external services:
from ratelimit import limits, sleep_and_retry
import time
class RateLimitedTool:
def __init__(self, calls_per_second: int = 10):
self.calls_per_second = calls_per_second
self.last_call_time = 0
self.call_count = 0
@sleep_and_retry
@limits(calls=10, period=1) # 10 calls per second
def execute(self, tool_name: str, tool_input: dict):
return _execute_tool(tool_name, tool_input)
Security Considerations
Streaming introduces security surface area you need to defend.
Input Validation
Validate tool inputs before execution:
from pydantic import BaseModel, validator
class QueryDatabaseInput(BaseModel):
query: str
limit: int = 10
@validator('limit')
def limit_must_be_reasonable(cls, v):
if v > 1000:
raise ValueError('limit cannot exceed 1000')
return v
@validator('query')
def query_must_not_contain_dangerous_keywords(cls, v):
dangerous = ['DROP', 'DELETE', 'TRUNCATE']
if any(keyword in v.upper() for keyword in dangerous):
raise ValueError('Query contains dangerous keywords')
return v
Tool Sandboxing
Run tools in isolated environments when possible:
import subprocess
import json
def execute_tool_sandboxed(tool_name: str, tool_input: dict, timeout_seconds: int = 10):
"""Execute tool in subprocess sandbox."""
try:
result = subprocess.run(
["python", f"tools/{tool_name}.py"],
input=json.dumps(tool_input),
capture_output=True,
timeout=timeout_seconds,
text=True
)
if result.returncode != 0:
return {"error": result.stderr}
return json.loads(result.stdout)
except subprocess.TimeoutExpired:
return {"error": "Tool execution timeout"}
Audit Logging
For regulated industries (financial services, healthcare), audit every tool call:
def audit_log_tool_call(user_id: str, tool_name: str, tool_input: dict, result: dict):
"""Log tool call for compliance audit."""
audit_entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"tool_name": tool_name,
"tool_input": tool_input,
"result_status": result.get("status"),
"result_size_bytes": len(json.dumps(result)),
}
# Write to immutable audit log (append-only database or file)
audit_database.insert(audit_entry)
For teams building AI systems in regulated environments, this is essential. We cover this in detail for financial services and healthcare workflows.
Migration and Rollout Strategy
If you’re moving from non-streaming to streaming agents, do it carefully.
Phase 1: Shadow Streaming
Run streaming in parallel with your existing system, but don’t send streaming events to users yet:
def stream_agent_response_shadow(user_message: str):
"""Stream agent response, but don't expose to user yet."""
events = []
for event in agent.stream_agent_response(user_message):
events.append(event)
# Log for analysis but don't yield yet
logger.info(f"Shadow event: {event['type']}")
# Return final response as before (non-streaming)
final_response = next(
(e for e in reversed(events) if e['type'] == 'final_response'),
None
)
return final_response
Run this for a week, monitor for issues, ensure the streaming events match your non-streaming baseline.
Phase 2: Beta Rollout
Enable streaming for 10% of users, monitor for issues:
import random
def should_use_streaming(user_id: str, beta_percentage: int = 10):
"""Determine if user should get streaming responses."""
# Consistent hash so same user always gets same treatment
user_hash = hash(user_id) % 100
return user_hash < beta_percentage
@app.post("/api/agent/stream")
async def stream_agent(request: dict):
user_id = request.get("user_id")
if should_use_streaming(user_id):
return StreamingResponse(...)
else:
# Fall back to non-streaming for non-beta users
return {"response": agent.get_response(request["message"])}
Phase 3: Full Rollout
Once you’re confident, enable for all users. Keep the fallback logic in place for at least 30 days.
Real-World Lessons from Production
Here are the patterns we’ve learned shipping agentic AI systems at scale:
Lesson 1: Streaming Latency Matters More Than Total Latency
Users perceive latency from when they submit a request to when they see the first event. A system that takes 30 seconds total but shows the first event in 200ms feels faster than a system that takes 5 seconds total but shows nothing for 4 seconds.
Optimise for time-to-first-event, not total execution time.
Lesson 2: Tool Call Ordering Matters
Don’t let the agent call tools in arbitrary order. If tool B depends on the output of tool A, enforce that dependency in your tool definitions. Use tool schemas to make dependencies explicit.
Lesson 3: Streaming Breaks Caching
With non-streaming responses, you can cache the entire response. With streaming, you can’t—each client gets a unique stream. Plan your caching strategy accordingly (cache tool results, not streams).
Lesson 4: Monitor Tool Latency Separately
When streaming breaks, it’s usually because one tool is slow. Instrument every tool call with timing:
import time
def execute_tool_with_timing(tool_name: str, tool_input: dict):
start = time.time()
result = _execute_tool(tool_name, tool_input)
duration_ms = (time.time() - start) * 1000
logger.info(f"Tool {tool_name} took {duration_ms:.0f}ms")
if duration_ms > 5000: # Alert if tool is slow
alert(f"Slow tool: {tool_name} took {duration_ms:.0f}ms")
return result
Next Steps
Streaming tool use outputs is non-trivial engineering. Here’s how to get started:
-
Start with SSE: If you’re building a web application, Server-Sent Events is the simplest path. Implement the FastAPI pattern above.
-
Instrument observability first: Before you ship streaming to production, add structured logging and distributed tracing. You’ll need it.
-
Test with real latencies: Don’t test with instant tools. Simulate realistic tool latencies (database queries, API calls) so you understand how your system behaves under real conditions.
-
Implement circuit breakers: Before you ship to production, add circuit breaker patterns to handle failing tools gracefully.
-
Monitor tool latencies: Set up dashboards that show you which tools are slow, which are failing, and which are being called most frequently.
-
Plan your rollout carefully: Use shadow streaming and beta rollouts to validate your implementation before full deployment.
For teams building AI agents in production, this is foundational work. We’ve learned these patterns by shipping agentic AI systems across 3PL operations, healthcare, financial services, and more. If you’re building something similar and want to move faster, we can help—we offer fractional CTO support and custom software development for teams shipping AI products.
The teams that get streaming right see 40–60% improvements in perceived latency and significantly better error recovery. It’s worth the engineering effort.
Key Takeaways
- Streaming is about perceived latency: Users don’t care about total execution time—they care about time-to-first-event. Stream aggressively.
- Use SSE for web, WebSocket for interactive, queues for async: Choose the right transport for your use case.
- Instrument everything: Structured logging and distributed tracing are non-negotiable at scale.
- Build for failure: Circuit breakers, fallbacks, and deadletter queues let your agents degrade gracefully.
- Monitor tool latencies: Your agent is only as fast as your slowest tool. Make that visible.
- Roll out carefully: Shadow streaming, beta rollouts, and fallback logic reduce risk.
Streaming tool use outputs is hard engineering. But it’s the difference between agents that feel responsive and agents that feel broken. Get it right, and your users will feel the difference.