Skip to content

Go package reference

Generated

Generated from godoc by gomarkdoc. Edits here will be overwritten — to change the content, edit the SDK's godoc comments and run scripts/docs/gen_go.sh.

tape

import "github.com/vamsiramakrishnan/durable-agents/tape/sdk/go"

Package tape — the Go client over the `tape.v1` gRPC service.

URL schemes:

tape://host:port    plaintext gRPC (self-hosted, k8s, local)
tapes://host        TLS on :443 (Cloud Run / any HTTPS endpoint)

On `tapes://`, if the endpoint is IAM-protected (e.g. an internal Cloud Run service), Tape attaches a Google OIDC ID token automatically via the google.golang.org/api/idtoken package — the caller's service account just needs roles/run.invoker. Set Auth=false / Audience="" to override.

Reactions — the event-bus user surface for Go.

This file is the Go-side companion to `design-principles/tape-event-bus.md` and a mirror of `tape/sdk/python/tape/reactions.py`. The shape:

  • `On(...)` and the `OnXxx(...)` convenience wrappers collect a process-global registry of `ReactionDef`s with their handler closures. The registration calls are DECLARATIONS only — they do not contact the server. Push them to the server with `RegisterAll(ctx, c, prefix)`.

  • `RunDispatcher(ctx, c, opts)` is the in-process reference dispatcher: for every TASK-kind registered reaction, claim a bounded batch from the server, dispatch handlers with backpressure (max_concurrency / rate_limit_per_s / debounce_ms), and complete/nack each task. The server enforces retry & DLQ — the dispatcher just calls `NackTask(permanent=…)` once `attempts >= dlq_after_n`. AGENT reactions are handled entirely on the server; PUBLISH reactions are pulled by the Pub/Sub bridge (`RunPubSubBridge`).

  • Subject path segments are URL-encoded except for the wildcards `*` and `**`. A `key=""` argument is treated as `**` (rest wildcard).

This module is intentionally optional: nothing else in the SDK depends on it. You can use the Tape journal/values/effects perfectly well without ever importing the reactions surface.

Index

Constants

──── status enums (re-exported for ergonomic use) ──────────────────────────

const (
    RunStatusRunnable     = int32(pb.RunStatus_RUN_STATUS_RUNNABLE)
    RunStatusRunning      = int32(pb.RunStatus_RUN_STATUS_RUNNING)
    RunStatusWaiting      = int32(pb.RunStatus_RUN_STATUS_WAITING)
    RunStatusTerminal     = int32(pb.RunStatus_RUN_STATUS_TERMINAL)
    RunStatusFailed       = int32(pb.RunStatus_RUN_STATUS_FAILED)
    RunStatusCompensating = int32(pb.RunStatus_RUN_STATUS_COMPENSATING)
    RunStatusStuck        = int32(pb.RunStatus_RUN_STATUS_STUCK)
    RunStatusCancelled    = int32(pb.RunStatus_RUN_STATUS_CANCELLED)

    EffectStatusPending   = int32(pb.EffectStatus_EFFECT_STATUS_PENDING)
    EffectStatusConfirmed = int32(pb.EffectStatus_EFFECT_STATUS_CONFIRMED)
    EffectStatusFailed    = int32(pb.EffectStatus_EFFECT_STATUS_FAILED)
    EffectStatusUnknown   = int32(pb.EffectStatus_EFFECT_STATUS_UNKNOWN)

    // Outbox / non-idempotent contract (see proto: EffectSemantics,
    // EffectDispatchMode, EffectResolution). The defaults preserve v1 behaviour
    // (idempotent + inline); opt into the outbox path by passing the
    // non-default values to BeginEffect.
    EffectSemanticsIdempotent    = int32(pb.EffectSemantics_EFFECT_SEMANTICS_IDEMPOTENT)
    EffectSemanticsNonIdempotent = int32(pb.EffectSemantics_EFFECT_SEMANTICS_NON_IDEMPOTENT)
    EffectSemanticsObserveOnly   = int32(pb.EffectSemantics_EFFECT_SEMANTICS_OBSERVE_ONLY)

    EffectDispatchInline = int32(pb.EffectDispatchMode_EFFECT_DISPATCH_MODE_INLINE)
    EffectDispatchOutbox = int32(pb.EffectDispatchMode_EFFECT_DISPATCH_MODE_OUTBOX)

    EffectResolutionConfirmed = int32(pb.EffectResolution_EFFECT_RESOLUTION_CONFIRMED)
    EffectResolutionFailed    = int32(pb.EffectResolution_EFFECT_RESOLUTION_FAILED)
    EffectResolutionAbsent    = int32(pb.EffectResolution_EFFECT_RESOLUTION_ABSENT)
    EffectResolutionDuplicate = int32(pb.EffectResolution_EFFECT_RESOLUTION_DUPLICATE)
    EffectResolutionStuck     = int32(pb.EffectResolution_EFFECT_RESOLUTION_STUCK)

    ObligationPending     = int32(pb.ObligationStatus_OBLIGATION_STATUS_PENDING)
    ObligationCommitted   = int32(pb.ObligationStatus_OBLIGATION_STATUS_COMMITTED)
    ObligationCompensated = int32(pb.ObligationStatus_OBLIGATION_STATUS_COMPENSATED)
    ObligationStuck       = int32(pb.ObligationStatus_OBLIGATION_STATUS_STUCK)
)

HandlerKind and TaskStatus re-exports for ergonomic call-sites.

const (
    HandlerKindAgent   = pb.HandlerKind_HANDLER_KIND_AGENT
    HandlerKindTask    = pb.HandlerKind_HANDLER_KIND_TASK
    HandlerKindPublish = pb.HandlerKind_HANDLER_KIND_PUBLISH

    TaskStatusPending = pb.TaskStatus_TASK_STATUS_PENDING
    TaskStatusClaimed = pb.TaskStatus_TASK_STATUS_CLAIMED
    TaskStatusDone    = pb.TaskStatus_TASK_STATUS_DONE
    TaskStatusFailed  = pb.TaskStatus_TASK_STATUS_FAILED
    TaskStatusDLQ     = pb.TaskStatus_TASK_STATUS_DLQ
)

Span-name constants — use these instead of stringly-typed magic.

const (
    SpanBeginRun        = "tape.begin_run"
    SpanResumeRun       = "tape.resume_run"
    SpanRecordDecision  = "tape.record_decision"
    SpanBeginEffect     = "tape.begin_effect"
    SpanCompleteEffect  = "tape.complete_effect"
    SpanReconcileEffect = "tape.reconcile_effect"
    SpanDispatchEffect  = "tape.dispatch_effect"
    SpanCompensate      = "tape.compensate"
    SpanRedrive         = "tape.redrive"
    SpanAwaitSignal     = "tape.await_signal"
    SpanSendSignal      = "tape.send_signal"
)

Variables

AllSpans — the canonical span-name list.

var AllSpans = []string{
    SpanBeginRun, SpanResumeRun, SpanRecordDecision,
    SpanBeginEffect, SpanCompleteEffect,
    SpanReconcileEffect, SpanDispatchEffect,
    SpanCompensate, SpanRedrive,
    SpanAwaitSignal, SpanSendSignal,
}

