Temporal Guide¶
In Development
The Temporal backend is under active development. The core compile and run paths work, but the API may change. This guide documents the current design and intended patterns. Not recommended for production use yet.
This guide covers Temporal-specific patterns, constraints, and examples for running adk-fluent agents with durable execution. For general backend selection, see Execution Backends.
Why Temporal?¶
Temporal provides durable execution — your agent pipeline survives process crashes, network failures, and deployments. If a 10-step pipeline crashes at step 7, Temporal replays steps 1–6 from cached results (zero LLM cost) and re-executes only step 7 onward.
This matters for:
Long-running agent pipelines (research → write → review → publish)
Expensive LLM calls that shouldn’t be repeated on failure
Human-in-the-loop workflows that pause for approval
Distributed execution across multiple workers/machines
Setup¶
Install¶
pip install adk-fluent[temporal]
This adds temporalio as a dependency.
Start Temporal (local development)¶
# Install Temporal CLI
brew install temporal # macOS
# or: curl -sSf https://temporal.download/cli.sh | sh
# Start local dev server
temporal server start-dev
The Temporal UI is available at http://localhost:8233.
Connect¶
from temporalio.client import Client
client = await Client.connect("localhost:7233")
Basic Usage¶
Select the Temporal backend¶
from adk_fluent import Agent
agent = (
Agent("researcher", "gemini-2.5-flash")
.instruct("Research the topic thoroughly.")
.engine("temporal", client=client, task_queue="research")
)
# Execute — this starts a Temporal workflow
response = await agent.ask_async("quantum computing advances")
Pipelines¶
from adk_fluent import Agent
pipeline = (
Agent("researcher", "gemini-2.5-flash")
.instruct("Research the topic.").writes("findings")
>> Agent("writer", "gemini-2.5-flash")
.instruct("Write a report about {findings}.").writes("draft")
>> Agent("reviewer", "gemini-2.5-flash")
.instruct("Review and score the draft: {draft}")
)
pipeline = pipeline.engine("temporal", client=client, task_queue="pipeline")
response = await pipeline.ask_async("AI in healthcare")
Each agent step becomes a Temporal activity (non-deterministic LLM call). The pipeline structure becomes the workflow (deterministic orchestration). If the process crashes after the writer completes, only the reviewer re-executes.
Determinism Rules¶
Temporal replays workflow code to reconstruct state. This means workflow code must be deterministic — given the same inputs, it must produce the same outputs.
What is deterministic (safe in workflow code)¶
State transforms:
S.pick(),S.merge(),S.rename(), etc.Route decisions:
Route("key").eq("value", agent)Conditional gates:
.proceed_if(lambda s: s.get("valid"))Loop conditions:
until(lambda s: s.get("score") >= 0.8)Pure functions:
>> merge_research(no I/O)tap(fn)— observation only, no side effects
What is non-deterministic (becomes an activity)¶
LLM calls: every
AgentNode(the core of what agents do)Tool calls: external API calls, database queries, file I/O
datetime.now(),random(),uuid4()
What to avoid in workflow code¶
# BAD — non-deterministic in a transform
pipeline = agent >> (lambda s: {"time": datetime.now().isoformat()}) >> next_agent
# GOOD — use Temporal's workflow.now() inside an activity, or
# pass the timestamp as part of the initial input
IR → Temporal Mapping¶
Understanding how IR nodes map to Temporal concepts helps you design effective pipelines.
IR Node |
Temporal Concept |
Behavior |
|---|---|---|
|
Activity |
LLM call; result cached on replay |
|
Workflow body |
Sequential |
|
Workflow |
Parallel activity execution |
|
Workflow |
Each iteration checkpointed |
|
Inline code |
Pure function, replayed from history |
|
Inline code |
Observation, no I/O |
|
Inline code |
Deterministic switch |
|
Workflow try/except |
Try activity A, on failure try B |
|
Signal + |
Pauses for external input |
|
Child workflow |
Fire-and-forget background task |
|
Await child handle |
Wait for background task |
Key principle¶
Deterministic nodes = workflow code (replayed from history, zero cost). Non-deterministic nodes = activities (cached, re-executed only on first run or after crash).
Patterns¶
Crash-resilient pipeline¶
from adk_fluent import Agent, S
# A 4-step pipeline where each LLM call is an activity
pipeline = (
Agent("extractor", "gemini-2.5-flash")
.instruct("Extract entities from the document.").writes("entities")
>> Agent("enricher", "gemini-2.5-flash")
.instruct("Enrich {entities} with additional context.").writes("enriched")
>> Agent("analyzer", "gemini-2.5-flash")
.instruct("Analyze {enriched} for risks.").writes("analysis")
>> Agent("reporter", "gemini-2.5-flash")
.instruct("Write a risk report from {analysis}.")
)
pipeline = pipeline.engine("temporal", client=client, task_queue="docs")
If the process crashes after step 2 (enricher), Temporal replays:
extractor → cached result (0 LLM cost)
enricher → cached result (0 LLM cost)
analyzer → re-executes (LLM call)
reporter → executes (LLM call)
Parallel research with durability¶
from adk_fluent import Agent, S
research = (
(
Agent("web", "gemini-2.5-flash").instruct("Search the web.").writes("web_results")
| Agent("papers", "gemini-2.5-flash").instruct("Search papers.").writes("paper_results")
)
>> S.merge("web_results", "paper_results", into="research")
>> Agent("synthesizer", "gemini-2.5-flash")
.instruct("Synthesize {research} into a report.")
)
research = research.engine("temporal", client=client, task_queue="research")
The parallel branches run as concurrent activities. If one fails, Temporal retries it without affecting the other.
Review loop with checkpointing¶
from adk_fluent import Agent, until
loop = (
Agent("writer", "gemini-2.5-flash")
.instruct("Write or revise the draft.").writes("draft")
>> Agent("critic", "gemini-2.5-flash")
.instruct("Score the draft 0-10.").writes("score")
) * until(lambda s: int(s.get("score", 0)) >= 8, max=5)
loop = loop.engine("temporal", client=client, task_queue="quality")
Each iteration is checkpointed. If the process crashes mid-loop, Temporal replays completed iterations from cache and continues from the last checkpoint.
Fallback across models¶
from adk_fluent import Agent
answer = (
Agent("fast", "gemini-2.0-flash").instruct("Quick answer.")
// Agent("thorough", "gemini-2.5-pro").instruct("Detailed answer.")
)
answer = answer.engine("temporal", client=client, task_queue="qa")
If the fast model fails (timeout, error), Temporal catches the exception and tries the thorough model. Both attempts are activities — the fast model’s failure is recorded in history.
Constraints and Limitations¶
Streaming is not supported¶
Temporal workflows return results, not streams. .stream() falls back to collecting all events and yielding them at once:
# This works but doesn't actually stream — waits for full completion
async for chunk in pipeline.stream("prompt"):
print(chunk, end="")
For streaming UIs, use the ADK backend. For durable pipelines, use Temporal with .ask_async().
Sessions require external state¶
Temporal activities are stateless. Multi-turn .session() requires explicit state management:
# Instead of .session(), use .ask_async() with explicit state passing
# State is managed by Temporal's workflow history
response = await pipeline.ask_async("first question")
# The pipeline's state is preserved in Temporal history
All builders work, but execution semantics differ¶
Every adk-fluent builder, operator, and namespace module works with Temporal. The definition is identical. What changes is the execution:
Feature |
ADK |
Temporal |
|---|---|---|
|
Works |
Not recommended (blocks) |
|
Works |
Works (starts workflow) |
|
Real streaming |
Falls back to batch |
|
In-memory history |
Requires external state |
|
Concurrent tasks |
Each prompt = workflow |
State transforms |
Inline |
Deterministic replay |
Middleware |
ADK plugins |
Runtime hooks |
Worker Setup¶
For the Temporal backend to execute workflows, you need a running worker:
from adk_fluent.backends.temporal_worker import create_worker
async def main():
client = await Client.connect("localhost:7233")
worker = await create_worker(client, task_queue="agents")
await worker.run()
The worker registers the generic adk_fluent_agent_workflow and its activities with Temporal. Multiple workers can run on different machines for horizontal scaling.
Comparison: When to Use What¶
Scenario |
Use ADK |
Use Temporal |
|---|---|---|
Quick prototype |
Yes |
No |
Short-lived agent (< 1 min) |
Yes |
Overkill |
Multi-step pipeline (minutes) |
Maybe |
Yes |
Must survive crashes |
No |
Yes |
Real-time streaming UI |
Yes |
No |
Multi-turn chat session |
Yes |
Not ideal |
Batch processing (1000 prompts) |
Yes |
Yes (each = workflow) |
Human approval step |
No |
Yes (signals) |
Distributed across machines |
No |
Yes |
See also
Execution Backends — overview of all backends and selection guide
IR & Backends — IR node types and the compilation pipeline
Execution —
.ask(),.stream(),.session()execution methodsPatterns — higher-order patterns that compose well with Temporal