Skip to content

Reactors

Five reactors ship: recovery, reconciler, outbox, timers, compensation. Each is idempotent — the per-run lease + the fact that every server RPC is idempotent makes a double-run harmless. Scale freely.

tape-reactors --runner-from app.agent:build_runner --url tape://localhost:7878

tape.reactors.recover_once

recover_once(*, runner: Any = None, redrive_fn: Any = None, url: str = DEFAULT_URL, limit: int = 50) -> list[dict]

Re-drive every recoverable run. Returns a list of {run_id, invocation_id}.

tape.reactors.reconcile_once

reconcile_once(url: str = DEFAULT_URL, *, reconcile_pending_after_ms: int = 0, client: Optional[TapeClient] = None) -> list[dict]

Resolve UNKNOWN effects (and PENDING ones older than reconcile_pending_after_ms, if > 0) via the connector's observe() (the new, semantics-aware path) or the per-tool status_check (the v1 path). Returns a list of {key, resolved, ...} dicts.

The semantics-aware rules:

  • UNKNOWN + IDEMPOTENT — observation absent is safe to re-issue: the server moves the effect back to PENDING with next_dispatch_at_ms=now and the outbox loop retries.
  • UNKNOWN + NON_IDEMPOTENT — observation absent is not retried. The effect is marked FAILED; a human or saga decides whether to issue a fresh, fresh-keyed attempt. (The server enforces this mapping in record_external_observation.)
  • duplicate for either — register a compensation obligation atomically with the observation, then mark the effect CONFIRMED.
  • stuck for either — mark the effect UNKNOWN; the operator triages.

tape.reactors.fire_due_timers_once

fire_due_timers_once(url: str = DEFAULT_URL, *, runner: Any = None, redrive_fn: Any = None, on_timer: Optional[Callable[[Any], None]] = None, client: Optional[TapeClient] = None) -> list[dict]

Claim and fire due timers. Returns a list of {run_id, timer_id, kind, action}. redrive_fn (e.g. one that calls the Agent Engine :streamQuery API) is used for redrive timers when there's no local runner.

tape.reactors.run_reactors

run_reactors(*, runner: Any = None, redrive_fn: Any = None, url: str = DEFAULT_URL, recover: bool = True, reconcile: bool = True, timers: bool = True, compensations: bool = True, interval_s: float = 2.0, reconcile_pending_after_s: float = 0.0, claimer: str = '', once: bool = False, on_tick: Optional[Callable[[dict], None]] = None) -> None

Pass runner= for a local ADK Runner, or redrive_fn= (e.g. one that calls the Vertex AI Agent Engine :streamQuery API) when the agent is deployed elsewhere — recovery still works, it just re-invokes through that callback.

compensations=True enables the obligations reactor on the same polling cadence as the others. For event-driven draining (lower latency, less polling pressure), call run_compensations_event_driven in its own process / thread instead — both close the same queue.

tape.reactors.run_event_fanout

run_event_fanout(url: str = DEFAULT_URL, *, sink: Callable[[Any], None], from_ts_ms: int = 0, run_id: str = '', kind: str = '') -> None

Tail the cross-run journal and hand each EventEntry to sink — wire sink to Pub/Sub / Kafka / a webhook to publish the WAL. Blocks; at-least-once (the boundary entry may repeat — sink should de-dup on (run_id, seq) if it cares). On the Bigtable backend a cross-run tail isn't available — use Bigtable change streams instead (the stream simply yields nothing there).

tape.reactors.run_outbox_relay

run_outbox_relay(url: str, sink: Any, *, cursor_path: str = '', run_id: str = '', kind: str = '', subject_pattern: str = '', predicate_cel: str = '', interval_s: float = 1.0, once: bool = False) -> None

Loop outbox_relay_tick forever (or once). The cursor is durable in cursor_path (a local JSON file), so a relay restart resumes from where it stopped. Run multiple relays = multiple sinks (one cursor file each).

subject_pattern (default /tape/**) and predicate_cel (default empty) are forwarded to SubscribeBySubject so the server does the filtering.

Outbox reactor

The outbox reactor — dispatches PENDING + OUTBOX effects through their registered connectors.

The loop:

list effects to dispatch (PENDING + OUTBOX + due)
for each:
    claim (atomic CAS lease)
    look up the connector
    dispatch through it
    record result:
      confirmed     → complete_effect(CONFIRMED) + register compensation
                      (if a compensate kind is registered for the tool)
      failed        → record_dispatch_attempt(next_at_ms=backoff)
                      (eventually exhaust → terminal FAILED via
                       record_external_observation(FAILED))
      unknown       → record_dispatch_attempt(next_at_ms=0)  → status UNKNOWN
                      (the reconciler resolves; do NOT blindly retry —
                       that is the entire safety claim for non-idempotent
                       upstreams)

Safety: the dispatcher refuses to re-dispatch a row whose semantics is NON_IDEMPOTENT and whose status is anything other than PENDING. The server-side CAS in claim_effect_dispatch enforces this, but we also assert on the result record so a bug in store config can't downgrade the guarantee silently.

dispatch_one

dispatch_one(eff, *, client: TapeClient, claimer: str, dispatch_max_attempts: int = 5) -> Dict[str, Any]

Run one effect through its connector. Returns a per-effect outcome dict.

outbox_dispatch_once

outbox_dispatch_once(url: str = DEFAULT_URL, *, connector: str = '', limit: int = 200, claimer: str = '', dispatch_max_attempts: int = 5, client: Optional[TapeClient] = None) -> List[Dict[str, Any]]

One pass of the outbox dispatcher. Returns a list of per-effect outcomes.

run_outbox_dispatcher

run_outbox_dispatcher(url: str = DEFAULT_URL, *, connector: str = '', interval_s: float = 1.0, claimer: str = '', dispatch_max_attempts: int = 5, once: bool = False, on_tick: Optional[Callable[[List[Dict[str, Any]]], None]] = None) -> None

Run the outbox dispatcher forever (or once). Connectors must be registered (via tape.connectors.register(...)) before this loop starts; a missing connector causes the effect to be left in PENDING for the next process that does have it.