ErrPubSubNotBuilt is returned by RunPubSubBridge when the Pub/Sub backend is not compiled in. Build with `-tags pubsub` after adding `cloud.google.com/go/pubsub` to your module to enable it.

var ErrPubSubNotBuilt = errors.New("tape: RunPubSubBridge requires building with `-tags pubsub` and adding cloud.google.com/go/pubsub to your module")

StructuredFields — the canonical field order for structured log records. `LogJSON` emits keys in this order when present.

var StructuredFields = []string{
    "ts", "level", "msg",
    "tenant_id", "app_name", "run_id", "invocation_id", "session_id",
    "seq", "effect_key", "decision_index", "reactor", "lease_owner",
}

func ClearRegistry

func ClearRegistry()

ClearRegistry drops every registered reaction. Test-only helper.

func DefaultURL

func DefaultURL() string

DefaultURL — honours $TAPE_URL.

func FireDueTimersOnce

func FireDueTimersOnce(ctx context.Context, c *Client, redrive RedriveFn) (int, error)

func GetCompensator

func GetCompensator(kind string) func(ctx context.Context, payload []byte) error

func GetStatusCheck

func GetStatusCheck(tool string) func(ctx context.Context, key string) (StatusCheckResult, error)

func IsOutboxConfigError

func IsOutboxConfigError(err error) bool

IsOutboxConfigError — type-check helper.

func IsOutboxEnvelope

func IsOutboxEnvelope(v any) bool

IsOutboxEnvelope — given an arbitrary tool result, report whether it is an outbox envelope. Used by the Go ADK adapter (and by hand-written dispatchers) to recognise an intent vs a direct result.

func LogJSON

func LogJSON(msg string, fields map[string]any)

LogJSON — emit one JSON line to stderr, ordered by `StructuredFields`. Extra fields are appended after the canonical block.

func LogLevel

func LogLevel(level, msg string, fields map[string]any)

LogLevel — convenience for log_json at a specific level.

func On

func On(def ReactionDef)

On appends `def` to the process-global reaction registry. The decoration is declarative — the server is not contacted until RegisterAll. If `def.Agent` is set the reaction kind defaults to AGENT; if `def.Publish` is set it defaults to PUBLISH; otherwise it's TASK. It is an error to set both.

func OnDecisionRecorded

func OnDecisionRecorded(h ReactionHandler, opts ReactionDef)

OnDecisionRecorded fires on every decision write.

func OnEffectConfirmed

func OnEffectConfirmed(tool string, h ReactionHandler, opts ReactionDef)

OnEffectConfirmed fires on `/tape/effect/confirmed/\<tool>/**`. Pass tool="" or tool="*" to match any tool.

func OnEffectFailed

func OnEffectFailed(tool string, h ReactionHandler, opts ReactionDef)

OnEffectFailed fires on `/tape/effect/failed/\<tool>/**`.

func OnEffectUnknown

func OnEffectUnknown(tool string, h ReactionHandler, opts ReactionDef)

OnEffectUnknown fires on `/tape/effect/unknown/\<tool>/**`.

func OnGate

func OnGate(gate, verb string, h ReactionHandler, opts ReactionDef)

OnGate fires on a gate lifecycle event. `verb=""` defaults to `"released"`; pass `"waiting"` for the park event.

func OnRun

func OnRun(status string, h ReactionHandler, opts ReactionDef)

OnRun fires on run lifecycle events. `status=""` defaults to `"terminal"`.

func OnValueChange

func OnValueChange(namespace, key string, h ReactionHandler, opts ReactionDef)

OnValueChange fires when a value in `(namespace, key)` is written. `key=""` matches any key (rest wildcard `**`); `key="*"` matches one segment.

func OnValueDeleted

func OnValueDeleted(namespace, key string, h ReactionHandler, opts ReactionDef)

OnValueDeleted is the deletion counterpart of OnValueChange.

func ReconcileOnce

func ReconcileOnce(ctx context.Context, c *Client, reconcilePendingAfter time.Duration) (int, error)

func RecoverOnce

func RecoverOnce(ctx context.Context, c *Client, redrive RedriveFn) ([]string, error)

func RegisterAll

func RegisterAll(ctx context.Context, c *Client, prefix string) ([]*pb.Reaction, error)

RegisterAll pushes every registered reaction to the server via RegisterReaction. The returned slice is the persisted `Reaction` rows with the canonical `reaction_id` filled in. `prefix` is prepended to each reaction's `name` so concurrent test runs don't collide.

Idempotent on `ReactionID` — the server upserts by id. Reactions that declared `ReactionID=""` get a stable server-minted id which we cache on the ReactionDef so RunDispatcher can claim by it.

func RegisterCompensator

func RegisterCompensator(kind string, fn func(ctx context.Context, payload []byte) error)

func RegisterStatusCheck

func RegisterStatusCheck(tool string, fn func(ctx context.Context, key string) (StatusCheckResult, error))

func RunDispatcher

func RunDispatcher(ctx context.Context, c *Client, opts RunDispatcherOpts) error

RunDispatcher is the in-proc dispatcher loop. For every TASK-kind reaction in the registry, it:

  1. Calls ClaimTasks(reaction_id, shard=-1, ...) in a tight outer loop.
  2. Submits each claimed Task to the reaction's semaphore-bounded worker pool. Inside the worker: rate-limit via token bucket, drop a re-trigger of the same subject inside the debounce window (complete it as a no-op), run the handler, CompleteTask on success or NackTask(permanent=…) on failure.

AGENT and PUBLISH reactions are NOT driven here — the server creates matching runs for AGENT, and the Pub/Sub bridge pulls PUBLISH tasks.

`Once=true` returns after one pass over every reaction (handy for tests).

func RunEventFanout

func RunEventFanout(ctx context.Context, c *Client, fromTsMs int64, runID, kind string, sink func(entry *pb.EventEntry) error) error

RunEventFanout is the legacy timestamp-cursored WAL fan-out. Preserved for back-compat — new code should use RunEventFanoutBySubject (global_seq cursored) or RunEventFanoutWith (the option-struct form supporting both cursors plus a subject pattern).

Deprecated: prefer RunEventFanoutBySubject for global_seq cursoring.

func RunEventFanoutBySubject

func RunEventFanoutBySubject(ctx context.Context, c *Client, subjectPattern, predicateCEL string, fromGlobalSeq int64, sink func(entry *pb.EventEntry) error) error

RunEventFanoutBySubject tails the journal via SubscribeBySubject, cursored on `global_seq`. The matching subject pattern follows the path-style grammar (`*` = one segment, `**` = trailing segments). `predicateCEL` is an optional server-side CEL filter on the envelope.

func RunEventFanoutWith

func RunEventFanoutWith(ctx context.Context, c *Client, o RunEventFanoutOpts, sink func(entry *pb.EventEntry) error) error

RunEventFanoutWith is the option-struct form of RunEventFanout. Honours both the legacy and the event-bus filter fields.

func RunOutboxDispatcher

