Reactors⌗
Five reactors ship: recovery, reconciler, outbox, timers, compensation. Each is
idempotent — the per-run lease + the fact that every server RPC is idempotent makes a
double-run harmless. Scale freely.
tape.reactors.recover_once
⌗
recover_once(*, runner: Any = None, redrive_fn: Any = None, url: str = DEFAULT_URL, limit: int = 50) -> list[dict]
Re-drive every recoverable run. Returns a list of {run_id, invocation_id}.
tape.reactors.reconcile_once
⌗
reconcile_once(url: str = DEFAULT_URL, *, reconcile_pending_after_ms: int = 0, client: Optional[TapeClient] = None) -> list[dict]
Resolve UNKNOWN effects (and PENDING ones older than
reconcile_pending_after_ms, if > 0) via the connector's observe() (the
new, semantics-aware path) or the per-tool status_check (the v1 path).
Returns a list of {key, resolved, ...} dicts.
The semantics-aware rules:
- UNKNOWN + IDEMPOTENT — observation
absentis safe to re-issue: the server moves the effect back to PENDING withnext_dispatch_at_ms=nowand the outbox loop retries. - UNKNOWN + NON_IDEMPOTENT — observation
absentis not retried. The effect is marked FAILED; a human or saga decides whether to issue a fresh, fresh-keyed attempt. (The server enforces this mapping inrecord_external_observation.) duplicatefor either — register a compensation obligation atomically with the observation, then mark the effect CONFIRMED.stuckfor either — mark the effect UNKNOWN; the operator triages.
tape.reactors.fire_due_timers_once
⌗
fire_due_timers_once(url: str = DEFAULT_URL, *, runner: Any = None, redrive_fn: Any = None, on_timer: Optional[Callable[[Any], None]] = None, client: Optional[TapeClient] = None) -> list[dict]
Claim and fire due timers. Returns a list of {run_id, timer_id, kind, action}.
redrive_fn (e.g. one that calls the Agent Engine :streamQuery API) is used
for redrive timers when there's no local runner.
tape.reactors.run_reactors
⌗
run_reactors(*, runner: Any = None, redrive_fn: Any = None, url: str = DEFAULT_URL, recover: bool = True, reconcile: bool = True, timers: bool = True, compensations: bool = True, interval_s: float = 2.0, reconcile_pending_after_s: float = 0.0, claimer: str = '', once: bool = False, on_tick: Optional[Callable[[dict], None]] = None) -> None
Pass runner= for a local ADK Runner, or redrive_fn= (e.g. one that calls
the Vertex AI Agent Engine :streamQuery API) when the agent is deployed
elsewhere — recovery still works, it just re-invokes through that callback.
compensations=True enables the obligations reactor on the same polling
cadence as the others. For event-driven draining (lower latency, less
polling pressure), call run_compensations_event_driven in its own
process / thread instead — both close the same queue.
tape.reactors.run_event_fanout
⌗
run_event_fanout(url: str = DEFAULT_URL, *, sink: Callable[[Any], None], from_ts_ms: int = 0, run_id: str = '', kind: str = '') -> None
Tail the cross-run journal and hand each EventEntry to sink — wire sink
to Pub/Sub / Kafka / a webhook to publish the WAL. Blocks; at-least-once
(the boundary entry may repeat — sink should de-dup on (run_id, seq) if it
cares). On the Bigtable backend a cross-run tail isn't available — use Bigtable
change streams instead (the stream simply yields nothing there).
tape.reactors.run_outbox_relay
⌗
run_outbox_relay(url: str, sink: Any, *, cursor_path: str = '', run_id: str = '', kind: str = '', subject_pattern: str = '', predicate_cel: str = '', interval_s: float = 1.0, once: bool = False) -> None
Loop outbox_relay_tick forever (or once). The cursor is durable in
cursor_path (a local JSON file), so a relay restart resumes from where it
stopped. Run multiple relays = multiple sinks (one cursor file each).
subject_pattern (default /tape/**) and predicate_cel (default empty)
are forwarded to SubscribeBySubject so the server does the filtering.
Outbox reactor⌗
The outbox reactor — dispatches PENDING + OUTBOX effects through their registered connectors.
The loop:
list effects to dispatch (PENDING + OUTBOX + due)
for each:
claim (atomic CAS lease)
look up the connector
dispatch through it
record result:
confirmed → complete_effect(CONFIRMED) + register compensation
(if a compensate kind is registered for the tool)
failed → record_dispatch_attempt(next_at_ms=backoff)
(eventually exhaust → terminal FAILED via
record_external_observation(FAILED))
unknown → record_dispatch_attempt(next_at_ms=0) → status UNKNOWN
(the reconciler resolves; do NOT blindly retry —
that is the entire safety claim for non-idempotent
upstreams)
Safety: the dispatcher refuses to re-dispatch a row whose semantics is NON_IDEMPOTENT and whose status is anything other than PENDING. The server-side CAS in claim_effect_dispatch enforces this, but we also assert on the result record so a bug in store config can't downgrade the guarantee silently.
dispatch_one
⌗
dispatch_one(eff, *, client: TapeClient, claimer: str, dispatch_max_attempts: int = 5) -> Dict[str, Any]
Run one effect through its connector. Returns a per-effect outcome dict.
outbox_dispatch_once
⌗
outbox_dispatch_once(url: str = DEFAULT_URL, *, connector: str = '', limit: int = 200, claimer: str = '', dispatch_max_attempts: int = 5, client: Optional[TapeClient] = None) -> List[Dict[str, Any]]
One pass of the outbox dispatcher. Returns a list of per-effect outcomes.
run_outbox_dispatcher
⌗
run_outbox_dispatcher(url: str = DEFAULT_URL, *, connector: str = '', interval_s: float = 1.0, claimer: str = '', dispatch_max_attempts: int = 5, once: bool = False, on_tick: Optional[Callable[[List[Dict[str, Any]]], None]] = None) -> None
Run the outbox dispatcher forever (or once). Connectors must be
registered (via tape.connectors.register(...)) before this loop starts;
a missing connector causes the effect to be left in PENDING for the next
process that does have it.