Middleware¶
Middleware provides app-global cross-cutting behavior. Unlike callbacks which are per-agent, middleware applies to the entire execution across all agents. Unlike presets which share config across agents, middleware operates at the pipeline level.
When to Use Middleware vs. Callbacks vs. Presets vs. Guards¶
Mechanism |
Scope |
Purpose |
Example |
|---|---|---|---|
Callbacks |
Single agent |
Agent-specific behavior |
Audit logging on one agent |
Presets |
Multiple agents |
Shared configuration |
Same model + callbacks on all agents |
Middleware |
Entire pipeline |
Cross-cutting infrastructure |
Retry, circuit breaker, tracing |
Guards |
Single agent |
Safety / validation |
PII redaction, output length |
Use Middleware when the concern is infrastructure-level and should apply uniformly across the pipeline. If you’re adding retry logic, logging, or tracing to individual agents, you’re doing it wrong – use middleware.
Attaching Middleware¶
Use .middleware() on any builder, then .to_app() to compile:
from adk_fluent import Agent
from adk_fluent._middleware import M
pipeline = (
Agent("a") >> Agent("b")
).middleware(M.retry(3) | M.log())
app = pipeline.to_app()
Multiple .middleware() calls accumulate. When combined with >> or |, middleware from all operands is merged.
The M Module¶
The M module provides composable middleware factories. Compose with | (chain):
from adk_fluent._middleware import M
# Compose a middleware stack
stack = M.retry(3) | M.log() | M.latency() | M.cost()
pipeline = (Agent("a") >> Agent("b")).middleware(stack)
Built-in Middleware¶
Factory |
Purpose |
Key args |
|---|---|---|
|
Retry with exponential backoff |
|
|
Structured event logging |
– |
|
Token usage tracking |
– |
|
Per-agent latency tracking |
– |
|
Stop calling a failing model |
|
|
Per-agent timeout |
– |
|
Response caching |
– |
|
Fallback to different model on error |
– |
|
Deduplicate identical requests |
– |
|
Probabilistic sampling |
– |
|
Distributed tracing |
– |
|
Metrics collection |
– |
|
Restrict middleware to specific agents |
– |
|
Conditional middleware |
– |
Composition Order Matters¶
Middleware runs in the order you compose it. For non-void hooks (before_model, before_tool), the first middleware to return a non-None value short-circuits the rest:
# Retry wraps logging wraps cost tracking
stack = M.retry(3) | M.log() | M.cost()
# Execution order for before_model:
# 1. retry.before_model (retries if error occurs downstream)
# 2. log.before_model (logs the request)
# 3. cost.before_model (starts token counting)
#
# Execution order for after_model:
# 3. cost.after_model (records tokens)
# 2. log.after_model (logs the response)
# 1. retry.after_model (checks for retryable errors)
Rule of thumb
Put retry first (outermost), logging next, then specific concerns (cost, latency, caching). This way retry wraps everything, and logging captures both successful and retried calls.
Scoping Middleware¶
Apply middleware to specific agents only:
# Only retry the LLM-calling agents, not the state transforms
stack = M.scope(["classifier", "resolver"], M.retry(3)) | M.log()
pipeline = (Agent("classifier") >> Agent("resolver")).middleware(stack)
Conditional Middleware¶
Apply middleware based on runtime conditions:
import os
# Only enable cost tracking in production
stack = M.log() | M.when(os.getenv("ENV") == "production", M.cost())
Middleware Protocol¶
A middleware is any object with optional async lifecycle methods:
class Middleware(Protocol):
# Runner lifecycle
async def before_run(self, *, runner, session, **kw): ...
async def after_run(self, *, runner, session, **kw): ...
# Agent lifecycle
async def before_agent(self, *, agent, context, **kw): ...
async def after_agent(self, *, agent, context, **kw): ...
# Model lifecycle
async def before_model(self, *, agent, request, context, **kw): ...
async def after_model(self, *, agent, response, context, **kw): ...
async def on_model_error(self, *, agent, error, context, **kw): ...
# Tool lifecycle
async def before_tool(self, *, agent, tool, args, context, **kw): ...
async def after_tool(self, *, agent, tool, result, context, **kw): ...
async def on_tool_error(self, *, agent, tool, error, context, **kw): ...
# Cleanup
async def close(self): ...
All methods are optional. Implement only what you need.
Writing Custom Middleware¶
import time
class TimingMiddleware:
"""Track execution time for each agent."""
def __init__(self):
self.timings = {}
async def before_agent(self, *, agent, context, **kw):
context.state["_start"] = time.time()
async def after_agent(self, *, agent, context, **kw):
elapsed = time.time() - context.state.get("_start", time.time())
self.timings[agent.name] = elapsed
How Middleware Works¶
When .to_app() is called:
Builder middleware (from
.middleware()) is merged withExecutionConfig.middlewaresThe middleware stack is compiled into a single
_MiddlewarePlugin(an ADKBasePlugin)The plugin is attached to the
Appviaplugins=[plugin]
Middleware Lifecycle (execution order):
before_run ──────────────────────────────────── after_run
│ │
▼ │
before_agent ─────────────────────── after_agent │
│ │ │
▼ │ │
before_model ─► LLM call ─► after_model │ │
│ │ │ │ │
│ on_model_error │ │ │
│ │ │ │
before_tool ──► tool() ──► after_tool │ │
│ │ │
on_tool_error │ │
│ │
Stack order: mw[0] → mw[1] → mw[2] │ │
Short-circuit: first non-None return wins│ │
Void hooks: ALL middleware always called ─┘──────────┘
Interplay with Other Modules¶
Middleware + Guards¶
Guards are per-agent safety checks. Middleware is pipeline-wide infrastructure. Use both:
from adk_fluent import Agent, G
from adk_fluent._middleware import M
agent = Agent("service").instruct("Help.").guard(G.pii("redact") | G.length(max=500))
pipeline = (agent >> Agent("auditor")).middleware(M.retry(3) | M.log())
# Guards: PII redaction on the service agent only
# Middleware: retry + logging on the entire pipeline
See Guards.
Middleware + Visibility¶
Middleware sees all agents regardless of visibility. M.log() captures events from hidden agents:
pipeline = (
Agent("hidden").instruct("Internal.").hide()
>> Agent("visible").instruct("User-facing.")
).middleware(M.log())
# M.log() captures events from BOTH agents
See Visibility.
Middleware + Context Engineering¶
Middleware and context engineering don’t interact directly, but they complement each other. Context engineering controls what the LLM sees; middleware controls how the pipeline behaves:
from adk_fluent import Agent, C
from adk_fluent._middleware import M
pipeline = (
Agent("classifier").context(C.none()).writes("intent")
>> Agent("resolver").context(C.from_state("intent"))
).middleware(M.retry(3) | M.log() | M.cost())
# Context: each agent sees only what it needs
# Middleware: retry, logging, cost tracking across all agents
Middleware + Testing¶
Use M.log() in tests to capture events for assertions:
from adk_fluent._middleware import M
logger = M.log()
pipeline = (Agent("a") >> Agent("b")).middleware(logger)
app = pipeline.to_app()
# After execution: inspect logger.events for assertions
See Testing.
Best Practices¶
Don’t scatter callbacks when middleware will do. If 5 agents all need retry logic, use
M.retry()once on the pipeline, not.on_model_error()5 timesPut retry outermost in the middleware stack.
M.retry(3) | M.log()means retry wraps logging – retried calls are logged correctlyUse
M.scope()for agent-specific middleware. Not all agents need the same middleware – scope expensive operations (caching, circuit breaker) to agents that benefitUse
M.when()for environment-specific behavior. Don’t branch in your pipeline code – let middleware handle itMiddleware is for infrastructure, not business logic. Retry, logging, tracing, caching – these are middleware concerns. Routing, classification, data transformation – these are agent/function concerns
Backend Awareness¶
Middleware works across all execution backends:
ADK backend: Middleware runs as ADK
BasePlugininstances compiled into the AppTemporal backend (in development): Middleware runs as runtime hooks around workflow/activity execution
asyncio backend (in development): Middleware runs as direct Python hooks
The middleware definition is identical regardless of backend. Only the execution mechanism differs:
# Same middleware definition, works with any engine
pipeline = (Agent("a") >> Agent("b")).middleware(M.retry(3) | M.log())
# ADK (default)
app = pipeline.to_app()
# Temporal (in development)
pipeline_t = pipeline.engine("temporal", client=client)
response = await pipeline_t.ask_async("Go")
See also
Execution Backends — backend selection and capability matrix
Temporal Guide — durable execution and how middleware interacts with Temporal
Callbacks — per-agent callback attachment
Presets — shared agent configuration
Guards — per-agent safety and validation
Visibility — controlling user-facing output
Best Practices — the “Callbacks vs. Middleware” decision tree