func RunOutboxDispatcher(ctx context.Context, c *Client, opt RunOutboxOptions) error

RunOutboxDispatcher polls the outbox forever (or once). Cancel via ctx.

func RunPubSubBridge

func RunPubSubBridge(ctx context.Context, c *Client, opts RunPubSubBridgeOpts) error

RunPubSubBridge — stub when not built with `-tags pubsub`. Returns ErrPubSubNotBuilt so callers get a clear message instead of a silent no-op.

To enable: add `cloud.google.com/go/pubsub` to your module and build with `-tags pubsub`. See `reactions_pubsub.go` for the active implementation.

func RunReactors

func RunReactors(ctx context.Context, c *Client, opt RunReactorsOptions) error

func SetOTelHooks

func SetOTelHooks(h OTelHooks)

SetOTelHooks installs OTel propagation hooks for the dispatcher. Calling with an empty struct restores the no-op default.

func SetSpanHook

func SetSpanHook(h SpanHook)

SetSpanHook — install a span hook. Used by tracing adapters; safe to call multiple times (last wins).

func Span

func Span(name string, attrs map[string]any) func(err error)

Span — open a span via the installed hook (or a no-op). Returns the end function the caller defers.

type BeginEffectOpts

type BeginEffectOpts struct {
    RunID         string
    DecisionIndex int64
    ToolName      string
    CallIndex     int32
    RequestJSON   string
    CustomKey     string
    // Outbox / non-idempotent contract. Zero values keep the v1 behaviour
    // (idempotent + inline); set Semantics + DispatchMode + BusinessKey +
    // Connector to opt into the outbox path. Server refuses
    // NON_IDEMPOTENT + INLINE — surface that error to the caller.
    Semantics    int32
    DispatchMode int32
    BusinessKey  string
    Connector    string
}

type BeginRunOpts

type BeginRunOpts struct {
    AppName, UserID, SessionID, InvocationID, LeaseOwner string
    LeaseTTLMs                                           int64
}

type Budget

Budget — the per-run budget. USDCap/TokenCap of 0 means "no cap".

type Budget struct {
    USDCap   float64
    TokenCap int64
}

type ClaimTasksOpts

ClaimTasksOpts is the option-struct form of the ClaimTasks RPC.

Defaults applied client-side: LeaseMs \<=0 → 60_000, Max \<=0 → 16. Shard \<0 asks the server for tasks from any shard.

type ClaimTasksOpts struct {
    ReactionID string
    Shard      int32 // <0 = any
    Owner      string
    LeaseMs    int64 // <=0 = 60_000
    Max        int32 // <=0 = 16
    NowMs      int64 // 0 = server time
}

type Client

Client wraps a *grpc.ClientConn and the generated pb.TapeClient with friendlier helpers. Use Conn() / PB() if you want raw access.

type Client struct {
    // contains filtered or unexported fields
}

func Dial

func Dial(url string, opts ...Options) (*Client, error)

func (*Client) AdmitBudget

func (c *Client) AdmitBudget(ctx context.Context, runID string, usd float64, tokens int64) (*pb.AdmitBudgetResponse, error)

func (*Client) AppendEvent

func (c *Client) AppendEvent(ctx context.Context, app, user, session string, ev *pb.EventRecord, stateDeltaJSON string) (*pb.AppendEventResponse, error)

func (*Client) AwaitSignal

func (c *Client) AwaitSignal(ctx context.Context, runID, gate, payloadJSON string) (*pb.AwaitSignalResponse, error)

func (*Client) BeginEffect

func (c *Client) BeginEffect(ctx context.Context, o BeginEffectOpts) (*pb.BeginEffectResponse, error)

func (*Client) BeginRun

func (c *Client) BeginRun(ctx context.Context, o BeginRunOpts) (*pb.BeginRunResponse, error)

func (*Client) CancelTimer

func (c *Client) CancelTimer(ctx context.Context, runID, timerID string) (*pb.CancelTimerResponse, error)

func (*Client) ChargeBudget

func (c *Client) ChargeBudget(ctx context.Context, runID string, usd float64, tokens int64) (*pb.BudgetState, error)

func (*Client) ClaimEffectDispatch

func (c *Client) ClaimEffectDispatch(ctx context.Context, runID, key, claimer string, leaseTTLMs int64) (*pb.ClaimEffectDispatchResponse, error)

ClaimEffectDispatch is the atomic CAS lease on the dispatch slot. Returns acquired=false (with the current row) when another dispatcher already holds the lease — the loser must not call the upstream.

func (*Client) ClaimObligation

func (c *Client) ClaimObligation(ctx context.Context, runID string, obligationSeq int64, claimer string, leaseTtlMs int64) (*pb.ClaimObligationResponse, error)

func (*Client) ClaimTasks

func (c *Client) ClaimTasks(ctx context.Context, o ClaimTasksOpts) ([]*pb.Task, error)

ClaimTasks atomically leases up to `Max` pending tasks for the dispatcher.

func (*Client) Close

func (c *Client) Close() error

func (*Client) CompleteEffect

func (c *Client) CompleteEffect(ctx context.Context, runID, key string, status int32, responseJSON, errorJSON string) (*pb.EffectRecord, error)

func (*Client) CompleteTask

func (c *Client) CompleteTask(ctx context.Context, taskID, owner string) (*pb.Task, error)

CompleteTask marks the task as DONE (success ack from the handler).

func (*Client) Conn

func (c *Client) Conn() *grpc.ClientConn

func (*Client) CreateSession

func (c *Client) CreateSession(ctx context.Context, app, user, session, stateJSON string) (*pb.Session, error)

func (*Client) DeregisterReaction

func (c *Client) DeregisterReaction(ctx context.Context, reactionID string) (bool, error)

DeregisterReaction marks the reaction `deleted=true`. Returns whether the server flipped the bit (false on unknown id).

func (*Client) EndRun

func (c *Client) EndRun(ctx context.Context, runID string, status int32, detailJSON string) (*pb.EndRunResponse, error)

func (*Client) GetDecision

func (c *Client) GetDecision(ctx context.Context, runID string, idx int64) (*pb.GetDecisionResponse, error)

func (*Client) GetEffect

func (c *Client) GetEffect(ctx context.Context, runID, key string) (*pb.GetEffectResponse, error)

func (*Client) GetRun

func (c *Client) GetRun(ctx context.Context, runID string) (*pb.RunState, error)

func (*Client) GetSession

func (c *Client) GetSession(ctx context.Context, app, user, session string, maxEvents int64) (*pb.GetSessionResponse, error)

func (*Client) ListDueTimers

func (c *Client) ListDueTimers(ctx context.Context, nowMs, limit int64, claim bool) (*pb.ListDueTimersResponse, error)

func (*Client) ListEffectsToDispatch

func (c *Client) ListEffectsToDispatch(ctx context.Context, connector string, limit int64) (*pb.ListEffectsToDispatchResponse, error)

ListEffectsToDispatch returns PENDING+OUTBOX effects whose next_dispatch_at_ms \<= now and whose dispatch lease is empty or expired. `connector` scopes the result to one connector name; empty means any.

