PADISO.ai: AI Agent Orchestration Platform - Launching May 2026
Back to Blog
Guide 19 mins

AI Agents in Production: Streaming Tool Use Outputs

Real patterns for streaming tool use outputs in production AI agents. Architecture, code, operational quirks, and the patterns that scale.

The PADISO Team ·2026-06-03

Table of Contents

  1. Why Streaming Tool Use Matters in Production
  2. Understanding Tool Use in AI Agents
  3. Streaming Architectures: The Real Patterns
  4. Code Patterns That Scale
  5. Handling Partial Tool Outputs
  6. Error Handling and Resilience
  7. Observability and Debugging
  8. Performance Tuning at Scale
  9. Security Considerations
  10. Migration and Rollout Strategy

Why Streaming Tool Use Matters in Production

Streaming tool use outputs isn’t a nice-to-have feature in production AI agents—it’s the difference between a system that feels responsive and one that disappears into a black box for 30 seconds while your users stare at a loading spinner.

When you deploy agentic AI in production, you’re not just running a single inference. You’re orchestrating a chain of tool calls, each with latency: database queries, API calls, file operations, external service integrations. Without streaming, users wait for the entire agent loop to complete before seeing any progress. With streaming, they see tool calls as they happen, intermediate results as they arrive, and final outputs as they’re generated.

At PADISO, we’ve shipped agentic AI systems across 3PL operations, healthcare prior authorisation workflows, and financial services integrations. The teams that got streaming right saw 40–60% improvements in perceived latency and significantly better error recovery. The teams that didn’t? They hit production horror stories—timeouts, user frustration, and abandoned integrations.

This guide covers the real engineering patterns we use to stream tool use outputs reliably at scale.


Understanding Tool Use in AI Agents

What Tool Use Actually Is

Tool use is the mechanism by which an AI agent decides it needs to call an external function, API, or system, and then uses the result to inform its next decision. Unlike traditional function calling, tool use in modern agents is iterative and context-aware: the agent can reason about which tool to call, with what parameters, and what to do with the result before deciding on the next step.

When you deploy an agent with tools, you’re giving it:

  • Tool definitions: JSON schemas describing what each tool does, what inputs it accepts, and what it returns
  • Tool implementations: The actual code that executes when the agent requests a tool call
  • Tool results: The data returned by the tool, which the agent uses to decide its next action

The OpenAI Agents Documentation and Anthropic Claude Agent Building Guide both cover this well, but the production reality is messier than the docs suggest.

Why Streaming Changes Everything

Without streaming, your agent system looks like this:

  1. User sends request
  2. Agent thinks about which tool to call (latency: 0.5–2s)
  3. Agent calls tool (latency: 0.5–10s+ depending on tool)
  4. Agent receives result
  5. Agent thinks about next step (latency: 0.5–2s)
  6. … repeat until done
  7. Return final result to user

Total latency: 5–30+ seconds, with no feedback until the end.

With streaming, you can emit events as they happen:

  1. User sends request
  2. Stream “thinking” event (user sees agent is working)
  3. Agent decides on tool call
  4. Stream “tool_call” event with tool name and parameters
  5. Tool executes in background
  6. Stream “tool_result” event with intermediate data
  7. Agent continues
  8. … repeat
  9. Stream “final_response” event

Total latency from user perspective: ~500ms to first event, then progressive updates. Total wall-clock time is the same, but perceived latency drops dramatically.


Streaming Architectures: The Real Patterns

Pattern 1: Server-Sent Events (SSE) for Web Clients

For web applications, Server-Sent Events is the standard pattern. The client opens a persistent HTTP connection, and the server pushes events down the stream as they occur.

Why SSE works:

  • Simple HTTP, no WebSocket complexity
  • Built-in reconnection and event ID tracking
  • Works through proxies and load balancers
  • Native browser support via EventSource API

The architecture:

Client → HTTP POST /agent/stream → Server
         ← SSE stream (Content-Type: text/event-stream) ←

The server holds the connection open and sends events like:

data: {"type": "thinking", "message": "Analyzing request..."}
data: {"type": "tool_call", "tool": "query_database", "params": {...}}
data: {"type": "tool_result", "data": {...}}
data: {"type": "final_response", "text": "..."}

This pattern works beautifully for most web use cases. We use it for AI-powered dashboard queries where users need to see query execution in real-time.

Pattern 2: WebSocket for Bidirectional Communication

When you need the client to send new instructions mid-stream (e.g., “cancel this tool call” or “use this alternative approach”), WebSocket is more appropriate. It’s more complex but gives you true bidirectional communication.

