Skip to content

Connectors

A capability connector knows how to talk to one upstream. Three operations cover the three places Tape touches the outside world: dispatch, observe, compensate.

from tape import connectors
from tape.connectors.http import HTTPConnector

connectors.register(HTTPConnector(
    name="bank.wire",
    endpoint="https://bank.example/wires",
    observe_endpoint="https://bank.example/wires/lookup",
    compensate_endpoint="https://bank.example/wires/reverse",
))

The outbox reactor matches @tape.effect(connector=...) to a registered connector by name and invokes it under a CAS lease. See Non-idempotent upstreams for the wider contract.

Protocol

tape.connectors.base.EffectConnector

Bases: Protocol

A connector knows how to talk to one upstream. The three methods cover the three places Tape touches the outside world for a non-idempotent effect:

  • dispatch(effect) — perform the call once (outbox reactor)
  • observe(effect) — ask "what happened?" (reconciler)
  • compensate(obligation) — run the inverse (obligations reactor)

Implementations should be stateless between calls; everything that must survive a crash lives in Tape's tables. Synchronous and async impls are both supported — the reactor awaits with _maybe_await (see below).

tape.connectors.base.DispatchResult dataclass

DispatchResult(status: Literal['confirmed', 'failed', 'unknown'], external_ref: str = '', response: dict = dict(), error: dict = dict(), retry_at_ms: int = 0)

What happened when the connector called the counterparty.

  • confirmed — the call landed; record the result and (optionally) the external_ref the counterparty returned.
  • failed — the call definitively did not land. Either deterministic (a 4xx the connector can interpret) or, after retries are exhausted, a final "give up" terminal.
  • unknown — we don't know. The ack was lost, or the timeout window closed before we heard back. For non-idempotent upstreams this is the only safe non-confirmed outcome the connector may return — the outbox will mark the effect UNKNOWN and stop dispatching; the reconciler then uses observe() to ask the counterparty what really happened.

tape.connectors.base.ObservationResult dataclass

ObservationResult(status: Literal['confirmed', 'absent', 'duplicate', 'failed', 'stuck'], external_ref: str = '', response: dict = dict())

What the connector saw at the counterparty for a given (idempotency_key or business_key). Maps to EffectResolution on the wire.

tape.connectors.base.CompensationResult dataclass

CompensationResult(status: Literal['compensated', 'failed'], response: dict = dict(), error: dict = dict())

What happened when the connector ran the inverse for a confirmed effect.

Registry

tape.connectors.base.register

register(connector: EffectConnector, *, name: str = '') -> EffectConnector

Register a connector. The name defaults to connector.name (so most callers just tape.connectors.register(MyConnector(...))). Overwriting an existing registration is allowed — late-binding is fine for tests.

tape.connectors.base.get

get(name: str) -> Optional[EffectConnector]

tape.connectors.base.all_registered

all_registered() -> Dict[str, EffectConnector]

tape.connectors.base.clear

clear() -> None

Test helper — clears the registry so a test can register fresh fakes.

Built-in connectors

tape.connectors.http.HTTPConnector dataclass

HTTPConnector(name: str, endpoint: str, headers: Dict[str, str] = None, timeout_s: float = 10.0, observe_endpoint: str = '', compensate_endpoint: str = '', success_codes: tuple = (200, 201, 202, 204), duplicate_code: int = 409)

Generic HTTP outbox connector.

  • name — match key for @tape.effect(connector=…).
  • endpoint — full URL to POST to.
  • headers — extra static headers (auth, content-type, …).
  • timeout_s — request timeout.
  • observe_endpoint — optional URL for GET ?key=…; the response JSON guides the observation result (status field expected: confirmed | absent | duplicate | failed | stuck).
  • compensate_endpoint — optional URL for POST to run the inverse.
  • success_codes — HTTP codes that count as confirmed (default 2xx).
  • duplicate_code — HTTP code that maps to ObservationResult.duplicate (default 409). 409 + a JSON body with external_ref is the common idempotency-conflict pattern.

dispatch

dispatch(effect) -> DispatchResult

POST effect.request_json to endpoint. Adds: * Idempotency-Key: <effect.idempotency_key> — the standard header most providers dedupe on * X-Tape-Run-Id, X-Tape-Effect-Key, X-Tape-Business-Key Returns confirmed on 2xx, unknown on timeout / network error (the ack was lost — the reconciler must resolve), failed on a definitive 4xx/5xx that isn't a timeout.

observe

observe(effect) -> ObservationResult

Ask the upstream: "did the operation with this key happen?". The observation endpoint is expected to return JSON like:

{"status": "confirmed" | "absent" | "duplicate" | "failed" | "stuck",
 "external_ref": "<optional>", "...": "..."}

If no observe_endpoint is set, returns absent (the reconciler will treat that per-semantics — see record_external_observation).

tape.connectors.pubsub.PubSubConnector dataclass

PubSubConnector(name: str, project: str, topic: str, ordering_key_from: Any = None, publisher: Any = None)

Publish effect intents to a Pub/Sub topic, one message per effect.

  • name — match key for @tape.effect(connector=…).
  • project, topic — Pub/Sub destination.
  • ordering_key_from — callable (effect) -> str for the ordering key; defaults to effect.run_id so a run's effects publish in order.
  • publisher — optional pubsub_v1.PublisherClient (test seam).

dispatch() returns confirmed on publish ack (the message landed in Pub/Sub, which is what we promised); the downstream subscriber is responsible for the upstream business outcome. For the upstream answer, pair this connector with a reconciler status_check that asks the business system, or with a second connector whose observe() reads a business-side ledger.

observe() here is a no-op (absent) — the connector knows about the Pub/Sub side, not the business side. Override in a subclass when the business system exposes a "did this happen?" endpoint.