Composition Patterns¶
Higher-order constructors from adk_fluent.patterns that compose agents into common architectures. Each pattern is a function that returns a ready-to-use builder.
from adk_fluent.patterns import review_loop, cascade, fan_out_merge, chain, conditional, supervised, map_reduce
Pattern Quick Reference:
review_loop ( worker >> reviewer ) * until(score >= target)
cascade agent_a // agent_b // agent_c
fan_out_merge ( a | b | c ) >> S.merge(into="combined")
chain a >> b >> c (with .writes()/.reads() wiring)
conditional pred? ─┬─ true_branch
└─ false_branch
supervised ( worker >> supervisor ) * until(approved)
map_reduce items ──┬─ mapper(item_0)
├─ mapper(item_1) ──>> reducer
└─ mapper(item_n)
review_loop — Refinement Loop¶
A worker produces output, a reviewer scores it, and the loop repeats until quality meets the target.
pipeline = review_loop(
worker=Agent("writer").instruct("Write a blog post about {topic}."),
reviewer=Agent("reviewer").instruct("Score the draft 0-1 for quality."),
quality_key="review_score",
target=0.8,
max_rounds=3,
)
┌──────────────────────────────────────────────┐
│ ┌──────────┐ ┌──────────┐ │
│ ──────────►│ worker │───►│ reviewer │──┐ │
│ │ └──────────┘ └──────────┘ │ │
│ │ score >= 0.8? │ │
│ └── no ──────────────────────────────────┘ │
│ │ yes │
└───────────────────────────────┼──────────────┘
▼ done
Data flow:
Worker runs, stores output in
{quality_key}_draftReviewer reads the draft, stores score in
{quality_key}Loop checks if score >= target
If not, worker runs again with reviewer feedback
cascade — Fallback Chain¶
Tries each agent in order. First successful response wins.
pipeline = cascade(
Agent("fast").model("gemini-2.0-flash"),
Agent("smart").model("gemini-2.5-pro"),
Agent("fallback").model("gemini-2.0-flash").instruct("Provide a safe default."),
)
fast ──► success? ─── yes ──► done
│ no
smart ──► success? ─── yes ──► done
│ no
fallback ──────────────────── done
Data flow: Each agent receives the same input. The first agent that succeeds provides the response.
fan_out_merge — Parallel Research + Merge¶
Run multiple agents in parallel, then merge their outputs.
pipeline = fan_out_merge(
Agent("web_search").writes("web"),
Agent("doc_search").writes("docs"),
Agent("expert").writes("expert"),
merge_key="combined",
merge_fn=lambda results: "\n\n".join(results.values()),
)
┌─ web_search ──► state["web"] ─┐
├─ doc_search ──► state["docs"] ─┼──► merge_fn ──► state["combined"]
└─ expert ──────► state["expert"]─┘
Data flow:
All agents run in parallel (FanOut)
Each writes to its own state key
Merge function combines results into
state[merge_key]
chain — Sequential Composition¶
Compose a list of steps into a Pipeline.
pipeline = chain(
Agent("researcher").writes("findings"),
Agent("writer").reads("findings").writes("draft"),
Agent("editor").reads("draft").writes("final"),
)
Data flow: Each agent runs in sequence. State propagates between steps via .writes() and .reads().
conditional — If/Else Branching¶
Route to different agents based on a predicate.
pipeline = conditional(
predicate=lambda state: state.get("category") == "technical",
if_true=Agent("tech_support").instruct("Handle technical issue."),
if_false=Agent("general_support").instruct("Handle general inquiry."),
)
┌─ yes ──► tech_support
state ──► pred? ─┤
└─ no ──► general_support
Data flow: The predicate reads from state. Only one branch executes.
supervised — Approval Workflow¶
A worker produces output, a supervisor approves or requests revisions.
pipeline = supervised(
worker=Agent("drafter").instruct("Draft the contract."),
supervisor=Agent("lawyer").instruct("Review for legal compliance."),
approval_key="approved",
max_revisions=2,
)
Data flow: Similar to review_loop but with approval semantics. The supervisor marks state[approval_key] as approved or requests changes.
map_reduce — Fan-Out Over Items¶
Apply a mapper agent to each item, then reduce results.
pipeline = map_reduce(
mapper=Agent("analyzer").instruct("Analyze this item: {item}"),
reducer=Agent("synthesizer").instruct("Synthesize all analyses."),
items_key="items",
)
state["items"] ──┬─ mapper("item_0") ─┐
├─ mapper("item_1") ─┼──► reducer ──► output
└─ mapper("item_n") ─┘
Data flow:
Reads
state[items_key](a list)Runs mapper on each item in parallel
Reducer combines all mapper outputs
Durable Execution¶
All patterns above work with any execution backend. The definition is identical — only the engine selection changes:
pipeline = review_loop(
worker=Agent("writer").instruct("Write."),
reviewer=Agent("reviewer").instruct("Review."),
quality_key="score",
target=0.8,
)
response = pipeline.ask("Write about AI safety")
from temporalio.client import Client
client = await Client.connect("localhost:7233")
# Same pattern — each review iteration is checkpointed
pipeline = review_loop(
worker=Agent("writer").instruct("Write."),
reviewer=Agent("reviewer").instruct("Review."),
quality_key="score",
target=0.8,
).engine("temporal", client=client, task_queue="quality")
# If crash occurs mid-loop, completed iterations replay from cache
response = await pipeline.ask_async("Write about AI safety")
pipeline = review_loop(
worker=Agent("writer").instruct("Write."),
reviewer=Agent("reviewer").instruct("Review."),
quality_key="score",
target=0.8,
).engine("asyncio")
response = await pipeline.ask_async("Write about AI safety")
Patterns with natural checkpoint boundaries (each step in chain, each iteration in review_loop) are especially well-suited for durable execution. See Temporal Guide for details.
See also
Execution Backends — backend selection and capability matrix
Temporal Guide — durable execution patterns and constraints
Expression Language — the operator equivalents of these patterns