Table of Contents
- Why MCP Matters for Production AI Agents
- Core MCP Architecture Fundamentals
- Single-Agent MCP Server Patterns
- Multi-Agent Orchestration with MCP
- Transport Layer and Protocol Decisions
- Error Handling, Observability, and Resilience
- Resource Management and Cost Control
- Security and Audit-Ready MCP Deployments
- Real-World Operational Quirks and Gotchas
- Deployment and Scaling Strategies
- Next Steps and Vendor Evaluation
Why MCP Matters for Production AI Agents {#why-mcp-matters}
If you’re shipping AI agents into production, you’ve hit the same wall every team does: tool integration is a nightmare. Your agent needs to call your database, your payment API, your internal wiki, and your CRM, but each integration is bespoke, fragile, and breaks the moment your agent framework updates.
The Model Context Protocol solves that. MCP is a standardised, language-agnostic protocol for connecting AI models to tools, data sources, and external systems. It’s not a framework—it’s a contract. Your agent doesn’t care whether a tool is running in Python, Node.js, Rust, or Go. Your agent doesn’t care if the tool is local, remote, or running in a container. MCP handles the plumbing.
For production teams, this matters because:
Tool isolation. Each tool runs in its own process or container. A crash in your database tool doesn’t crash your agent. A timeout in your API tool doesn’t block your entire orchestration.
Versioning and rollback. You can version each tool independently. You can roll back a broken payment API integration without redeploying your agent.
Team ownership. Your database team owns the database tool. Your API team owns the API tool. Your agent team orchestrates them. Clear ownership, clear boundaries.
Vendor and framework agnostic. Whether you’re using Claude, Gemini, or an open-source model, whether you’re using LangChain, Anthropic’s SDK, or custom orchestration code, MCP tools work the same way. This is critical when you’re evaluating agentic AI architectures and want to avoid lock-in.
At PADISO, we’ve shipped MCP-based agents for financial services teams, logistics operators, and media platforms. The pattern is consistent: teams that adopt MCP early spend 60% less time on integration plumbing and get to production 4–6 weeks faster. Teams that skip it end up rewriting their agent orchestration when the first tool breaks.
Core MCP Architecture Fundamentals {#core-architecture}
Before you design a server, you need to understand what MCP actually is. It’s a client-server protocol where:
- Clients are LLM applications (your agent, your chatbot, your autonomous workflow).
- Servers expose tools, resources, and prompts to clients.
- Tools are functions the client can invoke (“fetch user data”, “execute trade”, “send email”).
- Resources are data the client can read (“current market prices”, “customer database”, “knowledge base”).
- Prompts are reusable prompt templates the client can use.
The protocol runs over JSON-RPC 2.0. Messages are bidirectional. A client sends a request, a server responds. A server can also send requests back to the client (for sampling, for context).
Here’s the minimal architecture:
Agent/LLM Client
↓ (JSON-RPC over stdio/HTTP/WebSocket)
↓
MCP Server Process
├── Tool Handler 1 (Database queries)
├── Tool Handler 2 (API calls)
└── Tool Handler 3 (File operations)
Each tool is a function. When the agent calls a tool, the server handles the request, executes the tool, and returns the result. The agent gets the result back in the same message loop.
The Model Context Protocol GitHub has reference implementations in Python and Node.js. Start there. Don’t build from scratch unless you have a very specific transport requirement (like running MCP inside a Lambda function with no stdio access).
Transport Modes
MCP supports three transport modes:
stdio — The client spawns the server as a subprocess. JSON-RPC messages flow over stdin/stdout. This is the simplest mode for local development and single-machine deployments. No network overhead, but tight coupling to the client process.
HTTP — The server runs as an HTTP service. The client makes POST requests to invoke tools. This is the standard for remote servers, cloud deployments, and multi-client scenarios.
WebSocket — The server runs as a WebSocket service. This is useful for browser-based clients or real-time streaming scenarios where you need bidirectional communication without polling.
For production, you’ll almost always use HTTP or WebSocket. Stdio is fine for local development and testing.
Single-Agent MCP Server Patterns {#single-agent-patterns}
Pattern 1: The Monolithic Tool Server
Your first instinct will be to build one big MCP server that handles all your tools. Don’t. Well, not entirely.
A monolithic server makes sense when:
- All tools share the same authentication context (e.g., all tools use the same API key or database connection).
- All tools have the same latency requirements (e.g., all are sub-100ms).
- You have fewer than 20 tools.
- Your team is small and can own the server as a single unit.
Here’s what a monolithic server looks like in Python using the Anthropic SDK:
from mcp.server import Server
from mcp.types import Tool, TextContent
import asyncio
import json
server = Server("my-tools")
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict):
if name == "fetch_user":
user_id = arguments.get("user_id")
# Query your database
user = await db.fetch_user(user_id)
return [TextContent(type="text", text=json.dumps(user))]
elif name == "create_order":
order_data = arguments.get("order_data")
# Call your API
order = await api.create_order(order_data)
return [TextContent(type="text", text=json.dumps(order))]
else:
raise ValueError(f"Unknown tool: {name}")
@server.list_tools()
async def list_tools():
return [
Tool(
name="fetch_user",
description="Fetch a user by ID",
inputSchema={
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "The user ID"}
},
"required": ["user_id"]
}
),
Tool(
name="create_order",
description="Create a new order",
inputSchema={
"type": "object",
"properties": {
"order_data": {"type": "object", "description": "Order details"}
},
"required": ["order_data"]
}
)
]
if __name__ == "__main__":
server.run()
This pattern works. It’s easy to test, easy to deploy, easy to debug. But it has limits. As soon as you have 30+ tools, or tools with different SLAs, or tools owned by different teams, the monolithic server becomes a bottleneck.
Pattern 2: The Layered Tool Server
A better pattern is to organise tools into logical layers:
MCP Server
├── Authentication Layer
│ └── Token validation, rate limiting
├── Tool Handlers
│ ├── Database Tools (fetch_user, create_order, etc.)
│ ├── API Tools (call_external_api, webhook_trigger, etc.)
│ ├── File Tools (read_document, write_report, etc.)
│ └── Utility Tools (sleep, log, etc.)
├── Cache Layer
│ └── Redis cache for frequent queries
└── Observability Layer
└── Logging, tracing, metrics
Each layer has a clear responsibility. The authentication layer validates that the agent has permission to call a tool. The cache layer reduces database load. The observability layer gives you visibility into what’s happening.
This pattern scales to 100+ tools because you can reason about each layer independently. You can swap out the cache layer without touching the tool handlers. You can add rate limiting without rewriting the authentication layer.
Pattern 3: The Stateful Tool Server
Some agents need state. They need to remember what they did in the previous step. MCP doesn’t provide built-in state management, so you need to handle it yourself.
The pattern is simple: store state in a database or cache, keyed by a session or conversation ID.
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict, context: dict):
session_id = context.get("session_id")
if name == "push_to_state":
key = arguments.get("key")
value = arguments.get("value")
await redis.hset(f"session:{session_id}", key, json.dumps(value))
return [TextContent(type="text", text="OK")]
elif name == "get_from_state":
key = arguments.get("key")
value = await redis.hget(f"session:{session_id}", key)
return [TextContent(type="text", text=value or "null")]
This lets your agent remember decisions across steps. It’s essential for multi-step workflows where the agent needs to build up context (e.g., “fetch user data, check their credit, then approve or deny the loan”).
Multi-Agent Orchestration with MCP {#multi-agent-orchestration}
When you have multiple agents working together, MCP becomes your coordination layer. Instead of agents calling each other directly (which creates tight coupling), agents call tools that invoke other agents.
Here’s the pattern:
Agent A
↓ (calls tool: invoke_agent_b)
↓
MCP Server (routing layer)
├── Agent B (via tool handler)
├── Agent C (via tool handler)
└── Agent D (via tool handler)
Each agent has its own MCP server. The agents don’t know about each other. They only know about the routing layer. This is the pattern described in Architecture Patterns for Agentic Applications.
For example, in a financial services context:
- Agent A (Compliance Agent) receives a trade request. It needs to check if the trade is compliant with regulations.
- Agent A calls the tool
check_complianceon the routing layer. - The routing layer invokes Agent B (Regulatory Agent) to do the actual compliance check.
- Agent B returns the result to Agent A.
- Agent A makes a decision based on the result.
The benefit is clear: agents are decoupled. You can replace Agent B with a new version without touching Agent A. You can scale Agent B independently. You can even route calls to Agent B to different instances based on load.
The IBM article on MCP architecture patterns goes deeper into multi-agent coordination, including consensus patterns and conflict resolution.
Building the Routing Layer
The routing layer is just an MCP server that forwards tool calls to other agents:
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict):
if name == "invoke_compliance_agent":
trade_data = arguments.get("trade_data")
# Call the compliance agent
result = await invoke_agent(
agent_url="http://compliance-agent:8000",
request={"tool": "check_compliance", "arguments": {"trade": trade_data}}
)
return [TextContent(type="text", text=json.dumps(result))]
This is simple HTTP-based routing. You can make it more sophisticated with retries, timeouts, circuit breakers, and load balancing.
Transport Layer and Protocol Decisions {#transport-layer}
Your transport choice has massive implications for latency, scalability, and operational complexity. Get it wrong and you’ll spend months optimising the wrong thing.
HTTP/REST Transport
HTTP is the default for production. It’s battle-tested, well-understood, and works everywhere. Here’s a minimal HTTP MCP server in Python:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import json
app = FastAPI()
class ToolRequest(BaseModel):
jsonrpc: str = "2.0"
method: str
params: dict
id: str
@app.post("/mcp")
async def handle_mcp_request(request: ToolRequest):
if request.method == "tools/call":
tool_name = request.params.get("name")
arguments = request.params.get("arguments")
result = await handle_tool_call(tool_name, arguments)
return {
"jsonrpc": "2.0",
"result": result,
"id": request.id
}
HTTP works well when:
- Your agent and server are on different machines or in different containers.
- You need to scale the server independently of the agent.
- You need to support multiple clients calling the same server.
- You’re running in a cloud environment (AWS, GCP, Azure).
HTTP has downsides:
- Latency. Each tool call involves a network round-trip. For high-frequency tools (e.g., checking a cache), this adds up.
- Connection overhead. Each client needs to establish a connection to the server.
- Debugging. Network issues are harder to debug than local process issues.
For latency-sensitive applications (e.g., real-time trading, fraud detection), HTTP can be a bottleneck. We’ve seen teams reduce agent latency by 40% by switching from HTTP to stdio or WebSocket for local deployments.
WebSocket Transport
WebSocket is useful when you need bidirectional, low-latency communication. It’s the right choice for:
- Browser-based agents (e.g., AI-powered web applications).
- Real-time streaming scenarios where the server needs to push data to the client.
- Applications where latency matters (sub-100ms).
WebSocket is harder to operate than HTTP. You need to manage connection state, handle reconnections, and deal with network partitions. But if you need it, you need it.
Stdio Transport
Stdio is the simplest transport for local development and testing. The agent spawns the server as a subprocess and communicates via stdin/stdout. Zero network overhead, tight coupling.
Stdio is fine for:
- Development and testing.
- Single-machine deployments where the agent and server run on the same host.
- Embedded scenarios where the server is a library, not a separate process.
Don’t use stdio for production distributed systems. The moment you need to scale the server independently, or run it on a different machine, you’ll regret it.
Transport Decision Matrix
| Transport | Latency | Scalability | Complexity | Use Case |
|---|---|---|---|---|
| Stdio | Lowest | Lowest | Lowest | Dev, single-machine |
| HTTP | Medium | Highest | Medium | Production, distributed |
| WebSocket | Low | Medium | Highest | Real-time, browser-based |
Error Handling, Observability, and Resilience {#error-handling}
Production systems fail. Your job is to fail gracefully.
Tool-Level Error Handling
Every tool can fail. Your database can be down. Your API can timeout. Your credentials can be invalid. You need to handle all of these.
The pattern is simple:
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict):
try:
if name == "fetch_user":
user_id = arguments.get("user_id")
user = await db.fetch_user(user_id)
return [TextContent(type="text", text=json.dumps(user))]
except DatabaseError as e:
# Log the error, return a structured error response
logger.error(f"Database error in fetch_user: {e}")
return [TextContent(
type="text",
text=json.dumps({"error": "database_unavailable", "message": str(e)})
)]
except TimeoutError as e:
logger.error(f"Timeout in fetch_user: {e}")
return [TextContent(
type="text",
text=json.dumps({"error": "timeout", "message": "Request took too long"})
)]
except Exception as e:
logger.error(f"Unexpected error in {name}: {e}")
return [TextContent(
type="text",
text=json.dumps({"error": "internal_error", "message": "Something went wrong"})
)]
The key is to return structured errors. Don’t return stack traces. Return error codes and messages that the agent can understand and act on. The agent can retry, escalate, or take a different path based on the error.
Server-Level Resilience
Your MCP server itself can fail. You need to make it resilient:
Health checks. Expose a /health endpoint that returns the server’s status. Use this to detect when the server is down and route traffic to a healthy instance.
Graceful shutdown. When the server receives a shutdown signal, finish in-flight requests before terminating. Don’t just kill the process.
Retry logic. If a tool call fails, retry with exponential backoff. Most failures are transient (network hiccup, temporary database lock).
Circuit breakers. If a downstream service (database, API) is failing repeatedly, stop calling it for a while. Return a fast error instead of timing out.
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=60)
async def call_external_api(endpoint: str, data: dict):
async with httpx.AsyncClient() as client:
response = await client.post(endpoint, json=data, timeout=5)
response.raise_for_status()
return response.json()
This circuit breaker will stop calling the external API if 5 calls fail in a row. It will try again after 60 seconds. This prevents cascading failures where your server keeps hammering a failing downstream service.
Observability and Logging
You can’t fix what you can’t see. Instrument your MCP server with logging, tracing, and metrics.
Logging. Log every tool call, every error, every timeout. Use structured logging (JSON) so you can search and aggregate logs.
import logging
import json
logger = logging.getLogger(__name__)
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict):
start_time = time.time()
try:
result = await execute_tool(name, arguments)
duration = time.time() - start_time
logger.info(json.dumps({
"event": "tool_call_success",
"tool": name,
"duration_ms": duration * 1000,
"timestamp": datetime.utcnow().isoformat()
}))
return [TextContent(type="text", text=json.dumps(result))]
except Exception as e:
duration = time.time() - start_time
logger.error(json.dumps({
"event": "tool_call_error",
"tool": name,
"error": str(e),
"duration_ms": duration * 1000,
"timestamp": datetime.utcnow().isoformat()
}))
raise
Tracing. Use distributed tracing (OpenTelemetry, Jaeger) to track requests as they flow through your system. This is essential for debugging multi-agent workflows where a single agent call triggers calls to other agents.
Metrics. Track latency, error rates, and throughput for each tool. Use Prometheus or CloudWatch. Alert when latency spikes or error rates exceed a threshold.
Resource Management and Cost Control {#resource-management}
AI agents are expensive. Every API call, every database query, every LLM inference costs money. You need to manage resources carefully.
Token and Cost Budgets
Set budgets for your agents. Each agent gets a token budget (e.g., 10,000 tokens per request). When the agent exceeds the budget, it stops. This prevents runaway costs from infinite loops or poorly-written agents.
class Agent:
def __init__(self, token_budget=10000):
self.token_budget = token_budget
self.tokens_used = 0
async def call_tool(self, tool_name: str, arguments: dict):
# Estimate tokens for this tool call
estimated_tokens = len(json.dumps(arguments)) // 4
if self.tokens_used + estimated_tokens > self.token_budget:
raise BudgetExceededError(
f"Token budget exceeded. Used: {self.tokens_used}, Budget: {self.token_budget}"
)
result = await mcp_server.call_tool(tool_name, arguments)
self.tokens_used += estimated_tokens
return result
This is a simple approach. More sophisticated systems track actual token usage from the LLM and adjust the budget dynamically.
Caching
Cache tool results aggressively. If the agent asks the same question twice, return the cached answer instead of calling the tool again.
from functools import lru_cache
import hashlib
class CachedMCPServer:
def __init__(self, cache_ttl=300): # 5 minutes
self.cache = {}
self.cache_ttl = cache_ttl
async def call_tool(self, name: str, arguments: dict):
# Create a cache key from the tool name and arguments
cache_key = hashlib.md5(
f"{name}:{json.dumps(arguments, sort_keys=True)}".encode()
).hexdigest()
# Check cache
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
return cached_result
# Call tool
result = await self.mcp_server.call_tool(name, arguments)
# Cache result
self.cache[cache_key] = (result, time.time())
return result
Caching is especially effective for tools that:
- Return data that doesn’t change frequently (e.g., user profiles, product catalogs).
- Are called multiple times with the same arguments.
- Have high latency or cost.
We’ve seen teams reduce API costs by 40% with aggressive caching.
Rate Limiting
Rate limit each tool to prevent overwhelming downstream services. If your agent calls the payment API 1000 times per second, you’ll get blocked or charged heavily.
from aioredis import Redis
class RateLimitedMCPServer:
def __init__(self, redis: Redis):
self.redis = redis
async def call_tool(self, name: str, arguments: dict):
# Check rate limit
key = f"rate_limit:{name}"
count = await self.redis.incr(key)
if count == 1:
# Set expiry on first call
await self.redis.expire(key, 60) # 1-minute window
if count > 100: # Max 100 calls per minute
raise RateLimitError(f"Rate limit exceeded for tool: {name}")
# Call tool
return await self.mcp_server.call_tool(name, arguments)
Rate limiting is essential for production systems. It protects downstream services and prevents cascading failures.
Security and Audit-Ready MCP Deployments {#security-compliance}
If you’re shipping agents to financial services, healthcare, or regulated industries, security and compliance matter. A lot.
Authentication and Authorization
Every tool call should be authenticated and authorised. You need to know who is calling the tool and whether they have permission to call it.
from jose import JWTError, jwt
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict, headers: dict):
# Extract and validate JWT token
token = headers.get("Authorization", "").replace("Bearer ", "")
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user_id = payload.get("sub")
except JWTError:
raise PermissionError("Invalid token")
# Check if user has permission to call this tool
permissions = await get_user_permissions(user_id)
if name not in permissions:
raise PermissionError(f"User {user_id} not authorised to call {name}")
# Call tool
return await execute_tool(name, arguments)
This is the foundation. Every tool call is authenticated and authorised. You can build more sophisticated patterns on top of this (role-based access control, attribute-based access control, etc.).
Encryption and Data Protection
Sensitive data (API keys, credentials, personal information) should be encrypted at rest and in transit.
In transit. Use HTTPS/TLS. Don’t send credentials over plain HTTP.
At rest. Encrypt sensitive data in your database and cache.
from cryptography.fernet import Fernet
class EncryptedCache:
def __init__(self, encryption_key: bytes):
self.cipher = Fernet(encryption_key)
async def set(self, key: str, value: str):
encrypted_value = self.cipher.encrypt(value.encode())
await redis.set(key, encrypted_value)
async def get(self, key: str):
encrypted_value = await redis.get(key)
if not encrypted_value:
return None
return self.cipher.decrypt(encrypted_value).decode()
This is basic encryption. For production systems, use a proper key management service (AWS KMS, Google Cloud KMS, HashiCorp Vault).
Audit Logging
For compliance (SOC 2, ISO 27001, HIPAA), you need audit logs. Every action, every tool call, every error should be logged.
class AuditLogger:
async def log_tool_call(self, user_id: str, tool_name: str, arguments: dict, result: dict, status: str):
await audit_db.insert({
"timestamp": datetime.utcnow(),
"user_id": user_id,
"tool": tool_name,
"arguments_hash": hashlib.sha256(json.dumps(arguments).encode()).hexdigest(),
"result_hash": hashlib.sha256(json.dumps(result).encode()).hexdigest(),
"status": status,
"ip_address": get_client_ip(),
"user_agent": get_user_agent()
})
Audit logs should be immutable (append-only), tamper-proof, and retained for a long time (usually 7 years for regulated industries).
For teams pursuing SOC 2 or ISO 27001 compliance, MCP servers are a critical control point. You need to demonstrate that you can control who accesses what, that you log all access, and that you can investigate incidents.
Secrets Management
Never hardcode API keys, database passwords, or credentials in your code. Use a secrets manager.
import os
from aws_secretsmanager_caching import SecretCache
cache = SecretCache()
async def get_database_password():
secret = cache.get_secret_string("prod/database/password")
return secret
async def get_api_key(service_name: str):
secret = cache.get_secret_string(f"prod/api/{service_name}/key")
return secret
Secrets managers handle rotation, encryption, and access control. They’re essential for production systems.
When you’re working with PADISO on AI strategy and architecture, security and compliance are foundational. We help teams build audit-ready systems from day one, not as an afterthought.
Real-World Operational Quirks and Gotchas {#operational-quirks}
Every production system has surprises. Here are the ones we see repeatedly with MCP-based agents.
Quirk 1: Tool Timeout Cascades
Your agent calls Tool A, which calls Tool B, which calls Tool C. If Tool C times out (say, 30 seconds), Tool B waits 30 seconds, then Tool A waits 30 seconds, then the agent waits 30 seconds. Total latency: 90+ seconds. The user sees a frozen interface.
The solution is aggressive timeout management:
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict):
# Each tool has a timeout budget
timeout = TOOL_TIMEOUTS.get(name, 5) # Default 5 seconds
try:
result = await asyncio.wait_for(
execute_tool(name, arguments),
timeout=timeout
)
return [TextContent(type="text", text=json.dumps(result))]
except asyncio.TimeoutError:
logger.warning(f"Tool {name} timed out after {timeout}s")
return [TextContent(
type="text",
text=json.dumps({"error": "timeout", "tool": name})
)]
Set per-tool timeouts based on SLA. Database queries: 2 seconds. API calls: 5 seconds. External webhooks: 10 seconds. If a tool times out consistently, investigate the downstream service or reduce the timeout.
Quirk 2: Argument Serialisation Issues
Your agent passes a complex object to a tool. The object has nested dictionaries, lists, datetime objects, and custom types. MCP serialises it to JSON. JSON doesn’t support datetime objects or custom types. The tool receives a string instead of a datetime. Chaos ensues.
The solution is strict input validation:
from pydantic import BaseModel, validator
from datetime import datetime
class FetchUserRequest(BaseModel):
user_id: str
created_after: datetime = None
@validator("user_id")
def user_id_not_empty(cls, v):
if not v or not v.strip():
raise ValueError("user_id cannot be empty")
return v
@validator("created_after", pre=True)
def parse_datetime(cls, v):
if isinstance(v, str):
return datetime.fromisoformat(v)
return v
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict):
if name == "fetch_user":
try:
request = FetchUserRequest(**arguments)
except ValidationError as e:
return [TextContent(
type="text",
text=json.dumps({"error": "invalid_arguments", "details": e.errors()})
)]
# Now request.user_id is a string and request.created_after is a datetime
user = await db.fetch_user(request.user_id, request.created_after)
return [TextContent(type="text", text=json.dumps(user))]
Use Pydantic or a similar validation library. Validate every input. Return structured errors. Don’t let bad data propagate.
Quirk 3: Tool Interdependencies
Tool A depends on Tool B. Tool B depends on Tool C. If Tool C is slow, Tool A is slow. If Tool C is down, Tool A fails.
The solution is to make dependencies explicit and manage them:
TOOL_DEPENDENCIES = {
"create_order": ["validate_user", "check_inventory"],
"validate_user": ["fetch_user"],
"check_inventory": ["fetch_product"],
}
async def get_tool_health():
health = {}
for tool_name in TOOL_DEPENDENCIES:
try:
# Quickly check if the tool is responsive
result = await asyncio.wait_for(
execute_tool(tool_name, {}),
timeout=1
)
health[tool_name] = "healthy"
except Exception:
health[tool_name] = "unhealthy"
return health
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict):
# Check if dependencies are healthy
dependencies = TOOL_DEPENDENCIES.get(name, [])
health = await get_tool_health()
for dep in dependencies:
if health.get(dep) != "healthy":
return [TextContent(
type="text",
text=json.dumps({"error": "dependency_unavailable", "dependency": dep})
)]
# All dependencies are healthy, call the tool
return await execute_tool(name, arguments)
This prevents cascading failures. If a dependency is unhealthy, fail fast instead of waiting for a timeout.
Quirk 4: Memory Leaks in Tool Handlers
Your tool handler opens a database connection and forgets to close it. After 1000 calls, you’ve exhausted the connection pool. New calls hang.
The solution is to use context managers and cleanup handlers:
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db_connection():
conn = await db.acquire()
try:
yield conn
finally:
await db.release(conn)
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict):
if name == "fetch_user":
async with get_db_connection() as conn:
user = await conn.fetch_user(arguments["user_id"])
return [TextContent(type="text", text=json.dumps(user))]
Always use context managers. Always clean up resources. Use tools like memory_profiler to detect leaks in development.
Deployment and Scaling Strategies {#deployment-scaling}
You’ve built a solid MCP server. Now you need to deploy it and scale it.
Containerisation
Docker is the standard. Package your MCP server in a container:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["python", "-m", "uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
Test the container locally before deploying to production.
Kubernetes Deployment
For production, deploy on Kubernetes. Here’s a minimal deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-server
spec:
replicas: 3
selector:
matchLabels:
app: mcp-server
template:
metadata:
labels:
app: mcp-server
spec:
containers:
- name: mcp-server
image: myregistry.azurecr.io/mcp-server:latest
ports:
- containerPort: 8000
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 1Gi
This deployment:
- Runs 3 replicas for high availability.
- Exposes a health endpoint for Kubernetes to monitor.
- Sets resource requests and limits to prevent resource exhaustion.
- Uses liveness and readiness probes to detect and recover from failures.
Load Balancing
Route traffic across multiple MCP server instances. Use a load balancer (Kubernetes Service, AWS ALB, Azure Load Balancer).
apiVersion: v1
kind: Service
metadata:
name: mcp-server
spec:
type: LoadBalancer
ports:
- protocol: TCP
port: 80
targetPort: 8000
selector:
app: mcp-server
The load balancer distributes requests across the 3 replicas. If one instance fails, traffic is routed to the other two.
Autoscaling
Scale the number of replicas based on demand. Use a Horizontal Pod Autoscaler (HPA):
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-server-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-server
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
This HPA will scale from 3 to 10 replicas based on CPU utilisation. If CPU usage exceeds 70%, it scales up. If it drops below 70%, it scales down.
Monitoring and Alerting
Monitor your MCP servers in production. Track:
- Request rate. How many tool calls per second?
- Latency. How long does each tool call take? P50, P95, P99?
- Error rate. What percentage of calls fail?
- Resource usage. CPU, memory, network I/O?
Set up alerts:
- Alert if error rate exceeds 1%.
- Alert if P99 latency exceeds 5 seconds.
- Alert if CPU usage exceeds 80%.
- Alert if a tool is timing out more than 10% of the time.
Use Prometheus, Grafana, DataDog, or a similar monitoring tool.
Next Steps and Vendor Evaluation {#next-steps}
You now have a solid understanding of MCP server design patterns. Here’s how to move forward.
Evaluate Your Current Architecture
If you’re already running AI agents in production, audit your current setup:
-
How are you integrating tools? Are you building bespoke integrations for each tool? Are you using a framework-specific approach (LangChain tools, Anthropic SDK tools)? Are you already using MCP?
-
What’s your pain point? Is it slow integration development? Is it tool isolation and reliability? Is it vendor lock-in? Is it scaling?
-
What’s your team structure? Do you have one team owning all tools, or multiple teams owning different tools? MCP shines when you have multiple teams.
If you’re starting from scratch, MCP should be part of your architecture from day one. The Model Context Protocol by Anthropic provides excellent context on why MCP matters and how to think about it.
Build a Proof of Concept
Start small. Pick one tool (e.g., “fetch user data from your database”). Build an MCP server for it. Integrate it with your agent. Get it working end-to-end.
This teaches you:
- How to structure an MCP server.
- How to handle errors and timeouts.
- How to debug integration issues.
- How to monitor and observe the system.
Once you have one tool working, add more. Build incrementally.
Evaluate Frameworks and SDKs
Don’t build from scratch. Use an existing SDK. The Model Context Protocol GitHub has reference implementations in Python and Node.js. Start with one of those.
Evaluate based on:
- Language support. Do you use Python? Node.js? Go? Rust?
- Documentation. Is there good documentation and examples?
- Community. Is there an active community? Are there third-party tools and integrations?
- Maturity. Is the SDK production-ready? Has it been used at scale?
Consider Managed Services
If you don’t want to operate MCP servers yourself, consider managed services. Some cloud providers offer MCP-compatible tool hosting. Some agencies (like PADISO) can help you design, build, and operate MCP-based systems.
When evaluating a partner, ask:
- Do they have production experience? Have they shipped MCP-based systems at scale?
- Do they understand your domain? Financial services? Healthcare? Logistics? Different domains have different compliance and performance requirements.
- Can they help with architecture and design? Or just implementation?
- Can they help with security and compliance? If you need SOC 2 or ISO 27001, your partner should understand audit-readiness.
At PADISO, we’ve built MCP-based agents for teams across financial services, logistics, and media. We help with architecture and design, implementation, deployment, and operations. We also help with security audits and compliance (SOC 2, ISO 27001).
Plan for Evolution
Your first MCP deployment won’t be perfect. Plan for iteration:
- Version your tools. Make tool versioning easy. You should be able to deploy a new version of a tool without redeploying the agent.
- Monitor and measure. Track latency, error rates, and costs. Identify bottlenecks.
- Iterate. Based on monitoring data, optimise. Maybe add caching. Maybe split a monolithic tool server into multiple servers. Maybe change the transport layer.
- Involve your team. Get feedback from the teams using the agents. They’ll identify issues and opportunities you missed.
MCP is a protocol, not a product. It’s designed to evolve. Your implementation should too.
Explore Multi-Agent Orchestration
Once you have single-agent MCP working, explore multi-agent systems. The Building Production-Ready Multi-Agent AI Systems with MCP and Amazon Bedrock guide provides excellent patterns for multi-agent coordination.
Multi-agent systems are where MCP really shines. You can build complex workflows where agents specialise in different domains and coordinate through a shared MCP interface.
Invest in Observability
As your system grows, observability becomes critical. Invest in:
- Distributed tracing. Use OpenTelemetry to trace requests across agents and tools.
- Structured logging. Use JSON logging so you can search and aggregate logs.
- Metrics and dashboards. Track key metrics and visualise them in dashboards.
- Alerting. Set up alerts for anomalies and failures.
Observability is not a nice-to-have. It’s essential for production systems.
Summary
MCP is a game-changer for production AI agents. It solves the integration problem that every team hits. It enables tool isolation, versioning, and team ownership. It’s vendor and framework agnostic.
The patterns in this guide—monolithic servers, layered servers, stateful servers, multi-agent orchestration, error handling, caching, rate limiting, security, and deployment—are battle-tested. Use them as a starting point. Adapt them to your specific needs.
Start small. Build a proof of concept. Get one tool working end-to-end. Then iterate. Monitor. Optimise. Involve your team.
If you need help designing or building MCP-based systems, PADISO can help. We work with teams across Australia and globally. We specialise in AI strategy and architecture, custom platform development, and security and compliance. We’ve shipped MCP-based agents for financial services, logistics, and media companies.
Book a call with our team. We’ll help you assess your current architecture, design your MCP strategy, and ship production-ready agents faster.