Post

Async Agent Workflows

The most powerful agentic systems aren't the ones that respond in 2 seconds -- they're the ones that work for 2 hours while you sleep, checkpoint their progress, and have results waiting when you wake up.

Async Agent Workflows

The most powerful agentic systems aren’t the ones that respond in 2 seconds – they’re the ones that work for 2 hours while you sleep, checkpoint their progress, and have results waiting when you wake up.


Why Async Matters

Most agent demos are synchronous: user asks, agent responds. But the highest-value agent work is asynchronous:

  • Enterprise: Background data processing, report generation, automated monitoring, compliance checks that run nightly
  • Side ventures: Research agents that work overnight, content generation pipelines, market analysis that runs while you have a day job
  • Long-running tasks: Code migrations, large document analysis, multi-step research that takes 30+ minutes

Synchronous agents hit a wall when tasks take more than ~30 seconds. Users won’t wait. HTTP connections time out. The real unlock is agents that work independently and report back when done.


Pattern 1: Fire-and-Forget

The simplest async pattern. Submit a task, get a task ID, poll or receive a webhook when done.

1
2
3
4
5
6
7
Client --> POST /tasks {prompt, config}
       <-- 202 Accepted {task_id: "abc123"}

[Agent runs in background]

Client --> GET /tasks/abc123
       <-- {status: "completed", result: {...}}

Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()
task_store = {}  # Redis in production

@app.post("/tasks")
async def create_task(request: TaskRequest, bg: BackgroundTasks):
    task_id = generate_id()
    task_store[task_id] = {"status": "running", "result": None}
    bg.add_task(run_agent, task_id, request)
    return {"task_id": task_id}

async def run_agent(task_id: str, request: TaskRequest):
    try:
        result = await agent.run(request.prompt)
        task_store[task_id] = {"status": "completed", "result": result}
    except Exception as e:
        task_store[task_id] = {"status": "failed", "error": str(e)}

@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
    return task_store.get(task_id, {"status": "not_found"})

When to use: Simple background tasks, one-shot processing, report generation. Limitation: No intermediate progress, no recovery from worker crashes.


Pattern 2: Event-Driven Agent Pipeline

Agents communicate through events (messages on a queue). Each agent subscribes to specific event types, processes them, and emits new events.

1
2
3
4
5
6
7
8
9
10
11
12
13
[Trigger Event] --> Queue --> [Agent A: Research]
                                  |
                            [Event: research_complete]
                                  |
                              Queue --> [Agent B: Analyze]
                                            |
                                      [Event: analysis_complete]
                                            |
                                        Queue --> [Agent C: Report]
                                                      |
                                                [Event: report_ready]
                                                      |
                                                  Notify User

Implementation with Message Queues

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Using Redis Streams, Kafka, or Cloud Pub/Sub
import redis

r = redis.Redis()

# Agent A: Research
def research_agent():
    while True:
        event = r.xread({"research_requests": "$"}, block=0)
        task = parse_event(event)
        result = run_research(task["query"])
        r.xadd("analysis_requests", {
            "task_id": task["task_id"],
            "research_data": json.dumps(result)
        })

# Agent B: Analysis
def analysis_agent():
    while True:
        event = r.xread({"analysis_requests": "$"}, block=0)
        task = parse_event(event)
        result = run_analysis(task["research_data"])
        r.xadd("report_requests", {
            "task_id": task["task_id"],
            "analysis": json.dumps(result)
        })

Strengths: Agents are decoupled, can scale independently, natural retry semantics (re-queue failed events), agents can be written in different languages.

When to use: Multi-stage processing pipelines, enterprise data workflows, when different stages have different scaling needs.


Pattern 3: Agent Task Queue with Workers

A pool of agent workers pulls tasks from a shared queue. This is the workhorse pattern for scaling agent workloads.