When to use WebSocket:

  • Interactive agent loops where users can interrupt or redirect
  • Real-time collaborative workflows
  • Systems where the agent needs to ask the user for clarification mid-execution

The trade-off: WebSocket adds complexity (connection state management, reconnection logic, message ordering) but enables richer interactions.

Pattern 3: Message Queue + Polling for Async Workflows

For long-running agent tasks (processing 1000 documents, running overnight batch jobs), neither SSE nor WebSocket is appropriate. Instead, use a message queue (SQS, RabbitMQ, Kafka) with the client polling for results.

Architecture:

Client → POST /agent/task → Server (returns task_id)
Server → Enqueue task to message queue
Worker ← Dequeue task
Worker → Process with streaming
Worker → Write progress to database
Client ← Poll /agent/task/{task_id} status

This is what we use for 3PL operations automation where agents process inbound bookings across multiple WMS instances.

Pattern 4: gRPC Streaming for Service-to-Service

When your agent is called by another service (not a web client), gRPC streaming is more efficient than HTTP. gRPC uses HTTP/2 multiplexing and protobuf serialisation, reducing overhead.

When to use:

  • Agent as a microservice called by other services
  • High-volume tool use with strict latency budgets
  • Systems already using gRPC elsewhere

We’ve used gRPC streaming in financial services AI systems where agents query risk engines and compliance databases with sub-100ms latency requirements.


Code Patterns That Scale

Pattern: Streaming with Claude and Python

Here’s a production-grade pattern for streaming Claude agent responses with tool use:

import anthropic
import json
from typing import Generator, Any

class StreamingClaudeAgent:
    def __init__(self):
        self.client = anthropic.Anthropic()
        self.model = "claude-opus-4-1-20250805"
        self.tools = [
            {
                "name": "query_database",
                "description": "Query the operational database",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "query": {"type": "string"},
                        "limit": {"type": "integer", "default": 10}
                    },
                    "required": ["query"]
                }
            },
            {
                "name": "call_external_api",
                "description": "Call an external API endpoint",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "endpoint": {"type": "string"},
                        "method": {"type": "string", "enum": ["GET", "POST"]},
                        "payload": {"type": "object"}
                    },
                    "required": ["endpoint", "method"]
                }
            }
        ]
    
    def _execute_tool(self, tool_name: str, tool_input: dict) -> str:
        """Execute a tool and return result as JSON string."""
        if tool_name == "query_database":
            # In production, this queries your actual database
            return json.dumps({
                "status": "success",
                "rows": [{"id": 1, "value": "example"}]
            })
        elif tool_name == "call_external_api":
            # In production, this makes actual HTTP calls
            return json.dumps({
                "status": 200,
                "body": {"result": "api response"}
            })
        else:
            return json.dumps({"error": f"Unknown tool: {tool_name}"})
    
    def stream_agent_response(self, user_message: str) -> Generator[dict, None, None]:
        """Stream agent response with tool use."""
        messages = [{"role": "user", "content": user_message}]
        
        while True:
            # Call Claude with streaming
            with self.client.messages.stream(
                model=self.model,
                max_tokens=4096,
                tools=self.tools,
                messages=messages
            ) as stream:
                # Collect the full response
                full_response = None
                for event in stream:
                    if hasattr(event, 'type'):
                        if event.type == 'content_block_start':
                            if hasattr(event, 'content_block') and event.content_block.type == 'tool_use':
                                # Emit tool call event
                                yield {
                                    "type": "tool_call",
                                    "tool_name": event.content_block.name,
                                    "tool_id": event.content_block.id,
                                    "input": event.content_block.input
                                }
                        elif event.type == 'content_block_delta':
                            if hasattr(event, 'delta'):
                                if hasattr(event.delta, 'text'):
                                    # Emit text delta
                                    yield {
                                        "type": "text_delta",
                                        "text": event.delta.text
                                    }
                
                full_response = stream.get_final_message()
            
            # Check if we need to handle tool use
            has_tool_use = any(
                block.type == "tool_use" 
                for block in full_response.content
            )
            
            if not has_tool_use:
                # No more tool calls, we're done
                yield {"type": "final_response", "stop_reason": full_response.stop_reason}
                break
            
            # Process tool calls
            messages.append({"role": "assistant", "content": full_response.content})
            
            # Execute tools and collect results
            tool_results = []
            for block in full_response.content:
                if block.type == "tool_use":
                    tool_result = self._execute_tool(block.name, block.input)
                    yield {
                        "type": "tool_result",
                        "tool_id": block.id,
                        "tool_name": block.name,
                        "result": tool_result
                    }
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": tool_result
                    })
            
            # Add tool results back to messages
            messages.append({"role": "user", "content": tool_results})