func (*Client) ListObligations

func (c *Client) ListObligations(ctx context.Context, runID string, onlyUnresolved bool, opts ...ListObligationsOpts) (*pb.ListObligationsResponse, error)

func (*Client) ListPendingEffects

func (c *Client) ListPendingEffects(ctx context.Context, olderThanMs int64, includePending, includeUnknown bool, limit int64) (*pb.ListPendingEffectsResponse, error)

func (*Client) ListReactions

func (c *Client) ListReactions(ctx context.Context, subjectPattern string) ([]*pb.Reaction, error)

ListReactions returns every active reaction. Pass an empty pattern to list all reactions; otherwise an exact-match on `subject_pattern` is applied.

func (*Client) ListRunsToRecover

func (c *Client) ListRunsToRecover(ctx context.Context, limit int64) (*pb.ListRunsToRecoverResponse, error)

func (*Client) ListTasks

func (c *Client) ListTasks(ctx context.Context, reactionID string, status pb.TaskStatus, limit int32) ([]*pb.Task, error)

ListTasks returns tasks filtered by reaction and status (useful for observability and DLQ inspection).

func (*Client) ListUnresolvedObligations

func (c *Client) ListUnresolvedObligations(ctx context.Context, opts ListUnresolvedObligationsOpts) (*pb.ListUnresolvedObligationsResponse, error)

func (*Client) NackTask

func (c *Client) NackTask(ctx context.Context, taskID, owner, errMsg string, permanent bool) (*pb.Task, error)

NackTask reports a failed attempt. Set `permanent=true` to push it straight to DLQ; otherwise the server will re-lease until `dlq_after_n` is reached.

func (*Client) PB

func (c *Client) PB() pb.TapeClient

func (*Client) ReconcileEffect

func (c *Client) ReconcileEffect(ctx context.Context, runID, key string, resolved int32, responseJSON, errorJSON string) (*pb.EffectRecord, error)

func (*Client) RecordDecision

func (c *Client) RecordDecision(ctx context.Context, runID string, idx int64, model, requestJSON, responseJSON, rationale, policyVersion string) (*pb.DecisionRecord, error)

func (*Client) RecordDispatchAttempt

func (c *Client) RecordDispatchAttempt(ctx context.Context, runID, key, errMsg string, nextDispatchAtMs int64) (*pb.EffectRecord, error)

RecordDispatchAttempt reports a *failed* dispatch. `nextDispatchAtMs \<= 0` asks the server to transition the effect to UNKNOWN (the safety exit for a lost ack — the reconciler resolves via observe(), no blind retry); a positive value schedules a retry.

func (*Client) RecordExternalObservation

func (c *Client) RecordExternalObservation(ctx context.Context, o RecordExternalObservationOpts) (*pb.EffectRecord, error)

func (*Client) RecordObligationAttempt

func (c *Client) RecordObligationAttempt(ctx context.Context, runID string, obligationSeq int64, errMsg string, nextAttemptAtMs int64) (*pb.ObligationRecord, error)

func (*Client) RegisterCompensation

func (c *Client) RegisterCompensation(ctx context.Context, runID, effectKey, kind, payloadJSON string, opts ...RegisterCompensationOpts) (*pb.ObligationRecord, error)

func (*Client) RegisterReaction

func (c *Client) RegisterReaction(ctx context.Context, o RegisterReactionOpts) (*pb.Reaction, error)

RegisterReaction creates (or upserts on `reaction_id`) a server-side reaction. The returned Reaction echoes the persisted row with its server-assigned `reaction_id` filled in.

func (*Client) ResolveObligation

func (c *Client) ResolveObligation(ctx context.Context, runID string, obligationSeq int64, status int32, resultJSON string) (*pb.ObligationRecord, error)

func (*Client) SendSignal

func (c *Client) SendSignal(ctx context.Context, o SendSignalOpts) (*pb.SendSignalResponse, error)

func (*Client) SetBudget

func (c *Client) SetBudget(ctx context.Context, runID string, usdCap float64, tokenCap int64) (*pb.BudgetState, error)

func (*Client) SetTimer

func (c *Client) SetTimer(ctx context.Context, o SetTimerOpts) (*pb.TimerRecord, error)

func (*Client) SubscribeBySubject

func (c *Client) SubscribeBySubject(ctx context.Context, subjectPattern, predicateCEL string, fromGlobalSeq int64) (grpc.ServerStreamingClient[pb.EventEntry], error)

SubscribeBySubject opens a streaming WAL tail filtered by subject pattern and an optional CEL predicate, cursored on `global_seq`. Iterate with .Recv() until io.EOF.

func (*Client) SubscribeEvents

func (c *Client) SubscribeEvents(ctx context.Context, fromTsMs int64, runID, kind string) (grpc.ServerStreamingClient[pb.EventEntry], error)

SubscribeEvents returns a streaming client; iterate with .Recv() until io.EOF.

Legacy entry point. New code should use SubscribeEventsOpts (which supports `from_global_seq` and `subject_pattern`) or SubscribeBySubject.

func (*Client) SubscribeEventsWith

func (c *Client) SubscribeEventsWith(ctx context.Context, o SubscribeEventsOpts) (grpc.ServerStreamingClient[pb.EventEntry], error)

SubscribeEventsWith is the option-struct form of SubscribeEvents. Prefer this for new code; it supports the event-bus rebuild fields.

func (*Client) URL

func (c *Client) URL() string

type DurableApp

DurableApp — the wired bundle returned by `NewDurableApp()`.

type DurableApp struct {
    Cfg        DurableConfig
    URL        string
    Client     *Client
    LeaseOwner string
    LeaseTTLMs int64
    // contains filtered or unexported fields
}

func NewDurableApp

func NewDurableApp(ctx context.Context, cfg DurableConfig) (*DurableApp, error)

NewDurableApp — resolve config, dial the server, return the bundle. Honours `$TAPE_URL` for `TapeURL` and `$TAPE_LEASE_MS` for `LeaseTTLMs`.

Mirrors `tape.adk.durable_app(...)` from the Python SDK. The Go ADK adapter, when it lands, will accept a `*DurableApp` directly.

func (*DurableApp) Close

func (d *DurableApp) Close() error

Close — close the underlying client.

type DurableConfig

DurableConfig — the user-facing settings. Sensible defaults everywhere.

type DurableConfig struct {
    // Name — the ADK App name. Required.
    Name string

    // TapeURL — defaults to $TAPE_URL, then "tape://localhost:7878".
    TapeURL string

    // Budget — admit/charge thresholds. Zero values mean "no cap".
    Budget Budget

    // Resumable — enable ADK ResumabilityConfig(is_resumable=True). The Go
    // ADK adapter will read this; today it is recorded for completeness.
    Resumable bool

    // CheckCancellation — let the plugin poll RunStatus on every model
    // boundary. The Go ADK adapter will read this.
    CheckCancellation bool

    // LeaseOwner — overrides the default "<hostname>:<pid>" identity.
    LeaseOwner string

    // LeaseTTLMs — overrides the default lease (120_000 ms when 0).
    LeaseTTLMs int64

    // ClientOptions — passed verbatim to `tape.Dial(...)`.
    ClientOptions *Options
}