1
2
3
4
5
6
7
8
9
10
11
12
┌──────────┐     ┌───────────┐     ┌──────────────────┐
│ Producer │ --> │   Queue   │ --> │ Worker Pool (N)  │
│ (API/    │     │ (Redis/   │     │ ┌──────────────┐ │
│  Cron/   │     │  SQS/     │     │ │ Worker 1     │ │
│  Event)  │     │  Celery)  │     │ │ (Agent)      │ │
└──────────┘     └───────────┘     │ ├──────────────┤ │
                                   │ │ Worker 2     │ │
                                   │ │ (Agent)      │ │
                                   │ ├──────────────┤ │
                                   │ │ Worker N     │ │
                                   │ └──────────────┘ │
                                   └──────────────────┘

Celery Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery import Celery

app = Celery("agents", broker="redis://localhost:6379/0")

@app.task(bind=True, max_retries=3, soft_time_limit=600)
def run_agent_task(self, task_config: dict):
    try:
        agent = create_agent(task_config["agent_type"])
        result = agent.run(task_config["prompt"])
        save_result(task_config["task_id"], result)
    except SoftTimeLimitExceeded:
        checkpoint_state(task_config["task_id"], agent.state)
        self.retry(countdown=60)
    except Exception as e:
        self.retry(exc=e, countdown=30)

Key features: Automatic retries, time limits, concurrency control, priority queues.

For side ventures: This is the pattern. Queue up 50 research tasks before bed, workers process them overnight using batch API pricing, results are ready in the morning.


Pattern 4: Webhook-Based Agent Communication

Agents communicate via HTTP webhooks. When Agent A finishes, it POSTs results to Agent B’s endpoint. Good for cross-service and cross-infrastructure communication.

1
2
3
4
5
6
7
8
9
10
# Agent A completes and notifies Agent B
async def on_research_complete(result):
    await httpx.post(
        "https://agent-b.internal/webhook/research-complete",
        json={
            "task_id": result.task_id,
            "findings": result.findings,
            "callback_url": "https://agent-a.internal/webhook/feedback"
        }
    )

When to use: Agents running on different infrastructure (e.g., one on GCP, one on a personal server), integrating with external services (Slack, email), serverless agent functions.

Caveat: Webhooks need idempotency (same webhook delivered twice should not duplicate work), authentication, and retry logic. Use webhook signatures (HMAC) to verify authenticity.


Pattern 5: Long-Running Agent Tasks with Checkpointing

For tasks that take 30+ minutes, you must checkpoint progress. Agents crash, connections drop, rate limits hit. Without checkpointing, a crash at minute 29 means starting over.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class CheckpointedAgent:
    def __init__(self, task_id: str, checkpoint_store):
        self.task_id = task_id
        self.store = checkpoint_store
    
    async def run(self, steps: list[str]):
        checkpoint = self.store.load(self.task_id)
        completed = checkpoint.get("completed_steps", [])
        
        for step in steps:
            if step in completed:
                continue  # Skip already-completed steps
            
            result = await self.execute_step(step)
            completed.append(step)
            
            self.store.save(self.task_id, {
                "completed_steps": completed,
                "latest_result": result,
                "timestamp": now()
            })
        
        return self.store.load(self.task_id)

LangGraph checkpointing provides this natively. Every node transition is checkpointed. On resume, the graph replays from the last checkpoint.

Rule of thumb: If a task takes >5 minutes, add checkpointing. If it takes >30 minutes, checkpointing is mandatory.


Pattern 6: Human-in-the-Loop (Async Approval)

The agent works autonomously but pauses at critical decision points, waiting for human approval before proceeding.

1
2
3
4
5
6
7
8
9
10
Agent works --> [Decision Point] --> Pause & Notify Human
                                         |
                                    Human Reviews
                                         |
                                    ┌────┴────┐
                                    │         │
                                 Approve    Reject
                                    │         │
                              Agent Resumes  Agent Stops
                                              (or revises)

Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class HumanApprovalAgent:
    async def run(self, task):
        plan = await self.create_plan(task)
        
        # Pause for human approval
        approval_id = await self.request_approval(
            plan=plan,
            notify_via=["slack", "email"],
            timeout_hours=24,
            auto_action="reject"  # What to do if human doesn't respond
        )
        
        # Agent is suspended here. Resumes when webhook fires.
        approval = await self.wait_for_approval(approval_id)
        
        if approval.status == "approved":
            return await self.execute_plan(plan, modifications=approval.notes)
        elif approval.status == "approved_with_changes":
            revised_plan = await self.revise_plan(plan, approval.feedback)
            return await self.execute_plan(revised_plan)
        else:
            return {"status": "rejected", "reason": approval.feedback}