This pattern:

  • Uses Anthropic’s streaming API to emit events as they occur
  • Handles the agentic loop (tool call → tool result → next step)
  • Yields events that can be sent to clients via SSE or WebSocket
  • Properly manages message history for multi-turn interactions

Pattern: FastAPI Streaming Endpoint

Here’s how to expose this as a web endpoint:

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
import json

app = FastAPI()
agent = StreamingClaudeAgent()

@app.post("/api/agent/stream")
async def stream_agent(request: dict):
    """Stream agent response to client."""
    user_message = request.get("message")
    if not user_message:
        raise HTTPException(status_code=400, detail="Missing message")
    
    async def event_generator():
        try:
            for event in agent.stream_agent_response(user_message):
                # Format as SSE
                yield f"data: {json.dumps(event)}\n\n"
        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

The key headers:

  • Content-Type: text/event-stream tells the client this is an SSE stream
  • Cache-Control: no-cache prevents caching of streaming responses
  • X-Accel-Buffering: no disables nginx buffering (critical for real-time streaming)

Pattern: Client-Side Event Handling

On the frontend, consume the stream like this:

const eventSource = new EventSource('/api/agent/stream', {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({message: userInput})
});

eventSource.addEventListener('message', (event) => {
    const data = JSON.parse(event.data);
    
    switch(data.type) {
        case 'tool_call':
            updateUI(`Calling ${data.tool_name}...`);
            break;
        case 'tool_result':
            updateUI(`Got result from ${data.tool_name}`);
            break;
        case 'text_delta':
            appendToResponse(data.text);
            break;
        case 'final_response':
            eventSource.close();
            showComplete();
            break;
        case 'error':
            showError(data.error);
            eventSource.close();
            break;
    }
});

Handling Partial Tool Outputs

In production, tools don’t always complete instantly or return perfect data. You need to handle:

Timeouts

If a tool takes longer than expected, you have options:

  1. Hard timeout: Kill the tool call after N seconds, return a timeout error, let the agent decide what to do next
  2. Soft timeout: Return partial results, let the agent work with what it has
  3. Async timeout: Start the tool asynchronously, return a “pending” status, let the client poll for completion

For most production systems, we use soft timeouts (return partial results) combined with logging. This lets the agent continue rather than fail entirely.

import asyncio
from concurrent.futures import TimeoutError

async def execute_tool_with_timeout(tool_name: str, tool_input: dict, timeout_seconds: int = 10):
    """Execute tool with timeout, returning partial results if needed."""
    try:
        result = await asyncio.wait_for(
            _execute_tool_async(tool_name, tool_input),
            timeout=timeout_seconds
        )
        return {"status": "success", "data": result}
    except asyncio.TimeoutError:
        return {
            "status": "timeout",
            "message": f"Tool {tool_name} timed out after {timeout_seconds}s",
            "partial_data": None  # Or return partial results if available
        }

Partial Results

Some tools naturally return partial results (streaming APIs, paginated endpoints). Emit each chunk as it arrives:

def stream_tool_results(tool_name: str, tool_input: dict):
    """Stream results from a tool, emitting chunks as they arrive."""
    if tool_name == "search_documents":
        # Simulate streaming search results
        for i in range(5):
            yield {
                "type": "tool_result_chunk",
                "tool_id": tool_input.get("id"),
                "chunk_index": i,
                "total_chunks": 5,
                "data": {"document_id": i, "relevance": 0.95 - (i * 0.1)}
            }

Retryable Failures

If a tool fails transiently (network error, service temporarily down), emit a retry event and let the agent decide whether to retry:

def execute_tool_with_retry(tool_name: str, tool_input: dict, max_retries: int = 3):
    """Execute tool with exponential backoff retry."""
    import time
    
    for attempt in range(max_retries):
        try:
            return {"status": "success", "data": _execute_tool(tool_name, tool_input)}
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                yield {
                    "type": "tool_retry",
                    "tool_name": tool_name,
                    "attempt": attempt + 1,
                    "max_retries": max_retries,
                    "wait_seconds": wait_time,
                    "error": str(e)
                }
                time.sleep(wait_time)
            else:
                return {"status": "failed", "error": str(e)}

