Building Idempotent Tools for Long-Running Agents
Master idempotent tool design for production AI agents. Learn retry patterns, deduplication, side-effect logging, and safe agent restarts.
Building Idempotent Tools for Long-Running Agents
When you deploy an AI agent to production, you’re not deploying something that runs once and stops. You’re deploying something that runs for hours, days, or weeks—handling requests, making decisions, calling APIs, and moving money or data around. And when that agent crashes, hangs, or restarts mid-task, you need absolute certainty that it won’t double-charge a customer, duplicate an order, or leave your database in an inconsistent state.
That’s where idempotency comes in.
At PADISO, we’ve shipped dozens of production agents across financial services, supply chain, customer service, and healthcare. Every single tool we build follows the same non-negotiable pattern: idempotency first. Not as an afterthought. Not as a nice-to-have. As the foundation.
This guide walks you through the exact design patterns, code patterns, and operational practices we use to make agent restarts safe, retries bulletproof, and side effects traceable. Whether you’re building with LangChain, CrewAI, or a custom framework, these patterns apply.
Table of Contents
- Why Idempotency Matters for Agents
- Core Idempotency Patterns
- Deduplication Keys and Request IDs
- Side-Effect Logging and Audit Trails
- Handling Partial Failures and Retries
- Implementing Idempotency in Practice
- Testing Idempotent Tools
- Monitoring and Observability
- Common Pitfalls and How to Avoid Them
Why Idempotency Matters for Agents
A tool is idempotent if calling it multiple times with the same inputs produces the same result as calling it once. In plain English: if your agent retries a payment transfer three times, the customer’s account should only be debited once—not three times.
This is non-trivial because agents are non-deterministic. They make decisions based on prompts, model outputs, and external state. They call tools in sequences they weren’t explicitly programmed to follow. And when something fails—a timeout, a network glitch, a rate limit—the agent restarts and may retry the same tool call.
Without idempotency, that retry becomes a liability. You get duplicate charges, duplicate orders, duplicate database records, and the operational chaos that follows.
Consider a real scenario: you’re building an agentic AI system for an e-commerce platform that processes refunds. The agent receives a refund request, calculates the amount, calls the payment processor to reverse the charge, updates the order status, and sends a confirmation email. If the agent crashes after the payment reversal but before the email, and then restarts and retries the entire sequence, you’ve now issued the refund twice.
Idempotency prevents this. With the right patterns in place, that second refund attempt becomes a no-op—it returns the same result (“refund already processed”) without actually charging the customer again.
For startups and enterprises alike, this isn’t theoretical. We’ve seen teams lose tens of thousands of pounds to duplicate charges, and spend weeks cleaning up corrupted data because their agent tools weren’t idempotent. The cost of getting it right upfront is a fraction of the cost of fixing it in production.
Core Idempotency Patterns
The Idempotency Key Pattern
The foundation of idempotent tools is the idempotency key: a unique identifier that represents a specific action or request. When a tool is called with the same idempotency key, it returns the same result without repeating the side effect.
This pattern is used by every major payment processor (Stripe, PayPal, Square) and is the industry standard for good reason.
Here’s how it works:
- Generate or receive an idempotency key when the action is first requested. This key should be deterministic and unique to the action.
- Store the key and result before executing the side effect. This is critical: you write to storage first, then execute.
- Check for the key on every call. If it exists, return the stored result immediately without re-executing.
- Execute the side effect only if the key is new.
- Update the result once the side effect completes.
The key insight: you’re trading a small amount of storage (a key-value entry) for absolute certainty that side effects don’t repeat.
The State Machine Pattern
For longer-running operations, idempotency becomes more complex. A single tool call might involve multiple steps, and the agent might restart at any point. The solution is to model your tool as a state machine.
Each state represents a checkpoint in the operation. When the agent retries, it checks the current state and resumes from there—not from the beginning.
For example, a “process invoice” tool might have states like:
PENDING: Initial state, idempotency key stored but no side effects yetVALIDATED: Invoice data validated, not yet submittedSUBMITTED: Submitted to accounting system, awaiting confirmationCONFIRMED: Confirmed and recordedFAILED: Permanent failure, no retry
Each state transition is idempotent. If the agent restarts while the invoice is in the SUBMITTED state, it checks the accounting system for confirmation—it doesn’t re-submit.
The Write-Ahead Log Pattern
For critical operations, write everything to a log before executing. This gives you a complete audit trail and allows recovery if something goes wrong.
The pattern:
- Log the intent: “Agent is about to transfer £5,000 from account A to account B with idempotency key XYZ”
- Execute the transfer
- Log the result: “Transfer completed successfully” or “Transfer failed with error ABC”
If the agent crashes between steps 1 and 2, the log shows the intent but no result. On restart, the agent can check the log, see the incomplete operation, and either retry or roll back depending on the situation.
This pattern is essential for financial operations, data migrations, and any tool with high-cost side effects.
Deduplication Keys and Request IDs
Deduplication keys are the practical implementation of idempotency. They’re the mechanism that prevents duplicate work.
Generating Deduplication Keys
A good deduplication key should be:
- Unique to the action: Different actions get different keys
- Deterministic: The same action always generates the same key
- Traceable: You can look up the key in logs and understand what it represents
- Collision-resistant: Extremely low probability of two different actions generating the same key
Common approaches:
Approach 1: Hash-based keys
Hash the action parameters (agent ID, user ID, action type, timestamp, relevant data) to generate a key:
import hashlib
import json
def generate_idempotency_key(agent_id, user_id, action_type, action_params):
key_data = json.dumps({
"agent_id": agent_id,
"user_id": user_id,
"action_type": action_type,
"params": action_params,
}, sort_keys=True)
return hashlib.sha256(key_data.encode()).hexdigest()
This ensures the same agent, user, action type, and parameters always generate the same key. Retries will hit the same key and get cached results.
Approach 2: UUID-based keys
Generate a UUID at the start of the operation and pass it through the entire agent execution:
import uuid
def initiate_refund(order_id, amount):
idempotency_key = str(uuid.uuid4())
# Pass this key through the entire refund flow
return process_refund(order_id, amount, idempotency_key)
This works well when the key originates from external systems (webhooks, API calls) that already have unique request IDs.
Approach 3: Composite keys
Combine multiple identifiers to create a key that’s both deterministic and globally unique:
def generate_composite_key(user_id, action_type, resource_id, timestamp_bucket):
# Timestamp bucket (e.g., per hour) prevents key collisions
# while keeping keys deterministic within a time window
return f"{user_id}:{action_type}:{resource_id}:{timestamp_bucket}"
We prefer hash-based keys for most production tools because they’re fully deterministic and don’t require external coordination.
Storing and Retrieving Deduplication Data
Your deduplication store needs to be fast, reliable, and accessible from anywhere the tool might run. Redis, DynamoDB, or a relational database with a deduplication table all work.
Here’s a minimal schema:
CREATE TABLE idempotency_log (
idempotency_key VARCHAR(255) PRIMARY KEY,
agent_id VARCHAR(255) NOT NULL,
action_type VARCHAR(100) NOT NULL,
status ENUM('PENDING', 'SUCCESS', 'FAILED') NOT NULL,
result_data JSON,
error_data JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_agent_action (agent_id, action_type, created_at)
);
When a tool is called:
- Check the key:
SELECT * FROM idempotency_log WHERE idempotency_key = ? - If found and successful: Return the stored result immediately
- If found and pending: Wait or return a “still processing” response
- If found and failed: Decide whether to retry or return the error
- If not found: Insert a pending record, execute the tool, and update the result
Side-Effect Logging and Audit Trails
Idempotency and auditability go hand-in-hand. You need to log every side effect so you can answer critical questions: “Did this payment go through?” “How many times did this tool run?” “What was the state of the system when this agent crashed?”
Structured Logging for Side Effects
Every tool call should log:
- What happened (action type, parameters)
- When it happened (timestamp, with millisecond precision)
- Why it happened (which agent, which user request, which conversation turn)
- What the result was (success or failure, return value, side effects)
- Trace context (request ID, agent ID, conversation ID for correlation)
Here’s a logging pattern we use across all production tools:
import logging
import json
from datetime import datetime
logger = logging.getLogger(__name__)
def log_side_effect(action_type, agent_id, idempotency_key, params, result, error=None):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"action_type": action_type,
"agent_id": agent_id,
"idempotency_key": idempotency_key,
"params": params,
"result": result,
"error": error,
"status": "success" if error is None else "failed",
}
logger.info(json.dumps(log_entry))
return log_entry
def transfer_funds(from_account, to_account, amount, agent_id, idempotency_key):
try:
# Check for existing result
existing = check_idempotency_log(idempotency_key)
if existing and existing["status"] == "success":
log_side_effect("transfer_funds", agent_id, idempotency_key,
{"from": from_account, "to": to_account, "amount": amount},
existing["result"], error=None)
return existing["result"]
# Execute the transfer
result = payment_processor.transfer(from_account, to_account, amount)
# Log the success
log_side_effect("transfer_funds", agent_id, idempotency_key,
{"from": from_account, "to": to_account, "amount": amount},
result, error=None)
return result
except Exception as e:
# Log the failure
log_side_effect("transfer_funds", agent_id, idempotency_key,
{"from": from_account, "to": to_account, "amount": amount},
None, error=str(e))
raise
Audit Trail Retention and Compliance
For regulated industries (financial services, healthcare, insurance), audit trails aren’t optional—they’re required. We’ve helped teams implement audit-ready systems that satisfy SOC 2 and ISO 27001 requirements.
Key practices:
- Immutable logging: Once written, logs can’t be modified. Use append-only databases or write-once storage.
- Retention policies: Keep logs for the legally required period (typically 7 years for financial data).
- Access controls: Restrict who can read logs and log all access attempts.
- Encryption: Encrypt logs in transit and at rest.
- Timestamping: Use synchronized clocks (NTP) so timestamps are reliable.
We typically recommend a three-tier logging strategy:
- Tier 1 (Hot): Recent logs (last 30 days) in a fast, queryable database for operational monitoring
- Tier 2 (Warm): Medium-term logs (30 days to 1 year) in cloud storage with indexing
- Tier 3 (Cold): Long-term archive (1+ years) in immutable storage for compliance
This approach balances performance, cost, and compliance requirements.
Handling Partial Failures and Retries
Production agents fail. Networks timeout. APIs return 429 (rate limited). Databases go down. The question isn’t whether failures will happen—it’s how you’ll handle them.
Idempotent Retry Strategies
Not all failures are retryable. Before you retry, ask:
- Is the failure transient? (network timeout, 503 service unavailable) → Retry
- Is the failure permanent? (404 not found, 401 unauthorized) → Don’t retry
- Is the failure idempotent? (same parameters, same idempotency key) → Safe to retry
Here’s a retry pattern that respects these distinctions:
import time
from typing import Callable, Any
def idempotent_retry(
func: Callable,
idempotency_key: str,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_multiplier: float = 2.0,
):
"""Retry a function with exponential backoff, respecting idempotency."""
# Check if we've already succeeded
existing = check_idempotency_log(idempotency_key)
if existing and existing["status"] == "success":
return existing["result"]
# Don't retry if we've already failed permanently
if existing and existing["status"] == "failed":
if existing.get("error_type") == "permanent":
raise Exception(f"Permanent failure: {existing['error']}")
delay = base_delay
last_error = None
for attempt in range(max_retries):
try:
result = func()
record_idempotency_success(idempotency_key, result)
return result
except Exception as e:
last_error = e
error_type = classify_error(e)
# Don't retry permanent errors
if error_type == "permanent":
record_idempotency_failure(idempotency_key, str(e), error_type)
raise
# Don't retry if we've exhausted attempts
if attempt == max_retries - 1:
record_idempotency_failure(idempotency_key, str(e), error_type)
raise
# Wait before retrying (exponential backoff)
wait_time = min(delay, max_delay)
time.sleep(wait_time)
delay *= backoff_multiplier
raise last_error
def classify_error(error: Exception) -> str:
"""Classify an error as transient or permanent."""
if isinstance(error, TimeoutError):
return "transient"
if isinstance(error, ConnectionError):
return "transient"
if "429" in str(error): # Rate limited
return "transient"
if "503" in str(error): # Service unavailable
return "transient"
if "404" in str(error): # Not found
return "permanent"
if "401" in str(error): # Unauthorized
return "permanent"
if "403" in str(error): # Forbidden
return "permanent"
return "transient" # Default to transient (safer)
Handling Partial State and Rollback
Sometimes a tool succeeds partially. For example, an agent updates a customer record but fails to send a confirmation email. The customer data is changed, but the email never went out.
For these scenarios, you have two options:
Option 1: Rollback on Partial Failure
If any step fails, undo all previous steps and return to the initial state. This is the safest approach but can be expensive.
def process_order_with_rollback(order_id, agent_id, idempotency_key):
# Check idempotency first
existing = check_idempotency_log(idempotency_key)
if existing:
return existing["result"]
try:
# Step 1: Validate order
validate_order(order_id)
# Step 2: Process payment
payment_result = process_payment(order_id)
# Step 3: Update inventory
inventory_result = update_inventory(order_id)
# Step 4: Send confirmation
email_result = send_confirmation_email(order_id)
result = {"payment": payment_result, "inventory": inventory_result, "email": email_result}
record_idempotency_success(idempotency_key, result)
return result
except Exception as e:
# Rollback: reverse all changes
try:
reverse_payment(order_id)
restore_inventory(order_id)
except:
pass # Log but don't raise—we're already in error state
record_idempotency_failure(idempotency_key, str(e), "transient")
raise
Option 2: Accept Partial State and Make It Idempotent
Instead of rolling back, make each step idempotent and track which steps have completed. On retry, resume from the last completed step.
def process_order_with_resume(order_id, agent_id, idempotency_key):
# Check idempotency first
existing = check_idempotency_log(idempotency_key)
if existing:
return existing["result"]
# Track which steps have completed
state = {"validation": False, "payment": False, "inventory": False, "email": False}
try:
# Step 1: Validate
if not state["validation"]:
validate_order(order_id)
state["validation"] = True
# Step 2: Payment (idempotent)
if not state["payment"]:
payment_result = process_payment_idempotent(order_id, idempotency_key)
state["payment"] = True
# Step 3: Inventory (idempotent)
if not state["inventory"]:
inventory_result = update_inventory_idempotent(order_id, idempotency_key)
state["inventory"] = True
# Step 4: Email (idempotent)
if not state["email"]:
email_result = send_confirmation_idempotent(order_id, idempotency_key)
state["email"] = True
result = state
record_idempotency_success(idempotency_key, result)
return result
except Exception as e:
# Record partial progress
record_idempotency_partial(idempotency_key, state, str(e))
raise
We prefer option 2 for long-running operations because it’s faster and aligns with how agents naturally operate. But it requires every downstream tool to be idempotent—which brings us to the next section.
Implementing Idempotency in Practice
Now let’s look at concrete implementations across different agent frameworks and scenarios. When you’re working with AI & Agents Automation in production, these patterns need to be baked into your tools from day one.
LangChain Tool Integration
LangChain’s tool system makes it straightforward to wrap tools with idempotency. According to LangChain’s documentation on idempotence, tools can be designed to handle repeated calls safely.
Here’s how we structure idempotent tools in LangChain:
from langchain.tools import tool
from functools import wraps
import hashlib
import json
def idempotent_tool(func):
"""Decorator to make a LangChain tool idempotent."""
@wraps(func)
def wrapper(*args, **kwargs):
# Generate idempotency key from function name and arguments
key_data = json.dumps({
"func": func.__name__,
"args": str(args),
"kwargs": str(kwargs),
}, sort_keys=True)
idempotency_key = hashlib.sha256(key_data.encode()).hexdigest()
# Check for existing result
existing = get_from_cache(idempotency_key)
if existing is not None:
return existing
# Execute and cache
result = func(*args, **kwargs)
set_in_cache(idempotency_key, result)
return result
return wrapper
@tool
@idempotent_tool
def transfer_funds(from_account: str, to_account: str, amount: float) -> str:
"""Transfer funds between accounts. Idempotent: safe to retry."""
# Implementation here
return f"Transferred {amount} from {from_account} to {to_account}"
@tool
@idempotent_tool
def create_invoice(customer_id: str, amount: float, description: str) -> str:
"""Create an invoice. Idempotent: same parameters always produce same invoice."""
# Implementation here
return f"Invoice created for customer {customer_id}"
For more sophisticated patterns, Anthropic’s engineering guide on effective harnesses for long-running agents provides detailed guidance on managing context and tool usage across extended agent sessions.
CrewAI and Multi-Agent Idempotency
When you’re running multiple agents that coordinate with each other, idempotency becomes even more critical. CrewAI’s documentation on idempotency covers this in detail.
In a multi-agent system, you need idempotency not just within each tool, but across agent boundaries:
from crewai import Agent, Task, Crew
from crewai.tools import BaseTool
class IdempotentBaseTool(BaseTool):
"""Base class for idempotent tools in CrewAI."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.idempotency_store = {} # In production, use Redis/DB
def _run(self, *args, **kwargs):
# Generate key from args and kwargs
key = self._generate_key(*args, **kwargs)
# Check cache
if key in self.idempotency_store:
return self.idempotency_store[key]
# Execute
result = self._execute(*args, **kwargs)
# Cache result
self.idempotency_store[key] = result
return result
def _generate_key(self, *args, **kwargs):
"""Generate idempotency key from arguments."""
raise NotImplementedError
def _execute(self, *args, **kwargs):
"""Execute the actual tool logic."""
raise NotImplementedError
class ProcessRefundTool(IdempotentBaseTool):
name = "process_refund"
description = "Process a refund for an order"
def _generate_key(self, order_id, refund_amount):
return f"refund:{order_id}:{refund_amount}"
def _execute(self, order_id, refund_amount):
# Call payment processor
return payment_api.issue_refund(order_id, refund_amount)
# Use in agents
refund_agent = Agent(
role="Refund Processor",
goal="Process customer refunds accurately",
tools=[ProcessRefundTool()],
)
Database-Backed Idempotency for APIs
When your tools call external APIs (Stripe, Salesforce, HubSpot), you need to coordinate idempotency with the external system. Most modern APIs support idempotency keys natively.
Here’s how we structure this:
import stripe
import uuid
def create_stripe_charge_idempotent(customer_id: str, amount: int, description: str):
"""Create a Stripe charge with idempotency."""
# Generate a deterministic idempotency key
idempotency_key = f"charge:{customer_id}:{amount}:{description}"
# Check our local idempotency log
existing = db.query(
"SELECT charge_id, status FROM idempotency_log WHERE key = %s",
(idempotency_key,)
).fetchone()
if existing:
if existing["status"] == "success":
return {"charge_id": existing["charge_id"], "status": "success", "cached": True}
elif existing["status"] == "failed":
raise Exception(f"Previously failed: {existing['error']}")
try:
# Pass idempotency key to Stripe
charge = stripe.Charge.create(
amount=amount,
currency="gbp",
customer=customer_id,
description=description,
idempotency_key=idempotency_key, # Stripe respects this
)
# Log success
db.execute(
"INSERT INTO idempotency_log (key, charge_id, status) VALUES (%s, %s, %s)",
(idempotency_key, charge.id, "success")
)
return {"charge_id": charge.id, "status": "success", "cached": False}
except Exception as e:
# Log failure
db.execute(
"INSERT INTO idempotency_log (key, status, error) VALUES (%s, %s, %s)",
(idempotency_key, "failed", str(e))
)
raise
This approach gives you a safety net: even if Stripe’s idempotency fails for some reason, your local log prevents duplicate charges.
Testing Idempotent Tools
Idempotency is too important to test manually. You need automated tests that verify idempotent behaviour under realistic failure scenarios.
Unit Tests for Idempotency
import pytest
from unittest.mock import patch, MagicMock
class TestIdempotentTools:
def test_repeated_calls_return_same_result(self):
"""Calling the same tool twice should return the same result."""
result1 = transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
result2 = transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
assert result1 == result2
def test_different_keys_execute_independently(self):
"""Different idempotency keys should execute independently."""
result1 = transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
result2 = transfer_funds("account_a", "account_b", 100, "agent_1", "key_456")
# Results should be different objects (different executions)
assert result1 is not result2
def test_side_effect_happens_once(self):
"""Side effect should only occur once, even with multiple calls."""
with patch('payment_processor.transfer') as mock_transfer:
mock_transfer.return_value = {"status": "success"}
transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
# Side effect should only happen once
assert mock_transfer.call_count == 1
def test_failure_is_recorded_and_not_retried_for_permanent_errors(self):
"""Permanent errors should not be retried."""
with patch('payment_processor.transfer') as mock_transfer:
mock_transfer.side_effect = Exception("Invalid account (404)")
with pytest.raises(Exception):
transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
# Second call should immediately raise without retrying
with pytest.raises(Exception):
transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
# Side effect should only be attempted once
assert mock_transfer.call_count == 1
def test_transient_errors_are_retried(self):
"""Transient errors should be retried."""
with patch('payment_processor.transfer') as mock_transfer:
# Fail twice, succeed on third
mock_transfer.side_effect = [
TimeoutError("Network timeout"),
TimeoutError("Network timeout"),
{"status": "success"},
]
result = transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
assert result["status"] == "success"
assert mock_transfer.call_count == 3
Chaos Testing for Agent Restarts
The real test is what happens when your agent crashes and restarts. Simulate this:
def test_agent_restart_during_tool_execution():
"""Simulate agent restart mid-tool-execution."""
# Simulate: Agent starts executing transfer
agent = Agent(tools=[transfer_funds_tool])
# Simulate: Agent crashes after tool starts but before completion
with patch('payment_processor.transfer') as mock_transfer:
# First call: starts but takes time
mock_transfer.side_effect = [
TimeoutError("Network timeout"), # Agent crashes here
{"status": "success"}, # Agent restarts and retries
]
result = agent.execute_tool("transfer_funds", {
"from": "account_a",
"to": "account_b",
"amount": 100,
"idempotency_key": "key_123"
})
assert result["status"] == "success"
# Verify side effect only happened once
assert mock_transfer.call_count == 2 # One failed attempt, one success
Integration Tests with Real External Services
For critical tools, test against real external services in a sandbox environment:
@pytest.mark.integration
def test_stripe_refund_idempotency():
"""Test refund idempotency against real Stripe sandbox."""
# Create a test charge
charge = stripe.Charge.create(
amount=1000,
currency="gbp",
source="tok_visa",
)
# Issue refund with idempotency key
refund1 = stripe.Refund.create(
charge=charge.id,
idempotency_key="refund:test:1000"
)
# Issue same refund again
refund2 = stripe.Refund.create(
charge=charge.id,
idempotency_key="refund:test:1000"
)
# Should be the same refund
assert refund1.id == refund2.id
# Verify charge only refunded once
charge_updated = stripe.Charge.retrieve(charge.id)
assert charge_updated.amount_refunded == 1000
Monitoring and Observability
You can’t fix what you can’t see. Monitoring idempotent tools means tracking:
- Cache hit rate: What percentage of tool calls are served from cache?
- Retry rate: How often do tools fail and get retried?
- Idempotency key collisions: Are different actions accidentally getting the same key?
- Partial failures: How often do tools fail partway through?
Here’s a monitoring dashboard we typically set up:
from prometheus_client import Counter, Histogram, Gauge
# Metrics
idempotent_cache_hits = Counter(
'idempotent_cache_hits_total',
'Total idempotent cache hits',
['tool_name', 'agent_id']
)
idempotent_cache_misses = Counter(
'idempotent_cache_misses_total',
'Total idempotent cache misses',
['tool_name', 'agent_id']
)
tool_execution_time = Histogram(
'tool_execution_seconds',
'Tool execution time in seconds',
['tool_name', 'agent_id']
)
retry_attempts = Counter(
'tool_retry_attempts_total',
'Total retry attempts',
['tool_name', 'agent_id', 'error_type']
)
idempotency_key_collisions = Counter(
'idempotency_key_collisions_total',
'Idempotency key collisions (different actions, same key)',
['tool_name']
)
# Usage in tool
def monitored_idempotent_tool(tool_name, agent_id):
def decorator(func):
def wrapper(*args, **kwargs):
key = generate_key(*args, **kwargs)
existing = check_cache(key)
if existing:
idempotent_cache_hits.labels(tool_name, agent_id).inc()
return existing
idempotent_cache_misses.labels(tool_name, agent_id).inc()
with tool_execution_time.labels(tool_name, agent_id).time():
try:
result = func(*args, **kwargs)
cache_result(key, result)
return result
except Exception as e:
error_type = classify_error(e)
retry_attempts.labels(tool_name, agent_id, error_type).inc()
raise
return wrapper
return decorator
Key alerts to set up:
- Cache hit rate drops below 80%: Possible issue with key generation
- Retry rate spikes: Possible upstream service degradation
- Execution time > 10x normal: Possible database or network issue
- Idempotency key collisions detected: Bug in key generation logic
Common Pitfalls and How to Avoid Them
Pitfall 1: Idempotency Keys Based on Timestamps
Wrong:
def generate_key():
return str(time.time()) # WRONG: Different every time
Right:
def generate_key(user_id, action_type, resource_id):
return f"{user_id}:{action_type}:{resource_id}"
Idempotency keys must be deterministic. If you generate a new key every time, retries will create duplicates.
Pitfall 2: Checking Idempotency After Side Effects
Wrong:
def transfer_funds(from_account, to_account, amount, key):
# Execute first
payment_processor.transfer(from_account, to_account, amount)
# Check idempotency after (too late!)
if check_cache(key):
return get_from_cache(key)
cache_result(key, "success")
return "success"
Right:
def transfer_funds(from_account, to_account, amount, key):
# Check idempotency first
cached = check_cache(key)
if cached:
return cached
# Then execute
result = payment_processor.transfer(from_account, to_account, amount)
# Then cache
cache_result(key, result)
return result
Always check before executing. This is the critical ordering.
Pitfall 3: Forgetting to Handle Concurrent Requests
If two requests with the same idempotency key arrive simultaneously, both might execute. Use database constraints or locks:
def transfer_funds(from_account, to_account, amount, key):
# Use database constraint to prevent race conditions
try:
db.execute(
"INSERT INTO idempotency_log (key, status) VALUES (%s, %s)",
(key, "pending")
)
except IntegrityError:
# Key already exists, another request is handling it
# Wait for it to complete
return wait_for_completion(key)
try:
result = payment_processor.transfer(from_account, to_account, amount)
db.execute(
"UPDATE idempotency_log SET status = %s, result = %s WHERE key = %s",
("success", result, key)
)
return result
except Exception as e:
db.execute(
"UPDATE idempotency_log SET status = %s, error = %s WHERE key = %s",
("failed", str(e), key)
)
raise
Pitfall 4: Not Logging Side Effects
Wrong: Tool executes but logs nothing
Right: Every tool execution logs:
- When it happened
- What parameters it received
- What the result was
- Whether it was cached or executed
- How long it took
This is non-negotiable for production systems, especially if you’re pursuing SOC 2 or ISO 27001 compliance. We’ve helped teams implement Security Audit (SOC 2 / ISO 27001) via Vanta and comprehensive logging is always a requirement.
Pitfall 5: Treating All Errors the Same
Wrong:
for i in range(3):
try:
return execute_tool()
except:
time.sleep(2 ** i) # Retry everything
Right:
for i in range(3):
try:
return execute_tool()
except PermanentError:
raise # Don't retry
except TransientError:
time.sleep(2 ** i) # Retry
Classify errors correctly. Retrying a 404 or 401 is waste.
Pitfall 6: Idempotency Without Consistency Checks
Even with idempotent tools, things can go wrong. Add consistency checks:
def transfer_funds_with_verification(from_account, to_account, amount, key):
# Check idempotency
cached = check_cache(key)
if cached:
# Verify the transfer actually completed
verify_transfer(from_account, to_account, amount)
return cached
# Execute
result = payment_processor.transfer(from_account, to_account, amount)
# Verify
assert verify_transfer(from_account, to_account, amount)
cache_result(key, result)
return result
Summary and Next Steps
Idempotent tools are the foundation of reliable production agents. They’re not optional—they’re essential. Every tool you ship should follow these patterns:
- Generate deterministic idempotency keys from action parameters
- Check for existing results before executing side effects
- Log every execution with full context and timestamps
- Classify errors correctly and only retry transient failures
- Test idempotency thoroughly including agent restart scenarios
- Monitor cache hits, retries, and key collisions in production
- Handle concurrent requests with database constraints or locks
When you get this right, your agents become bulletproof. Retries work. Restarts are safe. Audits pass. And you sleep at night.
If you’re building agentic AI systems at scale, this is where the real engineering happens. It’s not sexy, but it’s what separates production systems from prototypes.
At PADISO, we’ve built these patterns into every AI & Agents Automation project we ship. Whether you’re a startup building your first agent or an enterprise modernising your platform engineering stack, we’ve seen the patterns that work and the ones that don’t.
If you’re ready to ship production-grade agents with the confidence that retries are safe, side effects are tracked, and your data stays consistent—let’s talk. We work with founders and operators across AI automation for customer service, supply chain automation, financial services, healthcare, and beyond.
Visit PADISO to learn more about our CTO as a Service and AI Strategy & Readiness offerings, or reach out to discuss your specific use case. We’ve helped teams ship agents that handle millions of transactions safely—and we can help you do the same.