Skip to content

Sinks

A sink is the consumer side of the WAL fan-out. The relay reads journal entries via SubscribeEvents (or SubscribeBySubject) and hands each one to sink.publish(entry). Combined with a durable cursor and a consumer-side dedup on (run_id, seq), that gives exactly-once-effective delivery.

from tape.sinks import LogSink, WebhookSink, PubSubSink
import tape.reactors

tape.reactors.run_outbox_relay(
    "tape://localhost:7878",
    sink=WebhookSink(
        url="https://partner.example.com/tape/events",
        headers={"Authorization": "Bearer …"},
        max_retries=3,
    ),
    cursor_path="/var/lib/tape/cursor.json",
)

See how-to: sinks for the cross-language story and the Webhook / Pub/Sub wire contracts.

tape.sinks.Sink

Bases: Protocol

tape.sinks.LogSink

LogSink(path: str = ':stderr')

Appends one JSON line per entry. path=":stderr" writes to stderr.

tape.sinks.WebhookSink dataclass

WebhookSink(url: str, headers: dict = dict(), max_retries: int = 3, initial_backoff_s: float = 0.5, timeout_s: float = 10.0)

POST each journal entry to url as JSON. Sets X-Tape-Event-Id: <run_id>/<seq> so the receiver can de-dup. At-least-once: a successful POST may still be retried (the relay sees the response after the fact).

tape.sinks.PubSubSink

PubSubSink(project: str, topic: str)

Publish to Google Cloud Pub/Sub. ordering_key = run_id (so per-run order is preserved at the subscriber if it enables ordered delivery). The Pub/Sub message_id is assigned by Pub/Sub; the attribute tape-event-id = run_id/seq is what consumers should dedup on. Lazy-imports google-cloud-pubsub — if it's not installed, the constructor raises.

tape.sinks.FnSink

FnSink(fn: Callable[[Any], None])

Wrap any def publish(entry) callable as a Sink.