Error Handling and Resilience

Production agents fail in ways the documentation doesn’t prepare you for. Here are the real patterns:

Pattern: Circuit Breaker for Flaky Tools

If a tool fails repeatedly, don’t keep calling it. Use a circuit breaker:

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Tool is failing, reject calls
    HALF_OPEN = "half_open"  # Testing if tool has recovered

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout_seconds: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout_seconds = timeout_seconds
        self.failure_count = 0
        self.state = CircuitState.CLOSED
        self.last_failure_time = None
    
    def call(self, tool_func, *args, **kwargs):
        """Call tool through circuit breaker."""
        if self.state == CircuitState.OPEN:
            # Check if we should try half-open
            if datetime.now() > self.last_failure_time + timedelta(seconds=self.timeout_seconds):
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception(f"Circuit breaker OPEN for {self.timeout_seconds}s")
        
        try:
            result = tool_func(*args, **kwargs)
            # Success, reset
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
            self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise e

Pattern: Fallback Tools

When a tool fails, offer the agent a fallback:

def execute_tool_with_fallback(tool_name: str, tool_input: dict):
    """Execute tool with fallback strategy."""
    fallbacks = {
        "query_database": "query_cache",
        "call_external_api": "use_cached_response",
        "generate_report": "return_template"
    }
    
    try:
        return _execute_tool(tool_name, tool_input)
    except Exception as e:
        fallback = fallbacks.get(tool_name)
        if fallback:
            return {
                "status": "fallback",
                "primary_tool": tool_name,
                "fallback_tool": fallback,
                "reason": str(e),
                "data": _execute_tool(fallback, tool_input)
            }
        else:
            raise e

The agent sees that a tool failed and a fallback was used, so it can adjust its reasoning accordingly.

Pattern: Deadletter Queue for Failed Executions

When all else fails, capture the failed execution for later analysis:

import uuid
from datetime import datetime

def capture_failed_execution(agent_id: str, user_message: str, tool_name: str, tool_input: dict, error: Exception):
    """Capture failed execution for debugging."""
    execution_id = str(uuid.uuid4())
    
    deadletter = {
        "execution_id": execution_id,
        "timestamp": datetime.now().isoformat(),
        "agent_id": agent_id,
        "user_message": user_message,
        "tool_name": tool_name,
        "tool_input": tool_input,
        "error": str(error),
        "error_type": type(error).__name__,
        "traceback": traceback.format_exc()
    }
    
    # Write to deadletter queue (SQS, Kafka, database)
    deadletter_queue.put(deadletter)
    
    # Emit event to client
    return {
        "type": "execution_failed",
        "execution_id": execution_id,
        "message": "This execution has been logged for debugging. Support can investigate using execution ID."
    }

Observability and Debugging

Streaming makes debugging harder because you lose the ability to see the full execution after the fact. You need proper observability.

Structured Logging

Every event should be logged with full context:

import logging
import json
from uuid import uuid4

logger = logging.getLogger(__name__)

class StreamingAgentLogger:
    def __init__(self, agent_id: str, user_id: str):
        self.agent_id = agent_id
        self.user_id = user_id
        self.session_id = str(uuid4())
        self.start_time = datetime.now()
    
    def log_event(self, event_type: str, **kwargs):
        """Log an agent event with full context."""
        log_entry = {
            "session_id": self.session_id,
            "agent_id": self.agent_id,
            "user_id": self.user_id,
            "event_type": event_type,
            "timestamp": datetime.now().isoformat(),
            "elapsed_ms": (datetime.now() - self.start_time).total_seconds() * 1000,
            **kwargs
        }
        logger.info(json.dumps(log_entry))
        return log_entry

This structured logging lets you reconstruct the entire execution flow from logs.

Distributed Tracing

For complex systems with multiple services, use distributed tracing (OpenTelemetry, Datadog, Jaeger):

from opentelemetry import trace, context
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

tracer = trace.get_tracer(__name__)

def stream_agent_with_tracing(user_message: str):
    """Stream agent response with distributed tracing."""
    with tracer.start_as_current_span("agent_execution") as span:
        span.set_attribute("user_message", user_message)
        
        for event in agent.stream_agent_response(user_message):
            if event["type"] == "tool_call":
                with tracer.start_as_current_span(f"tool_{event['tool_name']}") as tool_span:
                    tool_span.set_attribute("tool_name", event["tool_name"])
                    tool_span.set_attribute("tool_input", json.dumps(event["input"]))
            
            yield event