Enterprise use cases: Approving refunds over a threshold, reviewing generated content before publication, approving infrastructure changes.

Side venture use cases: Agent drafts 10 blog posts overnight, you review and approve in the morning, agent publishes approved ones.


Orchestrating Async Agents for Side Ventures

A practical architecture for async agent-driven ventures:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
┌──────────────────────────────────────────────────┐
│                  You (Morning)                   │
│  Review results, approve, queue new tasks        │
└───────────────────────┬──────────────────────────┘
                        │
                   ┌────▼────┐
                   │  Queue  │  (Redis / SQS / Supabase)
                   └────┬────┘
                        │
        ┌───────────────┼───────────────┐
        │               │               │
   ┌────▼────┐    ┌─────▼─────┐   ┌────▼────┐
   │Research │    │  Content  │   │ Outreach│
   │ Agent   │    │  Agent    │   │  Agent  │
   │(scrape, │    │(write,    │   │(email,  │
   │ analyze)│    │ edit)     │   │ social) │
   └────┬────┘    └─────┬─────┘   └────┬────┘
        │               │              │
        └───────────────┼──────────────┘
                        │
                   ┌────▼────┐
                   │ Results │  (DB + notification)
                   │  Store  │
                   └─────────┘
                        │
┌───────────────────────▼──────────────────────────┐
│                  You (Evening)                   │
│  Review outputs, provide feedback, iterate       │
└──────────────────────────────────────────────────┘

Key principles:

  1. Queue everything. Never call agents directly. Always through a queue for retry and audit.
  2. Budget caps per agent. Research agent gets $2/day, content agent gets $1/day. Hard limits.
  3. Checkpoint every step. You’re paying for agent work. Don’t lose it to crashes.
  4. Batch API for cost. Overnight work is not latency-sensitive. Use 50% cheaper batch pricing.
  5. Morning review ritual. 15 minutes reviewing agent outputs, approving next steps, queuing new tasks.

Infrastructure Choices

Component Simple (Side Venture) Enterprise
Queue Redis / Supabase Edge Functions Cloud Pub/Sub / SQS / Kafka
Worker Single server / Railway Kubernetes / Cloud Run
Checkpoint SQLite / Supabase PostgreSQL / Cloud Storage
Notification Telegram bot / Email Slack / PagerDuty
Scheduling Cron job Cloud Scheduler / Airflow
Monitoring Logs + daily email summary Grafana / Datadog

Error Handling in Async Systems

Async agents fail differently than sync ones. The user isn’t watching when things break.

  1. Dead letter queue. Failed tasks go to a DLQ after max retries. Review daily.
  2. Timeout with checkpoint. If a task exceeds its time limit, save state and stop. Don’t let agents run forever.
  3. Circuit breaker. If an agent fails 5 times in a row, stop sending it work. Alert a human.
  4. Idempotent operations. Every agent operation should be safe to retry. If “send email” is a tool, it needs deduplication.
  5. Graceful degradation. If the research agent is down, the content agent should work with cached research, not fail entirely.

Anti-Patterns

  • No timeout on agent tasks. An agent stuck in a loop will burn tokens indefinitely. Always set max_tokens, max_turns, and wall-clock timeouts.
  • Synchronous chains pretending to be async. If Agent B can’t start until Agent A finishes, and there’s nothing else to parallelize, you don’t have an async pipeline – you have a slow synchronous one with extra infrastructure.
  • No notification on completion. Async agents that silently complete are useless. Always notify (Slack, email, Telegram) when results are ready.
  • Skipping checkpoints for “short” tasks. Tasks you think take 2 minutes can take 20 when rate limits hit or the model is slow. Checkpoint anyway.

References

This post is licensed under CC BY 4.0 by the author.