- Published on
Orchestrating LLM Agents with AWS Step Functions: A Production-Grade Document Analysis Pipeline
- Authors

- Name
- Winston Brown
TL;DR — I built a serverless pipeline that uses AWS Step Functions to orchestrate Strands Agents (LLM-powered AI agents) for analyzing legal and compliance documents. The system chunks documents, runs parallel agent analysis, aggregates results, and generates structured reports. All infrastructure is defined as code with AWS CDK, and every cross-service boundary is type-safe via Pydantic v2.
The Problem
Enterprise documents—contracts, policies, compliance attestations—are high-stakes, high-volume, and notoriously tedious to review manually. A single MSA (Master Service Agreement) can be 50+ pages, and verifying it against SOC2, HIPAA, or GDPR controls requires both legal expertise and domain-specific compliance knowledge.
Large Language Models (LLMs) can help, but throwing a 50-page PDF at a single prompt is brittle:
- Context window limits force truncation
- Single-pass analysis misses nuanced clause interactions
- Unstructured output makes downstream processing unreliable
- No observability means you cannot audit what the model did
What we need is an orchestrated, multi-agent pipeline that:
- Breaks documents into manageable chunks
- Applies specialized AI agents in parallel
- Aggregates and deduplicates findings
- Produces structured, auditable reports
- Runs on serverless infrastructure with built-in error handling
The Solution: Step Functions + Strands Agents
I built exactly that. The architecture uses AWS Step Functions as the orchestration backbone and Strands Agents as the LLM execution framework.
Why Step Functions?
AWS Step Functions is genuinely powerful for AI pipelines:
- Visual state machines make complex workflows explicit and auditable—critical for compliance scenarios
- Map states provide controlled parallelism (with
max_concurrencyto avoid throttling LLM APIs) - Built-in retries and error handling per task, with dead-letter queues for failed chunks
- Distributed tracing via X-Ray and CloudWatch Logs integration
- Event-driven triggers via EventBridge when documents land in S3
Why Strands Agents?
Strands Agents is a Python SDK for building structured, tool-using agents on top of Amazon Bedrock:
- Pydantic-native output: Agents return structured data that validates against your schemas
- Tool composability: Agents can call external validators, APIs, or other agents
- Bedrock integration: Direct use of the Converse API with Nova Pro / Claude 3
- Resilience: Built-in retry with exponential backoff for throttling
Architecture Deep Dive
System Architecture
The diagram below shows the full serverless pipeline from document upload to structured report delivery:
Figure 1: End-to-end architecture showing S3 trigger, Step Functions orchestration, parallel Lambda execution with Bedrock LLM calls, and final artifact storage.
The Pipeline (4 Stages)
S3 Upload → Step Functions: Chunk → Map(Parallel Agents) → Aggregate → Report → S3 + DynamoDB
| Stage | Service | Responsibility |
|---|---|---|
| 1. Chunk | Lambda (chunker) | Reads S3 document, splits into semantic chunks with overlap |
| 2. Analyze | Map → Lambda (agent_executor) | Runs Strands Agents in parallel on each chunk: contract analysis + compliance checking |
| 3. Aggregate | Lambda (aggregator) | Deduplicates entities, ranks clauses/flags by risk, generates executive summary |
| 4. Report | Lambda (reporter) | Writes JSON + Markdown reports to S3, indexes metadata in DynamoDB |
Pipeline Flow
The diagram below breaks down the 4-stage pipeline with data transformations and the parallel agent execution model:
Figure 2: Detailed data flow through the 4 stages—from raw document chunking through parallel agent analysis to final report generation, plus error handling and observability patterns.
Stage 1: Chunker
Documents are split into semantically-meaningful chunks (paragraph boundaries) with configurable overlap. This preserves context at chunk boundaries while keeping each unit small enough for reliable LLM processing.
def _create_chunks(paragraphs, max_size, overlap):
"""Greedy paragraph packing into chunks with overlap."""
chunks = []
current_lines = []
current_size = 0
for para in paragraphs:
para_size = len(para)
if current_size + para_size > max_size and current_lines:
chunk_text = "\n\n".join(current_lines)
chunks.append(DocumentChunk(text=chunk_text, ...))
# Overlap: carry last text forward
overlap_text = chunk_text[-overlap:]
current_lines = [overlap_text + "\n\n" + para]
current_size = len(current_lines[0])
else:
current_lines.append(para)
current_size += para_size
if current_lines:
chunks.append(DocumentChunk(text="\n\n".join(current_lines), ...))
return chunks
Stage 2: Parallel Agent Execution (Map State)
The Step Functions Map state fans out each chunk to a Lambda running two Strands Agents concurrently via asyncio.gather:
Multi-Agent Execution Pattern
Inside each agent_executor Lambda invocation, two specialized Strands Agents run in parallel against the same document chunk, each calling Amazon Bedrock independently. Their results are merged into a single AnalysisResult:
Figure 3: Two Strands Agents (Contract Analyzer + Compliance Checker) executing concurrently within a single Lambda via asyncio.gather, each calling Bedrock independently before merging into a single Pydantic-validated result.
Agent A: Contract Analyzer
Extracts named entities (people, orgs, dates, monetary amounts) and identifies contract clauses with risk ratings.
from strands import Agent
SYSTEM_PROMPT = """You are a senior contract analyst...
RULES:
- Use exact text from the document; do not paraphrase clauses.
- Rate risk as: low, medium, high, critical.
- Output ONLY valid JSON conforming to the requested schema.
"""
def create_contract_analyzer():
return Agent(system_prompt=SYSTEM_PROMPT)
Agent B: Compliance Checker
Framework-specific evaluation against SOC2, HIPAA, GDPR, or FISMA controls. The system prompt is dynamically generated based on the target framework:
def _framework_prompt(framework):
if framework == ComplianceFramework.HIPAA:
return "Focus on §164.308 (Administrative Safeguards)..."
if framework == ComplianceFramework.SOC2:
return "Focus on Trust Services Criteria CC6.1, CC7.2..."
# ...
Both agents run in the same Lambda invocation to minimize cold starts and Step Functions state transitions.
Stage 3: Aggregator
After all chunks complete, the Aggregator Lambda:
- Deduplicates entities across chunks (e.g., "Acme Corp" mentioned in 3 chunks is listed once)
- Ranks clauses and flags by severity (CRITICAL → HIGH → MEDIUM → LOW)
- Generates an executive summary from chunk-level summaries
- Computes overall risk as the maximum severity found
def _compute_overall_risk(flags, clauses):
all_levels = [f.severity for f in flags] + [c.risk_level for c in clauses]
return max(all_levels, key=lambda r: RISK_ORDER[r])
Stage 4: Reporter
Persists two artifacts to S3:
{job_id}.json— StructuredFinalReportfor downstream systems{job_id}.md— Human-readable Markdown for legal/compliance teams
Also writes a metadata record to DynamoDB for querying by job ID, framework, or risk level.
Code Walkthrough: The Strands Agent
Here is the core of the contract analysis agent:
def run_contract_analysis(agent, chunk, job_id):
prompt = f"""Analyze the following document chunk and return JSON.
CHUNK (lines {chunk.start_line}-{chunk.end_line}):
{chunk.text}
Return JSON with this structure:
{{"entities": [...], "clauses": [...], "summary": "..."}}
"""
raw = agent.invoke(prompt)
data = _parse_json_safely(raw) # Strips markdown fences, validates JSON
return AnalysisResult(
chunk_id=chunk.chunk_id,
job_id=job_id,
entities=[Entity(**e) for e in data["entities"]],
clauses=[ContractClause(**c) for c in data["clauses"]],
summary=data["summary"],
processing_time_ms=elapsed_ms,
)
Key patterns here:
- Structured prompts with explicit JSON schemas reduce hallucination
_parse_json_safelyhandles models that wrap JSON in markdown fences- Pydantic v2 models validate field types and constraints automatically
tenacityretries with exponential backoff protect against Bedrock throttling
Infrastructure as Code (CDK)
The entire stack is defined in Python CDK:
# Map state: process chunks in parallel with concurrency control
map_state = sfn.Map(
self, "ParallelAnalysis",
items_path="$.chunks",
max_concurrency=10, # Prevents Bedrock throttling
)
map_state.iterator(agent_task)
# Chain: Chunk → Map → Aggregate → Report → Success
definition = (
chunk_task
.next(map_state)
.next(prepare_aggregator)
.next(aggregate_task)
.next(report_task)
.next(success_state)
)
Notable infrastructure choices:
- ARM64 Lambdas for better price-performance
- Lambda tracing (X-Ray) for distributed debugging
- CloudWatch alarms on failed Step Functions executions
- SNS email alerts for operational visibility
Type Safety Across Service Boundaries
Every Lambda input and output uses Pydantic v2 models:
class AnalysisResult(BaseModel):
chunk_id: str
job_id: str
entities: list[Entity]
clauses: list[ContractClause]
flags: list[ComplianceFlag]
summary: str
processing_time_ms: int = Field(..., ge=0)
This means:
- Invalid payloads fail fast at the Lambda entry point
- Auto-generated JSON schemas document the API contract
- IDE autocomplete works across the entire pipeline
Testing Strategy
| Layer | Approach |
|---|---|
| Agent Unit Tests | Mock Agent.invoke() with synthetic JSON responses; verify prompt construction and result parsing |
| Lambda Unit Tests | moto mocks for S3 and DynamoDB; verify handler input/output contracts |
| Integration | Local Step Functions with LocalStack (manual) |
| E2E | Deploy to dev account, trigger with sample documents |
Example agent test:
def test_run_contract_analysis(sample_chunk):
mock_agent = MagicMock()
mock_agent.invoke.return_value = json.dumps({
"entities": [{"text": "Acme Corp", "type": "organization"}],
"clauses": [{"title": "Payment Terms", "risk_level": "low"}],
"summary": "Low-risk payment terms.",
})
result = run_contract_analysis(mock_agent, sample_chunk, "job-001")
assert result.entities[0].text == "Acme Corp"
assert result.clauses[0].risk_level == RiskLevel.LOW
Lessons Learned
1. Chunk Overlap Matters
Without overlap, clauses split across chunk boundaries get garbled. We carry the last ~200 characters forward to preserve context.
2. Concurrency Limits Save You
Bedrock has account-level TPS limits. Setting max_concurrency=10 on the Map state keeps us well under typical limits while still processing large documents quickly.
3. Agents Need Strict Prompts
Vague prompts produce vague JSON. Explicit schemas in the prompt dramatically improve parse reliability:
prompt += '\nReturn JSON with this structure:\n{"entities": [...], "clauses": [...]}'
4. Structured Logging is Non-Negotiable
With parallel execution, correlating logs across chunks is painful without structured logging. We use structlog with job_id and chunk_id bound to every log line.
When to Use This Pattern
This architecture excels when you have:
- Long documents that exceed single-prompt context windows
- Multiple analysis dimensions (legal + compliance + financial)
- Audit requirements (every step is logged in Step Functions execution history)
- Variable volume (serverless scaling from 1 to 1000 concurrent documents)
It is overkill for simple classification tasks, but ideal for high-stakes document review where accuracy and traceability matter.
Repository
The full source code, CDK stack, and test suite are available on GitHub:
🔗 github.com/winston-brown/step-functions-strands-demo
About the Author
I am Winston Brown, a software engineer building AI-powered automation systems. I write about serverless architecture, LLM orchestration, and production engineering.
Published: March 2026
Consulting
Turn missed calls into booked appointments with AI systems
I help service businesses design lead response, intake automation, and operational workflows that capture more opportunities and reduce manual overhead.
Book a free strategy session to talk through your current workflows, where the bottlenecks are, and what a high-leverage implementation could look like.