G Module: Declarative Guard Composition

Demonstrates the G module – a fluent composition surface for safety, validation, and policy guards. Guards compile into before/after model callbacks automatically.

Key concepts:

  • GGuard: single guard unit with phase and compile function

  • GComposite: composable chain with | operator

  • G.json(), G.length(), G.regex(): structural guards

  • G.output(), G.input(): schema validation guards

  • G.pii(), G.toxicity(), G.topic(): content safety guards

  • G.budget(), G.rate_limit(), G.max_turns(): policy guards

  • G.grounded(), G.hallucination(): grounding guards

  • G.when(predicate, guard): conditional guards

  • Provider protocols: PIIDetector, ContentJudge

Tip

What you’ll learn How to register lifecycle callbacks with accumulation semantics.

Source: 67_g_module_guards.py

from adk_fluent import Agent, Pipeline
from adk_fluent._guards import (
    G,
    GComposite,
    GGuard,
    GuardViolation,
    PIIDetector,
    PIIFinding,
    ContentJudge,
    JudgmentResult,
)

# --- 1. GComposite composition with | operator ---
json_guard = G.json()
assert isinstance(json_guard, GComposite)
assert len(json_guard) == 1

length_guard = G.length(min=10, max=500)
assert isinstance(length_guard, GComposite)

# Compose with pipe operator
chain = G.json() | G.length(max=500)
assert isinstance(chain, GComposite)
assert len(chain) == 2  # two guards

# Chaining multiple guards
multi_chain = G.json() | G.length(max=500) | G.pii("redact")
assert len(multi_chain) == 3

# --- 2. G.json() compiles to after_model_callback ---
agent = Agent("validator").model("gemini-2.5-flash")
agent.guard(G.json())

# Guard compiles into after_model_callback
callbacks = agent._callbacks.get("after_model_callback", [])
assert any(name == "guard:json" for name, _ in callbacks)

# Guard spec is also tracked in config
guard_specs = agent._config.get("_guard_specs", ())
assert len(guard_specs) == 1
assert guard_specs[0]._kind == "json"

# --- 3. G.length(min, max) validates output length ---
agent2 = Agent("length_checker").model("gemini-2.5-flash")
agent2.guard(G.length(min=20, max=1000))

callbacks2 = agent2._callbacks.get("after_model_callback", [])
assert any(name == "guard:length" for name, _ in callbacks2)

# The config is stored in the callback
for name, config in callbacks2:
    if name == "guard:length":
        assert config["min"] == 20
        assert config["max"] == 1000

# --- 4. G.regex(pattern, action) enforces pattern matching ---
# Block mode: raises GuardViolation on match
block_guard = G.regex(r"\b(password|secret)\b", action="block")
agent3 = Agent("regex_blocker").model("gemini-2.5-flash")
agent3.guard(block_guard)

callbacks3 = agent3._callbacks.get("after_model_callback", [])
assert any(name == "guard:regex" for name, _ in callbacks3)

# Redact mode: replaces matches
redact_guard = G.regex(
    r"\b\d{3}-\d{2}-\d{4}\b",  # SSN pattern
    action="redact",
    replacement="[REDACTED-SSN]",
)
agent4 = Agent("regex_redactor").model("gemini-2.5-flash")
agent4.guard(redact_guard)

for name, config in agent4._callbacks.get("after_model_callback", []):
    if name == "guard:regex":
        assert config["action"] == "redact"
        assert config["replacement"] == "[REDACTED-SSN]"

# --- 5. G.output(schema) and G.input(schema) validate schemas ---


class ResponseSchema:
    """Example output schema."""

    answer: str
    confidence: float


class RequestSchema:
    """Example input schema."""

    query: str
    context: str


# Output validation (after_model_callback)
agent5 = Agent("output_validator").model("gemini-2.5-flash")
agent5.guard(G.output(ResponseSchema))

output_callbacks = agent5._callbacks.get("after_model_callback", [])
assert any(name == "guard:output" for name, _ in output_callbacks)

for name, schema in output_callbacks:
    if name == "guard:output":
        assert schema is ResponseSchema

# Input validation (before_model_callback)
agent6 = Agent("input_validator").model("gemini-2.5-flash")
agent6.guard(G.input(RequestSchema))

