PADISO.ai: AI Agent Orchestration Platform - Launching May 2026
Back to Blog
Guide 22 mins

Building Idempotent Tools for Long-Running Agents

Master idempotent tool design for production AI agents. Learn retry patterns, deduplication, side-effect logging, and safe agent restarts.

The PADISO Team ·2026-05-15

Building Idempotent Tools for Long-Running Agents

When you deploy an AI agent to production, you’re not deploying something that runs once and stops. You’re deploying something that runs for hours, days, or weeks—handling requests, making decisions, calling APIs, and moving money or data around. And when that agent crashes, hangs, or restarts mid-task, you need absolute certainty that it won’t double-charge a customer, duplicate an order, or leave your database in an inconsistent state.

That’s where idempotency comes in.

At PADISO, we’ve shipped dozens of production agents across financial services, supply chain, customer service, and healthcare. Every single tool we build follows the same non-negotiable pattern: idempotency first. Not as an afterthought. Not as a nice-to-have. As the foundation.

This guide walks you through the exact design patterns, code patterns, and operational practices we use to make agent restarts safe, retries bulletproof, and side effects traceable. Whether you’re building with LangChain, CrewAI, or a custom framework, these patterns apply.

Table of Contents


Why Idempotency Matters for Agents

A tool is idempotent if calling it multiple times with the same inputs produces the same result as calling it once. In plain English: if your agent retries a payment transfer three times, the customer’s account should only be debited once—not three times.

This is non-trivial because agents are non-deterministic. They make decisions based on prompts, model outputs, and external state. They call tools in sequences they weren’t explicitly programmed to follow. And when something fails—a timeout, a network glitch, a rate limit—the agent restarts and may retry the same tool call.

Without idempotency, that retry becomes a liability. You get duplicate charges, duplicate orders, duplicate database records, and the operational chaos that follows.

Consider a real scenario: you’re building an agentic AI system for an e-commerce platform that processes refunds. The agent receives a refund request, calculates the amount, calls the payment processor to reverse the charge, updates the order status, and sends a confirmation email. If the agent crashes after the payment reversal but before the email, and then restarts and retries the entire sequence, you’ve now issued the refund twice.

Idempotency prevents this. With the right patterns in place, that second refund attempt becomes a no-op—it returns the same result (“refund already processed”) without actually charging the customer again.

For startups and enterprises alike, this isn’t theoretical. We’ve seen teams lose tens of thousands of pounds to duplicate charges, and spend weeks cleaning up corrupted data because their agent tools weren’t idempotent. The cost of getting it right upfront is a fraction of the cost of fixing it in production.


Core Idempotency Patterns

The Idempotency Key Pattern

The foundation of idempotent tools is the idempotency key: a unique identifier that represents a specific action or request. When a tool is called with the same idempotency key, it returns the same result without repeating the side effect.

This pattern is used by every major payment processor (Stripe, PayPal, Square) and is the industry standard for good reason.

Here’s how it works:

  1. Generate or receive an idempotency key when the action is first requested. This key should be deterministic and unique to the action.
  2. Store the key and result before executing the side effect. This is critical: you write to storage first, then execute.
  3. Check for the key on every call. If it exists, return the stored result immediately without re-executing.
  4. Execute the side effect only if the key is new.
  5. Update the result once the side effect completes.

The key insight: you’re trading a small amount of storage (a key-value entry) for absolute certainty that side effects don’t repeat.

The State Machine Pattern

For longer-running operations, idempotency becomes more complex. A single tool call might involve multiple steps, and the agent might restart at any point. The solution is to model your tool as a state machine.

Each state represents a checkpoint in the operation. When the agent retries, it checks the current state and resumes from there—not from the beginning.

For example, a “process invoice” tool might have states like:

  • PENDING: Initial state, idempotency key stored but no side effects yet
  • VALIDATED: Invoice data validated, not yet submitted
  • SUBMITTED: Submitted to accounting system, awaiting confirmation
  • CONFIRMED: Confirmed and recorded
  • FAILED: Permanent failure, no retry

