Connectors⌗
A capability connector knows how to talk to one upstream. Three operations cover the three
places Tape touches the outside world: dispatch, observe, compensate.
from tape import connectors
from tape.connectors.http import HTTPConnector
connectors.register(HTTPConnector(
name="bank.wire",
endpoint="https://bank.example/wires",
observe_endpoint="https://bank.example/wires/lookup",
compensate_endpoint="https://bank.example/wires/reverse",
))
The outbox reactor matches @tape.effect(connector=...) to a registered connector by name and
invokes it under a CAS lease. See
Non-idempotent upstreams for the wider contract.
Protocol⌗
tape.connectors.base.EffectConnector
⌗
Bases: Protocol
A connector knows how to talk to one upstream. The three methods cover the three places Tape touches the outside world for a non-idempotent effect:
dispatch(effect)— perform the call once (outbox reactor)observe(effect)— ask "what happened?" (reconciler)compensate(obligation)— run the inverse (obligations reactor)
Implementations should be stateless between calls; everything that must
survive a crash lives in Tape's tables. Synchronous and async impls are
both supported — the reactor awaits with _maybe_await (see below).
tape.connectors.base.DispatchResult
dataclass
⌗
DispatchResult(status: Literal['confirmed', 'failed', 'unknown'], external_ref: str = '', response: dict = dict(), error: dict = dict(), retry_at_ms: int = 0)
What happened when the connector called the counterparty.
confirmed— the call landed; record the result and (optionally) the external_ref the counterparty returned.failed— the call definitively did not land. Either deterministic (a 4xx the connector can interpret) or, after retries are exhausted, a final "give up" terminal.unknown— we don't know. The ack was lost, or the timeout window closed before we heard back. For non-idempotent upstreams this is the only safe non-confirmed outcome the connector may return — the outbox will mark the effect UNKNOWN and stop dispatching; the reconciler then usesobserve()to ask the counterparty what really happened.
tape.connectors.base.ObservationResult
dataclass
⌗
ObservationResult(status: Literal['confirmed', 'absent', 'duplicate', 'failed', 'stuck'], external_ref: str = '', response: dict = dict())
What the connector saw at the counterparty for a given (idempotency_key
or business_key). Maps to EffectResolution on the wire.
tape.connectors.base.CompensationResult
dataclass
⌗
CompensationResult(status: Literal['compensated', 'failed'], response: dict = dict(), error: dict = dict())
What happened when the connector ran the inverse for a confirmed effect.
Registry⌗
tape.connectors.base.register
⌗
Register a connector. The name defaults to connector.name (so most
callers just tape.connectors.register(MyConnector(...))). Overwriting an
existing registration is allowed — late-binding is fine for tests.
tape.connectors.base.clear
⌗
Test helper — clears the registry so a test can register fresh fakes.
Built-in connectors⌗
tape.connectors.http.HTTPConnector
dataclass
⌗
HTTPConnector(name: str, endpoint: str, headers: Dict[str, str] = None, timeout_s: float = 10.0, observe_endpoint: str = '', compensate_endpoint: str = '', success_codes: tuple = (200, 201, 202, 204), duplicate_code: int = 409)
Generic HTTP outbox connector.
name— match key for@tape.effect(connector=…).endpoint— full URL to POST to.headers— extra static headers (auth, content-type, …).timeout_s— request timeout.observe_endpoint— optional URL forGET ?key=…; the response JSON guides the observation result (statusfield expected: confirmed | absent | duplicate | failed | stuck).compensate_endpoint— optional URL forPOSTto run the inverse.success_codes— HTTP codes that count as confirmed (default 2xx).duplicate_code— HTTP code that maps to ObservationResult.duplicate (default 409). 409 + a JSON body withexternal_refis the common idempotency-conflict pattern.
dispatch
⌗
POST effect.request_json to endpoint. Adds:
* Idempotency-Key: <effect.idempotency_key> — the standard header
most providers dedupe on
* X-Tape-Run-Id, X-Tape-Effect-Key, X-Tape-Business-Key
Returns confirmed on 2xx, unknown on timeout / network error (the
ack was lost — the reconciler must resolve), failed on a definitive
4xx/5xx that isn't a timeout.
observe
⌗
Ask the upstream: "did the operation with this key happen?". The observation endpoint is expected to return JSON like:
{"status": "confirmed" | "absent" | "duplicate" | "failed" | "stuck",
"external_ref": "<optional>", "...": "..."}
If no observe_endpoint is set, returns absent (the reconciler will
treat that per-semantics — see record_external_observation).
tape.connectors.pubsub.PubSubConnector
dataclass
⌗
PubSubConnector(name: str, project: str, topic: str, ordering_key_from: Any = None, publisher: Any = None)
Publish effect intents to a Pub/Sub topic, one message per effect.
name— match key for@tape.effect(connector=…).project,topic— Pub/Sub destination.ordering_key_from— callable(effect) -> strfor the ordering key; defaults toeffect.run_idso a run's effects publish in order.publisher— optionalpubsub_v1.PublisherClient(test seam).
dispatch() returns confirmed on publish ack (the message landed in
Pub/Sub, which is what we promised); the downstream subscriber is
responsible for the upstream business outcome. For the upstream answer,
pair this connector with a reconciler status_check that asks the
business system, or with a second connector whose observe() reads
a business-side ledger.
observe() here is a no-op (absent) — the connector knows about the
Pub/Sub side, not the business side. Override in a subclass when the
business system exposes a "did this happen?" endpoint.