Primitives Showcase: E-Commerce Order Pipeline Using All Primitives

Tip

What you’ll learn How to compose agents into a sequential pipeline.

Source: 43_primitives_showcase.py

from adk_fluent import Agent, Pipeline, S, C, tap, expect, gate
from adk_fluent._routing import Route
from adk_fluent.workflow import Loop

MODEL = "gemini-2.5-flash"

# --- 1. tap: observe state for monitoring without mutating ---

order_events = []
order_parser = Agent("order_parser", MODEL).instruct("Parse the incoming order JSON.").writes("order_type")
pipeline_with_tap = order_parser >> tap(lambda s: order_events.append(s.get("order_type")))
assert isinstance(pipeline_with_tap, Pipeline)

# tap via method syntax (same result)
pipeline_method = order_parser.tap(lambda s: order_events.append(s.get("order_type")))
assert isinstance(pipeline_method, Pipeline)

# --- 2. expect: assert data contracts between processing stages ---

inventory_checker = Agent("inventory_checker", MODEL).instruct("Check stock availability.").writes("in_stock")
payment_processor = Agent("payment_processor", MODEL).instruct("Process payment.")

pipeline_with_expect = (
    inventory_checker
    >> expect(lambda s: s.get("in_stock") == "yes", "Item must be in stock before processing payment")
    >> payment_processor
)
assert isinstance(pipeline_with_expect, Pipeline)
built = pipeline_with_expect.build()
assert len(built.sub_agents) == 3  # inventory_checker, expect, payment_processor

# --- 3. gate: human-in-the-loop for high-value orders ---

fraud_gate = gate(
    lambda s: s.get("fraud_score") == "high",
    message="Potential fraud detected. Manual review required.",
    gate_key="fraud_review_gate",
)
fulfillment_agent = Agent("fulfillment", MODEL).instruct("Ship the order to the customer.")

pipeline_with_gate = (
    Agent("fraud_detector", MODEL).instruct("Score fraud risk.").writes("fraud_score")
    >> fraud_gate
    >> fulfillment_agent
)
assert isinstance(pipeline_with_gate, Pipeline)
built_gate = pipeline_with_gate.build()
assert len(built_gate.sub_agents) == 3

# --- 4. Route: deterministic order routing by type ---

standard_handler = Agent("standard_handler", MODEL).instruct("Process standard delivery order.")
express_handler = Agent("express_handler", MODEL).instruct("Process express delivery order.")
pickup_handler = Agent("pickup_handler", MODEL).instruct("Process store pickup order.")

order_route = (
    Route("order_type").eq("standard", standard_handler).eq("express", express_handler).otherwise(pickup_handler)
)

routed_pipeline = order_parser >> order_route
assert isinstance(routed_pipeline, Pipeline)
built_route = routed_pipeline.build()
assert len(built_route.sub_agents) == 2  # order_parser + route_agent

# Route with .when() for complex multi-condition logic
priority_route = (
    Route()
    .when(lambda s: s.get("total", 0) > 500 and s.get("membership") == "premium", express_handler)
    .when(lambda s: s.get("total", 0) > 200, standard_handler)
    .otherwise(pickup_handler)
)
assert len(priority_route._rules) == 2
assert priority_route._default is pickup_handler

# --- 5. S.*: state transforms for order data management ---

# S.pick -- extract only the fields needed for the shipping label
pick_step = S.pick("customer_name", "shipping_address")
assert callable(pick_step)

# S.drop -- remove internal processing fields before customer notification
drop_step = S.drop("_internal_score", "_processing_log")
assert callable(drop_step)

# S.rename -- normalize field names between systems
rename_step = S.rename(customer_email="email_address")
assert callable(rename_step)

# S.default -- set fallback values for optional order fields
default_step = S.default(currency="USD", shipping_method="standard")
assert callable(default_step)

# S.merge -- combine item subtotals into a single order total
merge_step = S.merge("subtotal", "tax", into="order_financials")
assert callable(merge_step)

# S.transform -- normalize product SKUs to uppercase
transform_step = S.transform("sku", str.upper)
assert callable(transform_step)

# S.guard -- assert critical invariants before charging the card
guard_step = S.guard(lambda s: "payment_method" in s, "Payment method required before checkout")
assert callable(guard_step)

# S.log -- debug-print key order fields during development
log_step = S.log("order_id", "total")
assert callable(log_step)

# S.compute -- derive shipping cost from order weight and destination
compute_step = S.compute(shipping_cost=lambda s: s.get("weight_kg", 0) * s.get("rate_per_kg", 5.0))
assert callable(compute_step)

# --- 6. Full e-commerce pipeline: all primitives in one expression ---

ecommerce_pipeline = (
    # Set default values for the order
    S.default(currency="USD", shipping_method="standard")
    # Parse and classify the incoming order
    >> Agent("order_classifier", MODEL).instruct("Classify the order type.").writes("order_type")
    # Route to the appropriate fulfillment handler
    >> Route("order_type")
    .eq("digital", Agent("digital_delivery", MODEL).instruct("Deliver digital product.").writes("delivery_status"))
    .eq("physical", Agent("warehouse_pick", MODEL).instruct("Pick and pack from warehouse.").writes("delivery_status"))
    .otherwise(Agent("custom_handler", MODEL).instruct("Handle custom order.").writes("delivery_status"))
    # Observe the routing result for analytics
    >> tap(lambda s: None)  # no-op observation point
    # Assert contract: routing must produce a delivery status
    >> expect(lambda s: "delivery_status" in s, "Fulfillment must set delivery_status")
    # Compute estimated delivery date
    >> S.compute(eta_days=lambda s: 1 if s.get("order_type") == "digital" else 5)
    # Send confirmation to customer
    >> Agent("notification_sender", MODEL).instruct("Send order confirmation email.").writes("confirmation_id")
)

assert isinstance(ecommerce_pipeline, Pipeline)
built_full = ecommerce_pipeline.build()
# S.default, order_classifier, route_agent, tap, expect, compute, notification_sender = 7 steps
assert len(built_full.sub_agents) == 7

# --- 7. sub_agent() -- order coordinator with specialized workers ---

order_coordinator = (
    Agent("order_coordinator", MODEL)
    .instruct("Coordinate order processing across fulfillment centers.")
    .sub_agent(Agent("east_warehouse", MODEL).instruct("Handle East Coast fulfillment."))
    .sub_agent(Agent("west_warehouse", MODEL).instruct("Handle West Coast fulfillment."))
)
built_coord = order_coordinator.build()
assert len(built_coord.sub_agents) == 2

# --- 8. context() -- stateless payment processor ---

stateless_processor = (
    Agent("payment_gateway", MODEL).context(C.none()).instruct("Process payment using only state data.")
)
assert stateless_processor._config["_context_spec"] is not None
        graph TD
    n1[["order_parser_then_tap_5 (sequence)"]]
    n2["order_parser"]
    n3>"tap_5 tap"]
    n2 --> n3