Skip to Content
Developer GuideComponent System

Component System

Components are the building blocks of a pipeline. Each component has typed inputs and outputs, runs in its own thread, and communicates through channels.

Hierarchy

Component[I, O] Abstract base ├── PrimitiveComponent[I, O] Single node (not a subgraph) │ └── ThreadedComponent[I, O] Has a daemon thread with run() └── CompositeComponent Wraps a subgraph

EmitOnStart[E] is a mixin that any component can implement to send values before threads start.

Lifecycle

STARTUP → SETUP → RUNNING → STOPPED
  • STARTUP — Constructed, not yet started
  • SETUP — Thread started, setup() running (heavy init like model loading)
  • RUNNINGrun() executing, processing frames
  • STOPPED — Thread exited or stop() called

ThreadedComponent

The main building block. Define inputs/outputs as NamedTuples of Receivers/Senders, implement run():

class MyInputs(NamedTuple): audio: Receiver[AudioFrame] text: Receiver[TextFrame] | None = None # optional input class MyOutputs(NamedTuple): result: Sender[TextFrame] class MyComponent(ThreadedComponent[MyInputs, MyOutputs]): description = "Does something useful" tags = Tag(io={"conduit"}, functionality={"audio"}) def __init__(self, config: MyConfig) -> None: super().__init__() self.config = config def run(self, inputs: MyInputs, outputs: MyOutputs) -> None: for frame in inputs.audio: if frame is None: break result = process(frame) outputs.result.send(TextFrame.new(text=result))

Key points:

  • inputs.audio is a Receiver — iterate it directly with for frame in receiver
  • None from the iterator means stop (component shutting down)
  • outputs.result.send(...) pushes to all connected downstream channels
  • Optional inputs default to None when no edge is connected
  • self.stop_event is a threading.Event — check it for graceful shutdown

EmitOnStart

For components that need to send values synchronously before any threads start:

class MyToolOutputs(NamedTuple): tool_def: Sender[ToolDef] tool_result: Sender[ToolResult] class MyTool(ThreadedComponent[MyInputs, MyToolOutputs], EmitOnStart[MyToolOutputs]): def emit(self, outputs: MyToolOutputs) -> None: # Runs synchronously in GraphManager.run(), before threads start outputs.tool_def.send(ToolDef.new(name="my_tool", ...)) def run(self, inputs: MyInputs, outputs: MyToolOutputs) -> None: # Thread starts after all emits complete for call in inputs.tool_call: ...

GraphManager calls emit() on all EmitOnStart components after wiring receivers but before starting threads. This guarantees downstream components see the emitted values.

CompositeComponent

Groups multiple nodes into a reusable subgraph. Created via the frontend’s “Group” action. Boundary ports (unconnected inputs/outputs of inner nodes) become the composite’s external interface.

Not directly instantiated — the _registerable = False flag excludes it from the component registry.

Tags

Components declare tags for categorization:

tags = Tag(io={"source"}, functionality={"audio"})
  • io: "source" (produces data), "conduit" (transforms), "sink" (consumes)
  • functionality: "audio", "video", "llm", "movement", "image", "misc"

The frontend uses tags for sidebar grouping and badge colors.

Registration

All concrete subclasses of PrimitiveComponent are automatically discovered via PrimitiveComponent.registered_subclasses(). This walks the class hierarchy, skipping abstract classes and those with _registerable = False.

The component registry is exposed to the frontend via GET /component, which returns each component’s name, description, tags, and JSON schemas for init parameters and I/O types.

Reflection

Components provide type information at runtime:

  • get_init_types() — Constructor parameters (for config forms)
  • get_input_types() / get_output_types() — I/O slot names and types
  • get_ui_input_types() / get_ui_output_types() — UI channel slots
  • get_options(values) — Dynamic dropdown options for config fields

These are used by the frontend to render config forms, validate edges, and display type annotations.

setup() vs run()

ThreadedComponent has two phases in its daemon thread:

  • setup() — Optional. Runs before run() during Status.SETUP. For heavy init: loading models, allocating GPU memory. Default is no-op.
  • run(inputs, outputs) — Abstract. The main processing loop during Status.RUNNING. Iterate receivers, send results.

If setup() raises, run() never starts. This separates one-time initialization from per-frame processing.

from_args()

The from_args(cls, init_args) class method deserializes config from a dict:

  • BaseModel params: dict → BaseModel(**raw) or model_validate()
  • Path params: string → Path(raw) (auto-detected from type hints, including unions like Path | None)
  • Missing required fields: raises ValueError listing what’s missing
  • Extra keys: silently ignored

get_options()

Components can override get_options(values) for dynamic dropdowns:

@classmethod def get_options(cls, values: dict[str, Any]) -> dict[str, Any]: return { "config": { "voice_id": [ {"value": "Ashley", "label": "Ashley"}, {"value": "David", "label": "David"}, ] } }

The values dict contains current form selections, enabling dependent dropdowns (field B’s options depend on field A’s choice). The frontend calls POST /component/{'{name}'}/options after each form change.