adk-fluent Operator Algebra

Nine operators. Any topology. All immutable.
>> Sequence >> fn Function | Parallel * n Loop * until Cond Loop @ Schema // Fallback Route Branch S.* State

Topology Operators

core
>>

Pipeline (Sequential)

SequentialAgent
a b c
pipeline = a >> b >> c

# Equivalent builder style:
Pipeline("name")
  .step(a).step(b).step(c)
  .build()
>> fn

Function Step

Zero-cost transform (no LLM call)
agent fn() agent
def merge(state):
    return {"combined": ...}

pipeline = agent >> merge >> next
|

Parallel (Fan-Out)

ParallelAgent
a b c all
fanout = a | b | c

# All run concurrently
# Each writes to its own
# state key via .save_as()
* n

Loop (Fixed Count)

LoopAgent
a b n times
loop = (a >> b) * 3

# Body runs exactly 3 times
# (a >> b) is the loop body
* until

Loop (Conditional)

LoopAgent + checkpoint
body pred? yes done no
loop = body * until(
    lambda s: s["score"] >= 0.8,
    max=5
)

# Loop until predicate or max

Modifier Operators

shape
@

Typed Output (Schema)

output_schema on LlmAgent
agent Schema{ title, body, ... }
class Report(BaseModel):
    title: str
    body: str

agent @ Report
# Output validated against schema
//

Fallback Chain

First-success chain
fast fail smart fail fallback success? success? always
answer = fast // smart // fallback

# First success wins
# Cost optimization: cheap → expensive

Routing Operators

branch
Route

Deterministic Routing

Zero LLM calls for routing
state[key] "booking" "info" "other" booker info default
Route("intent")
  .eq("booking", booker)
  .eq("info", info)

# Dict shorthand:
classifier >> {
    "booking": booker,
    "info": info,
}

State Operators

state
S.*

State Transforms

FnAgent (zero-cost, no LLM)
state {a,b,c} .pick() .rename() state' {x,y} zero-cost: no LLM call, no events pick · drop · rename · default · merge · transform · compute · guard · log
pipeline = (
    agent
    >> S.pick("a", "b")
    >> S.rename(a="input")
    >> S.default(score=0.0)
    >> next_agent
)

# Compose: S.pick() >> S.rename()
# Combine: S.default() + S.rename()

Composition Rules

algebra
Expression Produces Type
Agent >> Agent Pipeline SequentialAgent
Agent >> fn Pipeline with FnAgent SequentialAgent
Agent | Agent FanOut ParallelAgent
expr * int Loop (fixed) LoopAgent
expr * until(fn) Loop (conditional) LoopAgent
Agent @ Schema Typed agent LlmAgent
Agent // Agent Fallback chain FallbackAgent
Agent >> {k: Agent} Route SequentialAgent + Router
Agent >> S.pick() Pipeline with transform SequentialAgent + FnAgent

Parallel Research + Merge

(web | scholar)
  >> S.merge("web", "scholar",
           into="research")
  >> writer
┬─ web ────┐ └─ scholar ┘ │ S.merge ──► writer

Classify + Route + Fallback

classifier
  >> Route("intent")
     .eq("book", fast // smart)
     .eq("info", helper)
classifier │ Route ┬─ "book" ──► fast // smart └─ "info" ──► helper

Review Loop with Schema

(writer @ Report
  >> critic.save_as("score")
) * until(lambda s: s["score"] >= 0.8)
┌──► writer @ Report ──► critic ──┐ └──────── until(score >= 0.8) ────┘

Full Pipeline

(web | scholar)
  >> S.merge(into="research")
  >> writer @ Report // writer_b
  >> (critic >> reviser)
     * until(confidence >= 0.85)
┬─ web ────┐ └─ scholar ┘ ──► S.merge │ writer @ Report // writer_b │ ┌──► critic ──► reviser ──┐ └── until(conf >= 0.85) ──┘
*
Immutability Guarantee
All operators return new expression objects. Sub-expressions can be safely reused across different pipelines. review = a >> b
pipeline_1 = review >> c   # Independent
pipeline_2 = review >> d   # Independent