Tape — the event bus rebuild⌗
Companion to tape.md. This document is the spec for the rebuild
of Tape's reactive surface from a 250 ms polling loop into a proper event bus:
subject-routed, server-pushed, globally-ordered, back-pressured, and with a
clean split between runs (durable, multi-step agent work) and tasks
(cheap, ephemeral handler work). The motivation is the critique that opens this
PR; this doc is the answer.
1. The diagnosis⌗
The Tape J/K shape we shipped earlier conflated three things that a senior team would keep separate:
- The journal is the source of truth — append-only, per-run-ordered. That stays.
- The bus is the transport over the journal — server-pushed change feed, subject routing, predicates, back-pressure. This is what we're rebuilding.
- The dispatcher is where reactions actually run — in-process for dev, Cloud Pub/Sub for cloud-native, NATS/Kafka for self-hosted. Tape ships the contract and a reference in-proc dispatcher; the broker does the brokering.
The mistakes we're fixing, in order of consequence:
| Mistake | Fix |
|---|---|
SubscribeEvents is a tokio::sleep(250 ms) poll loop |
Postgres LISTEN/NOTIFY for wake-ups; SQLite/Bigtable retain a short poll as the fallback; the streaming RPC stays the same shape on the wire. |
Cursor is (ts_ms, run_id, seq) — fragile under clock skew + concurrent writers |
global_seq BIGINT allocated by a Postgres sequence (BIGSERIAL) / SQLite INTEGER PRIMARY KEY AUTOINCREMENT / Bigtable change-stream commit timestamp. One integer per cursor. |
| "Reactions are runs" — 10 k/s state changes ⇒ 10 k runs/s | Two handler kinds: agent ⇒ creates a tape_run (heavy, durable, multi-step); task and publish ⇒ creates a tape_task (cheap, ephemeral, dispatcher-retried). |
| No subjects: every consumer scans the WAL | Every journal entry gets a subject (path-style: /tape/effect/confirmed/execute_sweep/<run_id>). Reactions subscribe to subject patterns (/tape/effect/confirmed/**). |
| No back-pressure | Per-reaction max_concurrency, rate_limit_per_s, debounce_ms, retry_policy, dlq_after_n — all stored on the reaction row, enforced by the dispatcher. |
| Filter predicates were unspecified | CEL (Google's Common Expression Language) evaluated server-side, against the envelope {run_id, subject, kind, payload}. |
| No OTel propagation | trace_id, span_id, parent_span_id on every journal entry; reaction-triggered work is a child span of the triggering entry. |
| No payload schema versioning | schema_version SMALLINT on every journal row, defaulting to 1, set by the producer. |
| Single-threaded per-reaction consumption | num_shards on each reaction; cursor is per-(reaction_id, shard); tasks hashed by hash(run_id) % num_shards. |
2. The surfaces⌗
2.1 The journal — augmented, not replaced⌗
tape_journal:
global_seq BIGINT PRIMARY KEY -- Postgres BIGSERIAL / SQLite INTEGER PK AUTOINCREMENT
run_id TEXT
seq BIGINT -- the per-run sequence (kept for back-compat / replay)
kind TEXT -- decision | effect | obligation | gate | value | run | event
subject TEXT -- /tape/<kind>/<verb>/<dim1>/<dim2>...
payload_json TEXT
schema_version SMALLINT NOT NULL DEFAULT 1
trace_id TEXT NOT NULL DEFAULT ''
span_id TEXT NOT NULL DEFAULT ''
parent_span_id TEXT NOT NULL DEFAULT ''
ts_ms BIGINT
UNIQUE (run_id, seq)
global_seq is the durable cursor for every cross-run consumer (outbox relay,
reactors, fan-out). It is strictly monotonic within a database. Across
multiple Tape replicas pointed at the same Postgres / AlloyDB, it is
allocated by the sequence — collisions impossible. On Bigtable, change-stream
commit timestamps play the same role (see §6).
The (run_id, seq) UNIQUE constraint is preserved so per-run replay
(SubscribeRun, recovery) still works.
2.2 Subjects⌗
Path-style, /-delimited, lower-case. The grammar:
Wildcards in subscription patterns:
* * — matches exactly one path segment
* ** — matches zero or more trailing segments (only at the end of a pattern)
Canonical subjects emitted by Tape:
| Journal event | Subject |
|---|---|
| Run started / lifecycle change | /tape/run/<status_lower>/<app>/<user>/<session>/<run_id> |
| Decision recorded | /tape/decision/recorded/<run_id>/<decision_index> |
| Effect pending | /tape/effect/pending/<tool>/<run_id> |
| Effect confirmed | /tape/effect/confirmed/<tool>/<run_id> |
| Effect failed | /tape/effect/failed/<tool>/<run_id> |
| Effect unknown | /tape/effect/unknown/<tool>/<run_id> |
| Effect reconciled | /tape/effect/reconciled/<tool>/<run_id> |
| Obligation registered | /tape/obligation/registered/<kind>/<run_id> |
| Obligation resolved | /tape/obligation/<status>/<kind>/<run_id> |
| Gate waiting | /tape/gate/waiting/<gate>/<run_id> |
| Gate released | /tape/gate/released/<gate>/<run_id> |
| Value changed | /tape/value/changed/<namespace>/<key> |
| Value deleted | /tape/value/deleted/<namespace>/<key> |
| Session event appended | /tape/event/appended/<app>/<user>/<session> |
Path segments are URL-segment-encoded (slashes, spaces, *, ** get
percent-encoded so user-chosen keys can't break the grammar).
Examples of subscription patterns:
/tape/value/changed/treasury/** any value in the "treasury" namespace
/tape/effect/confirmed/execute_sweep/* every confirmed sweep effect, any run
/tape/effect/** every effect event of any kind
/tape/decision/recorded/*/0 every first-decision (decision_index=0) of any run
2.3 Reactions⌗
A reaction is a server-side registration: "for journal entries matching this subject pattern (and optionally this CEL predicate), produce work." The work is one of three kinds:
agent— re-invoke an agent (creates atape_run, journaled, durable);task— enqueue an ephemeral task intotape_tasksfor the dispatcher to claim and run a local Python function on (no run row, no lease, no recovery rescan);publish— enqueue an ephemeral task whose dispatcher publishes it to an external broker (Pub/Sub topic, NATS subject, …).
tape_reactions:
reaction_id TEXT PRIMARY KEY -- stable; SDK-provided or server-generated
name TEXT -- human label
subject_pattern TEXT NOT NULL -- /tape/.../<seg>/*/<...>/**
predicate_cel TEXT NOT NULL DEFAULT '' -- empty = always-true
handler_kind SMALLINT NOT NULL -- 1=agent, 2=task, 3=publish
agent_app TEXT NOT NULL DEFAULT '' -- kind=agent: which app to re-invoke
publish_target TEXT NOT NULL DEFAULT '' -- kind=publish: e.g. pubsub://proj/topic
max_concurrency INTEGER NOT NULL DEFAULT 1
rate_limit_per_s INTEGER NOT NULL DEFAULT 0 -- 0 = unbounded
debounce_ms INTEGER NOT NULL DEFAULT 0 -- coalesce repeated keys
retry_max INTEGER NOT NULL DEFAULT 5
retry_backoff_ms INTEGER NOT NULL DEFAULT 1000 -- starting backoff; doubles
dlq_after_n INTEGER NOT NULL DEFAULT 5 -- after this many attempts, move to DLQ
num_shards INTEGER NOT NULL DEFAULT 1
created_at_ms BIGINT NOT NULL
deleted INTEGER NOT NULL DEFAULT 0
tape_reaction_cursors:
reaction_id TEXT
shard INTEGER
last_global_seq BIGINT NOT NULL DEFAULT 0
last_processed_at_ms BIGINT NOT NULL DEFAULT 0
PRIMARY KEY (reaction_id, shard)
The matcher (a server-side tokio task) tails tape_journal ordered by
global_seq, evaluates each entry against every active reaction, and:
kind=agent→ idempotently callsBeginRunfor the target app, threading the source(run_id, global_seq)asinvocation_idso a duplicate match doesn't re-launch.kind=task/publish→ inserts a row intotape_taskskeyed by(reaction_id, shard, source_global_seq)so duplicate matches collapse.
2.4 Tasks⌗
tape_tasks:
task_id TEXT PRIMARY KEY
reaction_id TEXT NOT NULL
shard INTEGER NOT NULL
source_run_id TEXT NOT NULL
source_global_seq BIGINT NOT NULL
subject TEXT NOT NULL
payload_json TEXT NOT NULL DEFAULT '' -- the triggering journal entry payload
status SMALLINT NOT NULL -- 1=pending 2=claimed 3=done 4=failed 5=dlq
attempts INTEGER NOT NULL DEFAULT 0
next_attempt_at_ms BIGINT NOT NULL DEFAULT 0
lease_owner TEXT NOT NULL DEFAULT ''
lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
last_error TEXT NOT NULL DEFAULT ''
created_at_ms BIGINT NOT NULL
completed_at_ms BIGINT NOT NULL DEFAULT 0
trace_id TEXT NOT NULL DEFAULT ''
span_id TEXT NOT NULL DEFAULT ''
UNIQUE (reaction_id, shard, source_global_seq)
A task is the unit of work for the dispatcher:
loop:
tasks = ClaimTasks(reaction_id=..., shard=..., owner=..., lease_ms=..., max=N)
for t in tasks:
fire_handler(t); CompleteTask(...) or NackTask(...)
NackTask with permanent=true after attempts >= dlq_after_n flips status
to DLQ and stops re-leasing.
2.5 CEL predicates⌗
CEL expressions are evaluated server-side against an Envelope map:
envelope = {
"global_seq": int,
"run_id": str,
"seq": int,
"kind": str,
"subject": str,
"ts_ms": int,
"schema_version": int,
"payload": parsed payload_json,
"trace_id": str,
}
Example:
RegisterReaction(
subject_pattern = "/tape/value/changed/treasury/**",
predicate_cel = "payload.value.value_json != prev_value_json && double(payload.value.value_json) > 100.0",
handler_kind = "agent",
agent_app = "treasury-pricing",
max_concurrency = 32,
debounce_ms = 500,
)
CEL keeps the predicate language sandboxed, serializable (it is just a
string), and fast (millions of evals/sec). We use cel-interpreter on the
server and celpy in the Python SDK so identical expressions work locally.
If predicate_cel is the empty string, the predicate is treated as true
(matches everything that hit the subject pattern). If a predicate throws an
evaluation error, the entry is skipped (logged as a reaction error metric);
errors don't pile up in the queue.
2.6 OTel propagation⌗
trace_id / span_id / parent_span_id are W3C trace-context bytes encoded
as hex strings on every journal entry. The SDK reads them off the current
OTel context when calling the journaling RPC; if there's no current span, they
are empty. The matcher copies (trace_id, span_id) from the source entry
into the task's (trace_id, parent_span_id), so a reaction-triggered run /
handler is a child span of the journal entry that triggered it.
2.7 Schema versioning⌗
schema_version SMALLINT NOT NULL DEFAULT 1. Producers set it when they
write a payload; consumers branch on it when they decode. Bumping the version
of a payload type does not require a schema migration — the column exists
day 1, and code that decodes the new version can fall back to the old. The
journal is forever, the wire shape is not.
3. RPC contract (proto, additions only)⌗
service Tape {
// ... all existing RPCs preserved ...
// ── reactions ──
rpc RegisterReaction (Reaction) returns (Reaction);
rpc DeregisterReaction (DeregisterReactionRequest) returns (DeregisterReactionResponse);
rpc ListReactions (ListReactionsRequest) returns (ListReactionsResponse);
// ── tasks ──
rpc ClaimTasks (ClaimTasksRequest) returns (ClaimTasksResponse);
rpc CompleteTask (CompleteTaskRequest) returns (CompleteTaskResponse);
rpc NackTask (NackTaskRequest) returns (NackTaskResponse);
rpc ListTasks (ListTasksRequest) returns (ListTasksResponse); // observability / DLQ inspection
// ── subject-filtered, global-seq-cursored WAL stream ──
rpc SubscribeBySubject (SubscribeBySubjectRequest) returns (stream EventEntry);
// SubscribeEvents is preserved for back-compat; it now also accepts
// `from_global_seq` and `subject_pattern` fields (the old `from_ts_ms`
// still works).
}
message Reaction {
string reaction_id = 1;
string name = 2;
string subject_pattern = 3;
string predicate_cel = 4;
HandlerKind handler_kind = 5;
string agent_app = 6;
string publish_target = 7;
int32 max_concurrency = 8;
int32 rate_limit_per_s = 9;
int32 debounce_ms = 10;
int32 retry_max = 11;
int32 retry_backoff_ms = 12;
int32 dlq_after_n = 13;
int32 num_shards = 14;
int64 created_at_ms = 15;
bool deleted = 16;
}
enum HandlerKind {
HANDLER_KIND_UNSPECIFIED = 0;
HANDLER_KIND_AGENT = 1;
HANDLER_KIND_TASK = 2;
HANDLER_KIND_PUBLISH = 3;
}
message Task {
string task_id = 1;
string reaction_id = 2;
int32 shard = 3;
string source_run_id = 4;
int64 source_global_seq = 5;
string subject = 6;
string payload_json = 7;
TaskStatus status = 8;
int32 attempts = 9;
int64 next_attempt_at_ms = 10;
string lease_owner = 11;
int64 lease_expires_at_ms = 12;
string last_error = 13;
int64 created_at_ms = 14;
string trace_id = 15;
string parent_span_id = 16;
}
enum TaskStatus {
TASK_STATUS_UNSPECIFIED = 0;
TASK_STATUS_PENDING = 1;
TASK_STATUS_CLAIMED = 2;
TASK_STATUS_DONE = 3;
TASK_STATUS_FAILED = 4;
TASK_STATUS_DLQ = 5;
}
message ClaimTasksRequest {
string reaction_id = 1;
int32 shard = 2;
string owner = 3;
int64 lease_ms = 4;
int32 max = 5;
}
message ClaimTasksResponse { repeated Task tasks = 1; }
message CompleteTaskRequest { string task_id = 1; string owner = 2; }
message CompleteTaskResponse { Task task = 1; }
message NackTaskRequest { string task_id = 1; string owner = 2; string error = 3; bool permanent = 4; }
message NackTaskResponse { Task task = 1; }
message ListTasksRequest { string reaction_id = 1; TaskStatus status = 2; int32 limit = 3; }
message ListTasksResponse { repeated Task tasks = 1; }
message SubscribeBySubjectRequest {
string subject_pattern = 1; // /tape/effect/confirmed/**
string predicate_cel = 2; // optional, may be empty
int64 from_global_seq = 3; // 0 => from earliest available
}
// EventEntry / JournalEntry gain global_seq, subject, schema_version, OTel cols.
message EventEntry {
string run_id = 1;
int64 seq = 2;
string kind = 3;
string payload_json = 4;
int64 ts_ms = 5;
int64 global_seq = 6;
string subject = 7;
int32 schema_version = 8;
string trace_id = 9;
string span_id = 10;
string parent_span_id = 11;
}
message DeregisterReactionRequest { string reaction_id = 1; }
message DeregisterReactionResponse { bool deregistered = 1; }
message ListReactionsRequest { string subject_pattern = 1; }
message ListReactionsResponse { repeated Reaction reactions = 1; }
The existing SubscribeEventsRequest gains two fields (subject_pattern,
from_global_seq) — old callers that pass only from_ts_ms keep working.
4. Python SDK shape⌗
import tape
# ── declare reactions at startup ──
@tape.on_value_change("treasury", key="fx_rate",
predicate="double(payload.value.value_json) > 1.10",
max_concurrency=8, debounce_ms=500)
def reprice_book(envelope):
# ephemeral task; this Python fn runs in the dispatcher process.
...
@tape.on_effect_confirmed("execute_sweep",
agent="treasury-followup", # ⇒ HandlerKind.AGENT
max_concurrency=4)
def _(): # body is unused for agent handlers; the decorator registers the reaction
...
@tape.on("/tape/effect/failed/**",
publish="pubsub://my-proj/incident-stream") # ⇒ HandlerKind.PUBLISH
def _():
...
# ── run the dispatcher (in-proc reference impl) ──
tape.reactions.run_dispatcher(url="tape://localhost:7878")
# ── or, run the Pub/Sub bridge: pull tasks → publish to Pub/Sub ──
tape.reactions.run_pubsub_bridge(url="tape://localhost:7878",
project="my-proj", topic="tape-tasks")
The decorator builds a Reaction proto and calls RegisterReaction on
import. run_dispatcher loops ClaimTasks/CompleteTask, enforcing
max_concurrency with a local semaphore, rate_limit_per_s with a token
bucket, debounce_ms by coalescing (reaction_id, subject) within the
window, and the retry/DLQ policy by setting NackTask(permanent=...) once
attempts >= dlq_after_n.
OTel: the dispatcher opens a span as a child of (trace_id, parent_span_id)
on every task; the handler runs inside that span.
4.1 Outbox relay — upgraded⌗
run_outbox_relay now reads journal entries via SubscribeBySubject
(with subject_pattern /tape/** by default) using from_global_seq instead
of from_ts_ms. The cursor file becomes:
A one-line migration helper transparently converts the old {from_ts_ms,
last_run_id, last_seq} cursor.
5. The dispatcher contract (so other brokers can implement it)⌗
A dispatcher is anything that:
- calls
ClaimTasks(reaction_id, shard, owner, lease_ms, max), - runs the handler (or publishes to its broker),
- calls
CompleteTaskon success orNackTask(permanent=…)on failure, - extends the lease via re-claim (idempotent on
task_id) for long-running handlers.
That's it. The reference dispatcher in tape.reactions is in-process Python.
The Cloud Pub/Sub bridge (run_pubsub_bridge) is a dispatcher that does
step 2 = "publish to Pub/Sub topic". A NATS / Kafka bridge would be a few
dozen lines following the same pattern.
6. Backend specifics⌗
6.1 Postgres / AlloyDB⌗
tape_journal.global_seq BIGSERIAL. Apg_notify('tape_journal', subject)trigger fires on every insert. The server-sideSubscribeBySubject/SubscribeEventshandlerLISTENs ontape_journalper channel, wakes onNOTIFY, and drainsWHERE global_seq > $cursor AND subject LIKE $pattern_to_sql_like(the matcher translates/tape/effect/confirmed/**→/tape/effect/confirmed/%).tape_tasksclaim usesFOR UPDATE SKIP LOCKEDfor safe concurrent dispatchers.
6.2 SQLite⌗
tape_journal.global_seq INTEGER PRIMARY KEY AUTOINCREMENT.- No
LISTEN/NOTIFY. The stream handler polls every 200 ms; an in-processtokio::sync::Notify(bus_tick) is pulsed on every journal write so same-process subscribers wake immediately. SQLite is the dev/single-node story; the polling fallback is fine there.
6.3 Bigtable⌗
global_seqis allocated viaReadModifyWriteon ameta#global_seqcounter row; ordering is preserved by writing it into the journal row key (j#<global_seq:020>) as well as keeping the per-runj#<run_id>#<seq>key for back-compat.- The matcher /
SubscribeBySubjectis stubbed on Bigtable for v1 — the documented path is a Bigtable change-stream sidecar (see §12 oftape.md). The SQL backends are the production path until that lands.
7. What's in this PR⌗
- schema migrations
0002_event_bus.{sqlite,postgres}.sql; tape_journalextensions (global_seq, subject, schema_version, OTel cols);tape_reactions,tape_reaction_cursors,tape_taskstables;- the new proto messages and RPCs;
- Rust server: subject derivation in every journal-write path, CEL evaluator
(
cel-interpreter), in-server matcher,LISTEN/NOTIFYon Postgres,Notify-based same-process wake-ups on SQLite, the new RPC handlers; - Python SDK:
@tape.on*decorators,run_dispatcher,run_pubsub_bridge, upgradedrun_outbox_relay, OTel propagation; - end-to-end tests covering subject derivation, matching, predicate evaluation, dispatcher backpressure, retry/DLQ, and the existing kill-and- resume scenario still passing.
What's deferred (called out so reviewers don't ask):
- Bigtable change-stream wiring (the SQL backends are the production path until then; the proto is forward-compatible).
- Decorator parity in the TS / Go / Java SDKs (they re-generate from the updated proto and can call the new RPCs raw; ergonomic decorators are a follow-up).
- NATS and Kafka bridges (the dispatcher contract is fixed; implementations are a few dozen lines each).
- Server-side debounce queue (debounce is enforced in the dispatcher for v1 — the simpler implementation; moving it server-side is a follow-up only worthwhile if cross-dispatcher coalescing matters at scale).