Go package reference⌗
Generated
Generated from godoc by gomarkdoc. Edits here will be overwritten — to change the
content, edit the SDK's godoc comments and run scripts/docs/gen_go.sh.
tape⌗
Package tape — the Go client over the `tape.v1` gRPC service.
URL schemes:
tape://host:port plaintext gRPC (self-hosted, k8s, local)
tapes://host TLS on :443 (Cloud Run / any HTTPS endpoint)
On `tapes://`, if the endpoint is IAM-protected (e.g. an internal Cloud Run service), Tape attaches a Google OIDC ID token automatically via the google.golang.org/api/idtoken package — the caller's service account just needs roles/run.invoker. Set Auth=false / Audience="" to override.
Reactions — the event-bus user surface for Go.
This file is the Go-side companion to `design-principles/tape-event-bus.md` and a mirror of `tape/sdk/python/tape/reactions.py`. The shape:
-
`On(...)` and the `OnXxx(...)` convenience wrappers collect a process-global registry of `ReactionDef`s with their handler closures. The registration calls are DECLARATIONS only — they do not contact the server. Push them to the server with `RegisterAll(ctx, c, prefix)`.
-
`RunDispatcher(ctx, c, opts)` is the in-process reference dispatcher: for every TASK-kind registered reaction, claim a bounded batch from the server, dispatch handlers with backpressure (max_concurrency / rate_limit_per_s / debounce_ms), and complete/nack each task. The server enforces retry & DLQ — the dispatcher just calls `NackTask(permanent=…)` once `attempts >= dlq_after_n`. AGENT reactions are handled entirely on the server; PUBLISH reactions are pulled by the Pub/Sub bridge (`RunPubSubBridge`).
-
Subject path segments are URL-encoded except for the wildcards `*` and `**`. A `key=""` argument is treated as `**` (rest wildcard).
This module is intentionally optional: nothing else in the SDK depends on it. You can use the Tape journal/values/effects perfectly well without ever importing the reactions surface.
Index⌗
- Constants
- Variables
- func ClearRegistry()
- func DefaultURL() string
- func FireDueTimersOnce(ctx context.Context, c *Client, redrive RedriveFn) (int, error)
- func GetCompensator(kind string) func(ctx context.Context, payload []byte) error
- func GetStatusCheck(tool string) func(ctx context.Context, key string) (StatusCheckResult, error)
- func IsOutboxConfigError(err error) bool
- func IsOutboxEnvelope(v any) bool
- func LogJSON(msg string, fields map[string]any)
- func LogLevel(level, msg string, fields map[string]any)
- func On(def ReactionDef)
- func OnDecisionRecorded(h ReactionHandler, opts ReactionDef)
- func OnEffectConfirmed(tool string, h ReactionHandler, opts ReactionDef)
- func OnEffectFailed(tool string, h ReactionHandler, opts ReactionDef)
- func OnEffectUnknown(tool string, h ReactionHandler, opts ReactionDef)
- func OnGate(gate, verb string, h ReactionHandler, opts ReactionDef)
- func OnRun(status string, h ReactionHandler, opts ReactionDef)
- func OnValueChange(namespace, key string, h ReactionHandler, opts ReactionDef)
- func OnValueDeleted(namespace, key string, h ReactionHandler, opts ReactionDef)
- func ReconcileOnce(ctx context.Context, c *Client, reconcilePendingAfter time.Duration) (int, error)
- func RecoverOnce(ctx context.Context, c *Client, redrive RedriveFn) ([]string, error)
- func RegisterAll(ctx context.Context, c *Client, prefix string) ([]*pb.Reaction, error)
- func RegisterCompensator(kind string, fn func(ctx context.Context, payload []byte) error)
- func RegisterStatusCheck(tool string, fn func(ctx context.Context, key string) (StatusCheckResult, error))
- func RunDispatcher(ctx context.Context, c *Client, opts RunDispatcherOpts) error
- func RunEventFanout(ctx context.Context, c *Client, fromTsMs int64, runID, kind string, sink func(entry *pb.EventEntry) error) error
- func RunEventFanoutBySubject(ctx context.Context, c *Client, subjectPattern, predicateCEL string, fromGlobalSeq int64, sink func(entry *pb.EventEntry) error) error
- func RunEventFanoutWith(ctx context.Context, c *Client, o RunEventFanoutOpts, sink func(entry *pb.EventEntry) error) error
- func RunOutboxDispatcher(ctx context.Context, c *Client, opt RunOutboxOptions) error
- func RunPubSubBridge(ctx context.Context, c *Client, opts RunPubSubBridgeOpts) error
- func RunReactors(ctx context.Context, c *Client, opt RunReactorsOptions) error
- func SetOTelHooks(h OTelHooks)
- func SetSpanHook(h SpanHook)
- func Span(name string, attrs map[string]any) func(err error)
- type BeginEffectOpts
- type BeginRunOpts
- type Budget
- type ClaimTasksOpts
- type Client
- func Dial(url string, opts ...Options) (*Client, error)
- func (c *Client) AdmitBudget(ctx context.Context, runID string, usd float64, tokens int64) (*pb.AdmitBudgetResponse, error)
- func (c *Client) AppendEvent(ctx context.Context, app, user, session string, ev *pb.EventRecord, stateDeltaJSON string) (*pb.AppendEventResponse, error)
- func (c *Client) AwaitSignal(ctx context.Context, runID, gate, payloadJSON string) (*pb.AwaitSignalResponse, error)
- func (c *Client) BeginEffect(ctx context.Context, o BeginEffectOpts) (*pb.BeginEffectResponse, error)
- func (c *Client) BeginRun(ctx context.Context, o BeginRunOpts) (*pb.BeginRunResponse, error)
- func (c *Client) CancelTimer(ctx context.Context, runID, timerID string) (*pb.CancelTimerResponse, error)
- func (c *Client) ChargeBudget(ctx context.Context, runID string, usd float64, tokens int64) (*pb.BudgetState, error)
- func (c *Client) ClaimEffectDispatch(ctx context.Context, runID, key, claimer string, leaseTTLMs int64) (*pb.ClaimEffectDispatchResponse, error)
- func (c *Client) ClaimObligation(ctx context.Context, runID string, obligationSeq int64, claimer string, leaseTtlMs int64) (*pb.ClaimObligationResponse, error)
- func (c *Client) ClaimTasks(ctx context.Context, o ClaimTasksOpts) ([]*pb.Task, error)
- func (c *Client) Close() error
- func (c *Client) CompleteEffect(ctx context.Context, runID, key string, status int32, responseJSON, errorJSON string) (*pb.EffectRecord, error)
- func (c *Client) CompleteTask(ctx context.Context, taskID, owner string) (*pb.Task, error)
- func (c *Client) Conn() *grpc.ClientConn
- func (c *Client) CreateSession(ctx context.Context, app, user, session, stateJSON string) (*pb.Session, error)
- func (c *Client) DeregisterReaction(ctx context.Context, reactionID string) (bool, error)
- func (c *Client) EndRun(ctx context.Context, runID string, status int32, detailJSON string) (*pb.EndRunResponse, error)
- func (c *Client) GetDecision(ctx context.Context, runID string, idx int64) (*pb.GetDecisionResponse, error)
- func (c *Client) GetEffect(ctx context.Context, runID, key string) (*pb.GetEffectResponse, error)
- func (c *Client) GetRun(ctx context.Context, runID string) (*pb.RunState, error)
- func (c *Client) GetSession(ctx context.Context, app, user, session string, maxEvents int64) (*pb.GetSessionResponse, error)
- func (c *Client) ListDueTimers(ctx context.Context, nowMs, limit int64, claim bool) (*pb.ListDueTimersResponse, error)
- func (c *Client) ListEffectsToDispatch(ctx context.Context, connector string, limit int64) (*pb.ListEffectsToDispatchResponse, error)
- func (c *Client) ListObligations(ctx context.Context, runID string, onlyUnresolved bool, opts ...ListObligationsOpts) (*pb.ListObligationsResponse, error)
- func (c *Client) ListPendingEffects(ctx context.Context, olderThanMs int64, includePending, includeUnknown bool, limit int64) (*pb.ListPendingEffectsResponse, error)
- func (c *Client) ListReactions(ctx context.Context, subjectPattern string) ([]*pb.Reaction, error)
- func (c *Client) ListRunsToRecover(ctx context.Context, limit int64) (*pb.ListRunsToRecoverResponse, error)
- func (c *Client) ListTasks(ctx context.Context, reactionID string, status pb.TaskStatus, limit int32) ([]*pb.Task, error)
- func (c *Client) ListUnresolvedObligations(ctx context.Context, opts ListUnresolvedObligationsOpts) (*pb.ListUnresolvedObligationsResponse, error)
- func (c *Client) NackTask(ctx context.Context, taskID, owner, errMsg string, permanent bool) (*pb.Task, error)
- func (c *Client) PB() pb.TapeClient
- func (c *Client) ReconcileEffect(ctx context.Context, runID, key string, resolved int32, responseJSON, errorJSON string) (*pb.EffectRecord, error)
- func (c *Client) RecordDecision(ctx context.Context, runID string, idx int64, model, requestJSON, responseJSON, rationale, policyVersion string) (*pb.DecisionRecord, error)
- func (c *Client) RecordDispatchAttempt(ctx context.Context, runID, key, errMsg string, nextDispatchAtMs int64) (*pb.EffectRecord, error)
- func (c *Client) RecordExternalObservation(ctx context.Context, o RecordExternalObservationOpts) (*pb.EffectRecord, error)
- func (c *Client) RecordObligationAttempt(ctx context.Context, runID string, obligationSeq int64, errMsg string, nextAttemptAtMs int64) (*pb.ObligationRecord, error)
- func (c *Client) RegisterCompensation(ctx context.Context, runID, effectKey, kind, payloadJSON string, opts ...RegisterCompensationOpts) (*pb.ObligationRecord, error)
- func (c *Client) RegisterReaction(ctx context.Context, o RegisterReactionOpts) (*pb.Reaction, error)
- func (c *Client) ResolveObligation(ctx context.Context, runID string, obligationSeq int64, status int32, resultJSON string) (*pb.ObligationRecord, error)
- func (c *Client) SendSignal(ctx context.Context, o SendSignalOpts) (*pb.SendSignalResponse, error)
- func (c *Client) SetBudget(ctx context.Context, runID string, usdCap float64, tokenCap int64) (*pb.BudgetState, error)
- func (c *Client) SetTimer(ctx context.Context, o SetTimerOpts) (*pb.TimerRecord, error)
- func (c *Client) SubscribeBySubject(ctx context.Context, subjectPattern, predicateCEL string, fromGlobalSeq int64) (grpc.ServerStreamingClient[pb.EventEntry], error)
- func (c *Client) SubscribeEvents(ctx context.Context, fromTsMs int64, runID, kind string) (grpc.ServerStreamingClient[pb.EventEntry], error)
- func (c *Client) SubscribeEventsWith(ctx context.Context, o SubscribeEventsOpts) (grpc.ServerStreamingClient[pb.EventEntry], error)
- func (c *Client) URL() string
- type DurableApp
- func NewDurableApp(ctx context.Context, cfg DurableConfig) (*DurableApp, error)
- func (d *DurableApp) Close() error
- type DurableConfig
- type Envelope
- type ListObligationsOpts
- type ListUnresolvedObligationsOpts
- type OTelHooks
- type Options
- type OutboxConfigError
- func (e *OutboxConfigError) Error() string
- type OutboxEnvelope
- type OutboxOptions
- type OutboxOutcome
- func DispatchOne(ctx context.Context, c *Client, eff *pb.EffectRecord, opt OutboxOptions) OutboxOutcome
- func OutboxDispatchOnce(ctx context.Context, c *Client, opt OutboxOptions) ([]OutboxOutcome, error)
- type OutboxSemantics
- type OutboxTool
- func MustOutboxTool(opts OutboxToolOpts) *OutboxTool
- func NewOutboxTool(opts OutboxToolOpts) (*OutboxTool, error)
- func (t *OutboxTool) Envelope(payload map[string]any) (map[string]any, error)
- func (t *OutboxTool) EnvelopeJSON(payload map[string]any) ([]byte, error)
- type OutboxToolOpts
- type ReactionDef
- func GetRegistry() []*ReactionDef
- type ReactionHandler
- type RecordExternalObservationOpts
- type RedriveFn
- type RegisterCompensationOpts
- type RegisterReactionOpts
- type RunDispatcherOpts
- type RunEventFanoutOpts
- type RunOutboxOptions
- type RunPubSubBridgeOpts
- type RunReactorsOptions
- type SendSignalOpts
- type SetTimerOpts
- type SpanHook
- type StatusCheckResult
- type SubscribeEventsOpts
- type TenancyConfig
- func TenancyFromEnv() TenancyConfig
- func (t TenancyConfig) IsHard() bool
- func (t TenancyConfig) WarnIfHardButUnenforced() []string
- type TenancyMode
Constants⌗
──── status enums (re-exported for ergonomic use) ──────────────────────────
const (
RunStatusRunnable = int32(pb.RunStatus_RUN_STATUS_RUNNABLE)
RunStatusRunning = int32(pb.RunStatus_RUN_STATUS_RUNNING)
RunStatusWaiting = int32(pb.RunStatus_RUN_STATUS_WAITING)
RunStatusTerminal = int32(pb.RunStatus_RUN_STATUS_TERMINAL)
RunStatusFailed = int32(pb.RunStatus_RUN_STATUS_FAILED)
RunStatusCompensating = int32(pb.RunStatus_RUN_STATUS_COMPENSATING)
RunStatusStuck = int32(pb.RunStatus_RUN_STATUS_STUCK)
RunStatusCancelled = int32(pb.RunStatus_RUN_STATUS_CANCELLED)
EffectStatusPending = int32(pb.EffectStatus_EFFECT_STATUS_PENDING)
EffectStatusConfirmed = int32(pb.EffectStatus_EFFECT_STATUS_CONFIRMED)
EffectStatusFailed = int32(pb.EffectStatus_EFFECT_STATUS_FAILED)
EffectStatusUnknown = int32(pb.EffectStatus_EFFECT_STATUS_UNKNOWN)
// Outbox / non-idempotent contract (see proto: EffectSemantics,
// EffectDispatchMode, EffectResolution). The defaults preserve v1 behaviour
// (idempotent + inline); opt into the outbox path by passing the
// non-default values to BeginEffect.
EffectSemanticsIdempotent = int32(pb.EffectSemantics_EFFECT_SEMANTICS_IDEMPOTENT)
EffectSemanticsNonIdempotent = int32(pb.EffectSemantics_EFFECT_SEMANTICS_NON_IDEMPOTENT)
EffectSemanticsObserveOnly = int32(pb.EffectSemantics_EFFECT_SEMANTICS_OBSERVE_ONLY)
EffectDispatchInline = int32(pb.EffectDispatchMode_EFFECT_DISPATCH_MODE_INLINE)
EffectDispatchOutbox = int32(pb.EffectDispatchMode_EFFECT_DISPATCH_MODE_OUTBOX)
EffectResolutionConfirmed = int32(pb.EffectResolution_EFFECT_RESOLUTION_CONFIRMED)
EffectResolutionFailed = int32(pb.EffectResolution_EFFECT_RESOLUTION_FAILED)
EffectResolutionAbsent = int32(pb.EffectResolution_EFFECT_RESOLUTION_ABSENT)
EffectResolutionDuplicate = int32(pb.EffectResolution_EFFECT_RESOLUTION_DUPLICATE)
EffectResolutionStuck = int32(pb.EffectResolution_EFFECT_RESOLUTION_STUCK)
ObligationPending = int32(pb.ObligationStatus_OBLIGATION_STATUS_PENDING)
ObligationCommitted = int32(pb.ObligationStatus_OBLIGATION_STATUS_COMMITTED)
ObligationCompensated = int32(pb.ObligationStatus_OBLIGATION_STATUS_COMPENSATED)
ObligationStuck = int32(pb.ObligationStatus_OBLIGATION_STATUS_STUCK)
)
HandlerKind and TaskStatus re-exports for ergonomic call-sites.
const (
HandlerKindAgent = pb.HandlerKind_HANDLER_KIND_AGENT
HandlerKindTask = pb.HandlerKind_HANDLER_KIND_TASK
HandlerKindPublish = pb.HandlerKind_HANDLER_KIND_PUBLISH
TaskStatusPending = pb.TaskStatus_TASK_STATUS_PENDING
TaskStatusClaimed = pb.TaskStatus_TASK_STATUS_CLAIMED
TaskStatusDone = pb.TaskStatus_TASK_STATUS_DONE
TaskStatusFailed = pb.TaskStatus_TASK_STATUS_FAILED
TaskStatusDLQ = pb.TaskStatus_TASK_STATUS_DLQ
)
Span-name constants — use these instead of stringly-typed magic.
const (
SpanBeginRun = "tape.begin_run"
SpanResumeRun = "tape.resume_run"
SpanRecordDecision = "tape.record_decision"
SpanBeginEffect = "tape.begin_effect"
SpanCompleteEffect = "tape.complete_effect"
SpanReconcileEffect = "tape.reconcile_effect"
SpanDispatchEffect = "tape.dispatch_effect"
SpanCompensate = "tape.compensate"
SpanRedrive = "tape.redrive"
SpanAwaitSignal = "tape.await_signal"
SpanSendSignal = "tape.send_signal"
)
Variables⌗
AllSpans — the canonical span-name list.
var AllSpans = []string{
SpanBeginRun, SpanResumeRun, SpanRecordDecision,
SpanBeginEffect, SpanCompleteEffect,
SpanReconcileEffect, SpanDispatchEffect,
SpanCompensate, SpanRedrive,
SpanAwaitSignal, SpanSendSignal,
}
ErrPubSubNotBuilt is returned by RunPubSubBridge when the Pub/Sub backend is not compiled in. Build with `-tags pubsub` after adding `cloud.google.com/go/pubsub` to your module to enable it.
var ErrPubSubNotBuilt = errors.New("tape: RunPubSubBridge requires building with `-tags pubsub` and adding cloud.google.com/go/pubsub to your module")
StructuredFields — the canonical field order for structured log records. `LogJSON` emits keys in this order when present.
var StructuredFields = []string{
"ts", "level", "msg",
"tenant_id", "app_name", "run_id", "invocation_id", "session_id",
"seq", "effect_key", "decision_index", "reactor", "lease_owner",
}
func ClearRegistry⌗
ClearRegistry drops every registered reaction. Test-only helper.
func DefaultURL⌗
DefaultURL — honours $TAPE_URL.
func FireDueTimersOnce⌗
func GetCompensator⌗
func GetStatusCheck⌗
func IsOutboxConfigError⌗
IsOutboxConfigError — type-check helper.
func IsOutboxEnvelope⌗
IsOutboxEnvelope — given an arbitrary tool result, report whether it is an outbox envelope. Used by the Go ADK adapter (and by hand-written dispatchers) to recognise an intent vs a direct result.
func LogJSON⌗
LogJSON — emit one JSON line to stderr, ordered by `StructuredFields`. Extra fields are appended after the canonical block.
func LogLevel⌗
LogLevel — convenience for log_json at a specific level.
func On⌗
On appends `def` to the process-global reaction registry. The decoration is declarative — the server is not contacted until RegisterAll. If `def.Agent` is set the reaction kind defaults to AGENT; if `def.Publish` is set it defaults to PUBLISH; otherwise it's TASK. It is an error to set both.
func OnDecisionRecorded⌗
OnDecisionRecorded fires on every decision write.
func OnEffectConfirmed⌗
OnEffectConfirmed fires on `/tape/effect/confirmed/\<tool>/**`. Pass tool="" or tool="*" to match any tool.
func OnEffectFailed⌗
OnEffectFailed fires on `/tape/effect/failed/\<tool>/**`.
func OnEffectUnknown⌗
OnEffectUnknown fires on `/tape/effect/unknown/\<tool>/**`.
func OnGate⌗
OnGate fires on a gate lifecycle event. `verb=""` defaults to `"released"`; pass `"waiting"` for the park event.
func OnRun⌗
OnRun fires on run lifecycle events. `status=""` defaults to `"terminal"`.
func OnValueChange⌗
OnValueChange fires when a value in `(namespace, key)` is written. `key=""` matches any key (rest wildcard `**`); `key="*"` matches one segment.
func OnValueDeleted⌗
OnValueDeleted is the deletion counterpart of OnValueChange.
func ReconcileOnce⌗
func ReconcileOnce(ctx context.Context, c *Client, reconcilePendingAfter time.Duration) (int, error)
func RecoverOnce⌗
func RegisterAll⌗
RegisterAll pushes every registered reaction to the server via RegisterReaction. The returned slice is the persisted `Reaction` rows with the canonical `reaction_id` filled in. `prefix` is prepended to each reaction's `name` so concurrent test runs don't collide.
Idempotent on `ReactionID` — the server upserts by id. Reactions that declared `ReactionID=""` get a stable server-minted id which we cache on the ReactionDef so RunDispatcher can claim by it.
func RegisterCompensator⌗
func RegisterStatusCheck⌗
func RegisterStatusCheck(tool string, fn func(ctx context.Context, key string) (StatusCheckResult, error))
func RunDispatcher⌗
RunDispatcher is the in-proc dispatcher loop. For every TASK-kind reaction in the registry, it:
- Calls ClaimTasks(reaction_id, shard=-1, ...) in a tight outer loop.
- Submits each claimed Task to the reaction's semaphore-bounded worker pool. Inside the worker: rate-limit via token bucket, drop a re-trigger of the same subject inside the debounce window (complete it as a no-op), run the handler, CompleteTask on success or NackTask(permanent=…) on failure.
AGENT and PUBLISH reactions are NOT driven here — the server creates matching runs for AGENT, and the Pub/Sub bridge pulls PUBLISH tasks.
`Once=true` returns after one pass over every reaction (handy for tests).
func RunEventFanout⌗
func RunEventFanout(ctx context.Context, c *Client, fromTsMs int64, runID, kind string, sink func(entry *pb.EventEntry) error) error
RunEventFanout is the legacy timestamp-cursored WAL fan-out. Preserved for back-compat — new code should use RunEventFanoutBySubject (global_seq cursored) or RunEventFanoutWith (the option-struct form supporting both cursors plus a subject pattern).
Deprecated: prefer RunEventFanoutBySubject for global_seq cursoring.
func RunEventFanoutBySubject⌗
func RunEventFanoutBySubject(ctx context.Context, c *Client, subjectPattern, predicateCEL string, fromGlobalSeq int64, sink func(entry *pb.EventEntry) error) error
RunEventFanoutBySubject tails the journal via SubscribeBySubject, cursored on `global_seq`. The matching subject pattern follows the path-style grammar (`*` = one segment, `**` = trailing segments). `predicateCEL` is an optional server-side CEL filter on the envelope.
func RunEventFanoutWith⌗
func RunEventFanoutWith(ctx context.Context, c *Client, o RunEventFanoutOpts, sink func(entry *pb.EventEntry) error) error
RunEventFanoutWith is the option-struct form of RunEventFanout. Honours both the legacy and the event-bus filter fields.
func RunOutboxDispatcher⌗
RunOutboxDispatcher polls the outbox forever (or once). Cancel via ctx.
func RunPubSubBridge⌗
RunPubSubBridge — stub when not built with `-tags pubsub`. Returns ErrPubSubNotBuilt so callers get a clear message instead of a silent no-op.
To enable: add `cloud.google.com/go/pubsub` to your module and build with `-tags pubsub`. See `reactions_pubsub.go` for the active implementation.
func RunReactors⌗
func SetOTelHooks⌗
SetOTelHooks installs OTel propagation hooks for the dispatcher. Calling with an empty struct restores the no-op default.
func SetSpanHook⌗
SetSpanHook — install a span hook. Used by tracing adapters; safe to call multiple times (last wins).
func Span⌗
Span — open a span via the installed hook (or a no-op). Returns the end function the caller defers.
type BeginEffectOpts⌗
type BeginEffectOpts struct {
RunID string
DecisionIndex int64
ToolName string
CallIndex int32
RequestJSON string
CustomKey string
// Outbox / non-idempotent contract. Zero values keep the v1 behaviour
// (idempotent + inline); set Semantics + DispatchMode + BusinessKey +
// Connector to opt into the outbox path. Server refuses
// NON_IDEMPOTENT + INLINE — surface that error to the caller.
Semantics int32
DispatchMode int32
BusinessKey string
Connector string
}
type BeginRunOpts⌗
type BeginRunOpts struct {
AppName, UserID, SessionID, InvocationID, LeaseOwner string
LeaseTTLMs int64
}
type Budget⌗
Budget — the per-run budget. USDCap/TokenCap of 0 means "no cap".
type ClaimTasksOpts⌗
ClaimTasksOpts is the option-struct form of the ClaimTasks RPC.
Defaults applied client-side: LeaseMs \<=0 → 60_000, Max \<=0 → 16. Shard \<0 asks the server for tasks from any shard.
type ClaimTasksOpts struct {
ReactionID string
Shard int32 // <0 = any
Owner string
LeaseMs int64 // <=0 = 60_000
Max int32 // <=0 = 16
NowMs int64 // 0 = server time
}
type Client⌗
Client wraps a *grpc.ClientConn and the generated pb.TapeClient with friendlier helpers. Use Conn() / PB() if you want raw access.
func Dial⌗
func (*Client) AdmitBudget⌗
func (c *Client) AdmitBudget(ctx context.Context, runID string, usd float64, tokens int64) (*pb.AdmitBudgetResponse, error)
func (*Client) AppendEvent⌗
func (c *Client) AppendEvent(ctx context.Context, app, user, session string, ev *pb.EventRecord, stateDeltaJSON string) (*pb.AppendEventResponse, error)
func (*Client) AwaitSignal⌗
func (c *Client) AwaitSignal(ctx context.Context, runID, gate, payloadJSON string) (*pb.AwaitSignalResponse, error)
func (*Client) BeginEffect⌗
func (c *Client) BeginEffect(ctx context.Context, o BeginEffectOpts) (*pb.BeginEffectResponse, error)
func (*Client) BeginRun⌗
func (*Client) CancelTimer⌗
func (c *Client) CancelTimer(ctx context.Context, runID, timerID string) (*pb.CancelTimerResponse, error)
func (*Client) ChargeBudget⌗
func (c *Client) ChargeBudget(ctx context.Context, runID string, usd float64, tokens int64) (*pb.BudgetState, error)
func (*Client) ClaimEffectDispatch⌗
func (c *Client) ClaimEffectDispatch(ctx context.Context, runID, key, claimer string, leaseTTLMs int64) (*pb.ClaimEffectDispatchResponse, error)
ClaimEffectDispatch is the atomic CAS lease on the dispatch slot. Returns acquired=false (with the current row) when another dispatcher already holds the lease — the loser must not call the upstream.
func (*Client) ClaimObligation⌗
func (c *Client) ClaimObligation(ctx context.Context, runID string, obligationSeq int64, claimer string, leaseTtlMs int64) (*pb.ClaimObligationResponse, error)
func (*Client) ClaimTasks⌗
ClaimTasks atomically leases up to `Max` pending tasks for the dispatcher.
func (*Client) Close⌗
func (*Client) CompleteEffect⌗
func (c *Client) CompleteEffect(ctx context.Context, runID, key string, status int32, responseJSON, errorJSON string) (*pb.EffectRecord, error)
func (*Client) CompleteTask⌗
CompleteTask marks the task as DONE (success ack from the handler).
func (*Client) Conn⌗
func (*Client) CreateSession⌗
func (c *Client) CreateSession(ctx context.Context, app, user, session, stateJSON string) (*pb.Session, error)
func (*Client) DeregisterReaction⌗
DeregisterReaction marks the reaction `deleted=true`. Returns whether the server flipped the bit (false on unknown id).
func (*Client) EndRun⌗
func (c *Client) EndRun(ctx context.Context, runID string, status int32, detailJSON string) (*pb.EndRunResponse, error)
func (*Client) GetDecision⌗
func (c *Client) GetDecision(ctx context.Context, runID string, idx int64) (*pb.GetDecisionResponse, error)
func (*Client) GetEffect⌗
func (*Client) GetRun⌗
func (*Client) GetSession⌗
func (c *Client) GetSession(ctx context.Context, app, user, session string, maxEvents int64) (*pb.GetSessionResponse, error)
func (*Client) ListDueTimers⌗
func (c *Client) ListDueTimers(ctx context.Context, nowMs, limit int64, claim bool) (*pb.ListDueTimersResponse, error)
func (*Client) ListEffectsToDispatch⌗
func (c *Client) ListEffectsToDispatch(ctx context.Context, connector string, limit int64) (*pb.ListEffectsToDispatchResponse, error)
ListEffectsToDispatch returns PENDING+OUTBOX effects whose next_dispatch_at_ms \<= now and whose dispatch lease is empty or expired. `connector` scopes the result to one connector name; empty means any.
func (*Client) ListObligations⌗
func (c *Client) ListObligations(ctx context.Context, runID string, onlyUnresolved bool, opts ...ListObligationsOpts) (*pb.ListObligationsResponse, error)
func (*Client) ListPendingEffects⌗
func (c *Client) ListPendingEffects(ctx context.Context, olderThanMs int64, includePending, includeUnknown bool, limit int64) (*pb.ListPendingEffectsResponse, error)
func (*Client) ListReactions⌗
ListReactions returns every active reaction. Pass an empty pattern to list all reactions; otherwise an exact-match on `subject_pattern` is applied.
func (*Client) ListRunsToRecover⌗
func (c *Client) ListRunsToRecover(ctx context.Context, limit int64) (*pb.ListRunsToRecoverResponse, error)
func (*Client) ListTasks⌗
func (c *Client) ListTasks(ctx context.Context, reactionID string, status pb.TaskStatus, limit int32) ([]*pb.Task, error)
ListTasks returns tasks filtered by reaction and status (useful for observability and DLQ inspection).
func (*Client) ListUnresolvedObligations⌗
func (c *Client) ListUnresolvedObligations(ctx context.Context, opts ListUnresolvedObligationsOpts) (*pb.ListUnresolvedObligationsResponse, error)
func (*Client) NackTask⌗
func (c *Client) NackTask(ctx context.Context, taskID, owner, errMsg string, permanent bool) (*pb.Task, error)
NackTask reports a failed attempt. Set `permanent=true` to push it straight to DLQ; otherwise the server will re-lease until `dlq_after_n` is reached.
func (*Client) PB⌗
func (*Client) ReconcileEffect⌗
func (c *Client) ReconcileEffect(ctx context.Context, runID, key string, resolved int32, responseJSON, errorJSON string) (*pb.EffectRecord, error)
func (*Client) RecordDecision⌗
func (c *Client) RecordDecision(ctx context.Context, runID string, idx int64, model, requestJSON, responseJSON, rationale, policyVersion string) (*pb.DecisionRecord, error)
func (*Client) RecordDispatchAttempt⌗
func (c *Client) RecordDispatchAttempt(ctx context.Context, runID, key, errMsg string, nextDispatchAtMs int64) (*pb.EffectRecord, error)
RecordDispatchAttempt reports a *failed* dispatch. `nextDispatchAtMs \<= 0` asks the server to transition the effect to UNKNOWN (the safety exit for a lost ack — the reconciler resolves via observe(), no blind retry); a positive value schedules a retry.
func (*Client) RecordExternalObservation⌗
func (c *Client) RecordExternalObservation(ctx context.Context, o RecordExternalObservationOpts) (*pb.EffectRecord, error)
func (*Client) RecordObligationAttempt⌗
func (c *Client) RecordObligationAttempt(ctx context.Context, runID string, obligationSeq int64, errMsg string, nextAttemptAtMs int64) (*pb.ObligationRecord, error)
func (*Client) RegisterCompensation⌗
func (c *Client) RegisterCompensation(ctx context.Context, runID, effectKey, kind, payloadJSON string, opts ...RegisterCompensationOpts) (*pb.ObligationRecord, error)
func (*Client) RegisterReaction⌗
func (c *Client) RegisterReaction(ctx context.Context, o RegisterReactionOpts) (*pb.Reaction, error)
RegisterReaction creates (or upserts on `reaction_id`) a server-side reaction. The returned Reaction echoes the persisted row with its server-assigned `reaction_id` filled in.
func (*Client) ResolveObligation⌗
func (c *Client) ResolveObligation(ctx context.Context, runID string, obligationSeq int64, status int32, resultJSON string) (*pb.ObligationRecord, error)
func (*Client) SendSignal⌗
func (*Client) SetBudget⌗
func (c *Client) SetBudget(ctx context.Context, runID string, usdCap float64, tokenCap int64) (*pb.BudgetState, error)
func (*Client) SetTimer⌗
func (*Client) SubscribeBySubject⌗
func (c *Client) SubscribeBySubject(ctx context.Context, subjectPattern, predicateCEL string, fromGlobalSeq int64) (grpc.ServerStreamingClient[pb.EventEntry], error)
SubscribeBySubject opens a streaming WAL tail filtered by subject pattern and an optional CEL predicate, cursored on `global_seq`. Iterate with .Recv() until io.EOF.
func (*Client) SubscribeEvents⌗
func (c *Client) SubscribeEvents(ctx context.Context, fromTsMs int64, runID, kind string) (grpc.ServerStreamingClient[pb.EventEntry], error)
SubscribeEvents returns a streaming client; iterate with .Recv() until io.EOF.
Legacy entry point. New code should use SubscribeEventsOpts (which supports `from_global_seq` and `subject_pattern`) or SubscribeBySubject.
func (*Client) SubscribeEventsWith⌗
func (c *Client) SubscribeEventsWith(ctx context.Context, o SubscribeEventsOpts) (grpc.ServerStreamingClient[pb.EventEntry], error)
SubscribeEventsWith is the option-struct form of SubscribeEvents. Prefer this for new code; it supports the event-bus rebuild fields.
func (*Client) URL⌗
type DurableApp⌗
DurableApp — the wired bundle returned by `NewDurableApp()`.
type DurableApp struct {
Cfg DurableConfig
URL string
Client *Client
LeaseOwner string
LeaseTTLMs int64
// contains filtered or unexported fields
}
func NewDurableApp⌗
NewDurableApp — resolve config, dial the server, return the bundle. Honours `$TAPE_URL` for `TapeURL` and `$TAPE_LEASE_MS` for `LeaseTTLMs`.
Mirrors `tape.adk.durable_app(...)` from the Python SDK. The Go ADK adapter, when it lands, will accept a `*DurableApp` directly.
func (*DurableApp) Close⌗
Close — close the underlying client.
type DurableConfig⌗
DurableConfig — the user-facing settings. Sensible defaults everywhere.
type DurableConfig struct {
// Name — the ADK App name. Required.
Name string
// TapeURL — defaults to $TAPE_URL, then "tape://localhost:7878".
TapeURL string
// Budget — admit/charge thresholds. Zero values mean "no cap".
Budget Budget
// Resumable — enable ADK ResumabilityConfig(is_resumable=True). The Go
// ADK adapter will read this; today it is recorded for completeness.
Resumable bool
// CheckCancellation — let the plugin poll RunStatus on every model
// boundary. The Go ADK adapter will read this.
CheckCancellation bool
// LeaseOwner — overrides the default "<hostname>:<pid>" identity.
LeaseOwner string
// LeaseTTLMs — overrides the default lease (120_000 ms when 0).
LeaseTTLMs int64
// ClientOptions — passed verbatim to `tape.Dial(...)`.
ClientOptions *Options
}
type Envelope⌗
Envelope is the per-task payload handed to the user handler. It mirrors the CEL envelope on the server: a handler can use the same field names that server-side predicates use.
type Envelope struct {
Task *pb.Task
Payload map[string]any // parsed payload_json; nil if not JSON-decodable
}
type ListObligationsOpts⌗
ListObligationsOpts: StatusFilter == 0 means "any status"; otherwise it's an exact ObligationStatus match. OnlyUnresolved is the shorthand "exclude terminal COMPENSATED/STUCK".
type ListUnresolvedObligationsOpts⌗
ListUnresolvedObligationsOpts: the cross-run drainer feed. Defaults pick up ready-to-run PENDING plus COMMITTED rows whose lease has expired.
type ListUnresolvedObligationsOpts struct {
Limit int32
NowMs int64
IncludePending bool
IncludeStuck bool
IncludeCommittedExpired bool
}
type OTelHooks⌗
OTelHooks lets callers plug in real OTel context propagation without the SDK depending on `go.opentelemetry.io/otel`. If the start hook returns a non-nil func, the dispatcher calls it after the handler returns.
type OTelHooks struct {
// Start opens a child span from the (trace_id, parent_span_id) on the
// task and returns the span-bearing context plus an `end` func. The
// dispatcher calls `end()` after the handler returns. A nil Start means
// "no propagation".
Start func(ctx context.Context, traceIDHex, parentSpanIDHex, name string) (context.Context, func())
}
type Options⌗
Options for Dial.
type Options struct {
Auth bool // default true on tapes://
Audience string // override the OIDC audience derived from the URL
IDToken string // static ID token (overrides Auth)
DialOpts []grpc.DialOption
}
type OutboxConfigError⌗
OutboxConfigError — returned by NewOutboxTool when the configuration is unsafe (typically: non_idempotent without any recovery path).
func (*OutboxConfigError) Error⌗
type OutboxEnvelope⌗
OutboxEnvelope — the JSON-serialisable intent the runner returns. The outbox reactor recognises envelopes by the `__outbox__: true` sentinel.
type OutboxEnvelope struct {
Outbox bool `json:"__outbox__"`
Connector string `json:"connector"`
Tool string `json:"tool"`
Semantics string `json:"semantics"`
WaitForResult bool `json:"wait_for_result"`
HumanGate bool `json:"human_gate"`
DispatchTimeoutMs int64 `json:"dispatch_timeout_ms,omitempty"`
BusinessKey string `json:"business_key,omitempty"`
Payload map[string]any `json:"payload"`
}
type OutboxOptions⌗
OutboxOptions — configuration for one dispatch pass.
type OutboxOptions struct {
Connector string // restrict to one connector name
Registry *connectors.Registry // defaults to connectors.Default
Claimer string // identity for dispatch_claimed_by
Limit int64 // ListEffectsToDispatch limit (default 200)
DispatchMaxAttempts int // give up after N (default 5)
}
type OutboxOutcome⌗
OutboxOutcome — per-effect outcome of one dispatch attempt.
type OutboxOutcome struct {
RunID string `json:"run_id"`
IdempotencyKey string `json:"idempotency_key"`
Connector string `json:"connector"`
Tool string `json:"tool"`
Status string `json:"status"` // confirmed | unknown | failed | retry-scheduled | skipped | error
Reason string `json:"reason,omitempty"`
ExternalRef string `json:"external_ref,omitempty"`
Error string `json:"error,omitempty"`
NextAtMs int64 `json:"next_at_ms,omitempty"`
Attempts int `json:"attempts,omitempty"`
}
func DispatchOne⌗
func DispatchOne(ctx context.Context, c *Client, eff *pb.EffectRecord, opt OutboxOptions) OutboxOutcome
DispatchOne runs a single effect through its connector and records the result. Returns the per-effect outcome.
func OutboxDispatchOnce⌗
OutboxDispatchOnce runs one pass of the outbox dispatcher.
type OutboxSemantics⌗
OutboxSemantics — the semantics tag on an outbox tool.
const (
OutboxIdempotent OutboxSemantics = "idempotent"
OutboxAtLeastOnce OutboxSemantics = "at_least_once"
OutboxNonIdempotent OutboxSemantics = "non_idempotent"
)
type OutboxTool⌗
OutboxTool — the value returned by `NewOutboxTool`. Use `Envelope(...)` from a tool body to produce the intent payload the runner will return.
func MustOutboxTool⌗
MustOutboxTool — like NewOutboxTool but panics on configuration errors. Use in init() blocks where a misconfiguration should crash early.
func NewOutboxTool⌗
NewOutboxTool — build a validated outbox tool. Returns `*OutboxConfigError` for unsafe configurations.
func (*OutboxTool) Envelope⌗
Envelope — build the JSON envelope the outbox reactor will dispatch. `payload` MUST be JSON-serialisable; it is wrapped, not consumed.
func (*OutboxTool) EnvelopeJSON⌗
EnvelopeJSON — convenience wrapper that returns the encoded envelope.
type OutboxToolOpts⌗
OutboxToolOpts — configuration for `NewOutboxTool`.
type OutboxToolOpts struct {
// Name — the tool name as it appears in the journal / model API.
Name string
// Connector — the registered capability connector name (see
// `tape/connectors`). E.g. "bank.wire".
Connector string
// Semantics — defaults to OutboxIdempotent.
Semantics OutboxSemantics
// BusinessKey — given the intent payload, derive a stable key the
// counterparty can dedup on (typically a hash of the irreducible
// identifying fields).
BusinessKey func(payload map[string]any) (string, error)
// StatusCheck — registered with `RegisterStatusCheck(Name, fn)`; the
// reconciler invokes it to resolve UNKNOWN effects.
StatusCheck func(ctx context.Context, idempotencyKey string) (StatusCheckResult, error)
// Compensate — registered with `RegisterCompensator(<name>, fn)`; the
// compensation reactor runs it for duplicate dispatches.
Compensate func(ctx context.Context, payload []byte) error
// WaitForResult — when true, the ADK runner parks the call until the
// dispatch resolves (the Go adapter consumes this hint).
WaitForResult bool
// HumanGate — when true, the run parks on a gate before the outbox
// dispatches. Required for non_idempotent tools that have no other
// safety net.
HumanGate bool
// DispatchTimeoutMs — soft deadline the connector should honour (0 =
// connector default).
DispatchTimeoutMs int64
// MaxAttempts — outbox-reactor retry budget (server default 5 when 0).
MaxAttempts int
}
type ReactionDef⌗
ReactionDef is one `On(...)` declaration before it's registered on the server. Field semantics match the proto / Python types one-for-one. Leave fields zero-valued to inherit defaults; `On(...)` normalises Min(1, …) where appropriate.
type ReactionDef struct {
SubjectPattern string
Predicate string
Agent string // kind=AGENT; if set, kind defaults to AGENT
Publish string // kind=PUBLISH; if set, kind defaults to PUBLISH
Name string
ReactionID string
MaxConcurrency int
RateLimitPerS int
DebounceMs int
RetryMax int
RetryBackoffMs int
DLQAfterN int
NumShards int
BootstrapFromHead bool
Handler ReactionHandler
// contains filtered or unexported fields
}
func GetRegistry⌗
GetRegistry returns a snapshot of the registered reactions. Mostly useful for tests / introspection.
type ReactionHandler⌗
ReactionHandler is the user callback the in-proc dispatcher runs for every claimed task. Returning a non-nil error triggers a NackTask (with permanent=true once attempts >= dlq_after_n).
type RecordExternalObservationOpts⌗
RecordExternalObservationOpts records what the counterparty said about an effect — the reconciler's write path. `Resolution` is one of EffectResolution*. When DUPLICATE + `CompensateOnDuplicateKind` is set, the server registers a compensation obligation atomically with the observation.
type RecordExternalObservationOpts struct {
RunID string
Key string
Resolution int32
ExternalRef string
ResponseJSON string
ErrorJSON string
CompensateOnDuplicateKind string
}
type RedriveFn⌗
RedriveFn — how a reactor re-invokes a stalled run. On Vertex AI Agent Engine the implementation calls :streamQuery; on Cloud Run / GKE / locally it's typically a wrapper that hands control back to the agent process.
type RegisterCompensationOpts⌗
RegisterCompensationOpts: the extra options grew over time; use named fields rather than a five-arg positional call. CompensatorRef ("module:attr") lets a generic drainer resolve the inverse without importing your agent. MaxAttempts of 0 falls back to the server default (5).
type RegisterReactionOpts⌗
RegisterReactionOpts mirrors the `Reaction` proto fields the client supplies at registration time. Leave a field zero/empty for the server default.
type RegisterReactionOpts struct {
ReactionID string
Name string
SubjectPattern string
PredicateCEL string
HandlerKind pb.HandlerKind
AgentApp string
PublishTarget string
MaxConcurrency int32
RateLimitPerS int32
DebounceMs int32
RetryMax int32
RetryBackoffMs int32
DLQAfterN int32
NumShards int32
BootstrapFromHead bool
}
type RunDispatcherOpts⌗
RunDispatcherOpts is the configuration block for the in-proc dispatcher.
type RunDispatcherOpts struct {
Owner string
PollInterval time.Duration
Once bool
ClaimMax int32
LeaseMs int64
}
type RunEventFanoutOpts⌗
RunEventFanoutOpts mirrors SubscribeEventsOpts: pass `FromGlobalSeq` / `SubjectPattern` for the event-bus path or `FromTsMs` / `RunID` / `Kind` for the legacy filters.
type RunEventFanoutOpts struct {
FromTsMs int64
RunID string
Kind string
FromGlobalSeq int64
SubjectPattern string
}
type RunOutboxOptions⌗
RunOutboxOptions — long-lived dispatcher.
type RunOutboxOptions struct {
OutboxOptions
Interval time.Duration
Once bool
OnTick func(outcomes []OutboxOutcome)
}
type RunPubSubBridgeOpts⌗
RunPubSubBridgeOpts is the configuration block for the Pub/Sub bridge.
type RunPubSubBridgeOpts struct {
Project string
Topic string
ReactionID string
Owner string
Once bool
PollInterval time.Duration
ClaimMax int32
LeaseMs int64
}
type RunReactorsOptions⌗
type RunReactorsOptions struct {
Redrive RedriveFn
Recover bool
Reconcile bool
Timers bool
Interval time.Duration
ReconcilePendingAfter time.Duration
Once bool
OnTick func(tick map[string]any)
}
type SendSignalOpts⌗
type SetTimerOpts⌗
type SpanHook⌗
SpanHook — set by the host to receive Tape span events. Default is nil (no-op). A future opentelemetry-go integration would set this from `init`.
type StatusCheckResult⌗
StatusCheckResult — the per-tool status_check returns one of these. Found = "the counterparty acknowledges this idempotency_key (the request did land)". ResponseJSON is what to record on the effect when Found.
type SubscribeEventsOpts⌗
SubscribeEventsOpts is the rich form for the WAL tail RPC: a single struct covering both the legacy filters (FromTsMs / RunID / Kind) and the new event-bus filters (FromGlobalSeq / SubjectPattern). Mix-and-match freely; `from_ts_ms` is honoured only when `from_global_seq` is zero.
type SubscribeEventsOpts struct {
FromTsMs int64
RunID string
Kind string
FromGlobalSeq int64
SubjectPattern string
}
type TenancyConfig⌗
TenancyConfig — declared in tape.yaml; consumed by the SDK for log tagging and by `tape doctor` for the loud warning when `hard_multi_tenant` is requested but the runtime can't enforce it.
func TenancyFromEnv⌗
TenancyFromEnv — read from $TAPE_TENANCY / $TAPE_TENANT_ID with sane defaults.
func (TenancyConfig) IsHard⌗
IsHard — true iff the configured mode is hard_multi_tenant.
func (TenancyConfig) WarnIfHardButUnenforced⌗
WarnIfHardButUnenforced — return loud warnings when hard_multi_tenant is requested today (the proto and stores do not yet carry tenant_id).
type TenancyMode⌗
TenancyMode — the supported deployment modes.
const (
TenancySingle TenancyMode = "single"
TenancyTrustedMultiApp TenancyMode = "trusted_multi_app"
TenancyHardMultiTenant TenancyMode = "hard_multi_tenant"
)
Generated by gomarkdoc
connectors⌗
Package connectors — capability connector registry + protocol.
A `Connector` is what the outbox reactor calls to actually perform a side effect. Three operations, all idempotent on (RunID, IdempotencyKey):
- Dispatch — perform / enqueue the effect
- Observe — ask the counterparty about an UNKNOWN
- Compensate — reverse a duplicate (or any registered obligation)
Built-in connectors: LogConnector (tests / demos), HttpConnector, PubSubConnector, CloudTasksConnector. Register your own:
import "github.com/vamsiramakrishnan/durable-agents/tape/sdk/go/connectors"
connectors.Default.Register("bank.wire", connectors.NewHttpConnector(
connectors.HttpOpts{URL: "https://bank.example/wires"},
))
Index⌗
- Variables
- type CloudTasksConnector
- func NewCloudTasksConnector(opts CloudTasksOpts) *CloudTasksConnector
- func (c *CloudTasksConnector) Compensate(ctx context.Context, o Obligation) (CompensationResult, error)
- func (c *CloudTasksConnector) Dispatch(ctx context.Context, e Effect) (DispatchResult, error)
- func (c *CloudTasksConnector) Name() string
- func (c *CloudTasksConnector) Observe(ctx context.Context, e Effect) (ObservationResult, error)
- type CloudTasksOpts
- type CompensationOutcome
- type CompensationResult
- type Connector
- type DispatchOutcome
- type DispatchResult
- type Effect
- type HttpConnector
- func NewHttpConnector(opts HttpOpts) *HttpConnector
- func (c *HttpConnector) Compensate(ctx context.Context, o Obligation) (CompensationResult, error)
- func (c *HttpConnector) Dispatch(ctx context.Context, e Effect) (DispatchResult, error)
- func (c *HttpConnector) Name() string
- func (c *HttpConnector) Observe(ctx context.Context, e Effect) (ObservationResult, error)
- type HttpOpts
- type LogConnector
- func NewLogConnector(path string) *LogConnector
- func (c *LogConnector) Compensate(ctx context.Context, o Obligation) (CompensationResult, error)
- func (c *LogConnector) Dispatch(ctx context.Context, e Effect) (DispatchResult, error)
- func (c *LogConnector) Name() string
- func (c *LogConnector) Observe(ctx context.Context, e Effect) (ObservationResult, error)
- type Obligation
- type ObservationOutcome
- type ObservationResult
- type PubSubConnector
- func NewPubSubConnector(opts PubSubOpts) *PubSubConnector
- func (c *PubSubConnector) Compensate(ctx context.Context, o Obligation) (CompensationResult, error)
- func (c *PubSubConnector) Dispatch(ctx context.Context, e Effect) (DispatchResult, error)
- func (c *PubSubConnector) Name() string
- func (c *PubSubConnector) Observe(ctx context.Context, e Effect) (ObservationResult, error)
- type PubSubOpts
- type Registry
- func NewRegistry() *Registry
- func (r *Registry) Get(name string) (Connector, error)
- func (r *Registry) Names() []string
- func (r *Registry) Register(name string, c Connector) error
- func (r *Registry) Replace(name string, c Connector)
Variables⌗
Default — the process-global registry. Most projects register at init time and consume from here.
ErrAlreadyRegistered — registry.Register saw a name collision.
ErrCloudTasksNotBuilt — returned by the default build.
ErrPubSubNotBuilt — returned by the default build of the Pub/Sub connector. Build with `-tags pubsub` to enable, and `go get cloud.google.com/go/pubsub`.
ErrUnknownConnector — registry.Get returned no match.
type CloudTasksConnector⌗
CloudTasksConnector — Cloud Tasks-backed outbox connector.
func NewCloudTasksConnector⌗
NewCloudTasksConnector — construct (stub in the default build).
func (*CloudTasksConnector) Compensate⌗
func (c *CloudTasksConnector) Compensate(ctx context.Context, o Obligation) (CompensationResult, error)
func (*CloudTasksConnector) Dispatch⌗
func (*CloudTasksConnector) Name⌗
func (*CloudTasksConnector) Observe⌗
type CloudTasksOpts⌗
CloudTasksOpts — configuration.
type CloudTasksOpts struct {
Name string
Project string
Location string
Queue string
TargetURL string
ServiceAccount string
ObserveURL string
CompensateURL string
}
type CompensationOutcome⌗
CompensationOutcome — what `Compensate()` resolved.
const (
CompensationCompensated CompensationOutcome = "compensated"
CompensationPending CompensationOutcome = "pending"
CompensationStuck CompensationOutcome = "stuck"
CompensationFailed CompensationOutcome = "failed"
)
type CompensationResult⌗
type Connector⌗
Connector — the interface every capability connector implements. All three methods MUST be idempotent on (RunID, IdempotencyKey).
type Connector interface {
Name() string
Dispatch(ctx context.Context, effect Effect) (DispatchResult, error)
Observe(ctx context.Context, effect Effect) (ObservationResult, error)
Compensate(ctx context.Context, obligation Obligation) (CompensationResult, error)
}
type DispatchOutcome⌗
DispatchOutcome — one of CONFIRMED / PENDING / UNKNOWN / FAILED.
const (
DispatchConfirmed DispatchOutcome = "confirmed"
DispatchPending DispatchOutcome = "pending"
DispatchUnknown DispatchOutcome = "unknown"
DispatchFailed DispatchOutcome = "failed"
)
type DispatchResult⌗
DispatchResult / ObservationResult / CompensationResult — what each op returns. Idiomatic Go: `Outcome` is the discriminator; `Response` carries the structured detail.
type DispatchResult struct {
Outcome DispatchOutcome
Response any
Error string
DispatchID string
RetryAfterMs int
}
type Effect⌗
Effect — the intent the outbox reactor wants dispatched.
type Effect struct {
RunID string
IdempotencyKey string
ToolName string
Connector string
Payload any
BusinessKey string
Attempt int
Semantics string
TenantID string
AppName string
Metadata map[string]any
}
type HttpConnector⌗
HttpConnector — POST the intent payload to an HTTPS endpoint. Headers:
X-Tape-Idempotency-Key the runner-derived dedup key
X-Tape-Business-Key when supplied by the outbox tool
X-Tape-Run-Id for traceability
X-Tape-Attempt dispatch attempt #
Outcome mapping: 2xx => CONFIRMED, 4xx => FAILED, 5xx / network => UNKNOWN (the reactor will Observe()).
func NewHttpConnector⌗
NewHttpConnector — construct with sane defaults.
func (*HttpConnector) Compensate⌗
func (*HttpConnector) Dispatch⌗
func (*HttpConnector) Name⌗
func (*HttpConnector) Observe⌗
type HttpOpts⌗
HttpOpts — configuration for `NewHttpConnector`.
type HttpOpts struct {
Name string // defaults to "http"
URL string // POST target for Dispatch
ObserveURL string // POST target for Observe (status lookup)
CompensateURL string // POST target for Compensate
Timeout time.Duration // default 30s
Headers map[string]string
HttpClient *http.Client // optional override (e.g. mTLS)
}
type LogConnector⌗
LogConnector — append each dispatch/observe/compensate as a JSON line. Useful for tests, demos, and the non-idempotent-bank example.
func NewLogConnector⌗
NewLogConnector — open / create the JSON-lines file at `path`.
func (*LogConnector) Compensate⌗
func (*LogConnector) Dispatch⌗
func (*LogConnector) Name⌗
func (*LogConnector) Observe⌗
type Obligation⌗
Obligation — the compensation obligation registered after a forward effect confirmed.
type Obligation struct {
RunID string
EffectKey string
Kind string
Payload any
Attempt int
CompensatorRef string
TenantID string
}
type ObservationOutcome⌗
ObservationOutcome — what `Observe()` resolved.
const (
ObservationConfirmed ObservationOutcome = "confirmed"
ObservationAbsent ObservationOutcome = "absent"
ObservationDuplicate ObservationOutcome = "duplicate"
ObservationStuck ObservationOutcome = "stuck"
ObservationUnknown ObservationOutcome = "unknown"
)
type ObservationResult⌗
type PubSubConnector⌗
PubSubConnector — Pub/Sub-backed outbox connector.
func NewPubSubConnector⌗
NewPubSubConnector — construct (Dispatch returns ErrPubSubNotBuilt in the default build).
func (*PubSubConnector) Compensate⌗
func (*PubSubConnector) Dispatch⌗
func (*PubSubConnector) Name⌗
func (*PubSubConnector) Observe⌗
type PubSubOpts⌗
PubSubOpts — configuration. With the default build, these values are preserved on the connector but `Dispatch` returns ErrPubSubNotBuilt.
type PubSubOpts struct {
Name string
Project string
Topic string
CompensateTopic string
TapeURL string // for the Observe() path via tape.GetValue
}
type Registry⌗
Registry — a process-local registry of connectors keyed by name.
func NewRegistry⌗
NewRegistry — fresh, empty registry.
func (*Registry) Get⌗
Get — fetch by name. Returns ErrUnknownConnector if absent.
func (*Registry) Names⌗
Names — sorted-ish snapshot of registered names.
func (*Registry) Register⌗
Register — add a connector under `name`. Returns ErrAlreadyRegistered if `name` is taken.
func (*Registry) Replace⌗
Replace — install `c` under `name`, overwriting any prior value.
Generated by gomarkdoc
sinks⌗
Package sinks — WAL fan-out destinations.
A Sink is `Publish(ctx, entry) error` (+ optional `Close()`). Combined with RunEventFanout it gives an at-least-once relay; pair with consumer-side dedup on (run_id, seq) for exactly-once-effective delivery.
Built-in sinks:
- LogSink — writes one JSON line per entry; useful as a tap.
- WebhookSink — POSTs each entry with X-Tape-Event-Id; retries with backoff.
- PubSubSink — publishes to Cloud Pub/Sub with ordering_key=run_id. Lazy: builds without the pubsub Go client unless `-tags pubsub` is used (same pattern as connectors/pubsub).
Index⌗
- Variables
- type Entry
- type FnSink
- func (FnSink) Close() error
- func (f FnSink) Publish(ctx context.Context, e *pb.EventEntry) error
- type LogSink
- func NewLogSink(path string) (*LogSink, error)
- func (s *LogSink) Close() error
- func (s *LogSink) Publish(_ context.Context, e *pb.EventEntry) error
- type PubSubSink
- func NewPubSubSink(opts PubSubSinkOpts) (*PubSubSink, error)
- func (s *PubSubSink) Close() error
- func (s *PubSubSink) Publish(ctx context.Context, _ *pb.EventEntry) error
- type PubSubSinkOpts
- type Sink
- type WebhookSink
- func NewWebhookSink(opts WebhookSinkOpts) (*WebhookSink, error)
- func (s *WebhookSink) Close() error
- func (s *WebhookSink) Publish(ctx context.Context, e *pb.EventEntry) error
- type WebhookSinkOpts
Variables⌗
ErrPubSubSinkNotBuilt — the default build ships a stub. Build with `-tags pubsub` (and `go get cloud.google.com/go/pubsub`) to enable.
type Entry⌗
Entry — the on-wire JSON representation of one journal entry. Stable; receivers dedupe on (run_id, seq).
type Entry struct {
RunID string `json:"run_id"`
Seq int64 `json:"seq"`
Kind string `json:"kind"`
PayloadJSON string `json:"payload_json"`
TsMs int64 `json:"ts_ms"`
}
type FnSink⌗
FnSink — wraps a function as a Sink.
func (FnSink) Close⌗
func (FnSink) Publish⌗
type LogSink⌗
LogSink appends one JSON line per entry. `path=""` or `":stderr"` writes to stderr; `":stdout"` to stdout.
func NewLogSink⌗
func (*LogSink) Close⌗
func (*LogSink) Publish⌗
type PubSubSink⌗
PubSubSink — stub. Use `-tags pubsub` for the real one.
func NewPubSubSink⌗
NewPubSubSink — returns a stub that errors on Publish.
func (*PubSubSink) Close⌗
Close — no-op in the stub build.
func (*PubSubSink) Publish⌗
Publish — always errors in the stub build.
type PubSubSinkOpts⌗
PubSubSinkOpts — configuration for the Pub/Sub sink.
type Sink⌗
Sink — what RunEventFanout adapters call for every journal entry.
type WebhookSink⌗
WebhookSink — POST each entry as JSON. Sets `X-Tape-Event-Id: run_id/seq`.
func NewWebhookSink⌗
func (*WebhookSink) Close⌗
func (*WebhookSink) Publish⌗
type WebhookSinkOpts⌗
WebhookSinkOpts — POST each entry as JSON.
type WebhookSinkOpts struct {
URL string
Headers map[string]string
MaxRetries int
InitialBackoff time.Duration
Timeout time.Duration
HTTPClient *http.Client
}
Generated by gomarkdoc