Skip to content

Async Event Architecture

DuraGraph’s runtime is event-driven. Every cross-process boundary — control plane to workers, workers back to the control plane, platform onboarding workflows, audit logging, real-time client streaming — is an event published to NATS JetStream.

This page describes the live event taxonomy. The REST API tells you what clients can call; the events on this page tell you how the system actually breathes between processes.


The async surface is partitioned into five domains. Each has its own subject prefix, a dedicated JetStream stream, and a retention policy chosen for its workload.

flowchart TB
    subgraph TenantAccount["Tenant Account — per approved tenant"]
        direction TB
        runs["run.* — RUNS"]
        exec["execution.* — EXECUTION"]
        intr["interrupt.* — INTERRUPTS"]
        cmds["worker.> — WORKER_COMMANDS"]
    end

    subgraph SystemAccount["System Account — singleton"]
        direction TB
        users["user.* — PLATFORM_USERS"]
        tenants["tenant.* — PLATFORM_TENANTS"]
    end

    SystemAccount -. optional imports .-> TenantAccount
DomainSubjectsStreamRetentionReason
Run lifecyclerun.*RUNS7d / 1 GBDebuggable for a week; SSE late-joiners can replay
Node executionexecution.*EXECUTION24h / 500 MBVery high volume; live streaming + same-day forensics
Human-in-the-loopinterrupt.*INTERRUPTS7d / 100 MBPending interrupts may wait days for a human
Worker commandsworker.>WORKER_COMMANDSWorkQueue / 1hEach command consumed by exactly one worker
Platform usersuser.*PLATFORM_USERS30d / 256 MBSparse but compliance-critical
Platform tenantstenant.*PLATFORM_TENANTS30d / 256 MBProvisioning workflow must survive restarts

Retention policies in one line:

  • limits — keep until age or size cap; fan-out to multiple consumers.
  • workqueue — delete on first ack; exactly one consumer.

This is the single most important architectural decision in the spec.

The problem. In a SaaS deployment, tenants share a NATS cluster. If tenantA could subscribe to tenantB’s run.completed, that’s a data leak. Subject-prefix conventions like tenant_{id}.run.created are not a security boundary — anyone with access to the NATS connection can SUB >.

The solution. Use NATS Accounts in operator-JWT mode. Each approved tenant gets its own Account, minted dynamically at approval time (no NATS server restart). Within an Account, subjects stay short and unprefixed (run.created, execution.node_started). The Account boundary is the security boundary.

All three accounts live under one NATS Operator. Subjects are unprefixed inside each account; cross-account access requires explicit imports.

flowchart LR
    subgraph TA1["Account: acme-corp"]
        direction TB
        CP1["control-plane user"]
        WK1["worker user"]
        TA1S[("Streams: RUNS, EXECUTION, INTERRUPTS, WORKER_COMMANDS")]
        CP1 --- TA1S
        WK1 --- TA1S
    end

    subgraph TA2["Account: globex-inc"]
        direction TB
        CP2["control-plane user"]
        WK2["worker user"]
        TA2S[("Streams: RUNS, EXECUTION, INTERRUPTS, WORKER_COMMANDS")]
        CP2 --- TA2S
        WK2 --- TA2S
    end

    subgraph SA["System Account"]
        direction TB
        PRV["platform-provisioner"]
        AUD["audit-writer"]
        ADM["admin observability"]
        SAS[("Streams: PLATFORM_USERS, PLATFORM_TENANTS")]
        PRV --- SAS
        AUD --- SAS
        ADM --- SAS
    end

    SA -. optional imports .-> TA1S
    SA -. optional imports .-> TA2S

Accounts can be minted without restarting the server. Provisioning a new tenant signs a new Account JWT and pushes it to NATS. Compare with config-file Accounts, which would require redeploys for every signup.

Defense in depth, plus practical needs:

  1. The System Account imports per-tenant streams for cross-tenant admin queries — it must attribute imported events.
  2. Postgres projections that span tenants need the foreign key.
  3. The audit log must attribute every event regardless of NATS context.

