Skip to Content
Developer GuideChannel System

Channel System

Channels are the transport layer between components. They’re multi-producer, multi-consumer queues with cursor-based replay and garbage collection.

Core Types

Channel[T]

The shared buffer. Internally:

  • _items: list[T] — frame buffer
  • _offset: int — logical index of first item (advances as GC drops old items)
  • _cursors: dict[sub_id, position] — per-subscriber read position (logical index)
  • _condition: threading.Condition — coordinates blocking reads

Key methods:

MethodBehavior
_send(item)Appends to buffer, notifies all waiting subscribers
_register(sub_id, latest)Creates cursor. latest=True: at head (new items only). latest=False: at tail (replay from oldest)
_wait_and_get(sub_id, stop_event)Blocks until item available (0.1s poll interval). Returns None on stop
_try_get(sub_id)Non-blocking. Returns item or None
_fast_forward(sub_id)Skips cursor to latest item (for newest=True mode)
_unregister(sub_id)Removes cursor. Idempotent
_gc()Drops items all subscribers have passed. Empty if no cursors

Sender[T]

Write handle that broadcasts to one or more channels:

sender = Sender(channel_a, channel_b) sender.send(frame) # delivered to both sender.connect(channel_c) # dynamic fan-out

Tracks metrics: _msg_count, _byte_count (via sys.getsizeof), _last_send_time, buffer_depth.

Receiver[T]

Read handle. Call it to get an iterator:

# Blocking, sequential (default) for frame in inputs.audio(self): if frame is None: break # Non-blocking, latest only for frame in inputs.video(self, newest=True, no_block=True): if frame is None: break

__call__ signature:

def __call__( self, subscriber: ThreadedComponent, newest: bool = False, # skip to latest, drop intermediate no_block: bool = False, # return None instead of blocking latest: bool = True, # start at head (True) or tail (False) ) -> Iterator[T | None]

Returns a ReceiverIterator. The subscriber provides the stop_event and sub_id (via id(subscriber)).

lag property — number of unread items buffered for this receiver.

ReceiverIterator

Created by Receiver.__call__(). Handles cursor lifecycle.

Eager registration: The cursor registers in __init__, not on first __next__. This means multiple iterators created in sequence all start at the same channel position — no off-by-one lag.

__next__ sequence:

  1. If done, raise StopIteration
  2. If stop_event set, cleanup and return None
  3. If newest, call _fast_forward() (skip to latest)
  4. If no_block, use _try_get() — else _wait_and_get()
  5. Update receiver metrics
  6. Return item or None

Cleanup: _finish() unregisters the cursor. Called by close(), __del__, or when stop is detected. Idempotent via _done flag.

Garbage Collection

GC runs after every cursor operation. It computes the minimum cursor position across all subscribers and drops items behind it:

drop = min(cursors.values()) - offset if drop > 0: del items[:drop] offset += drop

If there are no cursors, items are preserved so EmitOnStart values aren’t lost before threads register.

The latest Parameter

Controls where a new cursor starts:

  • latest=True (default) — Cursor at head. Only sees items sent after registration. For real-time streaming where old data is stale.
  • latest=False — Cursor at tail. Reads from the oldest buffered item. For EmitOnStart values sent before threads started.

Read Modes

Modenewestno_blockBehavior
Sequential blockingFalseFalseEvery frame in order, blocks when empty
Sequential pollingFalseTrueEvery frame in order, None when empty
Latest blockingTrueFalseSkips to newest, blocks when empty
Latest pollingTrueTrueSkips to newest, None when empty

UI Channels

Bidirectional communication between components and the frontend, bridged via WebSocket (/ui/ws).

Component sends to frontend:

class MyOutputs(NamedTuple): display: UITextSender # text overlay on node video: UIVideoSender # JPEG video stream on node

Component receives from frontend:

class MyInputs(NamedTuple): user_text: UITextReceiver # text input box keystrokes: UIKeystrokeReceiver # individual key events

Inheritance:

Sender[T] └── UISender[T] (component → frontend) ├── UITextSender UISender[TextFrame] └── UIVideoSender UISender[bytes] Receiver[T] └── UIReceiver[T] (frontend → component) ├── UITextReceiver UIReceiver[TextFrame] └── UIKeystrokeReceiver UIReceiver[TextFrame]

UI channels are created fresh on each GraphManager.run(). The WebSocket watcher monitors _ui_version and spawns async reader tasks for new channels.

drain() Utility

Multiplexes multiple non-blocking inputs, yielding frames in timestamp order:

from src.core.utils import drain for speech, feedback, vision in drain(speech_it, feedback_it, vision_it): if speech is not None: handle_speech(speech) if feedback is not None: handle_feedback(feedback) if vision is not None: handle_vision(vision)

How it works:

  1. Collects all available frames from each iterator (stops at first None per iterator)
  2. Sorts collected frames by pts (nanosecond timestamp)
  3. Yields one tuple per frame — exactly one non-None value at the source iterator’s index

Accepts None iterators (unconnected inputs) — skips them.

Typed overloads for 1–6 iterators preserve per-position types.

Important: Iterators must be created upfront (before the loop) so cursors persist across drain calls. Creating new iterators each time would register fresh cursors at the head, missing frames sent between calls.