type Envelope

Envelope is the per-task payload handed to the user handler. It mirrors the CEL envelope on the server: a handler can use the same field names that server-side predicates use.

type Envelope struct {
    Task    *pb.Task
    Payload map[string]any // parsed payload_json; nil if not JSON-decodable
}

type ListObligationsOpts

ListObligationsOpts: StatusFilter == 0 means "any status"; otherwise it's an exact ObligationStatus match. OnlyUnresolved is the shorthand "exclude terminal COMPENSATED/STUCK".

type ListObligationsOpts struct {
    StatusFilter int32
}

type ListUnresolvedObligationsOpts

ListUnresolvedObligationsOpts: the cross-run drainer feed. Defaults pick up ready-to-run PENDING plus COMMITTED rows whose lease has expired.

type ListUnresolvedObligationsOpts struct {
    Limit                   int32
    NowMs                   int64
    IncludePending          bool
    IncludeStuck            bool
    IncludeCommittedExpired bool
}

type OTelHooks

OTelHooks lets callers plug in real OTel context propagation without the SDK depending on `go.opentelemetry.io/otel`. If the start hook returns a non-nil func, the dispatcher calls it after the handler returns.

type OTelHooks struct {
    // Start opens a child span from the (trace_id, parent_span_id) on the
    // task and returns the span-bearing context plus an `end` func. The
    // dispatcher calls `end()` after the handler returns. A nil Start means
    // "no propagation".
    Start func(ctx context.Context, traceIDHex, parentSpanIDHex, name string) (context.Context, func())
}

type Options

Options for Dial.

type Options struct {
    Auth     bool   // default true on tapes://
    Audience string // override the OIDC audience derived from the URL
    IDToken  string // static ID token (overrides Auth)
    DialOpts []grpc.DialOption
}

type OutboxConfigError

OutboxConfigError — returned by NewOutboxTool when the configuration is unsafe (typically: non_idempotent without any recovery path).

type OutboxConfigError struct{ Reason string }

func (*OutboxConfigError) Error

func (e *OutboxConfigError) Error() string

type OutboxEnvelope

OutboxEnvelope — the JSON-serialisable intent the runner returns. The outbox reactor recognises envelopes by the `__outbox__: true` sentinel.

type OutboxEnvelope struct {
    Outbox            bool           `json:"__outbox__"`
    Connector         string         `json:"connector"`
    Tool              string         `json:"tool"`
    Semantics         string         `json:"semantics"`
    WaitForResult     bool           `json:"wait_for_result"`
    HumanGate         bool           `json:"human_gate"`
    DispatchTimeoutMs int64          `json:"dispatch_timeout_ms,omitempty"`
    BusinessKey       string         `json:"business_key,omitempty"`
    Payload           map[string]any `json:"payload"`
}

type OutboxOptions

OutboxOptions — configuration for one dispatch pass.

type OutboxOptions struct {
    Connector           string               // restrict to one connector name
    Registry            *connectors.Registry // defaults to connectors.Default
    Claimer             string               // identity for dispatch_claimed_by
    Limit               int64                // ListEffectsToDispatch limit (default 200)
    DispatchMaxAttempts int                  // give up after N (default 5)
}

type OutboxOutcome

OutboxOutcome — per-effect outcome of one dispatch attempt.

type OutboxOutcome struct {
    RunID          string `json:"run_id"`
    IdempotencyKey string `json:"idempotency_key"`
    Connector      string `json:"connector"`
    Tool           string `json:"tool"`
    Status         string `json:"status"` // confirmed | unknown | failed | retry-scheduled | skipped | error
    Reason         string `json:"reason,omitempty"`
    ExternalRef    string `json:"external_ref,omitempty"`
    Error          string `json:"error,omitempty"`
    NextAtMs       int64  `json:"next_at_ms,omitempty"`
    Attempts       int    `json:"attempts,omitempty"`
}

func DispatchOne

func DispatchOne(ctx context.Context, c *Client, eff *pb.EffectRecord, opt OutboxOptions) OutboxOutcome

DispatchOne runs a single effect through its connector and records the result. Returns the per-effect outcome.

func OutboxDispatchOnce

func OutboxDispatchOnce(ctx context.Context, c *Client, opt OutboxOptions) ([]OutboxOutcome, error)

OutboxDispatchOnce runs one pass of the outbox dispatcher.

type OutboxSemantics

OutboxSemantics — the semantics tag on an outbox tool.

type OutboxSemantics string

const (
    OutboxIdempotent    OutboxSemantics = "idempotent"
    OutboxAtLeastOnce   OutboxSemantics = "at_least_once"
    OutboxNonIdempotent OutboxSemantics = "non_idempotent"
)

type OutboxTool

OutboxTool — the value returned by `NewOutboxTool`. Use `Envelope(...)` from a tool body to produce the intent payload the runner will return.

type OutboxTool struct {
    Opts OutboxToolOpts
}

func MustOutboxTool

func MustOutboxTool(opts OutboxToolOpts) *OutboxTool

MustOutboxTool — like NewOutboxTool but panics on configuration errors. Use in init() blocks where a misconfiguration should crash early.

func NewOutboxTool

func NewOutboxTool(opts OutboxToolOpts) (*OutboxTool, error)

NewOutboxTool — build a validated outbox tool. Returns `*OutboxConfigError` for unsafe configurations.

func (*OutboxTool) Envelope

func (t *OutboxTool) Envelope(payload map[string]any) (map[string]any, error)

Envelope — build the JSON envelope the outbox reactor will dispatch. `payload` MUST be JSON-serialisable; it is wrapped, not consumed.

func (*OutboxTool) EnvelopeJSON

func (t *OutboxTool) EnvelopeJSON(payload map[string]any) ([]byte, error)

EnvelopeJSON — convenience wrapper that returns the encoded envelope.

type OutboxToolOpts

OutboxToolOpts — configuration for `NewOutboxTool`.

type OutboxToolOpts struct {
    // Name — the tool name as it appears in the journal / model API.
    Name string

    // Connector — the registered capability connector name (see
    // `tape/connectors`). E.g. "bank.wire".
    Connector string

    // Semantics — defaults to OutboxIdempotent.
    Semantics OutboxSemantics

    // BusinessKey — given the intent payload, derive a stable key the
    // counterparty can dedup on (typically a hash of the irreducible
    // identifying fields).
    BusinessKey func(payload map[string]any) (string, error)

    // StatusCheck — registered with `RegisterStatusCheck(Name, fn)`; the
    // reconciler invokes it to resolve UNKNOWN effects.
    StatusCheck func(ctx context.Context, idempotencyKey string) (StatusCheckResult, error)

    // Compensate — registered with `RegisterCompensator(<name>, fn)`; the
    // compensation reactor runs it for duplicate dispatches.
    Compensate func(ctx context.Context, payload []byte) error

    // WaitForResult — when true, the ADK runner parks the call until the
    // dispatch resolves (the Go adapter consumes this hint).
    WaitForResult bool

    // HumanGate — when true, the run parks on a gate before the outbox
    // dispatches. Required for non_idempotent tools that have no other
    // safety net.
    HumanGate bool

    // DispatchTimeoutMs — soft deadline the connector should honour (0 =
    // connector default).
    DispatchTimeoutMs int64

    // MaxAttempts — outbox-reactor retry budget (server default 5 when 0).
    MaxAttempts int
}