Each state transition is idempotent. If the agent restarts while the invoice is in the SUBMITTED state, it checks the accounting system for confirmation—it doesn’t re-submit.

The Write-Ahead Log Pattern

For critical operations, write everything to a log before executing. This gives you a complete audit trail and allows recovery if something goes wrong.

The pattern:

  1. Log the intent: “Agent is about to transfer £5,000 from account A to account B with idempotency key XYZ”
  2. Execute the transfer
  3. Log the result: “Transfer completed successfully” or “Transfer failed with error ABC”

If the agent crashes between steps 1 and 2, the log shows the intent but no result. On restart, the agent can check the log, see the incomplete operation, and either retry or roll back depending on the situation.

This pattern is essential for financial operations, data migrations, and any tool with high-cost side effects.


Deduplication Keys and Request IDs

Deduplication keys are the practical implementation of idempotency. They’re the mechanism that prevents duplicate work.

Generating Deduplication Keys

A good deduplication key should be:

  • Unique to the action: Different actions get different keys
  • Deterministic: The same action always generates the same key
  • Traceable: You can look up the key in logs and understand what it represents
  • Collision-resistant: Extremely low probability of two different actions generating the same key

Common approaches:

Approach 1: Hash-based keys

Hash the action parameters (agent ID, user ID, action type, timestamp, relevant data) to generate a key:

import hashlib
import json

def generate_idempotency_key(agent_id, user_id, action_type, action_params):
    key_data = json.dumps({
        "agent_id": agent_id,
        "user_id": user_id,
        "action_type": action_type,
        "params": action_params,
    }, sort_keys=True)
    return hashlib.sha256(key_data.encode()).hexdigest()

This ensures the same agent, user, action type, and parameters always generate the same key. Retries will hit the same key and get cached results.

Approach 2: UUID-based keys

Generate a UUID at the start of the operation and pass it through the entire agent execution:

import uuid

def initiate_refund(order_id, amount):
    idempotency_key = str(uuid.uuid4())
    # Pass this key through the entire refund flow
    return process_refund(order_id, amount, idempotency_key)

This works well when the key originates from external systems (webhooks, API calls) that already have unique request IDs.

Approach 3: Composite keys

Combine multiple identifiers to create a key that’s both deterministic and globally unique:

def generate_composite_key(user_id, action_type, resource_id, timestamp_bucket):
    # Timestamp bucket (e.g., per hour) prevents key collisions
    # while keeping keys deterministic within a time window
    return f"{user_id}:{action_type}:{resource_id}:{timestamp_bucket}"

We prefer hash-based keys for most production tools because they’re fully deterministic and don’t require external coordination.

Storing and Retrieving Deduplication Data

Your deduplication store needs to be fast, reliable, and accessible from anywhere the tool might run. Redis, DynamoDB, or a relational database with a deduplication table all work.

Here’s a minimal schema:

CREATE TABLE idempotency_log (
    idempotency_key VARCHAR(255) PRIMARY KEY,
    agent_id VARCHAR(255) NOT NULL,
    action_type VARCHAR(100) NOT NULL,
    status ENUM('PENDING', 'SUCCESS', 'FAILED') NOT NULL,
    result_data JSON,
    error_data JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_agent_action (agent_id, action_type, created_at)
);

When a tool is called:

  1. Check the key: SELECT * FROM idempotency_log WHERE idempotency_key = ?
  2. If found and successful: Return the stored result immediately
  3. If found and pending: Wait or return a “still processing” response
  4. If found and failed: Decide whether to retry or return the error
  5. If not found: Insert a pending record, execute the tool, and update the result

Side-Effect Logging and Audit Trails

Idempotency and auditability go hand-in-hand. You need to log every side effect so you can answer critical questions: “Did this payment go through?” “How many times did this tool run?” “What was the state of the system when this agent crashed?”

Structured Logging for Side Effects