The Account boundary prevents leaks; tenant_id lets authorized observers correctly slice the data they’re allowed to see.


Lifecycle events for the Run aggregate. Published by the control plane after the corresponding domain method is called and saved (events go through the outbox).

ChannelEmitted whenRequired payload
run.createdNew run acceptedtenant_id, run_id, thread_id, assistant_id, occurred_at
run.startedRun.Start() (queued → running)tenant_id, run_id, occurred_at
run.completedRun.Complete(output)tenant_id, run_id, output, occurred_at
run.failedRun.Fail(err)tenant_id, run_id, error, occurred_at
run.cancelledRun.Cancel(reason)tenant_id, run_id, reason, occurred_at
run.requires_actionRun.RequiresAction(...)tenant_id, run_id, interrupt_id, reason, tool_calls, occurred_at
run.resumedRun.Resume(...) after human actiontenant_id, run_id, interrupt_id, tool_outputs, occurred_at

reason on run.requires_action is one of: tool_call, approval_required, input_needed.

These map 1:1 to the Run aggregate’s domain events. The mapping is what the outbox relay implements.


Per-node events. Emitted by whoever is executing the graph: the local engine if no worker is available, otherwise a worker process posting back via /workers/{id}/events, which the control plane re-emits onto the bus.

ChannelEmitted whenRequired payload
execution.node_startedBefore each node executestenant_id, run_id, node_id, node_type, occurred_at
execution.node_completedAfter each node returnstenant_id, run_id, node_id, node_type, duration_ms, output, occurred_at
execution.node_failedWhen a node throwstenant_id, run_id, node_id, node_type, error, occurred_at

node_type is one of: start, end, llm, tool, conditional.

The 24h retention reflects this stream’s volume — node events fire many times per run. They drive live SSE streams and same-day debugging, not long-term audit.


Human-in-the-loop events. Used by dashboards waiting on approvals and notification systems alerting humans.

ChannelEmitted whenRequired payload
interrupt.createdhumanloop.NewInterrupt savedtenant_id, interrupt_id, run_id, node_id, reason, state, tool_calls, occurred_at
interrupt.resolvedInterrupt.Resolve(toolOutputs)tenant_id, interrupt_id, run_id, tool_outputs, occurred_at

The command-pattern half of the bus. Unlike past-tense events above, these are imperative commands consumed from a WorkQueue stream — exactly one worker handles each.

ChannelConsumerack_waitPurpose
worker.graph.executegraph-executor5 minExecute a whole graph end-to-end
worker.llm.invokellm-worker2 minSingle LLM call (fine-grained dispatch)
worker.tool.executetool-worker1 minSingle tool execution

ack_wait is sized to the longest legitimate processing time. All three consumers use ack_policy: explicit and max_deliver: 3.


The SaaS onboarding flow. Lives in the System Account, not in any tenant Account. Admin consumers and the audit-log writer subscribe here.

ChannelEmitted when
user.signed_upOAuth callback succeeds for the first time
user.promoted_to_adminUser’s role set to admin (bootstrap or manual)
user.approvedAdmin approves a pending user
user.rejectedAdmin rejects a pending user
user.suspendedAdmin suspends a previously-approved user

Schema quirk: user.promoted_to_admin.promoted_by_user_id is field-absent (not JSON null) in the bootstrap case — there is no human actor when the first user auto-elevates. Consumers must treat absence as the bootstrap signal.