type ReactionDef

ReactionDef is one `On(...)` declaration before it's registered on the server. Field semantics match the proto / Python types one-for-one. Leave fields zero-valued to inherit defaults; `On(...)` normalises Min(1, …) where appropriate.

type ReactionDef struct {
    SubjectPattern    string
    Predicate         string
    Agent             string // kind=AGENT; if set, kind defaults to AGENT
    Publish           string // kind=PUBLISH; if set, kind defaults to PUBLISH
    Name              string
    ReactionID        string
    MaxConcurrency    int
    RateLimitPerS     int
    DebounceMs        int
    RetryMax          int
    RetryBackoffMs    int
    DLQAfterN         int
    NumShards         int
    BootstrapFromHead bool
    Handler           ReactionHandler
    // contains filtered or unexported fields
}

func GetRegistry

func GetRegistry() []*ReactionDef

GetRegistry returns a snapshot of the registered reactions. Mostly useful for tests / introspection.

type ReactionHandler

ReactionHandler is the user callback the in-proc dispatcher runs for every claimed task. Returning a non-nil error triggers a NackTask (with permanent=true once attempts >= dlq_after_n).

type ReactionHandler func(ctx context.Context, env *Envelope) error

type RecordExternalObservationOpts

RecordExternalObservationOpts records what the counterparty said about an effect — the reconciler's write path. `Resolution` is one of EffectResolution*. When DUPLICATE + `CompensateOnDuplicateKind` is set, the server registers a compensation obligation atomically with the observation.

type RecordExternalObservationOpts struct {
    RunID                     string
    Key                       string
    Resolution                int32
    ExternalRef               string
    ResponseJSON              string
    ErrorJSON                 string
    CompensateOnDuplicateKind string
}

type RedriveFn

RedriveFn — how a reactor re-invokes a stalled run. On Vertex AI Agent Engine the implementation calls :streamQuery; on Cloud Run / GKE / locally it's typically a wrapper that hands control back to the agent process.

type RedriveFn func(ctx context.Context, run *pb.RunState) error

type RegisterCompensationOpts

RegisterCompensationOpts: the extra options grew over time; use named fields rather than a five-arg positional call. CompensatorRef ("module:attr") lets a generic drainer resolve the inverse without importing your agent. MaxAttempts of 0 falls back to the server default (5).

type RegisterCompensationOpts struct {
    CompensatorRef string
    MaxAttempts    int32
}

type RegisterReactionOpts

RegisterReactionOpts mirrors the `Reaction` proto fields the client supplies at registration time. Leave a field zero/empty for the server default.

type RegisterReactionOpts struct {
    ReactionID        string
    Name              string
    SubjectPattern    string
    PredicateCEL      string
    HandlerKind       pb.HandlerKind
    AgentApp          string
    PublishTarget     string
    MaxConcurrency    int32
    RateLimitPerS     int32
    DebounceMs        int32
    RetryMax          int32
    RetryBackoffMs    int32
    DLQAfterN         int32
    NumShards         int32
    BootstrapFromHead bool
}

type RunDispatcherOpts

RunDispatcherOpts is the configuration block for the in-proc dispatcher.

type RunDispatcherOpts struct {
    Owner        string
    PollInterval time.Duration
    Once         bool
    ClaimMax     int32
    LeaseMs      int64
}

type RunEventFanoutOpts

RunEventFanoutOpts mirrors SubscribeEventsOpts: pass `FromGlobalSeq` / `SubjectPattern` for the event-bus path or `FromTsMs` / `RunID` / `Kind` for the legacy filters.

type RunEventFanoutOpts struct {
    FromTsMs       int64
    RunID          string
    Kind           string
    FromGlobalSeq  int64
    SubjectPattern string
}

type RunOutboxOptions

RunOutboxOptions — long-lived dispatcher.

type RunOutboxOptions struct {
    OutboxOptions
    Interval time.Duration
    Once     bool
    OnTick   func(outcomes []OutboxOutcome)
}

type RunPubSubBridgeOpts

RunPubSubBridgeOpts is the configuration block for the Pub/Sub bridge.

type RunPubSubBridgeOpts struct {
    Project      string
    Topic        string
    ReactionID   string
    Owner        string
    Once         bool
    PollInterval time.Duration
    ClaimMax     int32
    LeaseMs      int64
}

type RunReactorsOptions

type RunReactorsOptions struct {
    Redrive               RedriveFn
    Recover               bool
    Reconcile             bool
    Timers                bool
    Interval              time.Duration
    ReconcilePendingAfter time.Duration
    Once                  bool
    OnTick                func(tick map[string]any)
}

type SendSignalOpts

type SendSignalOpts struct {
    RunID, AppName, UserID, SessionID, GateName, ResolutionJSON string
}

type SetTimerOpts

type SetTimerOpts struct {
    RunID, TimerID, Kind, PayloadJSON string
    FireAtMs                          int64
}

type SpanHook

SpanHook — set by the host to receive Tape span events. Default is nil (no-op). A future opentelemetry-go integration would set this from `init`.

type SpanHook func(name string, attrs map[string]any) (end func(err error))

type StatusCheckResult

StatusCheckResult — the per-tool status_check returns one of these. Found = "the counterparty acknowledges this idempotency_key (the request did land)". ResponseJSON is what to record on the effect when Found.

type StatusCheckResult struct {
    Found        bool
    ResponseJSON string
}

type SubscribeEventsOpts

SubscribeEventsOpts is the rich form for the WAL tail RPC: a single struct covering both the legacy filters (FromTsMs / RunID / Kind) and the new event-bus filters (FromGlobalSeq / SubjectPattern). Mix-and-match freely; `from_ts_ms` is honoured only when `from_global_seq` is zero.

type SubscribeEventsOpts struct {
    FromTsMs       int64
    RunID          string
    Kind           string
    FromGlobalSeq  int64
    SubjectPattern string
}

type TenancyConfig

TenancyConfig — declared in tape.yaml; consumed by the SDK for log tagging and by `tape doctor` for the loud warning when `hard_multi_tenant` is requested but the runtime can't enforce it.

type TenancyConfig struct {
    Mode     TenancyMode
    TenantID string
}

func TenancyFromEnv

func TenancyFromEnv() TenancyConfig

TenancyFromEnv — read from $TAPE_TENANCY / $TAPE_TENANT_ID with sane defaults.

func (TenancyConfig) IsHard

func (t TenancyConfig) IsHard() bool

IsHard — true iff the configured mode is hard_multi_tenant.

func (TenancyConfig) WarnIfHardButUnenforced

func (t TenancyConfig) WarnIfHardButUnenforced() []string

WarnIfHardButUnenforced — return loud warnings when hard_multi_tenant is requested today (the proto and stores do not yet carry tenant_id).

