Module: workflow

Builders in this module

Builder

Description

Loop

A shell agent that run its sub-agents in a loop.

FanOut

A shell agent that runs its sub-agents in parallel in an isolated manner.

Pipeline

A shell agent that runs its sub-agents in sequence.

Loop

Fluent builder for google.adk.agents.loop_agent.LoopAgent

A shell agent that run its sub-agents in a loop.

Quick start:

from adk_fluent import Loop

result = (
    Loop("name_value")
    .describe("...")
    .build()
)

Constructor

Loop(name: str)

Argument

Type

name

str

Core Configuration

.describe(value: str) -> Self Core Configuration

  • Maps to: description

  • Set agent description (metadata for transfer routing and topology display — NOT sent to the LLM as instruction). Always set this on sub-agents so the coordinator LLM can pick the right specialist.

Example:

loop = Loop("loop").describe("...")

.sub_agent(value: BaseAgent) -> Self Core Configuration

Append to sub_agents (lazy — built at .build() time).

Example:

loop = Loop("loop").sub_agent("...")

Configuration

.eval(prompt: str, *, expect: str | None = None, criteria: Any | None = None) -> Any Configuration

Inline evaluation. Run a single eval case against this loop.

Example:

loop = Loop("loop").eval("...")

.eval_suite() -> Any Configuration

Create an evaluation suite builder for this loop.

Example:

loop = Loop("loop").eval_suite("...")

.to_ir() -> Any Configuration

Convert this Loop builder to a LoopNode IR node.

Example:

loop = Loop("loop").to_ir("...")

Callbacks

.after_agent(*fns: Callable) -> Self Callbacks

Append callback(s) to after_agent_callback.

Note

Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.

Example:

loop = Loop("loop").after_agent(my_callback_fn)

.after_agent_if(condition: bool, fn: Callable) -> Self Callbacks

Append callback to after_agent_callback only if condition is True.

Example:

loop = Loop("loop").after_agent_if(condition, my_callback_fn)

.before_agent(*fns: Callable) -> Self Callbacks

Append callback(s) to before_agent_callback.

Note

Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.

Example:

loop = Loop("loop").before_agent(my_callback_fn)

.before_agent_if(condition: bool, fn: Callable) -> Self Callbacks

Append callback to before_agent_callback only if condition is True.

Example:

loop = Loop("loop").before_agent_if(condition, my_callback_fn)

Control Flow & Execution

.ask(prompt: str) -> str Control Flow & Execution

One-shot SYNC execution (blocking). Builds loop, sends prompt, returns response text.

Example:

loop = Loop("loop").ask("...")

.ask_async(prompt: str) -> str Control Flow & Execution

One-shot ASYNC execution (non-blocking, use with await).

Example:

loop = Loop("loop").ask_async("...")

.build() -> LoopAgent Control Flow & Execution

Resolve into a native ADK LoopAgent.

Example:

loop = Loop("loop").build("...")

.events(prompt: str) -> AsyncIterator[Any] Control Flow & Execution

Stream raw ADK Event objects. Yields every event including state deltas and function calls.

Example:

loop = Loop("loop").events("...")