ChannelEmitted when
tenant.pendingTenant row created at signup time, awaiting approval
tenant.provisioningAdmin clicks Approve — kicks off the async workflow
tenant.approvedProvisioning completed successfully (terminal success)
tenant.provisioning_failedAny provisioning step failed; admin can retry
tenant.suspendedAdmin suspends a tenant; API access gated off
sequenceDiagram
    participant U as User
    participant Auth as Auth Handler
    participant Admin as Admin UI
    participant Cmd as ApproveTenant<br/>Command
    participant Prov as platform-<br/>provisioner
    participant DB as Postgres
    participant NATS as NATS

    U->>Auth: OAuth callback
    Auth->>DB: INSERT user (pending)
    Auth->>DB: INSERT tenant (pending)
    Auth->>NATS: user.signed_up
    Auth->>NATS: tenant.pending
    Note over Admin: Admin reviews
    Admin->>Cmd: Approve
    Cmd->>DB: UPDATE tenant SET status='provisioning'
    Cmd->>NATS: tenant.provisioning
    NATS->>Prov: deliver (filter: tenant.provisioning)
    Prov->>DB: CREATE DATABASE tenant_<32hex>
    Prov->>DB: golang-migrate Up
    Prov->>NATS: mint Account JWT
    Prov->>DB: UPDATE tenant SET status='approved'
    Prov->>NATS: tenant.approved
    Note over Prov: on failure: tenant.provisioning_failed<br/>(admin can retry)

Subtle but important: tenant.approved is published by the provisioner consumer, not by the ApproveTenant command handler. The command handler only emits tenant.provisioning; the success event is the terminal event of the async workflow. This separation makes retries safe — the admin re-publishing tenant.provisioning will not double-create resources because only the consumer ever emits tenant.approved.


StreamSubjectsRetentionMax ageStorage
RUNSrun.*limits7d / 1 GBfile
EXECUTIONexecution.*limits24h / 500 MBfile
INTERRUPTSinterrupt.*limits7d / 100 MBfile
WORKER_COMMANDSworker.>workqueue1hfile
PLATFORM_USERSuser.*limits30d / 256 MBfile
PLATFORM_TENANTStenant.*limits30d / 256 MBfile
ConsumerStreamFilterack_waitmax_deliver
run-processorRUNS30 s3
execution-trackerEXECUTION
graph-executorWORKER_COMMANDSworker.graph.execute5 m3
llm-workerWORKER_COMMANDSworker.llm.invoke2 m3
tool-workerWORKER_COMMANDSworker.tool.execute1 m3
platform-audit-usersPLATFORM_USERS30 s3
platform-audit-tenantsPLATFORM_TENANTS30 s3
platform-provisionerPLATFORM_TENANTStenant.provisioning5 m3

All consumers use ack_policy: explicit and deliver_policy: all.


  • At-least-once delivery on every stream. Consumers must be idempotent.
  • Per-subject ordering is preserved. Cross-subject ordering is not guaranteed.
  • Explicit ack — messages stay in the stream until ACKed; redelivery after ack_wait.
  • max_deliver: 3 — after three failures the message is parked. In production, attach a DLQ stream.

For worker.graph.execute specifically, idempotency is enforced at the database layer: task_assignments.claimed_by uses FOR UPDATE SKIP LOCKED so a redelivered task whose row is already claimed becomes a no-op claim.


AsyncAPI 3.0 separates channels (named conduits) from operations (who does what to them). The spec defines explicit operations for every publish and subscribe:

OperationActionChannelActor
publishRunCreatedsendrun.createdAPI server
publishNodeCompletedsendexecution.node_completedWorker
executeGraphsendworker.graph.executeControl plane
publishUserSignedUpsenduser.signed_upAuth handler
publishTenantProvisioningsendtenant.provisioningApproveTenant command
publishTenantApprovedsendtenant.approvedProvisioner (terminal)
subscribePlatformAuditUsersreceiveuser.signed_upAudit writer
subscribePlatformAuditTenantsreceivetenant.approvedAudit writer
subscribePlatformProvisionerreceivetenant.provisioningProvisioner

The contract is in the spec; the wiring is in the Go and Python implementations.

Spec conceptCode location
run.* event payloadsduragraph/internal/domain/run/events.go
Outbox → NATS bridgeduragraph/internal/infrastructure/messaging/outbox_relay.go
Worker command publishingduragraph/internal/infrastructure/messaging/nats/task_queue.go
execution.* emission (worker)duragraph-python/src/duragraph/worker/worker.py
Stream-to-SSE bridgeduragraph/internal/infrastructure/streaming/bridge.go