type TenancyMode

TenancyMode — the supported deployment modes.

type TenancyMode string

const (
    TenancySingle          TenancyMode = "single"
    TenancyTrustedMultiApp TenancyMode = "trusted_multi_app"
    TenancyHardMultiTenant TenancyMode = "hard_multi_tenant"
)

Generated by gomarkdoc

connectors

import "github.com/vamsiramakrishnan/durable-agents/tape/sdk/go/connectors"

Package connectors — capability connector registry + protocol.

A `Connector` is what the outbox reactor calls to actually perform a side effect. Three operations, all idempotent on (RunID, IdempotencyKey):

  • Dispatch — perform / enqueue the effect
  • Observe — ask the counterparty about an UNKNOWN
  • Compensate — reverse a duplicate (or any registered obligation)

Built-in connectors: LogConnector (tests / demos), HttpConnector, PubSubConnector, CloudTasksConnector. Register your own:

import "github.com/vamsiramakrishnan/durable-agents/tape/sdk/go/connectors"

connectors.Default.Register("bank.wire", connectors.NewHttpConnector(
    connectors.HttpOpts{URL: "https://bank.example/wires"},
))

Index

Variables

Default — the process-global registry. Most projects register at init time and consume from here.

var Default = NewRegistry()

ErrAlreadyRegistered — registry.Register saw a name collision.

var ErrAlreadyRegistered = errors.New("connectors: already registered")

ErrCloudTasksNotBuilt — returned by the default build.

var ErrCloudTasksNotBuilt = errors.New("connectors/cloud_tasks: build with -tags cloudtasks")

ErrPubSubNotBuilt — returned by the default build of the Pub/Sub connector. Build with `-tags pubsub` to enable, and `go get cloud.google.com/go/pubsub`.

var ErrPubSubNotBuilt = errors.New("connectors/pubsub: build with -tags pubsub")

ErrUnknownConnector — registry.Get returned no match.

var ErrUnknownConnector = errors.New("connectors: unknown connector")

type CloudTasksConnector

CloudTasksConnector — Cloud Tasks-backed outbox connector.

type CloudTasksConnector struct {
    // contains filtered or unexported fields
}

func NewCloudTasksConnector

func NewCloudTasksConnector(opts CloudTasksOpts) *CloudTasksConnector

NewCloudTasksConnector — construct (stub in the default build).

func (*CloudTasksConnector) Compensate

func (c *CloudTasksConnector) Compensate(ctx context.Context, o Obligation) (CompensationResult, error)

func (*CloudTasksConnector) Dispatch

func (c *CloudTasksConnector) Dispatch(ctx context.Context, e Effect) (DispatchResult, error)

func (*CloudTasksConnector) Name

func (c *CloudTasksConnector) Name() string

func (*CloudTasksConnector) Observe

func (c *CloudTasksConnector) Observe(ctx context.Context, e Effect) (ObservationResult, error)

type CloudTasksOpts

CloudTasksOpts — configuration.

type CloudTasksOpts struct {
    Name           string
    Project        string
    Location       string
    Queue          string
    TargetURL      string
    ServiceAccount string
    ObserveURL     string
    CompensateURL  string
}

type CompensationOutcome

CompensationOutcome — what `Compensate()` resolved.

type CompensationOutcome string

const (
    CompensationCompensated CompensationOutcome = "compensated"
    CompensationPending     CompensationOutcome = "pending"
    CompensationStuck       CompensationOutcome = "stuck"
    CompensationFailed      CompensationOutcome = "failed"
)

type CompensationResult

type CompensationResult struct {
    Outcome  CompensationOutcome
    Response any
    Error    string
}

type Connector

Connector — the interface every capability connector implements. All three methods MUST be idempotent on (RunID, IdempotencyKey).

type Connector interface {
    Name() string
    Dispatch(ctx context.Context, effect Effect) (DispatchResult, error)
    Observe(ctx context.Context, effect Effect) (ObservationResult, error)
    Compensate(ctx context.Context, obligation Obligation) (CompensationResult, error)
}

type DispatchOutcome

DispatchOutcome — one of CONFIRMED / PENDING / UNKNOWN / FAILED.

type DispatchOutcome string

const (
    DispatchConfirmed DispatchOutcome = "confirmed"
    DispatchPending   DispatchOutcome = "pending"
    DispatchUnknown   DispatchOutcome = "unknown"
    DispatchFailed    DispatchOutcome = "failed"
)

type DispatchResult

DispatchResult / ObservationResult / CompensationResult — what each op returns. Idiomatic Go: `Outcome` is the discriminator; `Response` carries the structured detail.

type DispatchResult struct {
    Outcome      DispatchOutcome
    Response     any
    Error        string
    DispatchID   string
    RetryAfterMs int
}

type Effect

Effect — the intent the outbox reactor wants dispatched.

type Effect struct {
    RunID          string
    IdempotencyKey string
    ToolName       string
    Connector      string
    Payload        any
    BusinessKey    string
    Attempt        int
    Semantics      string
    TenantID       string
    AppName        string
    Metadata       map[string]any
}

type HttpConnector

HttpConnector — POST the intent payload to an HTTPS endpoint. Headers:

X-Tape-Idempotency-Key  the runner-derived dedup key
X-Tape-Business-Key     when supplied by the outbox tool
X-Tape-Run-Id           for traceability
X-Tape-Attempt          dispatch attempt #

Outcome mapping: 2xx => CONFIRMED, 4xx => FAILED, 5xx / network => UNKNOWN (the reactor will Observe()).

type HttpConnector struct {
    // contains filtered or unexported fields
}

func NewHttpConnector

func NewHttpConnector(opts HttpOpts) *HttpConnector

NewHttpConnector — construct with sane defaults.

func (*HttpConnector) Compensate

func (c *HttpConnector) Compensate(ctx context.Context, o Obligation) (CompensationResult, error)

func (*HttpConnector) Dispatch

func (c *HttpConnector) Dispatch(ctx context.Context, e Effect) (DispatchResult, error)

func (*HttpConnector) Name

func (c *HttpConnector) Name() string

func (*HttpConnector) Observe

func (c *HttpConnector) Observe(ctx context.Context, e Effect) (ObservationResult, error)

type HttpOpts

HttpOpts — configuration for `NewHttpConnector`.

type HttpOpts struct {
    Name          string        // defaults to "http"
    URL           string        // POST target for Dispatch
    ObserveURL    string        // POST target for Observe (status lookup)
    CompensateURL string        // POST target for Compensate
    Timeout       time.Duration // default 30s
    Headers       map[string]string
    HttpClient    *http.Client // optional override (e.g. mTLS)
}

type LogConnector

LogConnector — append each dispatch/observe/compensate as a JSON line. Useful for tests, demos, and the non-idempotent-bank example.

type LogConnector struct {
    // contains filtered or unexported fields
}

func NewLogConnector

func NewLogConnector(path string) *LogConnector

NewLogConnector — open / create the JSON-lines file at `path`.

func (*LogConnector) Compensate

