Graph Engine
The graph engine manages the pipeline topology, wires channels between components, and orchestrates execution.
Data Model
class Node(BaseModel):
id_: str # UUID
type: str # component class name (e.g. "LLM", "VAD")
init_args: dict # constructor parameters
x: float, y: float # canvas position
sub_graph: Graph | None # nested graph for composites
class Edge(BaseModel):
source_node: str
source_slot: str
target_node: str
target_slot: str
class Graph(BaseModel):
nodes: dict[str, Node]
edges: list[Edge]GraphManager
Owns the graph, components, and channel wiring.
CRUD
add_node(type, init_args)— Instantiate component, add to graphdelete_node(id)— Stop component, remove connected edges, reconcileadd_edge(edge)/delete_edge(edge)— Connect/disconnect, reconcileupdate_node_init_args(id, args)— Re-instantiate component, restart if running
Channel Reconciliation (_reconcile())
When edges change, the manager recomputes the optimal channel layout:
- Group receivers by their sender sets (edges with the same source share a channel)
- Reuse existing channels where the sender set hasn’t changed
- Create new channels for new sender sets
- Build Sender handles (one per output slot, broadcasting to all connected channels)
- Build Receiver handles (one per input slot)
This minimizes channel count — if two inputs read from the same output, they share one channel with two cursors (instead of two separate channels). The _group() method implements this by building a frozenset[SenderKey] → list[ReceiverKey] mapping.
Execution (run())
1. Stop all running components
2. Clear UI channels
3. For each node:
a. Build input handles (Receiver for each connected input, None for optional unconnected)
b. Build output handles (Sender for each output, no-op Sender if unconnected)
c. Wire UI channels (create Channel + Sender/Receiver pairs)
d. Wire all Receivers (register cursors)
4. Call emit() on all EmitOnStart components
5. Start all component threads
6. Notify frontend (bump _ui_version, fire _ui_changed event)The ordering matters:
- Step 3d: Receivers register cursors before any data is sent
- Step 4: EmitOnStart sends after cursors exist, so data is captured
- Step 5: Threads start after emits, so they see initial values
UI Channel Wiring
For each UIReceiver[T] input slot:
- Creates a
Channel[T] - Component gets a
UIReceiver(channel)for reading - Server stores a
Sender(channel)in_ui_senders[(node_id, slot)]for the WebSocket bridge to push into
For each UISender[T] output slot:
- Creates a
Channel[T] - Component gets a
UISender(channel)for writing - Server stores a
Receiver(channel)in_ui_receivers[(node_id, slot)]for the WebSocket bridge to read from
After all components start, _ui_version increments and _ui_changed fires, waking the WebSocket watcher to spawn async readers for the new channels.
Stop (stop())
Sets stop_event on all components. Threads exit when they see the flag or when iteration returns None. Idempotent — second call is a no-op.
Composite Components
When the user groups nodes into a subgraph:
- Selected nodes + internal edges are extracted into a sub-
Graph - A
CompositeComponentwraps it - Boundary ports (unconnected slots of inner nodes) become the composite’s external interface
- On
start(), the composite creates its own innerGraphManagerand runs it
Ungrouping reverses the process — inner nodes are re-exposed and the composite is deleted.