pipeline = a >> b >> c # Equivalent builder style: Pipeline("name") .step(a).step(b).step(c) .build()
def merge(state): return {"combined": ...} pipeline = agent >> merge >> next
fanout = a | b | c # All run concurrently # Each writes to its own # state key via .save_as()
loop = (a >> b) * 3 # Body runs exactly 3 times # (a >> b) is the loop body
loop = body * until( lambda s: s["score"] >= 0.8, max=5 ) # Loop until predicate or max
class Report(BaseModel): title: str body: str agent @ Report # Output validated against schema
answer = fast // smart // fallback # First success wins # Cost optimization: cheap → expensive
Route("intent") .eq("booking", booker) .eq("info", info) # Dict shorthand: classifier >> { "booking": booker, "info": info, }
pipeline = (
agent
>> S.pick("a", "b")
>> S.rename(a="input")
>> S.default(score=0.0)
>> next_agent
)
# Compose: S.pick() >> S.rename()
# Combine: S.default() + S.rename()
| Expression | Produces | Type |
|---|---|---|
| Agent >> Agent | Pipeline | SequentialAgent |
| Agent >> fn | Pipeline with FnAgent | SequentialAgent |
| Agent | Agent | FanOut | ParallelAgent |
| expr * int | Loop (fixed) | LoopAgent |
| expr * until(fn) | Loop (conditional) | LoopAgent |
| Agent @ Schema | Typed agent | LlmAgent |
| Agent // Agent | Fallback chain | FallbackAgent |
| Agent >> {k: Agent} | Route | SequentialAgent + Router |
| Agent >> S.pick() | Pipeline with transform | SequentialAgent + FnAgent |
(web | scholar) >> S.merge("web", "scholar", into="research") >> writer
classifier >> Route("intent") .eq("book", fast // smart) .eq("info", helper)
(writer @ Report >> critic.save_as("score") ) * until(lambda s: s["score"] >= 0.8)
(web | scholar) >> S.merge(into="research") >> writer @ Report // writer_b >> (critic >> reviser) * until(confidence >= 0.85)
review = a >> b
pipeline_1 = review >> c # Independent
pipeline_2 = review >> d # Independent