Sinks⌗
A sink is the consumer side of the WAL fan-out. The relay reads journal
entries via SubscribeEvents (or SubscribeBySubject) and hands each one to
sink.publish(entry). Combined with a durable cursor and a consumer-side
dedup on (run_id, seq), that gives exactly-once-effective delivery.
from tape.sinks import LogSink, WebhookSink, PubSubSink
import tape.reactors
tape.reactors.run_outbox_relay(
"tape://localhost:7878",
sink=WebhookSink(
url="https://partner.example.com/tape/events",
headers={"Authorization": "Bearer …"},
max_retries=3,
),
cursor_path="/var/lib/tape/cursor.json",
)
See how-to: sinks for the cross-language story and the Webhook / Pub/Sub wire contracts.
tape.sinks.Sink
⌗
Bases: Protocol
tape.sinks.LogSink
⌗
Appends one JSON line per entry. path=":stderr" writes to stderr.
tape.sinks.WebhookSink
dataclass
⌗
WebhookSink(url: str, headers: dict = dict(), max_retries: int = 3, initial_backoff_s: float = 0.5, timeout_s: float = 10.0)
POST each journal entry to url as JSON. Sets X-Tape-Event-Id:
<run_id>/<seq> so the receiver can de-dup. At-least-once: a successful POST
may still be retried (the relay sees the response after the fact).
tape.sinks.PubSubSink
⌗
Publish to Google Cloud Pub/Sub. ordering_key = run_id (so per-run order
is preserved at the subscriber if it enables ordered delivery). The Pub/Sub
message_id is assigned by Pub/Sub; the attribute tape-event-id =
run_id/seq is what consumers should dedup on. Lazy-imports
google-cloud-pubsub — if it's not installed, the constructor raises.
tape.sinks.FnSink
⌗
Wrap any def publish(entry) callable as a Sink.