One-Shot Execution¶
adk-fluent provides execution helpers that eliminate Runner and Session boilerplate. These methods let you go from builder to result in a single call.
Execution backend awareness
All execution methods below work with the default ADK backend. When using alternative backends, behavior varies:
Method |
ADK (default) |
Temporal (in dev) |
asyncio (in dev) |
|---|---|---|---|
|
Works |
Not recommended (blocking) |
Works |
|
Works |
Works (starts workflow) |
Works |
|
Real-time streaming |
Falls back to batch |
Works |
|
In-memory history |
Requires external state |
Works |
|
Concurrent tasks |
Each prompt = workflow |
Concurrent tasks |
See Execution Backends for details. The examples below use the default ADK backend.
.ask(prompt)¶
Send a prompt, get response text. No Runner or Session setup needed.
from adk_fluent import Agent
agent = Agent("helper", "gemini-2.5-flash").instruct("You are a helpful assistant.")
response = agent.ask("What is the capital of France?")
print(response) # "The capital of France is Paris."
# .ask() is sync and blocks — use .ask_async() with Temporal instead.
# Temporal requires an async event loop to start workflows.
from adk_fluent import Agent
agent = (
Agent("helper", "gemini-2.5-flash")
.instruct("You are a helpful assistant.")
.engine("asyncio")
)
response = agent.ask("What is the capital of France?")
print(response)
.ask() internally builds the agent, creates a Runner and Session, sends the prompt, and returns the final response text.
.ask_async(prompt)¶
Async version of .ask(). Required for the Temporal backend.
import asyncio
from adk_fluent import Agent
agent = Agent("helper", "gemini-2.5-flash").instruct("You are a helpful assistant.")
async def main():
response = await agent.ask_async("What is the capital of France?")
print(response)
asyncio.run(main())
import asyncio
from temporalio.client import Client
from adk_fluent import Agent
async def main():
client = await Client.connect("localhost:7233")
agent = (
Agent("helper", "gemini-2.5-flash")
.instruct("You are a helpful assistant.")
.engine("temporal", client=client, task_queue="qa")
)
# Starts a Temporal workflow — durable, crash-recoverable
response = await agent.ask_async("What is the capital of France?")
print(response)
asyncio.run(main())
import asyncio
from adk_fluent import Agent
async def main():
agent = (
Agent("helper", "gemini-2.5-flash")
.instruct("You are a helpful assistant.")
.engine("asyncio")
)
response = await agent.ask_async("What is the capital of France?")
print(response)
asyncio.run(main())
.stream(prompt)¶
Async generator that yields response text chunks as they arrive.
import asyncio
from adk_fluent import Agent
agent = Agent("helper", "gemini-2.5-flash").instruct("You are a helpful assistant.")
async def main():
async for chunk in agent.stream("Tell me a story."):
print(chunk, end="", flush=True)
asyncio.run(main())
Real-time streaming — chunks arrive as the LLM generates them.
# Temporal does NOT support real-time streaming.
# .stream() falls back to collecting all events, then yielding them at once.
# Use .ask_async() instead for Temporal workloads.
async for chunk in agent.stream("Tell me a story."):
print(chunk, end="", flush=True) # All chunks arrive at once
import asyncio
from adk_fluent import Agent
agent = Agent("helper", "gemini-2.5-flash").instruct("Tell stories.").engine("asyncio")
async def main():
async for chunk in agent.stream("Tell me a story."):
print(chunk, end="", flush=True)
asyncio.run(main())
.events(prompt)¶
Async generator that yields raw ADK Event objects, giving full access to state deltas, function calls, and other event metadata:
import asyncio
from adk_fluent import Agent
agent = Agent("helper", "gemini-2.5-flash").instruct("You are a helpful assistant.")
async def main():
async for event in agent.events("What is 2+2?"):
print(event)
asyncio.run(main())
.session()¶
Create an interactive multi-turn session using async with:
import asyncio
from adk_fluent import Agent
agent = Agent("helper", "gemini-2.5-flash").instruct("You are a helpful assistant.")
async def main():
async with agent.session() as chat:
r1 = await chat.send("Hi, my name is Alice.")
print(r1)
r2 = await chat.send("What is my name?")
print(r2) # Should remember "Alice"
asyncio.run(main())
The session maintains conversation history across turns, enabling multi-turn interactions.
.map(prompts, concurrency=5)¶
Batch execution against multiple prompts with bounded concurrency:
from adk_fluent import Agent
agent = Agent("helper", "gemini-2.5-flash").instruct("Translate to French.")
prompts = ["Hello", "Goodbye", "Thank you", "Please"]
results = agent.map(prompts, concurrency=3)
for prompt, result in zip(prompts, results):
print(f"{prompt} -> {result}")
.map_async(prompts, concurrency=5)¶
Async version of .map():
import asyncio
from adk_fluent import Agent
agent = Agent("helper", "gemini-2.5-flash").instruct("Translate to French.")
async def main():
prompts = ["Hello", "Goodbye", "Thank you"]
results = await agent.map_async(prompts, concurrency=3)
for prompt, result in zip(prompts, results):
print(f"{prompt} -> {result}")
asyncio.run(main())
.test(prompt, contains=, matches=, equals=)¶
Smoke test: calls .ask() and asserts the output matches the given condition:
from adk_fluent import Agent
agent = Agent("helper", "gemini-2.5-flash").instruct("You are a math tutor.")
# Assert the response contains a specific string
agent.test("What is 2+2?", contains="4")
# Assert with regex
agent.test("What is 2+2?", matches=r"\b4\b")
# Assert exact match
agent.test("What is 2+2?", equals="4")
.test() returns self so it can be chained with other builder methods. It is useful for quick inline smoke tests during development.
.to_app(config=None)¶
Compile the builder through IR to a native ADK App object. This is the production-grade alternative to .build() that supports middleware, resumability, and event compaction:
from adk_fluent import Agent, ExecutionConfig, CompactionConfig
app = (
Agent("prod")
.model("gemini-2.5-flash")
.instruct("Production agent.")
.to_app(config=ExecutionConfig(
app_name="prod_service",
resumable=True,
compaction=CompactionConfig(interval=10, overlap=2),
))
)
Unlike .build() which returns a raw ADK agent, .to_app() returns a full App object with configuration applied.
Execution Method Summary¶
Method |
Description |
|---|---|
|
Send a prompt, get response text. No Runner/Session boilerplate |
|
Async version of |
|
Async generator yielding response text chunks |
|
Async generator yielding raw ADK |
|
Batch execution against multiple prompts |
|
Async batch execution |
|
Create an interactive |
|
Smoke test: calls |
|
Compile through IR to native ADK |
Choosing an Execution Method¶
Situation |
Use |
Why |
|---|---|---|
Quick prototyping / REPL |
|
Synchronous, zero boilerplate |
Async application |
|
Non-blocking, integrates with asyncio |
Real-time UI |
|
Chunks arrive as they’re generated |
Low-level debugging |
|
Full ADK Event access including state deltas |
Multi-turn chatbot |
|
Maintains conversation history |
Batch processing |
|
Bounded concurrency, processes multiple prompts |
Inline testing |
|
Assert output during development |
Production deployment |
|
Full App with middleware, resumability, compaction |
Interplay with Other Modules¶
Execution + Middleware¶
.ask(), .stream(), and .session() don’t support middleware – they create lightweight runners internally. For middleware support, use .to_app():
from adk_fluent import Agent
from adk_fluent._middleware import M
# No middleware: fine for prototyping
response = Agent("helper").instruct("Help.").ask("Hi")
# With middleware: use .to_app()
pipeline = (Agent("a") >> Agent("b")).middleware(M.retry(3) | M.log())
app = pipeline.to_app()
Execution + Visibility¶
.stream() respects visibility settings. Hidden agents’ chunks don’t appear in the stream:
pipeline = (
Agent("analyzer").instruct("Analyze.").hide()
>> Agent("writer").instruct("Write.")
)
async for chunk in pipeline.stream("Explain"):
print(chunk, end="") # Only writer's output
See Visibility.
Execution + Context Engineering¶
.session() maintains conversation history across turns. Context engineering (C.*) controls how much of that history each agent sees:
from adk_fluent import Agent, C
agent = Agent("advisor").instruct("Advise.").context(C.window(n=5))
async with agent.session() as chat:
r1 = await chat.send("Question 1")
# ... many turns later ...
r10 = await chat.send("Question 10")
# Agent sees turns 6-10 (window of 5)
See Context Engineering.
Execution + Testing¶
.test() is the bridge between execution and testing. It calls .ask() internally and asserts the output:
# Development: inline smoke test
agent = Agent("math", "gemini-2.5-flash").instruct("You are a math tutor.")
agent.test("What is 2+2?", contains="4")
# CI: use mock_backend for deterministic tests
from adk_fluent.testing import AgentHarness, mock_backend
harness = AgentHarness(agent, backend=mock_backend({"math": "4"}))
See Testing.
Best Practices¶
Use
.ask()for prototyping,.to_app()for production..ask()is convenient but doesn’t support middleware, resumability, or compactionUse
.stream()for user-facing output. Streaming provides better UX than waiting for the full responseUse
.map()with bounded concurrency for batch jobs. Don’t fire 1000 concurrent requests — useconcurrency=to controlUse
.session()for multi-turn interactions. Don’t manually manage conversation historyUse
.test()during development,AgentHarnessin CI..test()hits the real LLM;AgentHarnesswithmock_backendis deterministicUse
.engine("temporal")for crash-resilient pipelines. When durability matters, switch to Temporal — your builder definitions stay the same (Temporal backend is in development)
Tip
Visual learner? Open the Execution Modes Interactive Reference{target=”_blank”} for sync vs async flow diagrams, an environment compatibility matrix, and the RuntimeError trap.
See also
Execution Backends — backend selection, capability matrix,
.engine()methodTemporal Guide — durable execution, crash recovery, Temporal-specific patterns
Testing —
.mock(),.test(),AgentHarness, andcheck_contracts()Middleware — pipeline-wide retry, logging, and tracing via
.to_app()Visibility — controlling which agents’ output appears in streams
Context Engineering — controlling history in
.session()interactions