import boto3, json
bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
tools = [
{
"toolSpec": {
"name": "query_database",
"description": "Run a SQL query and return results",
"inputSchema": {
"json": {
"type": "object",
"properties": {"sql": {"type": "string"}},
"required": ["sql"]
}
}
}
}
]
messages = [{"role": "user", "content": [{"text": "How many orders came in last week?"}]}]
while True:
response = bedrock.converse(
modelId="anthropic.claude-3-5-sonnet-20241022-v2:0",
messages=messages,
toolConfig={"tools": tools}
)
output = response["output"]["message"]
messages.append(output)
stop_reason = response["stopReason"]
if stop_reason == "end_turn":
print(output["content"][0]["text"])
break
elif stop_reason == "tool_use":
# handle tool call, append result, continue loop
...import boto3, json
stepfunctions = boto3.client("stepfunctions", region_name="us-east-1")
# Kick off a multi-agent pipeline defined in Step Functions
response = stepfunctions.start_execution(
stateMachineArn="arn:aws:states:us-east-1:123456789:stateMachine:MultiAgentPipeline",
input=json.dumps({
"goal": "Write and review a market analysis for Q2",
"context": {"vertical": "SaaS", "region": "EMEA"}
})
)import boto3, json
bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
def compress_agent_output(raw_output: str, task_description: str) -> dict:
"""Ask Claude to distill a subagent's verbose output into a tight summary."""
prompt = f"""
You are a compression step in a multi-agent pipeline.
Task the subagent was solving: {task_description}
Raw subagent output:
{raw_output}
Return ONLY a JSON object with:
- "summary": 2-3 sentence synthesis of the key findings
- "key_facts": list of up to 5 critical facts the orchestrator needs
- "confidence": high | medium | low
- "flags": any issues or blockers the orchestrator should know about
"""
response = bedrock.converse(
modelId="anthropic.claude-3-5-sonnet-20241022-v2:0",
messages=[{"role": "user", "content": [{"text": prompt}]}]
)
return json.loads(response["output"]["message"]["content"][0]["text"])
import boto3, json
bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
def orchestrator_cycle(state: dict, subagent_results: list[dict]) -> dict:
"""
Orchestrator step: given current state and new subagent results,
update state and decide next actions.
"""
system_prompt = """You are an orchestrator in a multi-agent pipeline.
You will receive your current state (including the original goal) and
new results from subagents. Update the state and output your next actions.
ALWAYS re-read the original_goal before deciding anything.
Respond only with valid JSON."""
prompt = f"""
Current state:
{json.dumps(state, indent=2)}
New subagent results:
{json.dumps(subagent_results, indent=2)}
Respond with a JSON object:
{{
"updated_state": {{ ...full updated state including original_goal... }},
"next_actions": [
{{"agent": "research|coder|reviewer", "task": "...", "context": {{...}} }}
],
"done": false,
"final_answer": null
}}
"""
response = bedrock.converse(
modelId="anthropic.claude-3-5-sonnet-20241022-v2:0",
system=[{"text": system_prompt}],
messages=[{"role": "user", "content": [{"text": prompt}]}]
)
raw = response["output"]["message"]["content"][0]["text"]
return json.loads(raw)state["original_goal"] is always present in the payload — the orchestrator can never drift away from it because it's structurally required in every cycle's input.import boto3, json
bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
sqs = boto3.client("sqs", region_name="us-east-1")
DOWNSTREAM_QUEUE = "https://sqs.us-east-1.amazonaws.com/123456789/coder-agent-queue"
DLQ = "https://sqs.us-east-1.amazonaws.com/123456789/failed-outputs-dlq"
def validate_research_output(event, context):
for record in event["Records"]:
payload = json.loads(record["body"])
research_output = payload["output"]
original_task = payload["task"]
validation_prompt = f"""
You are a validation gate in a multi-agent pipeline.
A research agent produced the following output for this task: "{original_task}"
Output to validate:
{research_output}
Check for:
1. Does the output actually address the task?
2. Are there any obvious factual contradictions?
3. Are claims that need sources missing citations?
Respond with JSON:
{{"valid": true/false, "reason": "...", "corrected_output": null or corrected string}}
"""
response = bedrock.converse(
modelId="anthropic.claude-3-haiku-20240307-v1:0", # cheap model for gates
messages=[{"role": "user", "content": [{"text": validation_prompt}]}]
)
result = json.loads(response["output"]["message"]["content"][0]["text"])
target_queue = DOWNSTREAM_QUEUE if result["valid"] else DLQ
payload["output"] = result.get("corrected_output") or research_output
payload["validation"] = result
sqs.send_message(QueueUrl=target_queue, MessageBody=json.dumps(payload)){
"Comment": "Multi-agent pipeline with guardrails",
"StartAt": "OrchestratorPlan",
"States": {
"OrchestratorPlan": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:orchestrator",
"TimeoutSeconds": 60,
"Retry": [
{
"ErrorEquals": ["States.Timeout", "Lambda.ServiceException"],
"IntervalSeconds": 5,
"MaxAttempts": 2,
"BackoffRate": 2
}
],
"Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "HandleFailure" }],
"Next": "ParallelSubagents"
},
"ParallelSubagents": {
"Type": "Parallel",
"TimeoutSeconds": 120,
"Branches": [
{ "StartAt": "ResearchAgent", "States": { "ResearchAgent": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789:function:research-agent", "End": true } } },
{ "StartAt": "CoderAgent", "States": { "CoderAgent": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789:function:coder-agent", "End": true } } }
],
"Next": "OrchestratorSynthesize",
"Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "HandleFailure" }]
},
"OrchestratorSynthesize": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:orchestrator-synthesize",
"TimeoutSeconds": 60,
"End": true
},
"HandleFailure": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789:function:handle-failure",
"End": true
}
}
}
Start with the simplest thing that could work. Add agents when you've hit a real ceiling, not because the pattern looks elegant.