This gives you a complete trace of tool calls, latencies, and failures across your entire system.


Performance Tuning at Scale

When you’re streaming tool outputs for thousands of concurrent agents, performance becomes critical.

Buffering Strategy

Don’t emit every single event immediately. Buffer small events and flush periodically:

class EventBuffer:
    def __init__(self, max_size: int = 100, flush_interval_ms: int = 100):
        self.max_size = max_size
        self.flush_interval_ms = flush_interval_ms
        self.buffer = []
        self.last_flush = datetime.now()
    
    def add(self, event: dict):
        """Add event to buffer, flush if needed."""
        self.buffer.append(event)
        
        should_flush = (
            len(self.buffer) >= self.max_size or
            (datetime.now() - self.last_flush).total_seconds() * 1000 > self.flush_interval_ms
        )
        
        if should_flush:
            return self.flush()
        return None
    
    def flush(self):
        """Return buffered events and reset."""
        events = self.buffer
        self.buffer = []
        self.last_flush = datetime.now()
        return events

Connection Pooling for Tools

If your tools make database or API calls, use connection pooling to avoid exhausting resources:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Create engine with connection pooling
engine = create_engine(
    "postgresql://user:password@localhost/db",
    poolclass=QueuePool,
    pool_size=20,           # Number of connections to keep in pool
    max_overflow=10,        # Additional connections when pool is exhausted
    pool_recycle=3600,      # Recycle connections after 1 hour
    pool_pre_ping=True      # Test connections before using
)

Rate Limiting

Prevent tool calls from overwhelming external services:

from ratelimit import limits, sleep_and_retry
import time

class RateLimitedTool:
    def __init__(self, calls_per_second: int = 10):
        self.calls_per_second = calls_per_second
        self.last_call_time = 0
        self.call_count = 0
    
    @sleep_and_retry
    @limits(calls=10, period=1)  # 10 calls per second
    def execute(self, tool_name: str, tool_input: dict):
        return _execute_tool(tool_name, tool_input)

Security Considerations

Streaming introduces security surface area you need to defend.

Input Validation

Validate tool inputs before execution:

from pydantic import BaseModel, validator

class QueryDatabaseInput(BaseModel):
    query: str
    limit: int = 10
    
    @validator('limit')
    def limit_must_be_reasonable(cls, v):
        if v > 1000:
            raise ValueError('limit cannot exceed 1000')
        return v
    
    @validator('query')
    def query_must_not_contain_dangerous_keywords(cls, v):
        dangerous = ['DROP', 'DELETE', 'TRUNCATE']
        if any(keyword in v.upper() for keyword in dangerous):
            raise ValueError('Query contains dangerous keywords')
        return v

Tool Sandboxing

Run tools in isolated environments when possible:

import subprocess
import json

def execute_tool_sandboxed(tool_name: str, tool_input: dict, timeout_seconds: int = 10):
    """Execute tool in subprocess sandbox."""
    try:
        result = subprocess.run(
            ["python", f"tools/{tool_name}.py"],
            input=json.dumps(tool_input),
            capture_output=True,
            timeout=timeout_seconds,
            text=True
        )
        
        if result.returncode != 0:
            return {"error": result.stderr}
        
        return json.loads(result.stdout)
    except subprocess.TimeoutExpired:
        return {"error": "Tool execution timeout"}

Audit Logging

For regulated industries (financial services, healthcare), audit every tool call:

def audit_log_tool_call(user_id: str, tool_name: str, tool_input: dict, result: dict):
    """Log tool call for compliance audit."""
    audit_entry = {
        "timestamp": datetime.now().isoformat(),
        "user_id": user_id,
        "tool_name": tool_name,
        "tool_input": tool_input,
        "result_status": result.get("status"),
        "result_size_bytes": len(json.dumps(result)),
    }
    
    # Write to immutable audit log (append-only database or file)
    audit_database.insert(audit_entry)

For teams building AI systems in regulated environments, this is essential. We cover this in detail for financial services and healthcare workflows.


Migration and Rollout Strategy

If you’re moving from non-streaming to streaming agents, do it carefully.

Phase 1: Shadow Streaming

Run streaming in parallel with your existing system, but don’t send streaming events to users yet:

def stream_agent_response_shadow(user_message: str):
    """Stream agent response, but don't expose to user yet."""
    events = []
    
    for event in agent.stream_agent_response(user_message):
        events.append(event)
        # Log for analysis but don't yield yet
        logger.info(f"Shadow event: {event['type']}")
    
    # Return final response as before (non-streaming)
    final_response = next(
        (e for e in reversed(events) if e['type'] == 'final_response'),
        None
    )
    return final_response

