Skip to content

Python SDK Architecture

The Python SDK (duragraph-python) is one package that plays three different roles depending on how you import it. This page explains the internal architecture — what each module is for, how the worker connects to the control plane, and what guarantees hold across crashes and network hiccups.

For installation and the public API surface, see the Python SDK reference. This page is about how it works internally.


flowchart LR
    A["Role 1 — REST client (DuraGraphClient)"]
    B["Role 2 — Graph DSL (@Graph decorator)"]
    C["Role 3 — Worker daemon (Worker / .serve())"]
RoleWhen you use itWhat it talks to
ClientBackend code or notebook creating assistants/threads/runsControl plane REST API
Graph DSLAuthor defines a workflow in Python with @Graph and @nodeCompiled to JSON IR for the control plane
WorkerLong-running Python process that executes dispatched runsControl plane HTTP /workers/* endpoints + optional NATS JetStream

A typical user installs duragraph once and uses all three: the @Graph decorator to define a workflow, Worker.serve() to host it, and DuraGraphClient to trigger runs.


src/duragraph/
├── __init__.py → Re-exports public API
├── client.py → DuraGraphClient + AsyncDuraGraphClient (httpx)
├── graph.py → @Graph decorator, GraphDefinition, GraphInstance
├── nodes.py → @entrypoint, @node, @llm_node, @tool_node, @router_node, @human_node
├── edges.py → @edge decorator
├── executor.py → execute_node() — calls user methods with state
├── tools.py → Tool registry (mirrors Go-side built-ins)
├── types.py → State, Message, StreamMode, RunResult, Event TypedDicts
├── worker/
│ └── worker.py → Worker class — register/poll/heartbeat/drain
├── llm/ → Optional: OpenAI, Anthropic clients
├── vectorstores/ → Optional: Qdrant, pgvector, Weaviate, Chroma, Pinecone
├── embeddings/ → Optional: OpenAI, Cohere, Anthropic, Ollama
└── document_loaders/ → Optional: text splitters, file/web loaders

The optional packages are imported lazily — installing duragraph without extras gives you the client + graph DSL + worker only. The optional dependencies are pulled in by extras (pip install duragraph[llm,vectorstores]).


client.py is a thin httpx-based wrapper over the control plane’s REST API. Both sync (DuraGraphClient) and async (AsyncDuraGraphClient) variants exist; both implement context-manager protocols.

flowchart LR
    User["Application code"]
    Client["DuraGraphClient (httpx)"]
    CP["Control plane REST API"]
    User --> Client
    Client -->|"POST /api/v1/threads/.../runs"| CP
    Client -->|"GET /api/v1/runs/{id}"| CP
    Client -->|"POST /api/v1/runs/wait"| CP

Method-to-endpoint mapping (selected):

SDK methodControl plane endpoint
create_assistant / get / list / update / deletePOST/GET/PATCH/DELETE /api/v1/assistants[...]
create_thread / get_thread_state / update_thread_state/api/v1/threads/...
create_runPOST /api/v1/threads/:tid/runs (returns 201 immediately)
wait_for_runPOST /api/v1/runs/wait (blocks until terminal)
cancel_runPOST /api/v1/threads/:tid/runs/:rid/cancel
put_store_item / get_store_item / search_store/api/v1/store/* (LangGraph long-term memory)
create_cron / delete_cron / search_crons/api/v1/runs/crons[...]

The client sets X-Api-Key if configured; the control plane’s optional auth middleware accepts that or a JWT.

Errors are raised as httpx.HTTPStatusError. The control plane sends a structured {error, message, code} body which the application can parse.


graph.py, nodes.py, and edges.py together form a small DSL. The author writes a normal Python class with decorated methods; the decorators collect metadata into a GraphDefinition object that knows how to serialize itself to the JSON IR the control plane stores.

from duragraph import Graph, entrypoint, llm_node, tool_node, edge
@Graph(id="chatbot", version="1.0.0")
class Chatbot:
@entrypoint
@llm_node(model="claude-sonnet-4-6")
def think(self, state): ...
@tool_node(tools=["http_get"])
def use_tool(self, state): ...
@edge("think", "use_tool", when=lambda s: s.get("tool_call"))
@edge("think", "__end__", when=lambda s: not s.get("tool_call"))
def routes(self): ...
flowchart LR
    Class["@Graph class"]
    Def["GraphDefinition"]
    IR["JSON IR (graph_id, nodes, edges, entrypoint)"]
    CP["Control plane"]
    Class -->|"decorators collect"| Def
    Def -->|"to_ir()"| IR
    IR -->|"sent at worker register"| CP
class GraphDefinition:
graph_id: str
nodes: dict[str, NodeMetadata] # name → {type, config}
edges: list[Edge] # source, target (str or dict for conditional)
entrypoint: str | None
def to_ir(self) -> dict:
return {
"version": "1.0",
"graph": {
"id": self.graph_id,
"entrypoint": self.entrypoint,
"nodes": [...],
"edges": [...],
},
}

The IR is uploaded once during worker registration, in the graph_definitions field of the registration payload. Once stored, the control plane uses it to route dispatch — WorkerService.FindForGraph(graphID) only routes a run to a worker that previously declared support for that graph.

GraphInstance.run() also supports local in-process execution, with no control plane involved. This is useful for unit testing node logic. The same execution path (executor.execute_node) drives both local and worker-served runs, so behavior is identical.


worker/worker.py is a 600+ line asyncio daemon. The @Graph decorator’s .serve() method wraps it for one-line use:

chatbot = Chatbot()
chatbot.serve(
control_plane_url="http://localhost:8081",
worker_name="chatbot-worker-1",
nats_url="nats://localhost:4222", # optional; instant task delivery
)

The worker is a finite state machine. State transitions are driven by external events (control plane responses, signals) and internal conditions (active run count).

flowchart LR
    S["STARTING"]
    R["READY"]
    B["BUSY"]
    D["DRAINING"]
    X["STOPPED"]

    S -->|"register succeeded"| R
    R -->|"task claimed"| B
    B -->|"all runs finished"| R
    R -->|"SIGTERM / SIGINT"| D
    B -->|"SIGTERM / SIGINT"| D
    D -->|"drained or timeout"| X
StateBehaviorHeartbeat sends
STARTINGConnecting to control plane and NATS
READYNo active runs; accepting new tasksstatus: ready
BUSYAt least one run in flightstatus: busy
DRAININGRefusing new tasks; finishing what’s runningstatus: draining
STOPPEDFinal; worker exits
sequenceDiagram
    participant W as Worker
    participant CP as Control plane
    participant NATS

    W->>CP: POST /api/v1/workers/register<br/>{capabilities, graph_definitions}
    Note over W,CP: 5 retries with exponential backoff
    CP-->>W: 200 {worker_id}
    opt nats_url provided
        W->>NATS: connect + JetStream
        loop for each graph_id
            W->>NATS: subscribe duragraph.tasks.assign.{graph_id}<br/>durable=worker-{name}-{graph_id}
        end
    end
    par
        W->>W: heartbeat loop (every 30s)
    and
        W->>W: poll loop (every 1s, or 30s if NATS active)
    end

The worker supports NATS JetStream subscription (instant) and HTTP polling (always available), and uses both at once when NATS is configured.

flowchart LR
    CP["Control plane DispatchRun"]
    PG[("task_assignments<br/>(source of truth)")]
    JS["NATS JetStream<br/>(notification only)"]
    W["Worker"]
    Poll["POST /workers/{id}/poll"]

    CP --> PG
    CP --> JS
    JS -->|"NATS msg"| W
    W -->|"claim via HTTP"| Poll
    Poll --> PG
    PG -->|"FOR UPDATE SKIP LOCKED"| Poll

Critical detail: even when a NATS notification arrives, the worker still calls /poll to claim the row in Postgres. Postgres is the source of truth; NATS is just a wakeup. This means duplicate NATS deliveries are harmless — the second /poll finds nothing and returns. We get NATS’s millisecond latency without sacrificing the correctness guarantees of a database transaction.

When nats_url is not provided, the worker polls /workers/{id}/poll every second by default. With NATS active, polling drops to every 30 seconds as a safety net.

When a worker claims a task, it walks the graph using its registered instance methods:

sequenceDiagram
    participant W as Worker
    participant CP as Control plane
    participant U as User node method

    W->>CP: POST /workers/{id}/events<br/>{event_type: run_started}
    loop until next_node is None
        W->>CP: POST /workers/{id}/events<br/>{event_type: node_started}
        W->>U: execute_node(name, meta, method, state)
        U-->>W: result
        W->>W: state.update(result)
        W->>CP: POST /workers/{id}/events<br/>{event_type: node_completed}
        W->>W: resolve next_node from edges
    end
    W->>CP: POST /workers/{id}/events<br/>{event_type: run_completed, output: state}

The control plane translates each /events call into a Run aggregate transition:

Event from workerAggregate method called
run_startedRun.Start()
node_started(emits execution event; no aggregate state change)
node_completed(emits execution event)
run_completedRun.Complete(output)
run_failedRun.Fail(error)
run_requires_actionRun.RequiresAction(...) (suspends; awaits human)

The worker never writes to the database directly. All state changes go through the aggregate, which records domain events that the outbox ships to NATS.


The worker process holds a 2-minute lease on each task it claims. The lease is extended by heartbeats every 30 seconds. If the worker crashes or its network partitions, the lease expires and the control plane reclaims the task.

sequenceDiagram
    participant W as Worker
    participant CP as Control plane
    participant LM as Lease monitor (control plane)

    Note over CP,LM: pg_try_advisory_lock(42)<br/>elects single leader
    W->>CP: POST /workers/{id}/heartbeat
    CP->>CP: extend lease_expires_at
    Note over W: ⚠️ worker crashes
    LM->>CP: SELECT runs WHERE lease_expires_at < NOW()
    LM->>CP: increment lease_epoch, clear worker_id
    Note over LM: run is re-dispatched
    Note over W: ⚠️ worker comes back, tries to POST run_completed
    W->>CP: POST events with stale lease_epoch
    CP-->>W: 409 Conflict (stale epoch)

lease_epoch is a fencing token. Every time a lease is reclaimed, the control plane increments it. A zombie worker that comes back online and tries to finalize a run with the old epoch is rejected — the row’s actual epoch has advanced.

This is the same pattern Martin Kleppmann calls out in How to do distributed locking — Postgres acts as the lock authority.

For the control-plane side, see Architecture Overview → Lease Epoch Fencing.


On SIGTERM or SIGINT, the worker enters DRAINING:

flowchart LR
    Signal["SIGTERM / SIGINT"]
    Drain["Set status = DRAINING"]
    HB["Send draining heartbeat"]
    Nack["NACK new NATS messages"]
    Wait["Wait shutdown_timeout (60s)<br/>for active runs to finish"]
    Cancel["Cancel remaining tasks"]
    Stop["Set status = STOPPED, exit"]

    Signal --> Drain --> HB --> Nack --> Wait
    Wait -->|"all done"| Stop
    Wait -->|"timeout"| Cancel --> Stop

The control plane sees the draining heartbeat and stops dispatching new tasks to this worker — but it does not reclaim runs already in flight. Existing runs continue to completion as long as they finish within the shutdown window.

Hard kills (SIGKILL or process crash) skip the graceful path. The lease monitor takes over and reclaims after the lease expires.


End-to-end: from client.create_run to a Python worker executing it

Section titled “End-to-end: from client.create_run to a Python worker executing it”
sequenceDiagram
    participant App as Application code
    participant Client as DuraGraphClient
    participant CP as Control plane
    participant NATS
    participant Worker as Python worker

    App->>Client: client.create_run(thread_id, assistant_id, input)
    Client->>CP: POST /api/v1/threads/.../runs
    CP->>CP: Run.NewRun() → events + outbox
    CP->>NATS: duragraph.tasks.assign.{graph_id}
    CP-->>Client: 201 {run_id, status: queued}
    Client-->>App: returns run_id

    NATS-->>Worker: task notification
    Worker->>CP: POST /workers/{id}/poll
    CP-->>Worker: claimed task
    Worker->>CP: POST /workers/{id}/events run_started
    CP->>CP: Run.Start() → events + outbox
    loop graph nodes
        Worker->>CP: node_started
        Worker->>Worker: execute @node method
        Worker->>CP: node_completed
    end
    Worker->>CP: POST /workers/{id}/events run_completed
    CP->>CP: Run.Complete(output) → events + outbox

    Note over CP: Outbox relay publishes to NATS
    Note over CP: SSE handlers stream to subscribed clients

If the application also called client.wait_for_run(...), that call is blocked on POST /api/v1/runs/wait, which the control plane resolves via NATS subscription on the run’s events. When run.completed arrives, the call returns the final run with output.


A common point of confusion: when you use a worker, the LLM calls, tool execution, and your @node methods all run in the Python process — not on the control plane. The Go control plane never imports openai or anthropic for worker-served runs. The LLM clients in the Go codebase exist only for the local fallback engine, used when no worker is available.

This is a deliberate split:

ConcernRuns in
Run aggregate state, transitions, persistenceGo control plane
Lease management, dispatch, fencingGo control plane
Outbox relay, NATS publishing, SSE fan-outGo control plane
@node method bodiesPython worker
LLM API callsPython worker (via llm/ modules)
Tool callsPython worker (via tools.py and user-registered tools)
Vector store queriesPython worker (via vectorstores/)

The control plane owns durability; the worker owns user-defined behavior. The boundary is the /workers/{id}/events endpoint.


The Python SDK does not implement DDD itself — it is a client and a worker for the control plane that does. But it interacts with DDD-shaped boundaries:

  • The worker’s POST /events calls translate one-to-one to Run aggregate methods. The control plane validates state transitions before persisting.
  • The IR uploaded at registration becomes the Workflow aggregate’s graph data.
  • The worker cannot bypass invariants — if it tries to send run_completed for a run that is requires_action, the aggregate’s state machine rejects the transition and the call returns 409.

That is the value of strict aggregate boundaries: they hold even when the actor is in a different language, on a different host.