Every tool call should log:

  • What happened (action type, parameters)
  • When it happened (timestamp, with millisecond precision)
  • Why it happened (which agent, which user request, which conversation turn)
  • What the result was (success or failure, return value, side effects)
  • Trace context (request ID, agent ID, conversation ID for correlation)

Here’s a logging pattern we use across all production tools:

import logging
import json
from datetime import datetime

logger = logging.getLogger(__name__)

def log_side_effect(action_type, agent_id, idempotency_key, params, result, error=None):
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "action_type": action_type,
        "agent_id": agent_id,
        "idempotency_key": idempotency_key,
        "params": params,
        "result": result,
        "error": error,
        "status": "success" if error is None else "failed",
    }
    logger.info(json.dumps(log_entry))
    return log_entry

def transfer_funds(from_account, to_account, amount, agent_id, idempotency_key):
    try:
        # Check for existing result
        existing = check_idempotency_log(idempotency_key)
        if existing and existing["status"] == "success":
            log_side_effect("transfer_funds", agent_id, idempotency_key,
                          {"from": from_account, "to": to_account, "amount": amount},
                          existing["result"], error=None)
            return existing["result"]
        
        # Execute the transfer
        result = payment_processor.transfer(from_account, to_account, amount)
        
        # Log the success
        log_side_effect("transfer_funds", agent_id, idempotency_key,
                       {"from": from_account, "to": to_account, "amount": amount},
                       result, error=None)
        
        return result
    except Exception as e:
        # Log the failure
        log_side_effect("transfer_funds", agent_id, idempotency_key,
                       {"from": from_account, "to": to_account, "amount": amount},
                       None, error=str(e))
        raise

Audit Trail Retention and Compliance

For regulated industries (financial services, healthcare, insurance), audit trails aren’t optional—they’re required. We’ve helped teams implement audit-ready systems that satisfy SOC 2 and ISO 27001 requirements.

Key practices:

  1. Immutable logging: Once written, logs can’t be modified. Use append-only databases or write-once storage.
  2. Retention policies: Keep logs for the legally required period (typically 7 years for financial data).
  3. Access controls: Restrict who can read logs and log all access attempts.
  4. Encryption: Encrypt logs in transit and at rest.
  5. Timestamping: Use synchronized clocks (NTP) so timestamps are reliable.

We typically recommend a three-tier logging strategy:

  • Tier 1 (Hot): Recent logs (last 30 days) in a fast, queryable database for operational monitoring
  • Tier 2 (Warm): Medium-term logs (30 days to 1 year) in cloud storage with indexing
  • Tier 3 (Cold): Long-term archive (1+ years) in immutable storage for compliance

This approach balances performance, cost, and compliance requirements.


Handling Partial Failures and Retries

Production agents fail. Networks timeout. APIs return 429 (rate limited). Databases go down. The question isn’t whether failures will happen—it’s how you’ll handle them.

Idempotent Retry Strategies

Not all failures are retryable. Before you retry, ask:

  1. Is the failure transient? (network timeout, 503 service unavailable) → Retry
  2. Is the failure permanent? (404 not found, 401 unauthorized) → Don’t retry
  3. Is the failure idempotent? (same parameters, same idempotency key) → Safe to retry

Here’s a retry pattern that respects these distinctions:

import time
from typing import Callable, Any

def idempotent_retry(
    func: Callable,
    idempotency_key: str,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    backoff_multiplier: float = 2.0,
):
    """Retry a function with exponential backoff, respecting idempotency."""
    
    # Check if we've already succeeded
    existing = check_idempotency_log(idempotency_key)
    if existing and existing["status"] == "success":
        return existing["result"]
    
    # Don't retry if we've already failed permanently
    if existing and existing["status"] == "failed":
        if existing.get("error_type") == "permanent":
            raise Exception(f"Permanent failure: {existing['error']}")
    
    delay = base_delay
    last_error = None
    
    for attempt in range(max_retries):
        try:
            result = func()
            record_idempotency_success(idempotency_key, result)
            return result
        except Exception as e:
            last_error = e
            error_type = classify_error(e)
            
            # Don't retry permanent errors
            if error_type == "permanent":
                record_idempotency_failure(idempotency_key, str(e), error_type)
                raise
            
            # Don't retry if we've exhausted attempts
            if attempt == max_retries - 1:
                record_idempotency_failure(idempotency_key, str(e), error_type)
                raise
            
            # Wait before retrying (exponential backoff)
            wait_time = min(delay, max_delay)
            time.sleep(wait_time)
            delay *= backoff_multiplier
    
    raise last_error