Run this for a week, monitor for issues, ensure the streaming events match your non-streaming baseline.

Phase 2: Beta Rollout

Enable streaming for 10% of users, monitor for issues:

import random

def should_use_streaming(user_id: str, beta_percentage: int = 10):
    """Determine if user should get streaming responses."""
    # Consistent hash so same user always gets same treatment
    user_hash = hash(user_id) % 100
    return user_hash < beta_percentage

@app.post("/api/agent/stream")
async def stream_agent(request: dict):
    user_id = request.get("user_id")
    
    if should_use_streaming(user_id):
        return StreamingResponse(...)
    else:
        # Fall back to non-streaming for non-beta users
        return {"response": agent.get_response(request["message"])}

Phase 3: Full Rollout

Once you’re confident, enable for all users. Keep the fallback logic in place for at least 30 days.


Real-World Lessons from Production

Here are the patterns we’ve learned shipping agentic AI systems at scale:

Lesson 1: Streaming Latency Matters More Than Total Latency

Users perceive latency from when they submit a request to when they see the first event. A system that takes 30 seconds total but shows the first event in 200ms feels faster than a system that takes 5 seconds total but shows nothing for 4 seconds.

Optimise for time-to-first-event, not total execution time.

Lesson 2: Tool Call Ordering Matters

Don’t let the agent call tools in arbitrary order. If tool B depends on the output of tool A, enforce that dependency in your tool definitions. Use tool schemas to make dependencies explicit.

Lesson 3: Streaming Breaks Caching

With non-streaming responses, you can cache the entire response. With streaming, you can’t—each client gets a unique stream. Plan your caching strategy accordingly (cache tool results, not streams).

Lesson 4: Monitor Tool Latency Separately

When streaming breaks, it’s usually because one tool is slow. Instrument every tool call with timing:

import time

def execute_tool_with_timing(tool_name: str, tool_input: dict):
    start = time.time()
    result = _execute_tool(tool_name, tool_input)
    duration_ms = (time.time() - start) * 1000
    
    logger.info(f"Tool {tool_name} took {duration_ms:.0f}ms")
    
    if duration_ms > 5000:  # Alert if tool is slow
        alert(f"Slow tool: {tool_name} took {duration_ms:.0f}ms")
    
    return result

Next Steps

Streaming tool use outputs is non-trivial engineering. Here’s how to get started:

  1. Start with SSE: If you’re building a web application, Server-Sent Events is the simplest path. Implement the FastAPI pattern above.

  2. Instrument observability first: Before you ship streaming to production, add structured logging and distributed tracing. You’ll need it.

  3. Test with real latencies: Don’t test with instant tools. Simulate realistic tool latencies (database queries, API calls) so you understand how your system behaves under real conditions.

  4. Implement circuit breakers: Before you ship to production, add circuit breaker patterns to handle failing tools gracefully.

  5. Monitor tool latencies: Set up dashboards that show you which tools are slow, which are failing, and which are being called most frequently.

  6. Plan your rollout carefully: Use shadow streaming and beta rollouts to validate your implementation before full deployment.

For teams building AI agents in production, this is foundational work. We’ve learned these patterns by shipping agentic AI systems across 3PL operations, healthcare, financial services, and more. If you’re building something similar and want to move faster, we can help—we offer fractional CTO support and custom software development for teams shipping AI products.

The teams that get streaming right see 40–60% improvements in perceived latency and significantly better error recovery. It’s worth the engineering effort.


Key Takeaways

  • Streaming is about perceived latency: Users don’t care about total execution time—they care about time-to-first-event. Stream aggressively.
  • Use SSE for web, WebSocket for interactive, queues for async: Choose the right transport for your use case.
  • Instrument everything: Structured logging and distributed tracing are non-negotiable at scale.
  • Build for failure: Circuit breakers, fallbacks, and deadletter queues let your agents degrade gracefully.
  • Monitor tool latencies: Your agent is only as fast as your slowest tool. Make that visible.
  • Roll out carefully: Shadow streaming, beta rollouts, and fallback logic reduce risk.

Streaming tool use outputs is hard engineering. But it’s the difference between agents that feel responsive and agents that feel broken. Get it right, and your users will feel the difference.

Want to talk through your situation?

Book a 30-minute call with Kevin (Founder/CEO). No pitch — direct advice on what to do next.

Book a 30-min call