Research Data Pipeline: State Transforms with S Factories

Pipeline topology: data_extractor >> S.pick(“clinical_findings”, “lab_results”) >> S.rename(clinical_findings=”analysis_input”) >> S.default(confidence_interval=0.95) >> statistical_analyzer

Research pipeline:
    ( literature_agent | trial_agent )
        >> S.merge(into="combined_evidence")
        >> S.default(...)
        >> report_writer
        >> S.compute(word_count=...)

Tip

What you’ll learn How to compose agents into a sequential pipeline.

Source: 33_state_transforms.py

from adk_fluent import Agent, S, Pipeline
from adk_fluent._transforms import StateDelta, StateReplacement

# S.pick — keep only the research-relevant fields, drop noise
# Returns StateReplacement (replaces session-scoped state entirely)
picked = S.pick("clinical_findings", "lab_results")
result = picked({"clinical_findings": "positive", "lab_results": [1.2, 3.4], "patient_notes": "misc"})
assert isinstance(result, StateReplacement)
assert result.new_state == {"clinical_findings": "positive", "lab_results": [1.2, 3.4]}

# S.drop — remove internal/debug fields before publishing results
dropped = S.drop("_raw_sensor_data", "_debug_log")
result = dropped({"conclusion": "significant", "_raw_sensor_data": "..."})
assert isinstance(result, StateReplacement)
assert result.new_state == {"conclusion": "significant"}

# S.rename — normalize field names for the downstream report template
renamed = S.rename(clinical_findings="analysis", raw_score="p_value")
result = renamed({"clinical_findings": "data", "sample_size": 100})
assert isinstance(result, StateReplacement)
assert result.new_state == {"analysis": "data", "sample_size": 100}

# S.default — fill missing fields with safe defaults for statistical analysis
# Returns StateDelta (additive merge — only missing keys are added)
defaulted = S.default(confidence_interval=0.95, significance_level=0.05)
result = defaulted({"confidence_interval": 0.99})
assert isinstance(result, StateDelta)
assert result.updates == {"significance_level": 0.05}  # existing key not overwritten

# S.merge — combine results from parallel research streams
merged = S.merge("literature_review", "experimental_data", into="combined_research")
result = merged({"literature_review": "Prior work shows...", "experimental_data": "Trial results..."})
assert isinstance(result, StateDelta)
assert result.updates == {"combined_research": "Prior work shows...\nTrial results..."}

# S.merge with custom function — aggregate numerical results
averaged = S.merge("trial_a_score", "trial_b_score", into="mean_score", fn=lambda a, b: (a + b) / 2)
result = averaged({"trial_a_score": 0.82, "trial_b_score": 0.88})
assert isinstance(result, StateDelta)
assert result.updates == {"mean_score": 0.85}

# S.transform — apply function to a single field
transformed = S.transform("abstract", str.upper)
result = transformed({"abstract": "results indicate"})
assert isinstance(result, StateDelta)
assert result.updates == {"abstract": "RESULTS INDICATE"}

# S.compute — derive new fields from full state (e.g., statistical metrics)
computed = S.compute(
    word_count=lambda s: len(s.get("manuscript", "").split()),
    preview=lambda s: s.get("manuscript", "")[:80],
)
result = computed({"manuscript": "This study demonstrates significant results"})
assert isinstance(result, StateDelta)
assert result.updates == {"word_count": 5, "preview": "This study demonstrates significant results"}

# S.guard — assert state invariant before proceeding
guarded = S.guard(lambda s: "patient_id" in s, msg="Missing required patient_id")

# Compose with >> in a clinical data pipeline
pipeline = (
    Agent("data_extractor")
    .model("gemini-2.5-flash")
    .instruct("Extract structured clinical data from the patient records.")
    >> S.pick("clinical_findings", "lab_results")
    >> S.rename(clinical_findings="analysis_input")
    >> S.default(confidence_interval=0.95)
    >> Agent("statistical_analyzer")
    .model("gemini-2.5-flash")
    .instruct("Perform statistical analysis on the clinical data.")
)

# Full research pipeline with parallel data collection and S transforms
research_pipeline = (
    (
        Agent("literature_agent")
        .model("gemini-2.5-flash")
        .instruct("Search medical literature databases for relevant studies.")
        | Agent("trial_agent")
        .model("gemini-2.5-flash")
        .instruct("Query clinical trial registries for comparable trials.")
    )
    >> S.merge("literature_agent", "trial_agent", into="combined_evidence")
    >> S.default(confidence_interval=0.95, sample_size=0)
    >> Agent("report_writer")
    .model("gemini-2.5-flash")
    .instruct("Write a systematic review report from the combined evidence.")
    >> S.compute(
        word_count=lambda s: len(s.get("report", "").split()),
    )
)

# --- NEW EXPANSION METHODS ---

# S.accumulate — collect observations across multiple iterations
# In a longitudinal study, accumulate patient measurements over time
accumulator = S.accumulate("blood_pressure", into="bp_readings")
result = accumulator({"blood_pressure": 120, "bp_readings": [115, 118]})
assert isinstance(result, StateDelta)
assert result.updates == {"bp_readings": [115, 118, 120]}

# S.accumulate with default target key — appends to "{key}_all"
accumulator_default = S.accumulate("glucose_level")
result = accumulator_default({"glucose_level": 5.4})
assert isinstance(result, StateDelta)
assert result.updates == {"glucose_level_all": [5.4]}