def classify_error(error: Exception) -> str:
    """Classify an error as transient or permanent."""
    if isinstance(error, TimeoutError):
        return "transient"
    if isinstance(error, ConnectionError):
        return "transient"
    if "429" in str(error):  # Rate limited
        return "transient"
    if "503" in str(error):  # Service unavailable
        return "transient"
    if "404" in str(error):  # Not found
        return "permanent"
    if "401" in str(error):  # Unauthorized
        return "permanent"
    if "403" in str(error):  # Forbidden
        return "permanent"
    return "transient"  # Default to transient (safer)

Handling Partial State and Rollback

Sometimes a tool succeeds partially. For example, an agent updates a customer record but fails to send a confirmation email. The customer data is changed, but the email never went out.

For these scenarios, you have two options:

Option 1: Rollback on Partial Failure

If any step fails, undo all previous steps and return to the initial state. This is the safest approach but can be expensive.

def process_order_with_rollback(order_id, agent_id, idempotency_key):
    # Check idempotency first
    existing = check_idempotency_log(idempotency_key)
    if existing:
        return existing["result"]
    
    try:
        # Step 1: Validate order
        validate_order(order_id)
        
        # Step 2: Process payment
        payment_result = process_payment(order_id)
        
        # Step 3: Update inventory
        inventory_result = update_inventory(order_id)
        
        # Step 4: Send confirmation
        email_result = send_confirmation_email(order_id)
        
        result = {"payment": payment_result, "inventory": inventory_result, "email": email_result}
        record_idempotency_success(idempotency_key, result)
        return result
    
    except Exception as e:
        # Rollback: reverse all changes
        try:
            reverse_payment(order_id)
            restore_inventory(order_id)
        except:
            pass  # Log but don't raise—we're already in error state
        
        record_idempotency_failure(idempotency_key, str(e), "transient")
        raise

Option 2: Accept Partial State and Make It Idempotent

Instead of rolling back, make each step idempotent and track which steps have completed. On retry, resume from the last completed step.

def process_order_with_resume(order_id, agent_id, idempotency_key):
    # Check idempotency first
    existing = check_idempotency_log(idempotency_key)
    if existing:
        return existing["result"]
    
    # Track which steps have completed
    state = {"validation": False, "payment": False, "inventory": False, "email": False}
    
    try:
        # Step 1: Validate
        if not state["validation"]:
            validate_order(order_id)
            state["validation"] = True
        
        # Step 2: Payment (idempotent)
        if not state["payment"]:
            payment_result = process_payment_idempotent(order_id, idempotency_key)
            state["payment"] = True
        
        # Step 3: Inventory (idempotent)
        if not state["inventory"]:
            inventory_result = update_inventory_idempotent(order_id, idempotency_key)
            state["inventory"] = True
        
        # Step 4: Email (idempotent)
        if not state["email"]:
            email_result = send_confirmation_idempotent(order_id, idempotency_key)
            state["email"] = True
        
        result = state
        record_idempotency_success(idempotency_key, result)
        return result
    
    except Exception as e:
        # Record partial progress
        record_idempotency_partial(idempotency_key, state, str(e))
        raise

We prefer option 2 for long-running operations because it’s faster and aligns with how agents naturally operate. But it requires every downstream tool to be idempotent—which brings us to the next section.


Implementing Idempotency in Practice

Now let’s look at concrete implementations across different agent frameworks and scenarios. When you’re working with AI & Agents Automation in production, these patterns need to be baked into your tools from day one.

