ETL Pipeline: Plain Functions as Data Cleaning Steps (>> fn)

Tip

What you’ll learn How to compose agents into a sequential pipeline.

Source: 29_function_steps.py

from adk_fluent import Agent, Pipeline


# Plain function — receives state dict, returns dict of updates.
# No BaseAgent boilerplate for simple data transformations.
def normalize_currency(state):
    """Strip currency symbols and normalize to plain numbers."""
    raw = state.get("raw_amounts", "")
    cleaned = raw.replace("$", "").replace(",", "").strip()
    return {"normalized_amounts": cleaned}


# >> fn: function becomes a zero-cost workflow node (no LLM call)
etl_pipeline = (
    Agent("financial_extractor")
    .model("gemini-2.5-flash")
    .instruct("Extract financial line items from the uploaded invoice PDF.")
    >> normalize_currency
    >> Agent("report_loader")
    .model("gemini-2.5-flash")
    .instruct("Format the normalized data into a financial summary report.")
)


# Named functions keep their name as the agent name
def truncate_to_limit(state):
    """Enforce a 500-char limit on executive summaries."""
    return {"exec_summary": state.get("text", "")[:500]}


trimmed = Agent("summarizer").model("gemini-2.5-flash") >> truncate_to_limit

# Lambdas get auto-generated names (fn_step_N) — useful for quick transforms
pipeline_with_lambda = (
    Agent("ingester").model("gemini-2.5-flash")
    >> (lambda s: {"upper_title": s.get("title", "").upper()})
    >> Agent("publisher").model("gemini-2.5-flash")
)


# fn >> agent also works (via __rrshift__) — preprocessing before LLM
def sanitize_pii(s):
    """Remove personally identifiable information before LLM processing."""
    return {"cleaned_text": s.get("raw_input", "").strip()}


preprocess_pipeline = sanitize_pii >> Agent("analyzer").model("gemini-2.5-flash")
# Native ADK requires subclassing BaseAgent for any custom logic node.
# In an ETL pipeline, every data cleaning step becomes a full class:
from google.adk.agents.base_agent import BaseAgent as NativeBaseAgent
from google.adk.agents.llm_agent import LlmAgent
from google.adk.agents.sequential_agent import SequentialAgent


class NormalizeCurrency(NativeBaseAgent):
    """Custom agent just to normalize currency values to USD."""

    async def _run_async_impl(self, ctx):
        raw = ctx.session.state.get("raw_amounts", "")
        # Strip currency symbols and normalize
        cleaned = raw.replace("$", "").replace(",", "").strip()
        ctx.session.state["normalized_amounts"] = cleaned
        # yield nothing


extractor = LlmAgent(name="extractor", model="gemini-2.5-flash", instruction="Extract financial data.")
normalizer = NormalizeCurrency(name="normalize_currency")
loader = LlmAgent(name="loader", model="gemini-2.5-flash", instruction="Load into report.")

pipeline_native = SequentialAgent(name="etl_pipeline", sub_agents=[extractor, normalizer, loader])

Equivalence

# >> fn creates a Pipeline
assert isinstance(etl_pipeline, Pipeline)
built = etl_pipeline.build()
assert len(built.sub_agents) == 3

# Named functions use their name
built_trimmed = trimmed.build()
assert built_trimmed.sub_agents[1].name == "truncate_to_limit"

# Lambda gets a valid identifier name
built_lambda = pipeline_with_lambda.build()
name = built_lambda.sub_agents[1].name
assert name.isidentifier()  # Not "<lambda>"

# fn >> agent works
assert isinstance(preprocess_pipeline, Pipeline)
built_rev = preprocess_pipeline.build()
assert len(built_rev.sub_agents) == 2