# S.counter — track iteration count or event frequency
# Count how many times a screening test was administered
counter = S.counter("test_count")
result = counter({"test_count": 3})
assert isinstance(result, StateDelta)
assert result.updates == {"test_count": 4}

# S.counter with custom step — decrement remaining doses
dose_counter = S.counter("doses_remaining", step=-1)
result = dose_counter({"doses_remaining": 10})
assert isinstance(result, StateDelta)
assert result.updates == {"doses_remaining": 9}

# S.history — maintain rolling window of recent values
# Track recent temperature readings for trend detection
history = S.history("temperature", max_size=3)
result = history({"temperature": 37.2, "temperature_history": [36.8, 37.0]})
assert isinstance(result, StateDelta)
assert result.updates == {"temperature_history": [36.8, 37.0, 37.2]}

# S.history enforces max_size — oldest values drop off
result2 = history({"temperature": 37.5, "temperature_history": [36.8, 37.0, 37.2]})
assert isinstance(result2, StateDelta)
assert result2.updates == {"temperature_history": [37.0, 37.2, 37.5]}  # 36.8 dropped

# S.validate — enforce state schema with Pydantic or dataclass
# Ensure research data conforms to expected structure
from dataclasses import dataclass


@dataclass
class ClinicalData:
    patient_id: str
    age: int
    treatment: str


validator = S.validate(ClinicalData)
valid_state = {"patient_id": "P001", "age": 45, "treatment": "A"}
result = validator(valid_state)
assert isinstance(result, StateDelta)
assert result.updates == {}  # validation passed, no changes

# S.require — assert presence of critical keys
# Before analysis, ensure required fields exist
requirement = S.require("patient_id", "consent_signed")
result = requirement({"patient_id": "P001", "consent_signed": True, "notes": "optional"})
assert isinstance(result, StateDelta)
assert result.updates == {}  # all required keys present

# S.flatten — convert nested measurement data to flat dotted keys
# Normalize hierarchical lab results for tabular analysis
flattener = S.flatten("lab_results", separator=".")
nested_state = {"lab_results": {"blood": {"hemoglobin": 14.2, "wbc": 7.5}, "urine": {"ph": 6.0}}}
result = flattener(nested_state)
assert isinstance(result, StateDelta)
assert result.updates == {"blood.hemoglobin": 14.2, "blood.wbc": 7.5, "urine.ph": 6.0}

# S.unflatten — rebuild nested structure from dotted keys
# Reconstruct hierarchical data for JSON export
unflattener = S.unflatten(separator=".")
flat_state = {"patient.demographics.age": 45, "patient.demographics.gender": "F", "visit_id": "V123"}
result = unflattener(flat_state)
assert isinstance(result, StateReplacement)
assert result.new_state == {
    "patient": {"demographics": {"age": 45, "gender": "F"}},
    "visit_id": "V123",
}

# S.zip — combine parallel measurement arrays
# Align timestamps with sensor readings for time-series analysis
zipper = S.zip("timestamps", "heart_rate", into="hr_timeseries")
result = zipper({"timestamps": [0, 60, 120], "heart_rate": [72, 75, 71]})
assert isinstance(result, StateDelta)
assert result.updates == {"hr_timeseries": [(0, 72), (60, 75), (120, 71)]}

# S.group_by — stratify cohort data by demographic category
# Group patients by age bracket for subgroup analysis
grouper = S.group_by("patients", key_fn=lambda p: p["age_bracket"], into="cohorts")
patient_list = [
    {"id": "P1", "age_bracket": "18-30"},
    {"id": "P2", "age_bracket": "31-50"},
    {"id": "P3", "age_bracket": "18-30"},
]
result = grouper({"patients": patient_list})
assert isinstance(result, StateDelta)
assert result.updates == {
    "cohorts": {
        "18-30": [{"id": "P1", "age_bracket": "18-30"}, {"id": "P3", "age_bracket": "18-30"}],
        "31-50": [{"id": "P2", "age_bracket": "31-50"}],
    }
}
# Native ADK requires custom BaseAgent subclasses for any state transform.
# In a clinical research pipeline, each data cleaning step becomes a class:
from google.adk.agents.base_agent import BaseAgent as NativeBaseAgent


class SelectResearchFields(NativeBaseAgent):
    async def _run_async_impl(self, ctx):
        # Keep only "clinical_findings" and "lab_results"
        for key in list(ctx.session.state.keys()):
            if key not in ("clinical_findings", "lab_results"):
                del ctx.session.state[key]


class RenameForReport(NativeBaseAgent):
    async def _run_async_impl(self, ctx):
        if "clinical_findings" in ctx.session.state:
            ctx.session.state["analysis"] = ctx.session.state.pop("clinical_findings")


# Each transform = a new class. No composability.

Equivalence

# S transforms compose with >> into Pipeline
assert isinstance(pipeline, Pipeline)
built = pipeline.build()
assert len(built.sub_agents) == 5  # extractor, pick, rename, default, analyzer

# All transform agent names are valid identifiers
for sub in built.sub_agents:
    assert sub.name.isidentifier(), f"Invalid name: {sub.name}"

# Research pipeline builds correctly
assert isinstance(research_pipeline, Pipeline)
built_research = research_pipeline.build()
assert len(built_research.sub_agents) == 5  # fanout, merge, default, writer, compute