LangChain Tool Integration

LangChain’s tool system makes it straightforward to wrap tools with idempotency. According to LangChain’s documentation on idempotence, tools can be designed to handle repeated calls safely.

Here’s how we structure idempotent tools in LangChain:

from langchain.tools import tool
from functools import wraps
import hashlib
import json

def idempotent_tool(func):
    """Decorator to make a LangChain tool idempotent."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # Generate idempotency key from function name and arguments
        key_data = json.dumps({
            "func": func.__name__,
            "args": str(args),
            "kwargs": str(kwargs),
        }, sort_keys=True)
        idempotency_key = hashlib.sha256(key_data.encode()).hexdigest()
        
        # Check for existing result
        existing = get_from_cache(idempotency_key)
        if existing is not None:
            return existing
        
        # Execute and cache
        result = func(*args, **kwargs)
        set_in_cache(idempotency_key, result)
        return result
    
    return wrapper

@tool
@idempotent_tool
def transfer_funds(from_account: str, to_account: str, amount: float) -> str:
    """Transfer funds between accounts. Idempotent: safe to retry."""
    # Implementation here
    return f"Transferred {amount} from {from_account} to {to_account}"

@tool
@idempotent_tool
def create_invoice(customer_id: str, amount: float, description: str) -> str:
    """Create an invoice. Idempotent: same parameters always produce same invoice."""
    # Implementation here
    return f"Invoice created for customer {customer_id}"

For more sophisticated patterns, Anthropic’s engineering guide on effective harnesses for long-running agents provides detailed guidance on managing context and tool usage across extended agent sessions.

CrewAI and Multi-Agent Idempotency

When you’re running multiple agents that coordinate with each other, idempotency becomes even more critical. CrewAI’s documentation on idempotency covers this in detail.

In a multi-agent system, you need idempotency not just within each tool, but across agent boundaries:

from crewai import Agent, Task, Crew
from crewai.tools import BaseTool

class IdempotentBaseTool(BaseTool):
    """Base class for idempotent tools in CrewAI."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.idempotency_store = {}  # In production, use Redis/DB
    
    def _run(self, *args, **kwargs):
        # Generate key from args and kwargs
        key = self._generate_key(*args, **kwargs)
        
        # Check cache
        if key in self.idempotency_store:
            return self.idempotency_store[key]
        
        # Execute
        result = self._execute(*args, **kwargs)
        
        # Cache result
        self.idempotency_store[key] = result
        return result
    
    def _generate_key(self, *args, **kwargs):
        """Generate idempotency key from arguments."""
        raise NotImplementedError
    
    def _execute(self, *args, **kwargs):
        """Execute the actual tool logic."""
        raise NotImplementedError

class ProcessRefundTool(IdempotentBaseTool):
    name = "process_refund"
    description = "Process a refund for an order"
    
    def _generate_key(self, order_id, refund_amount):
        return f"refund:{order_id}:{refund_amount}"
    
    def _execute(self, order_id, refund_amount):
        # Call payment processor
        return payment_api.issue_refund(order_id, refund_amount)

# Use in agents
refund_agent = Agent(
    role="Refund Processor",
    goal="Process customer refunds accurately",
    tools=[ProcessRefundTool()],
)

Database-Backed Idempotency for APIs

When your tools call external APIs (Stripe, Salesforce, HubSpot), you need to coordinate idempotency with the external system. Most modern APIs support idempotency keys natively.

Here’s how we structure this:

import stripe
import uuid