input_callbacks = agent6._callbacks.get("before_model_callback", [])
assert any(name == "guard:input" for name, _ in input_callbacks)

for name, schema in input_callbacks:
    if name == "guard:input":
        assert schema is RequestSchema

# --- 6. G.pii() with default regex detector ---
agent7 = Agent("pii_scanner").model("gemini-2.5-flash")
agent7.guard(G.pii("redact"))

pii_callbacks = agent7._callbacks.get("after_model_callback", [])
assert any(name == "guard:pii" for name, _ in pii_callbacks)

for name, config in pii_callbacks:
    if name == "guard:pii":
        assert config["action"] == "redact"
        assert config["threshold"] == 0.5
        assert config["replacement"] == "[PII]"
        # Default detector is _RegexDetector
        assert hasattr(config["detector"], "detect")

# --- 7. G.pii() with custom detector (PIIDetector protocol) ---


class CustomPIIDetector:
    """Custom PII detector implementing PIIDetector protocol."""

    async def detect(self, text: str) -> list[PIIFinding]:
        """Detect custom PII patterns."""
        findings = []
        # Example: detect employee IDs like EMP-12345
        import re

        for match in re.finditer(r"\bEMP-\d{5}\b", text):
            findings.append(
                PIIFinding(
                    kind="EMPLOYEE_ID",
                    start=match.start(),
                    end=match.end(),
                    confidence=1.0,
                    text=match.group(),
                )
            )
        return findings


custom_detector = CustomPIIDetector()
assert isinstance(custom_detector, PIIDetector)  # Protocol check

agent8 = Agent("custom_pii").model("gemini-2.5-flash")
agent8.guard(G.pii("block", detector=custom_detector, threshold=0.8))

for name, config in agent8._callbacks.get("after_model_callback", []):
    if name == "guard:pii":
        assert config["detector"] is custom_detector
        assert config["threshold"] == 0.8
        assert config["action"] == "block"

# --- 8. G.toxicity() and G.topic() for content safety ---
agent9 = Agent("toxicity_filter").model("gemini-2.5-flash")
agent9.guard(G.toxicity(threshold=0.8))

toxicity_callbacks = agent9._callbacks.get("after_model_callback", [])
assert any(name == "guard:toxicity" for name, _ in toxicity_callbacks)

for name, config in toxicity_callbacks:
    if name == "guard:toxicity":
        assert config["threshold"] == 0.8
        assert hasattr(config["judge"], "judge")

# Topic blocking
agent10 = Agent("topic_filter").model("gemini-2.5-flash")
denied_topics = ["politics", "religion", "financial_advice"]
agent10.guard(G.topic(deny=denied_topics))

topic_callbacks = agent10._callbacks.get("after_model_callback", [])
assert any(name == "guard:topic" for name, _ in topic_callbacks)

for name, config in topic_callbacks:
    if name == "guard:topic":
        assert config["deny"] == denied_topics

# --- 9. G.budget(), G.rate_limit(), G.max_turns() policy guards ---

# Token budget enforcement
agent11 = Agent("budget_enforcer").model("gemini-2.5-flash")
agent11.guard(G.budget(max_tokens=5000))

budget_callbacks = agent11._callbacks.get("after_model_callback", [])
assert any(name == "guard:budget" for name, _ in budget_callbacks)

for name, config in budget_callbacks:
    if name == "guard:budget":
        assert config["max_tokens"] == 5000

# Rate limiting
agent12 = Agent("rate_limiter").model("gemini-2.5-flash")
agent12.guard(G.rate_limit(rpm=60))

rate_callbacks = agent12._callbacks.get("before_model_callback", [])
assert any(name == "guard:rate_limit" for name, _ in rate_callbacks)

for name, config in rate_callbacks:
    if name == "guard:rate_limit":
        assert config["rpm"] == 60

# Max turns limit
agent13 = Agent("turn_limiter").model("gemini-2.5-flash")
agent13.guard(G.max_turns(n=10))

turns_callbacks = agent13._callbacks.get("before_model_callback", [])
assert any(name == "guard:max_turns" for name, _ in turns_callbacks)

for name, config in turns_callbacks:
    if name == "guard:max_turns":
        assert config["n"] == 10

# --- 10. G.grounded() and G.hallucination() for grounding ---

