Analytics Data Quality: State Contract Assertions with expect()

Tip

What you’ll learn How to work with state keys and state transforms.

Source: 36_expect_assertions.py

from adk_fluent import Agent, Pipeline, expect

# expect(): assert a state contract at a pipeline step.
# In analytics, data quality gates prevent garbage-in-garbage-out.
analytics_pipeline = (
    Agent("metric_calculator")
    .model("gemini-2.5-flash")
    .instruct("Compute key business metrics: revenue, churn rate, and LTV from raw data.")
    .writes("metrics")
    >> expect(lambda s: "metrics" in s, "Metrics must be computed before dashboard generation")
    >> Agent("dashboard_generator")
    .model("gemini-2.5-flash")
    .instruct("Generate an executive dashboard with charts and insights from the metrics.")
)

# Multiple quality gates in a data pipeline — catch issues at each stage
validated_pipeline = (
    Agent("data_ingester")
    .model("gemini-2.5-flash")
    .instruct("Ingest raw event data from the warehouse and extract user behavior events.")
    >> expect(lambda s: "events" in s, "Ingestion must produce events data")
    >> Agent("aggregator")
    .model("gemini-2.5-flash")
    .instruct("Aggregate events into daily/weekly/monthly cohort metrics.")
    >> expect(lambda s: len(s.get("events", "")) > 0, "Events data must not be empty after aggregation")
    >> Agent("report_builder")
    .model("gemini-2.5-flash")
    .instruct("Build the final analytics report with trend analysis and recommendations.")
)
# Native ADK requires a custom BaseAgent to assert state contracts.
# In a data analytics pipeline, every quality gate is a full class:
from google.adk.agents.base_agent import BaseAgent as NativeBaseAgent
from google.adk.agents.llm_agent import LlmAgent
from google.adk.agents.sequential_agent import SequentialAgent


class AssertMetricsExist(NativeBaseAgent):
    """Custom agent that raises if 'metrics' is missing from state."""

    async def _run_async_impl(self, ctx):
        if "metrics" not in ctx.session.state:
            raise ValueError("Metrics must be computed before dashboard generation")
        # yield nothing


collector = LlmAgent(name="collector", model="gemini-2.5-flash", instruction="Collect raw analytics data.")
checker = AssertMetricsExist(name="checker")
dashboard = LlmAgent(name="dashboard", model="gemini-2.5-flash", instruction="Generate the dashboard.")

pipeline_native = SequentialAgent(name="pipeline", sub_agents=[collector, checker, dashboard])

Equivalence

import pytest

# expect() creates a builder
e = expect(lambda s: True, "test msg")
assert hasattr(e, "build")

# >> expect() creates a Pipeline
assert isinstance(analytics_pipeline, Pipeline)
built = analytics_pipeline.build()
assert len(built.sub_agents) == 3

# The internal function raises ValueError on failure
e_fail = expect(lambda s: False, "Data quality check failed: missing required fields")
with pytest.raises(ValueError, match="Data quality check failed"):
    e_fail._fn({})

# The internal function passes silently on success
e_pass = expect(lambda s: "revenue" in s)
result = e_pass._fn({"revenue": 42000})
assert result == {}

# Default message
e_default = expect(lambda s: False)
with pytest.raises(ValueError, match="State assertion failed"):
    e_default._fn({})