def create_stripe_charge_idempotent(customer_id: str, amount: int, description: str):
    """Create a Stripe charge with idempotency."""
    
    # Generate a deterministic idempotency key
    idempotency_key = f"charge:{customer_id}:{amount}:{description}"
    
    # Check our local idempotency log
    existing = db.query(
        "SELECT charge_id, status FROM idempotency_log WHERE key = %s",
        (idempotency_key,)
    ).fetchone()
    
    if existing:
        if existing["status"] == "success":
            return {"charge_id": existing["charge_id"], "status": "success", "cached": True}
        elif existing["status"] == "failed":
            raise Exception(f"Previously failed: {existing['error']}")
    
    try:
        # Pass idempotency key to Stripe
        charge = stripe.Charge.create(
            amount=amount,
            currency="gbp",
            customer=customer_id,
            description=description,
            idempotency_key=idempotency_key,  # Stripe respects this
        )
        
        # Log success
        db.execute(
            "INSERT INTO idempotency_log (key, charge_id, status) VALUES (%s, %s, %s)",
            (idempotency_key, charge.id, "success")
        )
        
        return {"charge_id": charge.id, "status": "success", "cached": False}
    
    except Exception as e:
        # Log failure
        db.execute(
            "INSERT INTO idempotency_log (key, status, error) VALUES (%s, %s, %s)",
            (idempotency_key, "failed", str(e))
        )
        raise

This approach gives you a safety net: even if Stripe’s idempotency fails for some reason, your local log prevents duplicate charges.


Testing Idempotent Tools

Idempotency is too important to test manually. You need automated tests that verify idempotent behaviour under realistic failure scenarios.

Unit Tests for Idempotency

import pytest
from unittest.mock import patch, MagicMock

class TestIdempotentTools:
    
    def test_repeated_calls_return_same_result(self):
        """Calling the same tool twice should return the same result."""
        result1 = transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
        result2 = transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
        
        assert result1 == result2
    
    def test_different_keys_execute_independently(self):
        """Different idempotency keys should execute independently."""
        result1 = transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
        result2 = transfer_funds("account_a", "account_b", 100, "agent_1", "key_456")
        
        # Results should be different objects (different executions)
        assert result1 is not result2
    
    def test_side_effect_happens_once(self):
        """Side effect should only occur once, even with multiple calls."""
        with patch('payment_processor.transfer') as mock_transfer:
            mock_transfer.return_value = {"status": "success"}
            
            transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
            transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
            transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
            
            # Side effect should only happen once
            assert mock_transfer.call_count == 1
    
    def test_failure_is_recorded_and_not_retried_for_permanent_errors(self):
        """Permanent errors should not be retried."""
        with patch('payment_processor.transfer') as mock_transfer:
            mock_transfer.side_effect = Exception("Invalid account (404)")
            
            with pytest.raises(Exception):
                transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
            
            # Second call should immediately raise without retrying
            with pytest.raises(Exception):
                transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
            
            # Side effect should only be attempted once
            assert mock_transfer.call_count == 1
    
    def test_transient_errors_are_retried(self):
        """Transient errors should be retried."""
        with patch('payment_processor.transfer') as mock_transfer:
            # Fail twice, succeed on third
            mock_transfer.side_effect = [
                TimeoutError("Network timeout"),
                TimeoutError("Network timeout"),
                {"status": "success"},
            ]
            
            result = transfer_funds("account_a", "account_b", 100, "agent_1", "key_123")
            
            assert result["status"] == "success"
            assert mock_transfer.call_count == 3

Chaos Testing for Agent Restarts

The real test is what happens when your agent crashes and restarts. Simulate this:

def test_agent_restart_during_tool_execution():
    """Simulate agent restart mid-tool-execution."""
    
    # Simulate: Agent starts executing transfer
    agent = Agent(tools=[transfer_funds_tool])
    
    # Simulate: Agent crashes after tool starts but before completion
    with patch('payment_processor.transfer') as mock_transfer:
        # First call: starts but takes time
        mock_transfer.side_effect = [
            TimeoutError("Network timeout"),  # Agent crashes here
            {"status": "success"},  # Agent restarts and retries
        ]
        
        result = agent.execute_tool("transfer_funds", {
            "from": "account_a",
            "to": "account_b",
            "amount": 100,
            "idempotency_key": "key_123"
        })
        
        assert result["status"] == "success"
        # Verify side effect only happened once
        assert mock_transfer.call_count == 2  # One failed attempt, one success

Integration Tests with Real External Services