# Grounding guard reads from state
agent14 = Agent("grounded_agent").model("gemini-2.5-flash")
agent14.guard(G.grounded(sources_key="documents"))

grounded_callbacks = agent14._callbacks.get("after_model_callback", [])
assert any(name == "guard:grounded" for name, _ in grounded_callbacks)

for name, config in grounded_callbacks:
    if name == "guard:grounded":
        assert config["sources_key"] == "documents"

# Check that guard declares reads dependency
guard_specs14 = agent14._config.get("_guard_specs", ())
for spec in guard_specs14:
    if spec._kind == "grounded":
        assert spec._reads_keys == frozenset({"documents"})

# Hallucination detection
agent15 = Agent("hallucination_detector").model("gemini-2.5-flash")
agent15.guard(G.hallucination(threshold=0.7, sources_key="sources"))

halluc_callbacks = agent15._callbacks.get("after_model_callback", [])
assert any(name == "guard:hallucination" for name, _ in halluc_callbacks)

for name, config in halluc_callbacks:
    if name == "guard:hallucination":
        assert config["threshold"] == 0.7
        assert config["sources_key"] == "sources"

# Hallucination guard also declares reads dependency
guard_specs15 = agent15._config.get("_guard_specs", ())
for spec in guard_specs15:
    if spec._kind == "hallucination":
        assert spec._reads_keys == frozenset({"sources"})

# --- 11. G.when(predicate, guard) for conditional guards ---


def is_production(state):
    """Example predicate checking environment."""
    return state.get("env") == "production"


# Conditional PII guard: only in production
agent16 = Agent("conditional_guard").model("gemini-2.5-flash")
conditional = G.when(is_production, G.pii("redact"))
agent16.guard(conditional)

# The inner guard is still compiled
pii_found = any(name == "guard:pii" for name, _ in agent16._callbacks.get("after_model_callback", []))
assert pii_found

# Guard spec tracks the conditional wrapper
guard_specs16 = agent16._config.get("_guard_specs", ())
assert any(spec._kind == "when" for spec in guard_specs16)

# Conditional with multiple guards chained
conditional_multi = G.when(lambda s: s.get("strict_mode", False), G.json() | G.length(max=500) | G.pii("block"))
agent17 = Agent("multi_conditional").model("gemini-2.5-flash")
agent17.guard(conditional_multi)

# All inner guards compiled
callbacks17 = agent17._callbacks.get("after_model_callback", [])
assert any(name == "guard:json" for name, _ in callbacks17)
assert any(name == "guard:length" for name, _ in callbacks17)
assert any(name == "guard:pii" for name, _ in callbacks17)

# --- 12. Builder integration: full guard chain on agent ---
agent18 = (
    Agent("protected_agent")
    .model("gemini-2.5-flash")
    .instruct("You are a helpful assistant.")
    .guard(G.json() | G.pii("redact") | G.budget(10000))
)

# All three guards compiled
callbacks18 = agent18._callbacks.get("after_model_callback", [])
guard_names = [name for name, _ in callbacks18]
assert "guard:json" in guard_names
assert "guard:pii" in guard_names
assert "guard:budget" in guard_names

# Guard specs tracked
specs18 = agent18._config.get("_guard_specs", ())
assert len(specs18) == 3

# --- 13. NamespaceSpec protocol conformance ---
composite = G.json() | G.pii("redact")

# _kind discriminator
assert composite._kind == "guard_chain"

# _as_list() flattens guards
flattened = composite._as_list()
assert len(flattened) == 2
assert all(isinstance(g, GGuard) for g in flattened)

# _reads_keys union
json_guard_obj = G.json()
assert json_guard_obj._reads_keys == frozenset()

grounded_guard = G.grounded(sources_key="docs")
assert grounded_guard._reads_keys == frozenset({"docs"})

# Union of reads
combined_reads = G.json() | G.grounded("docs") | G.hallucination(sources_key="refs")
union_reads = combined_reads._reads_keys
assert union_reads is not None
assert "docs" in union_reads
assert "refs" in union_reads

# _writes_keys always empty for guards
assert composite._writes_keys == frozenset()

# --- 14. Guard specs stored in builder config ---
agent19 = Agent("spec_tracking").model("gemini-2.5-flash")

# No specs initially
assert "_guard_specs" not in agent19._config

