Table of Contents
- What Agentic Document Processing Actually Is
- Why Template-Based Extraction Breaks at Scale
- Core Architecture: Planning, Execution, and Validation
- Routing Logic and Decision Trees
- Memory, Context, and State Management
- Error Handling and Fallback Patterns
- Cost Control and Token Optimisation
- Observability and Production Monitoring
- Security, Compliance, and Data Governance
- Real Deployment Patterns and Lessons
- Building Your First Agentic Document Processor
What Agentic Document Processing Actually Is
Agentic document processing is not optical character recognition (OCR) plus template matching. It’s not rule-based extraction wrapped in a chatbot interface. It’s autonomous agents that read, reason about, and act on documents—planning multi-step workflows, validating outputs, and routing decisions without human intervention.
The distinction matters operationally. Intelligent document processing has existed for years: extract fields, classify document type, trigger workflows. That’s extraction-first. Agentic document processing flips the stack: the agent decides what to extract, when to ask for clarification, which path to take, and whether the document is even processable.
How AI agents and LLMs are evolving intelligent document processing shows the real shift—from “extract these 12 fields” to “understand this document, extract what matters, flag anomalies, and route it correctly.” The agent becomes the orchestrator, not the extraction engine.
In production, this means:
- Planning: The agent reads a document, decides what information is critical, and builds a plan (“I need to extract invoice number, date, and vendor; validate totals; check against PO database; flag if over threshold”).
- Execution: The agent uses tools—OCR, database lookups, rule checks, LLM calls—to execute that plan.
- Validation: The agent checks its own work, spots inconsistencies, and either self-corrects or escalates.
- Routing: The agent decides the next step: approve, flag for review, request more data, or reject.
This is radically different from a pipeline that says “extract these fields, then run this rule, then move to this queue.” Agentic systems adapt to document variance, handle edge cases, and make decisions in context.
Why Template-Based Extraction Breaks at Scale
Most teams start with template-based extraction because it works for 80% of documents—the clean, well-formatted ones. Then reality hits.
The Brittleness Problem
A vendor invoice comes in with a different layout. The invoice number is in the header instead of the footer. The date format changes. A line item spans two lines instead of one. The template fails. The document gets stuck in a queue. A human reviews it. The queue grows.
Template-based systems require a new template for every document variant. In a real business, vendors don’t follow your schema. Customers use different formats. Scanned documents have skew, shadows, and poor quality. You end up maintaining dozens of templates, each fragile, each needing retraining when the source changes.
The Context Problem
Template extraction doesn’t understand intent. It extracts fields, but it doesn’t know if a field is anomalous, contradictory, or missing critical context. If an invoice total doesn’t match the sum of line items, a template-based system flags it as an error. An agentic system understands the discrepancy, checks for common causes (rounding, tax calculation, discount), and either resolves it or escalates with context.
The Routing Problem
Templates extract data. Then you need separate logic to route documents. Is this invoice for payment, dispute, or return? Is this contract a renewal or new business? Template systems separate extraction from decision-making, creating brittle handoff logic.
Agentic systems integrate extraction and routing. The agent extracts, validates, and routes in one coherent workflow.
The Cost Problem
Template systems often use OCR + regex + rule engines. That’s cheap per document until you need to handle variance. Then you add more templates, more rules, more exceptions. You hire people to maintain the system. Agentic systems use LLMs, which cost more per document but adapt to variance without code changes.
The trade-off: template systems scale horizontally (more rules, more templates); agentic systems scale vertically (better prompts, better tool design). For 100 document types across 500 vendors, agentic wins. For 1 document type across 1 vendor, templates win.
Core Architecture: Planning, Execution, and Validation
A production agentic document processor has three layers:
Layer 1: Planning (The Thinking Phase)
When a document arrives, the agent doesn’t immediately extract. It reads the document, classifies it, and builds a plan.
def plan_extraction(document: Document) -> ExtractionPlan:
"""
Agent reads document, decides what to do.
Returns a plan: what to extract, what tools to use, what to validate.
"""
response = llm.call(
model="gpt-4",
messages=[
{"role": "system", "content": PLANNING_PROMPT},
{"role": "user", "content": f"Document:\n{document.text}"}
],
tools=[
{"name": "extract_fields", "description": "Extract specific fields from document"},
{"name": "classify_document", "description": "Classify document type"},
{"name": "flag_for_review", "description": "Flag anomalies or missing data"}
],
tool_choice="auto"
)
return parse_plan(response)
The planning phase is where the agent reasons about the document. It decides:
- Document type: Is this an invoice, contract, claim form, or something else?
- Extraction scope: What fields are critical? What fields are optional?
- Validation rules: What checks matter for this document type?
- Routing: Where should this document go after processing?
Planning is expensive (one LLM call per document), but it prevents wasted extraction on documents that shouldn’t be processed. It also gives you a clear audit trail: the agent said “this is an invoice, extract amount and vendor, validate against PO database, flag if over $50k.”
Layer 2: Execution (The Action Phase)
Once the plan is set, the agent executes tools in sequence. This is where the actual extraction happens, but it’s guided by the plan.
def execute_plan(document: Document, plan: ExtractionPlan) -> ExtractionResult:
"""
Agent executes the plan: calls tools, validates outputs, adapts if needed.
"""
state = AgentState(document=document, plan=plan, extracted_data={})
while not state.is_complete():
# Agent decides next action based on plan and current state
action = agent.decide_next_action(state)
if action.type == "extract":
result = extract_field(document, action.field, action.method)
state.extracted_data[action.field] = result
elif action.type == "validate":
is_valid = validate_field(state.extracted_data, action.field, action.rule)
if not is_valid:
state.add_flag(f"Validation failed: {action.field}")
elif action.type == "lookup":
# Query external system (PO database, vendor registry, etc.)
lookup_result = external_lookup(action.query)
state.context[action.key] = lookup_result
elif action.type == "escalate":
state.escalate(action.reason)
break
return state.to_result()
Execution is where you integrate tools. Tools are the agent’s hands:
- OCR tools: Extract text from images.
- Parsing tools: Parse structured data (dates, amounts, addresses).
- Lookup tools: Query databases (PO systems, vendor registries, compliance databases).
- Validation tools: Check consistency (invoice total = sum of lines, date is valid, vendor exists).
- Routing tools: Send documents to queues or workflows.
Tools should be deterministic and fast. If a tool is slow or unreliable, the agent wastes time and tokens retrying.
Layer 3: Validation (The Verification Phase)
After extraction, the agent validates its own work. This is critical because LLMs hallucinate.
def validate_extraction(document: Document, extracted: dict, plan: ExtractionPlan) -> ValidationResult:
"""
Agent checks its own work: are extracted fields consistent? Do they match the document?
Are there red flags?
"""
validation_checks = [
check_field_presence(extracted, plan.required_fields),
check_data_consistency(extracted), # e.g., invoice total = sum of lines
check_against_document(document, extracted), # re-read document to verify
check_business_rules(extracted, plan.rules), # e.g., amount < $50k
]
flags = []
for check in validation_checks:
if not check.passed:
flags.append(check.flag)
if flags:
return ValidationResult(status="flagged", flags=flags, action="review")
else:
return ValidationResult(status="valid", action="approve")
Validation is where you catch errors before they propagate. A document with a flagged field goes to a human reviewer, not into your business process.
Validation also gives you feedback to improve the agent. If the agent frequently flags a certain field, that’s a signal that the extraction logic needs tuning.
Routing Logic and Decision Trees
After a document is processed, it needs to go somewhere. Routing is where agentic systems shine because the agent has full context.
Static Routing (The Baseline)
Static routing is simple: if document type is invoice and amount < $1000, approve automatically. Otherwise, flag for review.
def route_document(extracted: dict, validation: ValidationResult) -> Route:
if validation.status == "flagged":
return Route(destination="review_queue", priority="high")
doc_type = extracted["document_type"]
amount = extracted.get("amount", 0)
if doc_type == "invoice" and amount < 1000:
return Route(destination="approve", priority="auto")
elif doc_type == "invoice" and amount < 10000:
return Route(destination="approval_queue", priority="normal", approver="finance_manager")
else:
return Route(destination="approval_queue", priority="high", approver="cfo")
Static routing works until your business rules get complex. Then you need dynamic routing.
Dynamic Routing (The Agentic Approach)
Dynamic routing uses the agent to decide the next step based on context, history, and business state.
def route_with_agent(document: Document, extracted: dict, validation: ValidationResult) -> Route:
"""
Agent decides routing based on full context: document content, extracted data,
validation results, and external context (vendor history, user preferences, etc.).
"""
context = {
"document_type": extracted.get("document_type"),
"vendor": extracted.get("vendor"),
"amount": extracted.get("amount"),
"validation_flags": validation.flags,
"vendor_history": lookup_vendor_history(extracted.get("vendor")),
"user_preferences": lookup_user_preferences(extracted.get("user_id")),
}
routing_decision = agent.decide(
prompt=ROUTING_PROMPT,
context=context,
options=[
"auto_approve",
"approval_queue",
"review_queue",
"escalate_to_cfo",
"request_more_info",
"reject"
]
)
return Route(
destination=routing_decision.option,
priority=routing_decision.priority,
reason=routing_decision.reasoning
)
Dynamic routing is more expensive (another LLM call), but it adapts to context. If a vendor has a perfect payment history, an invoice from them might auto-approve even if it’s large. If a user has previously flagged a vendor, invoices from that vendor go to review.
Routing Patterns in Practice
Pattern 1: Threshold-Based: Route based on amount, complexity, or risk score. Cheap, predictable.
Pattern 2: Vendor-Based: Route based on vendor history, contract terms, or SLA. Requires vendor data.
Pattern 3: User-Based: Route based on user preferences, approval limits, or department. Requires user context.
Pattern 4: Time-Based: Route based on urgency, deadline, or business cycle. Requires temporal awareness.
Pattern 5: Hybrid: Combine multiple signals. Use static rules for 80% of documents, agent-based routing for edge cases.
Most production systems use Pattern 5. Static rules handle high-volume, low-variance documents. Agent-based routing handles exceptions and complex cases.
Memory, Context, and State Management
Agentic systems are stateful. They need to remember what they’ve done, what they’ve learned, and what context matters.
Short-Term Memory (Within a Document)
As the agent processes a single document, it accumulates state: extracted fields, validation results, flags, and context lookups.
class AgentState:
def __init__(self, document: Document, plan: ExtractionPlan):
self.document = document
self.plan = plan
self.extracted_data = {} # Fields extracted so far
self.validation_flags = [] # Issues found
self.context = {} # External lookups (vendor info, PO data, etc.)
self.actions_taken = [] # Audit trail
self.tokens_used = 0 # Cost tracking
def add_extracted_field(self, field: str, value: any, confidence: float):
self.extracted_data[field] = {"value": value, "confidence": confidence}
def add_flag(self, flag: str, severity: str = "warning"):
self.validation_flags.append({"flag": flag, "severity": severity})
def add_context(self, key: str, value: any):
self.context[key] = value
def to_result(self) -> ExtractionResult:
return ExtractionResult(
extracted_data=self.extracted_data,
flags=self.validation_flags,
context=self.context,
actions=self.actions_taken,
tokens_used=self.tokens_used
)
Short-term memory is straightforward: it’s the state object that persists for the duration of a single document’s processing.
Long-Term Memory (Across Documents)
Long-term memory is harder. It’s the agent’s ability to learn from past documents and improve over time.
class AgentMemory:
def __init__(self, storage: VectorDB):
self.storage = storage # e.g., Pinecone, Weaviate, Postgres with pgvector
def remember_extraction(self, document: Document, result: ExtractionResult):
"""
Store a successful extraction in memory.
Future documents can reference similar past extractions.
"""
embedding = embed(document.text)
self.storage.upsert(
id=document.id,
vector=embedding,
metadata={
"document_type": result.extracted_data.get("document_type"),
"vendor": result.extracted_data.get("vendor"),
"extraction_result": result.to_dict(),
"timestamp": datetime.now()
}
)
def recall_similar_documents(self, document: Document, k: int = 5) -> list:
"""
Find similar documents from the past.
Use these as examples for in-context learning.
"""
embedding = embed(document.text)
results = self.storage.query(vector=embedding, top_k=k)
return results
def learn_from_feedback(self, document_id: str, feedback: dict):
"""
When a human corrects the agent, update memory.
"""
self.storage.update(
id=document_id,
metadata={"human_feedback": feedback, "feedback_timestamp": datetime.now()}
)
Long-term memory is powerful but expensive. You need vector embeddings (cost), a vector database (infrastructure), and logic to use past examples in prompts (token cost). Use it selectively:
- High-variance document types: Invoices from 100+ vendors benefit from memory.
- Low-volume document types: A single contract type doesn’t need memory; rules suffice.
- Feedback loops: If you’re correcting the agent frequently, memory helps it learn.
Context Management
Context is the agent’s access to external information. Manage it carefully.
class ContextManager:
def __init__(self):
self.sources = {
"vendor_db": VendorDatabase(),
"po_system": POSystem(),
"user_preferences": UserPreferenceCache(),
"compliance_rules": ComplianceRuleEngine(),
}
def get_context(self, document: Document, needed_keys: list) -> dict:
"""
Fetch only the context needed for this document.
Don't load everything; it wastes tokens.
"""
context = {}
if "vendor" in needed_keys:
vendor = document.extracted_data.get("vendor")
if vendor:
context["vendor_info"] = self.sources["vendor_db"].get(vendor)
if "po_reference" in needed_keys:
po_ref = document.extracted_data.get("po_reference")
if po_ref:
context["po_info"] = self.sources["po_system"].get(po_ref)
# ... etc for other context sources
return context
Context is expensive. Each piece of context you include in the prompt increases token usage. Fetch only what the agent needs.
Error Handling and Fallback Patterns
Production agentic systems fail. Network timeouts, OCR errors, LLM hallucinations, database unavailability—all happen. Design for graceful degradation.
Error Categories
Transient errors: Network timeout, rate limit, temporary service unavailability. Retry with exponential backoff.
Permanent errors: Invalid document format, corrupted file, unsupported language. Escalate to human review.
Semantic errors: LLM hallucination, contradictory extraction, logic error. Validate and flag.
Retry Logic
def process_with_retry(document: Document, max_retries: int = 3) -> ExtractionResult:
for attempt in range(max_retries):
try:
# Try to process
plan = plan_extraction(document)
result = execute_plan(document, plan)
validation = validate_extraction(document, result.extracted_data, plan)
return result
except TransientError as e:
# Retry with backoff
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s
time.sleep(wait_time)
continue
else:
# Give up, escalate
return ExtractionResult(
status="error",
error=f"Transient error after {max_retries} retries: {e}",
action="escalate"
)
except PermanentError as e:
# Don't retry, escalate immediately
return ExtractionResult(
status="error",
error=f"Permanent error: {e}",
action="escalate"
)
Fallback Patterns
Fallback 1: Simpler Model: If GPT-4 fails or is too expensive, fall back to GPT-3.5-Turbo.
Fallback 2: Template-Based: If the agent can’t extract, fall back to template matching.
Fallback 3: Human Review: If all else fails, send to a human.
def process_with_fallback(document: Document) -> ExtractionResult:
try:
# Try agent-based extraction
return process_with_agent(document)
except Exception as e:
logger.warning(f"Agent extraction failed: {e}. Falling back to template.")
try:
# Try template-based extraction
return process_with_template(document)
except Exception as e:
logger.error(f"Template extraction also failed: {e}. Escalating to human.")
return ExtractionResult(
status="error",
error=f"Both agent and template extraction failed: {e}",
action="escalate",
priority="high"
)
Graceful Degradation
If a tool is unavailable, the agent should adapt.
def execute_plan_with_degradation(document: Document, plan: ExtractionPlan) -> ExtractionResult:
state = AgentState(document=document, plan=plan)
available_tools = check_tool_availability() # Which tools are up?
while not state.is_complete():
action = agent.decide_next_action(state, available_tools=available_tools)
if action.type == "lookup" and "vendor_db" not in available_tools:
# Vendor DB is down. Skip this lookup, continue with other extractions.
logger.warning("Vendor DB unavailable. Skipping vendor lookup.")
state.add_flag("Vendor lookup skipped due to service unavailability")
continue
# ... normal execution
return state.to_result()
Graceful degradation means the system keeps working, even if some features are unavailable. A document might not be fully validated, but it still gets extracted and routed.
Cost Control and Token Optimisation
LLM calls are expensive. A large document processed by GPT-4 can cost $0.50–$2.00 per document. At scale, this adds up.
Token Budgeting
Before you call an LLM, know your token budget.
class TokenBudget:
def __init__(self, max_tokens_per_document: int = 8000, cost_per_1k_tokens: float = 0.03):
self.max_tokens = max_tokens_per_document
self.cost_per_1k = cost_per_1k_tokens
def estimate_cost(self, document: Document) -> float:
# Rough estimate: 1 token ≈ 4 characters
estimated_tokens = len(document.text) / 4
return (estimated_tokens / 1000) * self.cost_per_1k
def is_within_budget(self, document: Document) -> bool:
estimated_tokens = len(document.text) / 4
return estimated_tokens < self.max_tokens
If a document is too large (e.g., a 100-page contract), don’t process it with a single LLM call. Chunk it.
Document Chunking
def chunk_document(document: Document, chunk_size: int = 4000) -> list:
"""
Split large documents into chunks.
Process each chunk separately, then aggregate results.
"""
chunks = []
text = document.text
while len(text) > chunk_size:
# Find a good split point (end of sentence, not mid-word)
split_point = text.rfind(".", 0, chunk_size)
if split_point == -1:
split_point = chunk_size
chunks.append(text[:split_point])
text = text[split_point:]
if text:
chunks.append(text)
return chunks
def process_large_document(document: Document) -> ExtractionResult:
chunks = chunk_document(document)
results = []
for chunk in chunks:
chunk_doc = Document(text=chunk, metadata=document.metadata)
result = process_with_agent(chunk_doc)
results.append(result)
# Aggregate results from all chunks
aggregated = aggregate_chunk_results(results)
return aggregated
Prompt Optimisation
Shorten your prompts. Every token costs money.
Bad prompt (300 tokens):
You are a highly skilled document processing expert with 20 years of experience
in financial document analysis. Your task is to extract information from the
following invoice document. Please be very careful and thorough. Extract the
following fields: invoice number, invoice date, vendor name, vendor address,
billing address, line items (description, quantity, unit price, total), subtotal,
tax, and total amount due. Be sure to validate that the subtotal plus tax equals
the total. If there are any discrepancies, flag them for review. Also, check if
the vendor exists in our database and flag if it doesn't. Return the results in
JSON format.
Good prompt (100 tokens):
Extract from invoice:
- Invoice number, date
- Vendor name, address
- Line items (description, qty, price, total)
- Subtotal, tax, total
Validate: subtotal + tax = total
Flag: vendor not in database
Return JSON.
The good prompt is 1/3 the size and conveys the same information.
Model Selection
Use the cheapest model that works.
- GPT-4: $0.03/1K input, $0.06/1K output. Use for complex reasoning, edge cases.
- GPT-3.5-Turbo: $0.0005/1K input, $0.0015/1K output. Use for straightforward extraction.
- Claude 3 Haiku: $0.00025/1K input, $0.00125/1K output. Use for simple tasks.
Route documents based on complexity:
def select_model(document: Document, plan: ExtractionPlan) -> str:
complexity = estimate_complexity(document, plan)
if complexity == "simple":
return "gpt-3.5-turbo" # Cheap
elif complexity == "moderate":
return "gpt-4" # Balanced
elif complexity == "complex":
return "gpt-4" # Worth the cost
else:
return "claude-3-haiku" # Fallback, cheapest
Caching and Deduplication
If you process the same document twice, use cached results.
class ExtractionCache:
def __init__(self, storage: Redis):
self.storage = storage
def get_cached_result(self, document: Document) -> Optional[ExtractionResult]:
# Hash the document content
doc_hash = hashlib.sha256(document.text.encode()).hexdigest()
cached = self.storage.get(f"extraction:{doc_hash}")
if cached:
return ExtractionResult.from_json(cached)
return None
def cache_result(self, document: Document, result: ExtractionResult):
doc_hash = hashlib.sha256(document.text.encode()).hexdigest()
self.storage.set(
f"extraction:{doc_hash}",
result.to_json(),
ex=86400 # 24 hours
)
Observability and Production Monitoring
You can’t fix what you can’t see. Agentic systems are black boxes; you need observability.
Metrics to Track
Extraction accuracy: What % of fields are extracted correctly? Track per document type, per vendor.
Validation flags: How many documents are flagged? What are the most common flags?
Routing accuracy: How often does the agent route documents to the correct destination?
Cost per document: How many tokens does each document consume? What’s the cost trend?
Latency: How long does each document take to process? Are there bottlenecks?
Error rate: How often do errors occur? Are they transient or permanent?
Instrumentation
from opentelemetry import metrics, trace
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
# Counters
extraction_counter = meter.create_counter("extraction_total")
flag_counter = meter.create_counter("validation_flags_total")
error_counter = meter.create_counter("errors_total")
# Gauges
token_gauge = meter.create_observable_gauge("tokens_used")
cost_gauge = meter.create_observable_gauge("cost_per_document")
# Histograms
latency_histogram = meter.create_histogram("processing_latency_seconds")
def process_document_instrumented(document: Document) -> ExtractionResult:
with tracer.start_as_current_span("process_document") as span:
span.set_attribute("document.type", document.type)
span.set_attribute("document.size_bytes", len(document.text))
start_time = time.time()
try:
result = process_with_agent(document)
# Record metrics
extraction_counter.add(1, {"document_type": document.type, "status": "success"})
flag_counter.add(len(result.flags), {"document_type": document.type})
token_gauge.set(result.tokens_used)
cost_gauge.set(result.tokens_used * 0.03 / 1000) # Rough cost
latency = time.time() - start_time
latency_histogram.record(latency)
return result
except Exception as e:
error_counter.add(1, {"document_type": document.type, "error_type": type(e).__name__})
raise
Dashboards
Build dashboards to visualize metrics:
- Extraction accuracy: Line chart over time. Alert if accuracy drops below 95%.
- Flag distribution: Bar chart of most common flags. Use to prioritise improvements.
- Cost trend: Line chart of cost per document. Alert if cost spikes.
- Latency percentiles: P50, P95, P99 latency. Spot slow documents.
- Error rate: Line chart of error % over time. Alert if error rate increases.
Logging
Log everything, but be selective about what you store long-term.
import logging
logger = logging.getLogger(__name__)
def process_document_logged(document: Document) -> ExtractionResult:
logger.info(f"Processing document {document.id}", extra={
"document_type": document.type,
"document_size": len(document.text),
})
try:
plan = plan_extraction(document)
logger.debug(f"Extraction plan: {plan}", extra={"document_id": document.id})
result = execute_plan(document, plan)
logger.info(f"Extraction complete for {document.id}", extra={
"extracted_fields": len(result.extracted_data),
"flags": len(result.flags),
"tokens_used": result.tokens_used,
})
return result
except Exception as e:
logger.error(f"Processing failed for {document.id}", exc_info=True, extra={
"document_type": document.type,
"error_type": type(e).__name__,
})
raise
Log at INFO level for normal operations, DEBUG for detailed troubleshooting, ERROR for failures. This keeps logs manageable.
Security, Compliance, and Data Governance
Agentic document processing often handles sensitive data: invoices, contracts, personal information. Security matters.
Data Classification
Not all documents are equal. Classify them by sensitivity.
class DataClassification:
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
def classify_document_sensitivity(document: Document) -> str:
"""
Determine sensitivity level based on content.
"""
text = document.text.lower()
# Check for sensitive patterns
if any(pattern in text for pattern in ["ssn", "tax id", "credit card", "bank account"]):
return DataClassification.RESTRICTED
if any(pattern in text for pattern in ["confidential", "proprietary", "trade secret"]):
return DataClassification.CONFIDENTIAL
if any(pattern in text for pattern in ["internal use only"]):
return DataClassification.INTERNAL
return DataClassification.PUBLIC
Data Retention
Don’t keep data longer than needed.
class DataRetentionPolicy:
PUBLIC = 365 # 1 year
INTERNAL = 90 # 3 months
CONFIDENTIAL = 30 # 1 month
RESTRICTED = 7 # 1 week
def apply_retention_policy(document: Document, result: ExtractionResult):
sensitivity = classify_document_sensitivity(document)
retention_days = DataRetentionPolicy[sensitivity]
# Schedule deletion
deletion_date = datetime.now() + timedelta(days=retention_days)
schedule_deletion(document.id, deletion_date)
Audit Trails
Track who accessed what, when, and why.
class AuditLog:
def log_access(self, document_id: str, user_id: str, action: str, reason: str = None):
log_entry = {
"document_id": document_id,
"user_id": user_id,
"action": action, # "view", "extract", "approve", "reject"
"reason": reason,
"timestamp": datetime.now(),
"ip_address": get_user_ip(),
}
self.storage.insert(log_entry)
def log_extraction(self, document_id: str, extracted_fields: list):
log_entry = {
"document_id": document_id,
"action": "extraction",
"extracted_fields": extracted_fields,
"timestamp": datetime.now(),
}
self.storage.insert(log_entry)
Compliance Frameworks
If you’re processing documents for regulated industries, you need compliance.
SOC 2 Type II: Relevant if you’re processing documents on behalf of customers. Requires controls over access, data integrity, and confidentiality. PADISO offers SOC 2 compliance support to help teams achieve audit-readiness.
ISO 27001: Information security management. Covers access controls, encryption, incident response.
HIPAA: Health Insurance Portability and Accountability Act. Required if processing health documents. Requires encryption, audit logs, access controls.
GDPR: General Data Protection Regulation. Required if processing EU personal data. Requires consent, data minimisation, right to deletion.
For agentic document processing, compliance means:
- Data minimisation: Extract only what you need.
- Encryption: Encrypt data at rest and in transit.
- Access controls: Limit who can view documents and extracted data.
- Audit logs: Track all access and modifications.
- Data retention: Delete data when no longer needed.
- Incident response: Have a plan for data breaches.
LLM-Specific Security
When you send documents to an LLM (OpenAI, Anthropic, etc.), you’re sharing data with a third party. Be careful.
def should_use_external_llm(document: Document) -> bool:
"""
Determine if it's safe to send this document to an external LLM.
"""
sensitivity = classify_document_sensitivity(document)
# Don't send restricted or confidential data to external LLMs
if sensitivity in [DataClassification.RESTRICTED, DataClassification.CONFIDENTIAL]:
return False
return True
def process_with_appropriate_llm(document: Document) -> ExtractionResult:
if should_use_external_llm(document):
# Use OpenAI, Anthropic, etc.
return process_with_external_llm(document)
else:
# Use a local model (Llama, Mistral, etc.)
return process_with_local_llm(document)
For sensitive documents, use on-premise or self-hosted LLMs. They’re slower and more expensive to run, but they keep data private.
Real Deployment Patterns and Lessons
Theory is clean. Production is messy. Here’s what actually happens.
Pattern 1: Synchronous Processing (Simple, Slow)
Document arrives → agent processes → result returned immediately.
Works for: Low volume, low latency requirements.
Fails for: High volume, large documents, external lookups.
@app.post("/extract")
def extract_document(file: UploadFile) -> dict:
document = parse_document(file)
result = process_with_agent(document) # Blocks until complete
return result.to_dict()
Problem: If processing takes 30 seconds, the HTTP request times out.
Pattern 2: Asynchronous Processing (Complex, Scalable)
Document arrives → queued → agent processes in background → result stored → client polls or gets webhook.
Works for: High volume, large documents, external dependencies.
Fails for: Real-time requirements, simple cases (over-engineered).
@app.post("/extract")
def extract_document(file: UploadFile) -> dict:
document = parse_document(file)
job_id = str(uuid.uuid4())
# Queue the job
task_queue.enqueue(process_extraction_task, document, job_id)
# Return immediately
return {"job_id": job_id, "status": "queued"}
@app.get("/extract/{job_id}")
def get_extraction_result(job_id: str) -> dict:
result = result_store.get(job_id)
if result:
return {"status": "complete", "result": result.to_dict()}
else:
return {"status": "processing"}
# Background task
def process_extraction_task(document: Document, job_id: str):
result = process_with_agent(document)
result_store.set(job_id, result)
This is what production looks like. You need a job queue (Celery, Bull, RabbitMQ), a result store (Redis, PostgreSQL), and polling or webhooks.
Pattern 3: Batch Processing (Efficient, Delayed)
Documents accumulate → batch processed → results stored.
Works for: Very high volume, cost-sensitive, batch deadlines (daily, hourly).
Fails for: Real-time requirements.
def batch_process_documents(batch_size: int = 100):
while True:
# Fetch unprocessed documents
documents = db.query(Document).filter(Document.status == "pending").limit(batch_size).all()
if not documents:
time.sleep(60) # Wait for more documents
continue
# Process batch
results = []
for doc in documents:
result = process_with_agent(doc)
results.append(result)
# Store results
for doc, result in zip(documents, results):
db.update(Document).filter(Document.id == doc.id).set(status="complete", result=result)
db.commit()
Batch processing is efficient because you can parallelize, reuse context, and optimise for throughput. But it introduces latency.
Pattern 4: Hybrid (Best of All Worlds)
Use synchronous for small, fast documents. Async for large, slow documents. Batch for very high volume.
@app.post("/extract")
def extract_document(file: UploadFile) -> dict:
document = parse_document(file)
# Estimate processing time
estimated_time = estimate_processing_time(document)
if estimated_time < 5: # Fast
# Process synchronously
result = process_with_agent(document)
return result.to_dict()
else: # Slow
# Process asynchronously
job_id = str(uuid.uuid4())
task_queue.enqueue(process_extraction_task, document, job_id)
return {"job_id": job_id, "status": "queued"}
This gives you the best of both worlds: fast responses for simple documents, scalability for complex ones.
Lesson 1: Start Simple, Scale Later
Don’t build a distributed system on day one. Start with synchronous processing. When you hit latency or throughput limits, move to async. When you hit cost limits, move to batch.
Lesson 2: Monitor From Day One
Instrumentation is not optional. Add logging, metrics, and tracing from the start. You’ll thank yourself when debugging production issues.
Lesson 3: Test With Real Documents
Your test documents are clean. Your production documents are messy. Test with real documents from day one. Build a test dataset of 50–100 real documents covering edge cases.
Lesson 4: Humans in the Loop
No agentic system is 100% accurate. Plan for human review from the start. Flag documents for review, track human feedback, and use it to improve the agent.
Lesson 5: Cost Will Surprise You
LLM costs are hidden. A $0.01 per-document cost becomes $10k/month at 1M documents. Track costs obsessively. Optimise prompts, use cheaper models, cache results.
Building Your First Agentic Document Processor
Ready to build? Here’s a step-by-step guide.
Step 1: Define Your Use Case
Choose a specific document type and workflow. Don’t try to handle all documents at once.
Good first use case: Invoices from 5–10 vendors. Well-structured, high volume, clear extraction goals.
Bad first use case: Mixed contracts, forms, and emails. Unstructured, low volume, unclear extraction goals.
Step 2: Collect Sample Documents
Gather 50–100 real documents. Include edge cases: different vendors, formats, errors.
Step 3: Define Extraction Goals
What fields do you need? What validations matter? What’s the routing logic?
Example: Extract invoice number, date, vendor, amount. Validate that vendor exists in database. Route to approval if amount < $1000, else flag for review.
Step 4: Build the Planning Prompt
Write a prompt that instructs the LLM to plan the extraction.
You are an invoice processing agent. Your task is to extract information from invoices.
When you receive an invoice, do the following:
1. Classify the document (is it really an invoice?)
2. Decide what fields to extract (invoice number, date, vendor, amount, etc.)
3. Decide what validations to perform (does vendor exist? is amount reasonable?)
4. Decide where to route the document (approve, review, reject)
Return a JSON plan with your decisions.
Step 5: Build the Extraction Tools
Create tools the agent can use:
tools = [
{
"name": "extract_field",
"description": "Extract a specific field from the document",
"parameters": {
"field_name": "string",
"field_type": "string (date, number, text, etc.)"
}
},
{
"name": "lookup_vendor",
"description": "Check if a vendor exists in the database",
"parameters": {
"vendor_name": "string"
}
},
{
"name": "validate_amount",
"description": "Check if an amount is reasonable",
"parameters": {
"amount": "number",
"vendor": "string"
}
}
]
Step 6: Test on Sample Documents
Run your agent on your 50–100 sample documents. Track accuracy, flags, and errors.
Step 7: Iterate
Where does it fail? Refine the prompt, add tools, adjust validation logic.
Step 8: Deploy Carefully
Start with a small subset of production documents. Monitor closely. Expand gradually.
Step 9: Collect Feedback
When humans correct the agent, capture that feedback. Use it to improve.
Step 10: Scale
Once you’re confident, scale to all documents of that type. Then move to the next document type.
Conclusion and Next Steps
Agentic document processing is not magic. It’s a combination of planning, tool design, validation, and error handling. Done well, it adapts to document variance, handles edge cases, and scales to millions of documents. Done poorly, it hallucinates, breaks on edge cases, and becomes a liability.
The key principles:
- Plan before extracting: Have the agent decide what to do before it acts.
- Validate ruthlessly: Check the agent’s work. Don’t trust it blindly.
- Route intelligently: Use context to decide where documents go.
- Monitor obsessively: Track accuracy, cost, latency, and errors.
- Fail gracefully: Have fallbacks when the agent can’t decide.
- Secure by default: Treat documents as sensitive until proven otherwise.
- Start simple, scale later: Synchronous → async → batch as you grow.
- Keep humans in the loop: Agentic systems augment humans; they don’t replace them.
If you’re building agentic document processing systems, you’re likely managing complex workflows, scaling across multiple document types, or integrating with existing business systems. That’s where PADISO’s platform engineering expertise becomes valuable. We’ve built production AI systems for teams at Series A through Series C, handling AI strategy, architecture, and delivery from Sydney and across the US. If you’re tackling this at scale—whether it’s document automation, agentic AI orchestration, or platform re-platforming—our AI Quickstart Audit gives you a 2-week diagnostic of where you actually are, what to ship first, and what 90 days could unlock. Book a 30-minute call to discuss your specific use case.
For teams modernising with agentic AI and workflow automation, platform development services ensure your architecture scales, your costs stay predictable, and your team ships reliably. Whether you need fractional CTO leadership, venture studio co-build support, or security audit readiness via Vanta, we ship outcomes, not decks.