Channel System
Channels are the transport layer between components. They’re multi-producer, multi-consumer queues with cursor-based replay and garbage collection.
Core Types
Channel[T]
The shared buffer. Internally:
_items: list[T]— frame buffer_offset: int— logical index of first item (advances as GC drops old items)_cursors: dict[sub_id, position]— per-subscriber read position (logical index)_condition: threading.Condition— coordinates blocking reads
Key methods:
| Method | Behavior |
|---|---|
_send(item) | Appends to buffer, notifies all waiting subscribers |
_register(sub_id, latest) | Creates cursor. latest=True: at head (new items only). latest=False: at tail (replay from oldest) |
_wait_and_get(sub_id, stop_event) | Blocks until item available (0.1s poll interval). Returns None on stop |
_try_get(sub_id) | Non-blocking. Returns item or None |
_fast_forward(sub_id) | Skips cursor to latest item (for newest=True mode) |
_unregister(sub_id) | Removes cursor. Idempotent |
_gc() | Drops items all subscribers have passed. Empty if no cursors |
Sender[T]
Write handle that broadcasts to one or more channels:
sender = Sender(channel_a, channel_b)
sender.send(frame) # delivered to both
sender.connect(channel_c) # dynamic fan-outTracks metrics: _msg_count, _byte_count (via sys.getsizeof), _last_send_time, buffer_depth.
Receiver[T]
Read handle. Call it to get an iterator:
# Blocking, sequential (default)
for frame in inputs.audio(self):
if frame is None: break
# Non-blocking, latest only
for frame in inputs.video(self, newest=True, no_block=True):
if frame is None: break__call__ signature:
def __call__(
self,
subscriber: ThreadedComponent,
newest: bool = False, # skip to latest, drop intermediate
no_block: bool = False, # return None instead of blocking
latest: bool = True, # start at head (True) or tail (False)
) -> Iterator[T | None]Returns a ReceiverIterator. The subscriber provides the stop_event and sub_id (via id(subscriber)).
lag property — number of unread items buffered for this receiver.
ReceiverIterator
Created by Receiver.__call__(). Handles cursor lifecycle.
Eager registration: The cursor registers in __init__, not on first __next__. This means multiple iterators created in sequence all start at the same channel position — no off-by-one lag.
__next__ sequence:
- If done, raise
StopIteration - If
stop_eventset, cleanup and returnNone - If
newest, call_fast_forward()(skip to latest) - If
no_block, use_try_get()— else_wait_and_get() - Update receiver metrics
- Return item or
None
Cleanup: _finish() unregisters the cursor. Called by close(), __del__, or when stop is detected. Idempotent via _done flag.
Garbage Collection
GC runs after every cursor operation. It computes the minimum cursor position across all subscribers and drops items behind it:
drop = min(cursors.values()) - offset
if drop > 0:
del items[:drop]
offset += dropIf there are no cursors, items are preserved so EmitOnStart values aren’t lost before threads register.
The latest Parameter
Controls where a new cursor starts:
latest=True(default) — Cursor at head. Only sees items sent after registration. For real-time streaming where old data is stale.latest=False— Cursor at tail. Reads from the oldest buffered item. ForEmitOnStartvalues sent before threads started.
Read Modes
| Mode | newest | no_block | Behavior |
|---|---|---|---|
| Sequential blocking | False | False | Every frame in order, blocks when empty |
| Sequential polling | False | True | Every frame in order, None when empty |
| Latest blocking | True | False | Skips to newest, blocks when empty |
| Latest polling | True | True | Skips to newest, None when empty |
UI Channels
Bidirectional communication between components and the frontend, bridged via WebSocket (/ui/ws).
Component sends to frontend:
class MyOutputs(NamedTuple):
display: UITextSender # text overlay on node
video: UIVideoSender # JPEG video stream on nodeComponent receives from frontend:
class MyInputs(NamedTuple):
user_text: UITextReceiver # text input box
keystrokes: UIKeystrokeReceiver # individual key eventsInheritance:
Sender[T]
└── UISender[T] (component → frontend)
├── UITextSender UISender[TextFrame]
└── UIVideoSender UISender[bytes]
Receiver[T]
└── UIReceiver[T] (frontend → component)
├── UITextReceiver UIReceiver[TextFrame]
└── UIKeystrokeReceiver UIReceiver[TextFrame]UI channels are created fresh on each GraphManager.run(). The WebSocket watcher monitors _ui_version and spawns async reader tasks for new channels.
drain() Utility
Multiplexes multiple non-blocking inputs, yielding frames in timestamp order:
from src.core.utils import drain
for speech, feedback, vision in drain(speech_it, feedback_it, vision_it):
if speech is not None: handle_speech(speech)
if feedback is not None: handle_feedback(feedback)
if vision is not None: handle_vision(vision)How it works:
- Collects all available frames from each iterator (stops at first
Noneper iterator) - Sorts collected frames by
pts(nanosecond timestamp) - Yields one tuple per frame — exactly one non-None value at the source iterator’s index
Accepts None iterators (unconnected inputs) — skips them.
Typed overloads for 1–6 iterators preserve per-position types.
Important: Iterators must be created upfront (before the loop) so cursors persist across drain calls. Creating new iterators each time would register fresh cursors at the head, missing frames sent between calls.