# Add first guard
agent19.guard(G.json())
specs = agent19._config.get("_guard_specs", ())
assert len(specs) == 1

# Add more guards -- they accumulate
agent19.guard(G.pii("redact"))
specs = agent19._config.get("_guard_specs", ())
assert len(specs) == 2

agent19.guard(G.budget(5000))
specs = agent19._config.get("_guard_specs", ())
assert len(specs) == 3

# Each spec is a GGuard instance
assert all(isinstance(s, GGuard) for s in specs)

# --- 15. Full production example: comprehensive guard chain on medical agent ---


class MedicalResponseSchema:
    """Structured medical information response."""

    answer: str
    disclaimer: str
    sources: list[str]


class CustomToxicityJudge:
    """Custom judge for medical content appropriateness."""

    async def judge(self, text: str, context: dict | None = None) -> JudgmentResult:
        """Check for harmful medical advice."""
        # Simplified example: block if text contains dangerous keywords
        dangerous_keywords = ["self-medicate", "stop taking", "ignore doctor"]
        if any(kw in text.lower() for kw in dangerous_keywords):
            return JudgmentResult(passed=False, score=0.95, reason="Contains potentially harmful medical advice")
        return JudgmentResult(passed=True, score=0.1, reason="Safe")


# Build a medical agent with comprehensive safety guards
medical_agent = (
    Agent("medical_advisor")
    .model("gemini-2.5-flash")
    .instruct(
        "You provide general health information only. "
        "Always include disclaimers. Never diagnose or prescribe. "
        "Direct emergencies to 911."
    )
    .guard(
        # Structural validation
        G.output(MedicalResponseSchema)
        | G.length(min=50, max=2000)
        # PII protection
        | G.pii("redact", replacement="[PATIENT-INFO]")
        # Content safety
        | G.toxicity(threshold=0.7, judge=CustomToxicityJudge())
        | G.topic(deny=["specific_diagnosis", "prescription_dosage"])
        # Grounding
        | G.grounded(sources_key="medical_sources")
        | G.hallucination(threshold=0.6, sources_key="medical_sources")
        # Policy limits
        | G.budget(max_tokens=8000)
        | G.max_turns(n=5)
    )
)

# Verify all guards compiled
after_callbacks = medical_agent._callbacks.get("after_model_callback", [])
before_callbacks = medical_agent._callbacks.get("before_model_callback", [])

# After model guards
after_guard_names = [name for name, _ in after_callbacks]
assert "guard:output" in after_guard_names
assert "guard:length" in after_guard_names
assert "guard:pii" in after_guard_names
assert "guard:toxicity" in after_guard_names
assert "guard:topic" in after_guard_names
assert "guard:grounded" in after_guard_names
assert "guard:hallucination" in after_guard_names
assert "guard:budget" in after_guard_names

# Before model guards
before_guard_names = [name for name, _ in before_callbacks]
assert "guard:max_turns" in before_guard_names

# Guard specs tracked
medical_specs = medical_agent._config.get("_guard_specs", ())
assert len(medical_specs) == 9  # All nine guards

# Verify reads dependencies declared correctly
reads_specs = [s for s in medical_specs if s._reads_keys]
assert len(reads_specs) == 2  # grounded and hallucination
assert all("medical_sources" in s._reads_keys for s in reads_specs)

# --- Integration with pipeline: guards on individual agents ---
writer = (
    Agent("medical_writer")
    .model("gemini-2.5-flash")
    .instruct("Draft response.")
    .guard(G.pii("redact") | G.length(max=1000))
)
reviewer = (
    Agent("medical_reviewer")
    .model("gemini-2.5-flash")
    .instruct("Review safety.")
    .guard(G.toxicity(threshold=0.8) | G.budget(5000))
)

# Build pipeline with guarded agents
pipeline = writer >> reviewer

# Each agent has its own guards
writer_specs = writer._config.get("_guard_specs", ())
assert len(writer_specs) == 2

reviewer_specs = reviewer._config.get("_guard_specs", ())
assert len(reviewer_specs) == 2

print("All G module guard composition assertions passed!")
        graph TD
    n1[["medical_writer_then_medical_reviewer (sequence)"]]
    n2["medical_writer"]
    n3["medical_reviewer"]
    n2 --> n3