For critical tools, test against real external services in a sandbox environment:

@pytest.mark.integration
def test_stripe_refund_idempotency():
    """Test refund idempotency against real Stripe sandbox."""
    
    # Create a test charge
    charge = stripe.Charge.create(
        amount=1000,
        currency="gbp",
        source="tok_visa",
    )
    
    # Issue refund with idempotency key
    refund1 = stripe.Refund.create(
        charge=charge.id,
        idempotency_key="refund:test:1000"
    )
    
    # Issue same refund again
    refund2 = stripe.Refund.create(
        charge=charge.id,
        idempotency_key="refund:test:1000"
    )
    
    # Should be the same refund
    assert refund1.id == refund2.id
    
    # Verify charge only refunded once
    charge_updated = stripe.Charge.retrieve(charge.id)
    assert charge_updated.amount_refunded == 1000

Monitoring and Observability

You can’t fix what you can’t see. Monitoring idempotent tools means tracking:

  1. Cache hit rate: What percentage of tool calls are served from cache?
  2. Retry rate: How often do tools fail and get retried?
  3. Idempotency key collisions: Are different actions accidentally getting the same key?
  4. Partial failures: How often do tools fail partway through?

Here’s a monitoring dashboard we typically set up:

from prometheus_client import Counter, Histogram, Gauge

# Metrics
idempotent_cache_hits = Counter(
    'idempotent_cache_hits_total',
    'Total idempotent cache hits',
    ['tool_name', 'agent_id']
)

idempotent_cache_misses = Counter(
    'idempotent_cache_misses_total',
    'Total idempotent cache misses',
    ['tool_name', 'agent_id']
)

tool_execution_time = Histogram(
    'tool_execution_seconds',
    'Tool execution time in seconds',
    ['tool_name', 'agent_id']
)

retry_attempts = Counter(
    'tool_retry_attempts_total',
    'Total retry attempts',
    ['tool_name', 'agent_id', 'error_type']
)

idempotency_key_collisions = Counter(
    'idempotency_key_collisions_total',
    'Idempotency key collisions (different actions, same key)',
    ['tool_name']
)

# Usage in tool
def monitored_idempotent_tool(tool_name, agent_id):
    def decorator(func):
        def wrapper(*args, **kwargs):
            key = generate_key(*args, **kwargs)
            existing = check_cache(key)
            
            if existing:
                idempotent_cache_hits.labels(tool_name, agent_id).inc()
                return existing
            
            idempotent_cache_misses.labels(tool_name, agent_id).inc()
            
            with tool_execution_time.labels(tool_name, agent_id).time():
                try:
                    result = func(*args, **kwargs)
                    cache_result(key, result)
                    return result
                except Exception as e:
                    error_type = classify_error(e)
                    retry_attempts.labels(tool_name, agent_id, error_type).inc()
                    raise
        
        return wrapper
    return decorator

Key alerts to set up:

  • Cache hit rate drops below 80%: Possible issue with key generation
  • Retry rate spikes: Possible upstream service degradation
  • Execution time > 10x normal: Possible database or network issue
  • Idempotency key collisions detected: Bug in key generation logic

Common Pitfalls and How to Avoid Them

Pitfall 1: Idempotency Keys Based on Timestamps

Wrong:

def generate_key():
    return str(time.time())  # WRONG: Different every time

Right:

def generate_key(user_id, action_type, resource_id):
    return f"{user_id}:{action_type}:{resource_id}"

Idempotency keys must be deterministic. If you generate a new key every time, retries will create duplicates.

Pitfall 2: Checking Idempotency After Side Effects

Wrong:

def transfer_funds(from_account, to_account, amount, key):
    # Execute first
    payment_processor.transfer(from_account, to_account, amount)
    
    # Check idempotency after (too late!)
    if check_cache(key):
        return get_from_cache(key)
    
    cache_result(key, "success")
    return "success"

Right:

