Async Agent Workflows
The most powerful agentic systems aren't the ones that respond in 2 seconds -- they're the ones that work for 2 hours while you sleep, checkpoint their progress, and have results waiting when you wake up.
The most powerful agentic systems aren’t the ones that respond in 2 seconds – they’re the ones that work for 2 hours while you sleep, checkpoint their progress, and have results waiting when you wake up.
Why Async Matters
Most agent demos are synchronous: user asks, agent responds. But the highest-value agent work is asynchronous:
- Enterprise: Background data processing, report generation, automated monitoring, compliance checks that run nightly
- Side ventures: Research agents that work overnight, content generation pipelines, market analysis that runs while you have a day job
- Long-running tasks: Code migrations, large document analysis, multi-step research that takes 30+ minutes
Synchronous agents hit a wall when tasks take more than ~30 seconds. Users won’t wait. HTTP connections time out. The real unlock is agents that work independently and report back when done.
Pattern 1: Fire-and-Forget
The simplest async pattern. Submit a task, get a task ID, poll or receive a webhook when done.
1
2
3
4
5
6
7
Client --> POST /tasks {prompt, config}
<-- 202 Accepted {task_id: "abc123"}
[Agent runs in background]
Client --> GET /tasks/abc123
<-- {status: "completed", result: {...}}
Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
task_store = {} # Redis in production
@app.post("/tasks")
async def create_task(request: TaskRequest, bg: BackgroundTasks):
task_id = generate_id()
task_store[task_id] = {"status": "running", "result": None}
bg.add_task(run_agent, task_id, request)
return {"task_id": task_id}
async def run_agent(task_id: str, request: TaskRequest):
try:
result = await agent.run(request.prompt)
task_store[task_id] = {"status": "completed", "result": result}
except Exception as e:
task_store[task_id] = {"status": "failed", "error": str(e)}
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
return task_store.get(task_id, {"status": "not_found"})
When to use: Simple background tasks, one-shot processing, report generation. Limitation: No intermediate progress, no recovery from worker crashes.
Pattern 2: Event-Driven Agent Pipeline
Agents communicate through events (messages on a queue). Each agent subscribes to specific event types, processes them, and emits new events.
1
2
3
4
5
6
7
8
9
10
11
12
13
[Trigger Event] --> Queue --> [Agent A: Research]
|
[Event: research_complete]
|
Queue --> [Agent B: Analyze]
|
[Event: analysis_complete]
|
Queue --> [Agent C: Report]
|
[Event: report_ready]
|
Notify User
Implementation with Message Queues
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Using Redis Streams, Kafka, or Cloud Pub/Sub
import redis
r = redis.Redis()
# Agent A: Research
def research_agent():
while True:
event = r.xread({"research_requests": "$"}, block=0)
task = parse_event(event)
result = run_research(task["query"])
r.xadd("analysis_requests", {
"task_id": task["task_id"],
"research_data": json.dumps(result)
})
# Agent B: Analysis
def analysis_agent():
while True:
event = r.xread({"analysis_requests": "$"}, block=0)
task = parse_event(event)
result = run_analysis(task["research_data"])
r.xadd("report_requests", {
"task_id": task["task_id"],
"analysis": json.dumps(result)
})
Strengths: Agents are decoupled, can scale independently, natural retry semantics (re-queue failed events), agents can be written in different languages.
When to use: Multi-stage processing pipelines, enterprise data workflows, when different stages have different scaling needs.
Pattern 3: Agent Task Queue with Workers
A pool of agent workers pulls tasks from a shared queue. This is the workhorse pattern for scaling agent workloads.
1
2
3
4
5
6
7
8
9
10
11
12
┌──────────┐ ┌───────────┐ ┌──────────────────┐
│ Producer │ --> │ Queue │ --> │ Worker Pool (N) │
│ (API/ │ │ (Redis/ │ │ ┌──────────────┐ │
│ Cron/ │ │ SQS/ │ │ │ Worker 1 │ │
│ Event) │ │ Celery) │ │ │ (Agent) │ │
└──────────┘ └───────────┘ │ ├──────────────┤ │
│ │ Worker 2 │ │
│ │ (Agent) │ │
│ ├──────────────┤ │
│ │ Worker N │ │
│ └──────────────┘ │
└──────────────────┘
Celery Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery import Celery
app = Celery("agents", broker="redis://localhost:6379/0")
@app.task(bind=True, max_retries=3, soft_time_limit=600)
def run_agent_task(self, task_config: dict):
try:
agent = create_agent(task_config["agent_type"])
result = agent.run(task_config["prompt"])
save_result(task_config["task_id"], result)
except SoftTimeLimitExceeded:
checkpoint_state(task_config["task_id"], agent.state)
self.retry(countdown=60)
except Exception as e:
self.retry(exc=e, countdown=30)
Key features: Automatic retries, time limits, concurrency control, priority queues.
For side ventures: This is the pattern. Queue up 50 research tasks before bed, workers process them overnight using batch API pricing, results are ready in the morning.
Pattern 4: Webhook-Based Agent Communication
Agents communicate via HTTP webhooks. When Agent A finishes, it POSTs results to Agent B’s endpoint. Good for cross-service and cross-infrastructure communication.
1
2
3
4
5
6
7
8
9
10
# Agent A completes and notifies Agent B
async def on_research_complete(result):
await httpx.post(
"https://agent-b.internal/webhook/research-complete",
json={
"task_id": result.task_id,
"findings": result.findings,
"callback_url": "https://agent-a.internal/webhook/feedback"
}
)
When to use: Agents running on different infrastructure (e.g., one on GCP, one on a personal server), integrating with external services (Slack, email), serverless agent functions.
Caveat: Webhooks need idempotency (same webhook delivered twice should not duplicate work), authentication, and retry logic. Use webhook signatures (HMAC) to verify authenticity.
Pattern 5: Long-Running Agent Tasks with Checkpointing
For tasks that take 30+ minutes, you must checkpoint progress. Agents crash, connections drop, rate limits hit. Without checkpointing, a crash at minute 29 means starting over.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class CheckpointedAgent:
def __init__(self, task_id: str, checkpoint_store):
self.task_id = task_id
self.store = checkpoint_store
async def run(self, steps: list[str]):
checkpoint = self.store.load(self.task_id)
completed = checkpoint.get("completed_steps", [])
for step in steps:
if step in completed:
continue # Skip already-completed steps
result = await self.execute_step(step)
completed.append(step)
self.store.save(self.task_id, {
"completed_steps": completed,
"latest_result": result,
"timestamp": now()
})
return self.store.load(self.task_id)
LangGraph checkpointing provides this natively. Every node transition is checkpointed. On resume, the graph replays from the last checkpoint.
Rule of thumb: If a task takes >5 minutes, add checkpointing. If it takes >30 minutes, checkpointing is mandatory.
Pattern 6: Human-in-the-Loop (Async Approval)
The agent works autonomously but pauses at critical decision points, waiting for human approval before proceeding.
1
2
3
4
5
6
7
8
9
10
Agent works --> [Decision Point] --> Pause & Notify Human
|
Human Reviews
|
┌────┴────┐
│ │
Approve Reject
│ │
Agent Resumes Agent Stops
(or revises)
Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class HumanApprovalAgent:
async def run(self, task):
plan = await self.create_plan(task)
# Pause for human approval
approval_id = await self.request_approval(
plan=plan,
notify_via=["slack", "email"],
timeout_hours=24,
auto_action="reject" # What to do if human doesn't respond
)
# Agent is suspended here. Resumes when webhook fires.
approval = await self.wait_for_approval(approval_id)
if approval.status == "approved":
return await self.execute_plan(plan, modifications=approval.notes)
elif approval.status == "approved_with_changes":
revised_plan = await self.revise_plan(plan, approval.feedback)
return await self.execute_plan(revised_plan)
else:
return {"status": "rejected", "reason": approval.feedback}
Enterprise use cases: Approving refunds over a threshold, reviewing generated content before publication, approving infrastructure changes.
Side venture use cases: Agent drafts 10 blog posts overnight, you review and approve in the morning, agent publishes approved ones.
Orchestrating Async Agents for Side Ventures
A practical architecture for async agent-driven ventures:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
┌──────────────────────────────────────────────────┐
│ You (Morning) │
│ Review results, approve, queue new tasks │
└───────────────────────┬──────────────────────────┘
│
┌────▼────┐
│ Queue │ (Redis / SQS / Supabase)
└────┬────┘
│
┌───────────────┼───────────────┐
│ │ │
┌────▼────┐ ┌─────▼─────┐ ┌────▼────┐
│Research │ │ Content │ │ Outreach│
│ Agent │ │ Agent │ │ Agent │
│(scrape, │ │(write, │ │(email, │
│ analyze)│ │ edit) │ │ social) │
└────┬────┘ └─────┬─────┘ └────┬────┘
│ │ │
└───────────────┼──────────────┘
│
┌────▼────┐
│ Results │ (DB + notification)
│ Store │
└─────────┘
│
┌───────────────────────▼──────────────────────────┐
│ You (Evening) │
│ Review outputs, provide feedback, iterate │
└──────────────────────────────────────────────────┘
Key principles:
- Queue everything. Never call agents directly. Always through a queue for retry and audit.
- Budget caps per agent. Research agent gets $2/day, content agent gets $1/day. Hard limits.
- Checkpoint every step. You’re paying for agent work. Don’t lose it to crashes.
- Batch API for cost. Overnight work is not latency-sensitive. Use 50% cheaper batch pricing.
- Morning review ritual. 15 minutes reviewing agent outputs, approving next steps, queuing new tasks.
Infrastructure Choices
| Component | Simple (Side Venture) | Enterprise |
|---|---|---|
| Queue | Redis / Supabase Edge Functions | Cloud Pub/Sub / SQS / Kafka |
| Worker | Single server / Railway | Kubernetes / Cloud Run |
| Checkpoint | SQLite / Supabase | PostgreSQL / Cloud Storage |
| Notification | Telegram bot / Email | Slack / PagerDuty |
| Scheduling | Cron job | Cloud Scheduler / Airflow |
| Monitoring | Logs + daily email summary | Grafana / Datadog |
Error Handling in Async Systems
Async agents fail differently than sync ones. The user isn’t watching when things break.
- Dead letter queue. Failed tasks go to a DLQ after max retries. Review daily.
- Timeout with checkpoint. If a task exceeds its time limit, save state and stop. Don’t let agents run forever.
- Circuit breaker. If an agent fails 5 times in a row, stop sending it work. Alert a human.
- Idempotent operations. Every agent operation should be safe to retry. If “send email” is a tool, it needs deduplication.
- Graceful degradation. If the research agent is down, the content agent should work with cached research, not fail entirely.
Anti-Patterns
- No timeout on agent tasks. An agent stuck in a loop will burn tokens indefinitely. Always set
max_tokens,max_turns, and wall-clock timeouts. - Synchronous chains pretending to be async. If Agent B can’t start until Agent A finishes, and there’s nothing else to parallelize, you don’t have an async pipeline – you have a slow synchronous one with extra infrastructure.
- No notification on completion. Async agents that silently complete are useless. Always notify (Slack, email, Telegram) when results are ready.
- Skipping checkpoints for “short” tasks. Tasks you think take 2 minutes can take 20 when rate limits hit or the model is slow. Checkpoint anyway.
References
- Celery – task queues for Python
- Temporal – durable execution engine for long-running workflows
- Inngest – event-driven serverless agent orchestration
- LangGraph Persistence & Streaming – LangGraph persistence and streaming documentation
- Anthropic Batch API – Anthropic Batch API documentation
- BullMQ – Node.js task queue (for TypeScript agents)