func (c *LogConnector) Compensate(ctx context.Context, o Obligation) (CompensationResult, error)

func (*LogConnector) Dispatch

func (c *LogConnector) Dispatch(ctx context.Context, e Effect) (DispatchResult, error)

func (*LogConnector) Name

func (c *LogConnector) Name() string

func (*LogConnector) Observe

func (c *LogConnector) Observe(ctx context.Context, e Effect) (ObservationResult, error)

type Obligation

Obligation — the compensation obligation registered after a forward effect confirmed.

type Obligation struct {
    RunID          string
    EffectKey      string
    Kind           string
    Payload        any
    Attempt        int
    CompensatorRef string
    TenantID       string
}

type ObservationOutcome

ObservationOutcome — what `Observe()` resolved.

type ObservationOutcome string

const (
    ObservationConfirmed ObservationOutcome = "confirmed"
    ObservationAbsent    ObservationOutcome = "absent"
    ObservationDuplicate ObservationOutcome = "duplicate"
    ObservationStuck     ObservationOutcome = "stuck"
    ObservationUnknown   ObservationOutcome = "unknown"
)

type ObservationResult

type ObservationResult struct {
    Outcome  ObservationOutcome
    Response any
    Error    string
    Count    int
}

type PubSubConnector

PubSubConnector — Pub/Sub-backed outbox connector.

type PubSubConnector struct {
    // contains filtered or unexported fields
}

func NewPubSubConnector

func NewPubSubConnector(opts PubSubOpts) *PubSubConnector

NewPubSubConnector — construct (Dispatch returns ErrPubSubNotBuilt in the default build).

func (*PubSubConnector) Compensate

func (c *PubSubConnector) Compensate(ctx context.Context, o Obligation) (CompensationResult, error)

func (*PubSubConnector) Dispatch

func (c *PubSubConnector) Dispatch(ctx context.Context, e Effect) (DispatchResult, error)

func (*PubSubConnector) Name

func (c *PubSubConnector) Name() string

func (*PubSubConnector) Observe

func (c *PubSubConnector) Observe(ctx context.Context, e Effect) (ObservationResult, error)

type PubSubOpts

PubSubOpts — configuration. With the default build, these values are preserved on the connector but `Dispatch` returns ErrPubSubNotBuilt.

type PubSubOpts struct {
    Name            string
    Project         string
    Topic           string
    CompensateTopic string
    TapeURL         string // for the Observe() path via tape.GetValue
}

type Registry

Registry — a process-local registry of connectors keyed by name.

type Registry struct {
    // contains filtered or unexported fields
}

func NewRegistry

func NewRegistry() *Registry

NewRegistry — fresh, empty registry.

func (*Registry) Get

func (r *Registry) Get(name string) (Connector, error)

Get — fetch by name. Returns ErrUnknownConnector if absent.

func (*Registry) Names

func (r *Registry) Names() []string

Names — sorted-ish snapshot of registered names.

func (*Registry) Register

func (r *Registry) Register(name string, c Connector) error

Register — add a connector under `name`. Returns ErrAlreadyRegistered if `name` is taken.

func (*Registry) Replace

func (r *Registry) Replace(name string, c Connector)

Replace — install `c` under `name`, overwriting any prior value.

Generated by gomarkdoc

sinks

import "github.com/vamsiramakrishnan/durable-agents/tape/sdk/go/sinks"

Package sinks — WAL fan-out destinations.

A Sink is `Publish(ctx, entry) error` (+ optional `Close()`). Combined with RunEventFanout it gives an at-least-once relay; pair with consumer-side dedup on (run_id, seq) for exactly-once-effective delivery.

Built-in sinks:

  • LogSink — writes one JSON line per entry; useful as a tap.
  • WebhookSink — POSTs each entry with X-Tape-Event-Id; retries with backoff.
  • PubSubSink — publishes to Cloud Pub/Sub with ordering_key=run_id. Lazy: builds without the pubsub Go client unless `-tags pubsub` is used (same pattern as connectors/pubsub).

Index

Variables

ErrPubSubSinkNotBuilt — the default build ships a stub. Build with `-tags pubsub` (and `go get cloud.google.com/go/pubsub`) to enable.

var ErrPubSubSinkNotBuilt = errors.New("sinks/pubsub: build with -tags pubsub")

type Entry

Entry — the on-wire JSON representation of one journal entry. Stable; receivers dedupe on (run_id, seq).

type Entry struct {
    RunID       string `json:"run_id"`
    Seq         int64  `json:"seq"`
    Kind        string `json:"kind"`
    PayloadJSON string `json:"payload_json"`
    TsMs        int64  `json:"ts_ms"`
}

type FnSink

FnSink — wraps a function as a Sink.

type FnSink func(ctx context.Context, entry *pb.EventEntry) error

func (FnSink) Close

func (FnSink) Close() error

func (FnSink) Publish

func (f FnSink) Publish(ctx context.Context, e *pb.EventEntry) error

type LogSink

LogSink appends one JSON line per entry. `path=""` or `":stderr"` writes to stderr; `":stdout"` to stdout.

type LogSink struct {
    // contains filtered or unexported fields
}

func NewLogSink

func NewLogSink(path string) (*LogSink, error)

func (*LogSink) Close

func (s *LogSink) Close() error

func (*LogSink) Publish

func (s *LogSink) Publish(_ context.Context, e *pb.EventEntry) error

type PubSubSink

PubSubSink — stub. Use `-tags pubsub` for the real one.

type PubSubSink struct {
    // contains filtered or unexported fields
}

func NewPubSubSink

func NewPubSubSink(opts PubSubSinkOpts) (*PubSubSink, error)

NewPubSubSink — returns a stub that errors on Publish.

func (*PubSubSink) Close

func (s *PubSubSink) Close() error

Close — no-op in the stub build.

func (*PubSubSink) Publish

func (s *PubSubSink) Publish(ctx context.Context, _ *pb.EventEntry) error

Publish — always errors in the stub build.

type PubSubSinkOpts

PubSubSinkOpts — configuration for the Pub/Sub sink.

type PubSubSinkOpts struct {
    Project string
    Topic   string
}

type Sink

Sink — what RunEventFanout adapters call for every journal entry.

type Sink interface {
    Publish(ctx context.Context, entry *pb.EventEntry) error
    Close() error
}

type WebhookSink

WebhookSink — POST each entry as JSON. Sets `X-Tape-Event-Id: run_id/seq`.

type WebhookSink struct {
    // contains filtered or unexported fields
}

func NewWebhookSink

func NewWebhookSink(opts WebhookSinkOpts) (*WebhookSink, error)

func (*WebhookSink) Close

func (s *WebhookSink) Close() error

func (*WebhookSink) Publish

func (s *WebhookSink) Publish(ctx context.Context, e *pb.EventEntry) error

type WebhookSinkOpts

WebhookSinkOpts — POST each entry as JSON.

type WebhookSinkOpts struct {
    URL            string
    Headers        map[string]string
    MaxRetries     int
    InitialBackoff time.Duration
    Timeout        time.Duration
    HTTPClient     *http.Client
}

Generated by gomarkdoc