def transfer_funds(from_account, to_account, amount, key):
    # Check idempotency first
    cached = check_cache(key)
    if cached:
        return cached
    
    # Then execute
    result = payment_processor.transfer(from_account, to_account, amount)
    
    # Then cache
    cache_result(key, result)
    return result

Always check before executing. This is the critical ordering.

Pitfall 3: Forgetting to Handle Concurrent Requests

If two requests with the same idempotency key arrive simultaneously, both might execute. Use database constraints or locks:

def transfer_funds(from_account, to_account, amount, key):
    # Use database constraint to prevent race conditions
    try:
        db.execute(
            "INSERT INTO idempotency_log (key, status) VALUES (%s, %s)",
            (key, "pending")
        )
    except IntegrityError:
        # Key already exists, another request is handling it
        # Wait for it to complete
        return wait_for_completion(key)
    
    try:
        result = payment_processor.transfer(from_account, to_account, amount)
        db.execute(
            "UPDATE idempotency_log SET status = %s, result = %s WHERE key = %s",
            ("success", result, key)
        )
        return result
    except Exception as e:
        db.execute(
            "UPDATE idempotency_log SET status = %s, error = %s WHERE key = %s",
            ("failed", str(e), key)
        )
        raise

Pitfall 4: Not Logging Side Effects

Wrong: Tool executes but logs nothing

Right: Every tool execution logs:

  • When it happened
  • What parameters it received
  • What the result was
  • Whether it was cached or executed
  • How long it took

This is non-negotiable for production systems, especially if you’re pursuing SOC 2 or ISO 27001 compliance. We’ve helped teams implement Security Audit (SOC 2 / ISO 27001) via Vanta and comprehensive logging is always a requirement.

Pitfall 5: Treating All Errors the Same

Wrong:

for i in range(3):
    try:
        return execute_tool()
    except:
        time.sleep(2 ** i)  # Retry everything

Right:

for i in range(3):
    try:
        return execute_tool()
    except PermanentError:
        raise  # Don't retry
    except TransientError:
        time.sleep(2 ** i)  # Retry

Classify errors correctly. Retrying a 404 or 401 is waste.

Pitfall 6: Idempotency Without Consistency Checks

Even with idempotent tools, things can go wrong. Add consistency checks:

def transfer_funds_with_verification(from_account, to_account, amount, key):
    # Check idempotency
    cached = check_cache(key)
    if cached:
        # Verify the transfer actually completed
        verify_transfer(from_account, to_account, amount)
        return cached
    
    # Execute
    result = payment_processor.transfer(from_account, to_account, amount)
    
    # Verify
    assert verify_transfer(from_account, to_account, amount)
    
    cache_result(key, result)
    return result

Summary and Next Steps

Idempotent tools are the foundation of reliable production agents. They’re not optional—they’re essential. Every tool you ship should follow these patterns:

  1. Generate deterministic idempotency keys from action parameters
  2. Check for existing results before executing side effects
  3. Log every execution with full context and timestamps
  4. Classify errors correctly and only retry transient failures
  5. Test idempotency thoroughly including agent restart scenarios
  6. Monitor cache hits, retries, and key collisions in production
  7. Handle concurrent requests with database constraints or locks

When you get this right, your agents become bulletproof. Retries work. Restarts are safe. Audits pass. And you sleep at night.

If you’re building agentic AI systems at scale, this is where the real engineering happens. It’s not sexy, but it’s what separates production systems from prototypes.

At PADISO, we’ve built these patterns into every AI & Agents Automation project we ship. Whether you’re a startup building your first agent or an enterprise modernising your platform engineering stack, we’ve seen the patterns that work and the ones that don’t.

If you’re ready to ship production-grade agents with the confidence that retries are safe, side effects are tracked, and your data stays consistent—let’s talk. We work with founders and operators across AI automation for customer service, supply chain automation, financial services, healthcare, and beyond.

Visit PADISO to learn more about our CTO as a Service and AI Strategy & Readiness offerings, or reach out to discuss your specific use case. We’ve helped teams ship agents that handle millions of transactions safely—and we can help you do the same.