Component System
Components are the building blocks of a pipeline. Each component has typed inputs and outputs, runs in its own thread, and communicates through channels.
Hierarchy
Component[I, O] Abstract base
├── PrimitiveComponent[I, O] Single node (not a subgraph)
│ └── ThreadedComponent[I, O] Has a daemon thread with run()
└── CompositeComponent Wraps a subgraphEmitOnStart[E] is a mixin that any component can implement to send values before threads start.
Lifecycle
STARTUP → SETUP → RUNNING → STOPPEDSTARTUP— Constructed, not yet startedSETUP— Thread started,setup()running (heavy init like model loading)RUNNING—run()executing, processing framesSTOPPED— Thread exited orstop()called
ThreadedComponent
The main building block. Define inputs/outputs as NamedTuples of Receivers/Senders, implement run():
class MyInputs(NamedTuple):
audio: Receiver[AudioFrame]
text: Receiver[TextFrame] | None = None # optional input
class MyOutputs(NamedTuple):
result: Sender[TextFrame]
class MyComponent(ThreadedComponent[MyInputs, MyOutputs]):
description = "Does something useful"
tags = Tag(io={"conduit"}, functionality={"audio"})
def __init__(self, config: MyConfig) -> None:
super().__init__()
self.config = config
def run(self, inputs: MyInputs, outputs: MyOutputs) -> None:
for frame in inputs.audio:
if frame is None:
break
result = process(frame)
outputs.result.send(TextFrame.new(text=result))Key points:
inputs.audiois aReceiver— iterate it directly withfor frame in receiverNonefrom the iterator means stop (component shutting down)outputs.result.send(...)pushes to all connected downstream channels- Optional inputs default to
Nonewhen no edge is connected self.stop_eventis athreading.Event— check it for graceful shutdown
EmitOnStart
For components that need to send values synchronously before any threads start:
class MyToolOutputs(NamedTuple):
tool_def: Sender[ToolDef]
tool_result: Sender[ToolResult]
class MyTool(ThreadedComponent[MyInputs, MyToolOutputs], EmitOnStart[MyToolOutputs]):
def emit(self, outputs: MyToolOutputs) -> None:
# Runs synchronously in GraphManager.run(), before threads start
outputs.tool_def.send(ToolDef.new(name="my_tool", ...))
def run(self, inputs: MyInputs, outputs: MyToolOutputs) -> None:
# Thread starts after all emits complete
for call in inputs.tool_call:
...GraphManager calls emit() on all EmitOnStart components after wiring receivers but before starting threads. This guarantees downstream components see the emitted values.
CompositeComponent
Groups multiple nodes into a reusable subgraph. Created via the frontend’s “Group” action. Boundary ports (unconnected inputs/outputs of inner nodes) become the composite’s external interface.
Not directly instantiated — the _registerable = False flag excludes it from the component registry.
Tags
Components declare tags for categorization:
tags = Tag(io={"source"}, functionality={"audio"})io:"source"(produces data),"conduit"(transforms),"sink"(consumes)functionality:"audio","video","llm","movement","image","misc"
The frontend uses tags for sidebar grouping and badge colors.
Registration
All concrete subclasses of PrimitiveComponent are automatically discovered via PrimitiveComponent.registered_subclasses(). This walks the class hierarchy, skipping abstract classes and those with _registerable = False.
The component registry is exposed to the frontend via GET /component, which returns each component’s name, description, tags, and JSON schemas for init parameters and I/O types.
Reflection
Components provide type information at runtime:
get_init_types()— Constructor parameters (for config forms)get_input_types()/get_output_types()— I/O slot names and typesget_ui_input_types()/get_ui_output_types()— UI channel slotsget_options(values)— Dynamic dropdown options for config fields
These are used by the frontend to render config forms, validate edges, and display type annotations.
setup() vs run()
ThreadedComponent has two phases in its daemon thread:
setup()— Optional. Runs beforerun()duringStatus.SETUP. For heavy init: loading models, allocating GPU memory. Default is no-op.run(inputs, outputs)— Abstract. The main processing loop duringStatus.RUNNING. Iterate receivers, send results.
If setup() raises, run() never starts. This separates one-time initialization from per-frame processing.
from_args()
The from_args(cls, init_args) class method deserializes config from a dict:
- BaseModel params: dict →
BaseModel(**raw)ormodel_validate() - Path params: string →
Path(raw)(auto-detected from type hints, including unions likePath | None) - Missing required fields: raises
ValueErrorlisting what’s missing - Extra keys: silently ignored
get_options()
Components can override get_options(values) for dynamic dropdowns:
@classmethod
def get_options(cls, values: dict[str, Any]) -> dict[str, Any]:
return {
"config": {
"voice_id": [
{"value": "Ashley", "label": "Ashley"},
{"value": "David", "label": "David"},
]
}
}The values dict contains current form selections, enabling dependent dropdowns (field B’s options depend on field A’s choice). The frontend calls POST /component/{'{name}'}/options after each form change.