.map(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution

Batch SYNC execution (blocking). Run loop against multiple prompts with bounded concurrency.

Example:

loop = Loop("loop").map("...")

.map_async(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution

Batch ASYNC execution (non-blocking, use with await).

Example:

loop = Loop("loop").map_async("...")

.session() -> Any Control Flow & Execution

Create an interactive multi-turn chat session. Returns an async context manager.

Example:

loop = Loop("loop").session("...")

.step(value: BaseAgent) -> Self Control Flow & Execution

Append to sub_agents (lazy — built at .build() time).

Example:

loop = Loop("loop").step("...")

.stream(prompt: str) -> AsyncIterator[str] Control Flow & Execution

ASYNC streaming execution. Yields response text chunks as they arrive.

Example:

loop = Loop("loop").stream("...")

.test(prompt: str, *, contains: str | None = None, matches: str | None = None, equals: str | None = None) -> Self Control Flow & Execution

Run a smoke test. Calls .ask() internally, asserts output matches condition.

Example:

loop = Loop("loop").test("...")

Forwarded Fields

These fields are available via __getattr__ forwarding.

Field

Type

.sub_agents(value)

list[BaseAgent]

.max_iterations(value)

`int


FanOut

Fluent builder for google.adk.agents.parallel_agent.ParallelAgent

A shell agent that runs its sub-agents in parallel in an isolated manner.

Quick start:

from adk_fluent import FanOut

result = (
    FanOut("name_value")
    .describe("...")
    .build()
)

Constructor

FanOut(name: str)

Argument

Type

name

str

Core Configuration

.describe(value: str) -> Self Core Configuration

  • Maps to: description

  • Set agent description (metadata for transfer routing and topology display — NOT sent to the LLM as instruction). Always set this on sub-agents so the coordinator LLM can pick the right specialist.

Example:

fanout = FanOut("fanout").describe("...")

.sub_agent(value: BaseAgent) -> Self Core Configuration

Append to sub_agents (lazy — built at .build() time).

Example:

fanout = FanOut("fanout").sub_agent("...")

Configuration

.eval(prompt: str, *, expect: str | None = None, criteria: Any | None = None) -> Any Configuration

Inline evaluation. Run a single eval case against this fan-out.

Example:

fanout = FanOut("fanout").eval("...")

.eval_suite() -> Any Configuration

Create an evaluation suite builder for this fan-out.

Example:

fanout = FanOut("fanout").eval_suite("...")

.to_ir() -> Any Configuration

Convert this FanOut builder to a ParallelNode IR node.

Example:

fanout = FanOut("fanout").to_ir("...")

Callbacks

.after_agent(*fns: Callable) -> Self Callbacks

Append callback(s) to after_agent_callback.

Note

Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.

Example:

fanout = FanOut("fanout").after_agent(my_callback_fn)

.after_agent_if(condition: bool, fn: Callable) -> Self Callbacks

Append callback to after_agent_callback only if condition is True.

Example:

fanout = FanOut("fanout").after_agent_if(condition, my_callback_fn)

.before_agent(*fns: Callable) -> Self Callbacks

Append callback(s) to before_agent_callback.

Note

Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.

Example:

fanout = FanOut("fanout").before_agent(my_callback_fn)

.before_agent_if(condition: bool, fn: Callable) -> Self Callbacks

Append callback to before_agent_callback only if condition is True.

Example:

fanout = FanOut("fanout").before_agent_if(condition, my_callback_fn)

Control Flow & Execution

.ask(prompt: str) -> str Control Flow & Execution

One-shot SYNC execution (blocking). Builds fan-out, sends prompt, returns response text.

Example:

fanout = FanOut("fanout").ask("...")

.ask_async(prompt: str) -> str Control Flow & Execution

One-shot ASYNC execution (non-blocking, use with await).

Example:

fanout = FanOut("fanout").ask_async("...")

.branch(value: BaseAgent) -> Self Control Flow & Execution

Append to sub_agents (lazy — built at .build() time).

Example:

fanout = FanOut("fanout").branch("...")

.build() -> ParallelAgent Control Flow & Execution

Resolve into a native ADK ParallelAgent.

Example:

fanout = FanOut("fanout").build("...")

.events(prompt: str) -> AsyncIterator[Any] Control Flow & Execution

Stream raw ADK Event objects. Yields every event including state deltas and function calls.

Example:

fanout = FanOut("fanout").events("...")

.map(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution

Batch SYNC execution (blocking). Run fan-out against multiple prompts with bounded concurrency.

Example:

fanout = FanOut("fanout").map("...")

.map_async(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution

Batch ASYNC execution (non-blocking, use with await).

Example:

fanout = FanOut("fanout").map_async("...")

.session() -> Any Control Flow & Execution

Create an interactive multi-turn chat session. Returns an async context manager.

Example:

fanout = FanOut("fanout").session("...")

.step(value: BaseAgent) -> Self Control Flow & Execution

Alias for .branch() — consistent API across workflow builders.

Example:

fanout = FanOut("fanout").step("...")

.stream(prompt: str) -> AsyncIterator[str] Control Flow & Execution

ASYNC streaming execution. Yields response text chunks as they arrive.

Example:

fanout = FanOut("fanout").stream("...")

.test(prompt: str, *, contains: str | None = None, matches: str | None = None, equals: str | None = None) -> Self Control Flow & Execution

Run a smoke test. Calls .ask() internally, asserts output matches condition.

Example:

fanout = FanOut("fanout").test("...")

Forwarded Fields

These fields are available via __getattr__ forwarding.

Field

Type

.sub_agents(value)

list[BaseAgent]


Pipeline

Fluent builder for google.adk.agents.sequential_agent.SequentialAgent

A shell agent that runs its sub-agents in sequence.

Quick start:

from adk_fluent import Pipeline

result = (
    Pipeline("name_value")
    .describe("...")
    .build()
)

Constructor

Pipeline(name: str)

Argument

Type

name

str

Core Configuration

.describe(value: str) -> Self Core Configuration

  • Maps to: description

  • Set agent description (metadata for transfer routing and topology display — NOT sent to the LLM as instruction). Always set this on sub-agents so the coordinator LLM can pick the right specialist.

Example:

pipeline = Pipeline("pipeline").describe("...")

.sub_agent(value: BaseAgent) -> Self Core Configuration

Append to sub_agents (lazy — built at .build() time).

Example:

pipeline = Pipeline("pipeline").sub_agent("...")

Configuration

.eval(prompt: str, *, expect: str | None = None, criteria: Any | None = None) -> Any Configuration

Inline evaluation. Run a single eval case against this pipeline.

Example:

pipeline = Pipeline("pipeline").eval("...")

.eval_suite() -> Any Configuration

Create an evaluation suite builder for this pipeline.

Example:

pipeline = Pipeline("pipeline").eval_suite("...")

.to_ir() -> Any Configuration

Convert this Pipeline builder to a SequenceNode IR node.

Example:

pipeline = Pipeline("pipeline").to_ir("...")

Callbacks

.after_agent(*fns: Callable) -> Self Callbacks

Append callback(s) to after_agent_callback.

Note

Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.

Example:

pipeline = Pipeline("pipeline").after_agent(my_callback_fn)

.after_agent_if(condition: bool, fn: Callable) -> Self Callbacks

Append callback to after_agent_callback only if condition is True.

Example:

pipeline = Pipeline("pipeline").after_agent_if(condition, my_callback_fn)

.before_agent(*fns: Callable) -> Self Callbacks

Append callback(s) to before_agent_callback.

Note

Multiple calls accumulate. Each invocation appends to the callback list rather than replacing previous callbacks.

Example:

pipeline = Pipeline("pipeline").before_agent(my_callback_fn)

.before_agent_if(condition: bool, fn: Callable) -> Self Callbacks

Append callback to before_agent_callback only if condition is True.

Example:

pipeline = Pipeline("pipeline").before_agent_if(condition, my_callback_fn)

Control Flow & Execution

.ask(prompt: str) -> str Control Flow & Execution

One-shot SYNC execution (blocking). Builds pipeline, sends prompt, returns response text.

Example:

pipeline = Pipeline("pipeline").ask("...")

.ask_async(prompt: str) -> str Control Flow & Execution

One-shot ASYNC execution (non-blocking, use with await).

Example:

pipeline = Pipeline("pipeline").ask_async("...")

.build() -> SequentialAgent Control Flow & Execution

Resolve into a native ADK SequentialAgent.

Example:

pipeline = Pipeline("pipeline").build("...")

.events(prompt: str) -> AsyncIterator[Any] Control Flow & Execution

Stream raw ADK Event objects. Yields every event including state deltas and function calls.

Example:

pipeline = Pipeline("pipeline").events("...")

.map(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution

Batch SYNC execution (blocking). Run pipeline against multiple prompts with bounded concurrency.

Example:

pipeline = Pipeline("pipeline").map("...")

.map_async(prompts: list[str], *, concurrency: int = 5) -> list[str] Control Flow & Execution

Batch ASYNC execution (non-blocking, use with await).

Example:

pipeline = Pipeline("pipeline").map_async("...")

.session() -> Any Control Flow & Execution

Create an interactive multi-turn chat session. Returns an async context manager.

Example:

pipeline = Pipeline("pipeline").session("...")

.step(value: BaseAgent) -> Self Control Flow & Execution

Append to sub_agents (lazy — built at .build() time).

Example:

pipeline = Pipeline("pipeline").step("...")

.stream(prompt: str) -> AsyncIterator[str] Control Flow & Execution

ASYNC streaming execution. Yields response text chunks as they arrive.

Example:

pipeline = Pipeline("pipeline").stream("...")

.test(prompt: str, *, contains: str | None = None, matches: str | None = None, equals: str | None = None) -> Self Control Flow & Execution

Run a smoke test. Calls .ask() internally, asserts output matches condition.

Example:

pipeline = Pipeline("pipeline").test("...")

Forwarded Fields

These fields are available via __getattr__ forwarding.

Field

Type

.sub_agents(value)

list[BaseAgent]