Module: workflow¶
Builders in this module¶
Loop¶
Fluent builder for
google.adk.agents.loop_agent.LoopAgent
A shell agent that run its sub-agents in a loop.
Quick start:
from adk_fluent import Loop
result = (
Loop("name_value")
.describe("...")
.build()
)
Constructor¶
Loop(name: str)
Argument |
Type |
|---|---|
|
Core Configuration¶
.describe(value: str) -> Self Core Configuration¶
Maps to:
descriptionSet agent description (metadata for transfer routing and topology display — NOT sent to the LLM as instruction). Always set this on sub-agents so the coordinator LLM can pick the right specialist.
Example:
loop = Loop("loop").describe("...")
.sub_agent(value: BaseAgent) -> Self Core Configuration¶
Append to sub_agents (lazy — built at .build() time).
Example:
loop = Loop("loop").sub_agent("...")
Configuration¶
.eval(prompt: str, *, expect: str | None = None, criteria: Any | None = None) -> Any Configuration¶
Inline evaluation. Run a single eval case against this loop.
Example:
loop = Loop("loop").eval("...")
.eval_suite() -> Any Configuration¶
Create an evaluation suite builder for this loop.
Example:
loop = Loop("loop").eval_suite("...")
.to_ir() -> Any Configuration¶
Convert this Loop builder to a LoopNode IR node.
Example:
loop = Loop("loop").to_ir("...")
Callbacks¶
.after_agent(*fns: Callable) -> Self Callbacks¶
Append callback(s) to after_agent_callback.
Note
Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.
Example:
loop = Loop("loop").after_agent(my_callback_fn)
.after_agent_if(condition: bool, fn: Callable) -> Self Callbacks¶
Append callback to after_agent_callback only if condition is True.
Example:
loop = Loop("loop").after_agent_if(condition, my_callback_fn)
.before_agent(*fns: Callable) -> Self Callbacks¶
Append callback(s) to before_agent_callback.
Note
Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.
Example:
loop = Loop("loop").before_agent(my_callback_fn)
.before_agent_if(condition: bool, fn: Callable) -> Self Callbacks¶
Append callback to before_agent_callback only if condition is True.
Example:
loop = Loop("loop").before_agent_if(condition, my_callback_fn)
Control Flow & Execution¶
.ask(prompt: str) -> str Control Flow & Execution¶
One-shot SYNC execution (blocking). Builds loop, sends prompt, returns response text.
Example:
loop = Loop("loop").ask("...")
.ask_async(prompt: str) -> str Control Flow & Execution¶
One-shot ASYNC execution (non-blocking, use with await).
Example:
loop = Loop("loop").ask_async("...")
.build() -> LoopAgent Control Flow & Execution¶
Resolve into a native ADK LoopAgent.
Example:
loop = Loop("loop").build("...")
.events(prompt: str) -> AsyncIterator[Any] Control Flow & Execution¶
Stream raw ADK Event objects. Yields every event including state deltas and function calls.
Example:
loop = Loop("loop").events("...")
.map(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution¶
Batch SYNC execution (blocking). Run loop against multiple prompts with bounded concurrency.
Example:
loop = Loop("loop").map("...")
.map_async(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution¶
Batch ASYNC execution (non-blocking, use with await).
Example:
loop = Loop("loop").map_async("...")
.session() -> Any Control Flow & Execution¶
Create an interactive multi-turn chat session. Returns an async context manager.
Example:
loop = Loop("loop").session("...")
.step(value: BaseAgent) -> Self Control Flow & Execution¶
Append to sub_agents (lazy — built at .build() time).
Example:
loop = Loop("loop").step("...")
.stream(prompt: str) -> AsyncIterator[str] Control Flow & Execution¶
ASYNC streaming execution. Yields response text chunks as they arrive.
Example:
loop = Loop("loop").stream("...")
.test(prompt: str, *, contains: str | None = None, matches: str | None = None, equals: str | None = None) -> Self Control Flow & Execution¶
Run a smoke test. Calls .ask() internally, asserts output matches condition.
Example:
loop = Loop("loop").test("...")
Forwarded Fields¶
These fields are available via __getattr__ forwarding.
Field |
Type |
|---|---|
|
|
|
`int |
FanOut¶
Fluent builder for
google.adk.agents.parallel_agent.ParallelAgent
A shell agent that runs its sub-agents in parallel in an isolated manner.
Quick start:
from adk_fluent import FanOut
result = (
FanOut("name_value")
.describe("...")
.build()
)
Constructor¶
FanOut(name: str)
Argument |
Type |
|---|---|
|
Core Configuration¶
.describe(value: str) -> Self Core Configuration¶
Maps to:
descriptionSet agent description (metadata for transfer routing and topology display — NOT sent to the LLM as instruction). Always set this on sub-agents so the coordinator LLM can pick the right specialist.
Example:
fanout = FanOut("fanout").describe("...")
.sub_agent(value: BaseAgent) -> Self Core Configuration¶
Append to sub_agents (lazy — built at .build() time).
Example:
fanout = FanOut("fanout").sub_agent("...")
Configuration¶
.eval(prompt: str, *, expect: str | None = None, criteria: Any | None = None) -> Any Configuration¶
Inline evaluation. Run a single eval case against this fan-out.
Example:
fanout = FanOut("fanout").eval("...")
.eval_suite() -> Any Configuration¶
Create an evaluation suite builder for this fan-out.
Example:
fanout = FanOut("fanout").eval_suite("...")
.to_ir() -> Any Configuration¶
Convert this FanOut builder to a ParallelNode IR node.
Example:
fanout = FanOut("fanout").to_ir("...")
Callbacks¶
.after_agent(*fns: Callable) -> Self Callbacks¶
Append callback(s) to after_agent_callback.
Note
Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.
Example:
fanout = FanOut("fanout").after_agent(my_callback_fn)
.after_agent_if(condition: bool, fn: Callable) -> Self Callbacks¶
Append callback to after_agent_callback only if condition is True.
Example:
fanout = FanOut("fanout").after_agent_if(condition, my_callback_fn)
.before_agent(*fns: Callable) -> Self Callbacks¶
Append callback(s) to before_agent_callback.
Note
Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.
Example:
fanout = FanOut("fanout").before_agent(my_callback_fn)
.before_agent_if(condition: bool, fn: Callable) -> Self Callbacks¶
Append callback to before_agent_callback only if condition is True.
Example:
fanout = FanOut("fanout").before_agent_if(condition, my_callback_fn)
Control Flow & Execution¶
.ask(prompt: str) -> str Control Flow & Execution¶
One-shot SYNC execution (blocking). Builds fan-out, sends prompt, returns response text.
Example:
fanout = FanOut("fanout").ask("...")
.ask_async(prompt: str) -> str Control Flow & Execution¶
One-shot ASYNC execution (non-blocking, use with await).
Example:
fanout = FanOut("fanout").ask_async("...")
.branch(value: BaseAgent) -> Self Control Flow & Execution¶
Append to sub_agents (lazy — built at .build() time).
Example:
fanout = FanOut("fanout").branch("...")
.build() -> ParallelAgent Control Flow & Execution¶
Resolve into a native ADK ParallelAgent.
Example:
fanout = FanOut("fanout").build("...")
.events(prompt: str) -> AsyncIterator[Any] Control Flow & Execution¶
Stream raw ADK Event objects. Yields every event including state deltas and function calls.
Example:
fanout = FanOut("fanout").events("...")
.map(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution¶
Batch SYNC execution (blocking). Run fan-out against multiple prompts with bounded concurrency.
Example:
fanout = FanOut("fanout").map("...")
.map_async(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution¶
Batch ASYNC execution (non-blocking, use with await).
Example:
fanout = FanOut("fanout").map_async("...")
.session() -> Any Control Flow & Execution¶
Create an interactive multi-turn chat session. Returns an async context manager.
Example:
fanout = FanOut("fanout").session("...")
.step(value: BaseAgent) -> Self Control Flow & Execution¶
Alias for .branch() — consistent API across workflow builders.
Example:
fanout = FanOut("fanout").step("...")
.stream(prompt: str) -> AsyncIterator[str] Control Flow & Execution¶
ASYNC streaming execution. Yields response text chunks as they arrive.
Example:
fanout = FanOut("fanout").stream("...")
.test(prompt: str, *, contains: str | None = None, matches: str | None = None, equals: str | None = None) -> Self Control Flow & Execution¶
Run a smoke test. Calls .ask() internally, asserts output matches condition.
Example:
fanout = FanOut("fanout").test("...")
Forwarded Fields¶
These fields are available via __getattr__ forwarding.
Field |
Type |
|---|---|
|
|
Pipeline¶
Fluent builder for
google.adk.agents.sequential_agent.SequentialAgent
A shell agent that runs its sub-agents in sequence.
Quick start:
from adk_fluent import Pipeline
result = (
Pipeline("name_value")
.describe("...")
.build()
)
Constructor¶
Pipeline(name: str)
Argument |
Type |
|---|---|
|
Core Configuration¶
.describe(value: str) -> Self Core Configuration¶
Maps to:
descriptionSet agent description (metadata for transfer routing and topology display — NOT sent to the LLM as instruction). Always set this on sub-agents so the coordinator LLM can pick the right specialist.
Example:
pipeline = Pipeline("pipeline").describe("...")
.sub_agent(value: BaseAgent) -> Self Core Configuration¶
Append to sub_agents (lazy — built at .build() time).
Example:
pipeline = Pipeline("pipeline").sub_agent("...")
Configuration¶
.eval(prompt: str, *, expect: str | None = None, criteria: Any | None = None) -> Any Configuration¶
Inline evaluation. Run a single eval case against this pipeline.
Example:
pipeline = Pipeline("pipeline").eval("...")
.eval_suite() -> Any Configuration¶
Create an evaluation suite builder for this pipeline.
Example:
pipeline = Pipeline("pipeline").eval_suite("...")
.to_ir() -> Any Configuration¶
Convert this Pipeline builder to a SequenceNode IR node.
Example:
pipeline = Pipeline("pipeline").to_ir("...")
Callbacks¶
.after_agent(*fns: Callable) -> Self Callbacks¶
Append callback(s) to after_agent_callback.
Note
Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.
Example:
pipeline = Pipeline("pipeline").after_agent(my_callback_fn)
.after_agent_if(condition: bool, fn: Callable) -> Self Callbacks¶
Append callback to after_agent_callback only if condition is True.
Example:
pipeline = Pipeline("pipeline").after_agent_if(condition, my_callback_fn)
.before_agent(*fns: Callable) -> Self Callbacks¶
Append callback(s) to before_agent_callback.
Note
Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.
Example:
pipeline = Pipeline("pipeline").before_agent(my_callback_fn)
.before_agent_if(condition: bool, fn: Callable) -> Self Callbacks¶
Append callback to before_agent_callback only if condition is True.
Example:
pipeline = Pipeline("pipeline").before_agent_if(condition, my_callback_fn)
Control Flow & Execution¶
.ask(prompt: str) -> str Control Flow & Execution¶
One-shot SYNC execution (blocking). Builds pipeline, sends prompt, returns response text.
Example:
pipeline = Pipeline("pipeline").ask("...")
.ask_async(prompt: str) -> str Control Flow & Execution¶
One-shot ASYNC execution (non-blocking, use with await).
Example:
pipeline = Pipeline("pipeline").ask_async("...")
.build() -> SequentialAgent Control Flow & Execution¶
Resolve into a native ADK SequentialAgent.
Example:
pipeline = Pipeline("pipeline").build("...")
.events(prompt: str) -> AsyncIterator[Any] Control Flow & Execution¶
Stream raw ADK Event objects. Yields every event including state deltas and function calls.
Example:
pipeline = Pipeline("pipeline").events("...")
.map(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution¶
Batch SYNC execution (blocking). Run pipeline against multiple prompts with bounded concurrency.
Example:
pipeline = Pipeline("pipeline").map("...")
.map_async(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution¶
Batch ASYNC execution (non-blocking, use with await).
Example:
pipeline = Pipeline("pipeline").map_async("...")
.session() -> Any Control Flow & Execution¶
Create an interactive multi-turn chat session. Returns an async context manager.
Example:
pipeline = Pipeline("pipeline").session("...")
.step(value: BaseAgent) -> Self Control Flow & Execution¶
Append to sub_agents (lazy — built at .build() time).
Example:
pipeline = Pipeline("pipeline").step("...")
.stream(prompt: str) -> AsyncIterator[str] Control Flow & Execution¶
ASYNC streaming execution. Yields response text chunks as they arrive.
Example:
pipeline = Pipeline("pipeline").stream("...")
.test(prompt: str, *, contains: str | None = None, matches: str | None = None, equals: str | None = None) -> Self Control Flow & Execution¶
Run a smoke test. Calls .ask() internally, asserts output matches condition.
Example:
pipeline = Pipeline("pipeline").test("...")
Forwarded Fields¶
These fields are available via __getattr__ forwarding.
Field |
Type |
|---|---|
|
|