Code Review Pipeline – Expression Algebra in Practice

Demonstrates how composition operators (>>, |, @, //) combine naturally in a real-world code review system. A diff parser extracts changes, parallel reviewers check style, security, and logic independently, then findings are aggregated into a structured verdict.

Real-world use case: Automated code review pipeline that runs style, security, and logic reviewers in parallel, then merges findings. Used by engineering teams as a pre-merge quality gate.

In other frameworks: LangGraph models this as a fan-out subgraph with merge node (~45 lines). adk-fluent composes parallel reviewers with | and sequences with >> in a single expression.

Pipeline topology: diff_parser >> ( style_checker | security_scanner | logic_reviewer ) >> ( finding_aggregator @ ReviewVerdict // backup_aggregator @ ReviewVerdict )

Tip

What you’ll learn How to compose agents into a sequential pipeline.

Source: 34_full_algebra.py

from pydantic import BaseModel

from adk_fluent import Agent


class ReviewVerdict(BaseModel):
    """Structured output from the code review pipeline."""

    has_issues: bool
    critical_count: int
    summary: str


# The code review pipeline uses 4 operators:
#   >>  sequential flow (parse -> review -> aggregate)
#   |   parallel fan-out (style + security + logic run concurrently)
#   @   typed output (aggregator returns ReviewVerdict)
#   //  fallback (primary model -> backup model)

review_pipeline = (
    # Step 1: Parse the diff into reviewable chunks
    Agent("diff_parser")
    .model("gemini-2.5-flash")
    .instruct("Parse the git diff into individual file changes with context.")
    .writes("parsed_diff")
    # Step 2: Three reviewers run in parallel
    >> (
        Agent("style_checker")
        .model("gemini-2.5-flash")
        .instruct("Check code style: naming conventions, formatting, docstrings.")
        | Agent("security_scanner")
        .model("gemini-2.5-flash")
        .instruct("Scan for security issues: injection, auth bypass, secrets in code.")
        | Agent("logic_reviewer")
        .model("gemini-2.5-flash")
        .instruct("Review business logic: edge cases, error handling, race conditions.")
    )
    # Step 3: Aggregate findings with typed output and model fallback
    >> (
        Agent("finding_aggregator")
        .model("gemini-2.5-flash")
        .instruct("Aggregate all review findings into a final verdict.")
        @ ReviewVerdict
        // Agent("backup_aggregator")
        .model("gemini-2.5-pro")
        .instruct("Aggregate all review findings into a final verdict.")
        @ ReviewVerdict
    )
)
# A native ADK code review pipeline requires:
#   - 5 LlmAgent declarations
#   - 1 ParallelAgent for fan-out
#   - 1 SequentialAgent for the overall pipeline
#   - Manual output_schema wiring on the aggregator
# That's ~60 lines of boilerplate. The fluent algebra below
# expresses the same architecture in a single readable expression.
        graph TD
    n1[["diff_parser_then_style_checker_and_security_scanner_and_logic_reviewer_then_finding_aggregator_or_backup_aggregator (sequence)"]]
    n2["diff_parser"]
    n3{"style_checker_and_security_scanner_and_logic_reviewer (parallel)"}
    n4["style_checker"]
    n5["security_scanner"]
    n6["logic_reviewer"]
    n7[/"finding_aggregator_or_backup_aggregator (fallback)"\]
    n8["finding_aggregator"]
    n9["backup_aggregator"]
    n3 --> n4
    n3 --> n5
    n3 --> n6
    n7 --> n8
    n7 --> n9
    n2 --> n3
    n3 --> n7
    

Equivalence

from adk_fluent import Pipeline
from adk_fluent._primitive_builders import _FallbackBuilder

# Pipeline builds correctly
assert isinstance(review_pipeline, Pipeline)
built = review_pipeline.build()

# Has 3 top-level steps: diff_parser, fanout, fallback_aggregator
assert len(built.sub_agents) == 3

# First step is the diff parser
assert built.sub_agents[0].name == "diff_parser"
assert built.sub_agents[0].output_key == "parsed_diff"

# Second step is a parallel fan-out with 3 reviewers
fanout = built.sub_agents[1]
assert len(fanout.sub_agents) == 3

# Third step is the fallback aggregator
fallback = built.sub_agents[2]
assert len(fallback.sub_agents) == 2  # primary + backup