Module: middleware¶
from adk_fluent import M
Fluent middleware composition. Consistent with P, C, S modules.
Quick Reference¶
Method |
Returns |
Description |
|---|---|---|
|
|
Retry middleware with exponential backoff |
|
|
Structured event logging middleware |
|
|
Token usage tracking middleware |
|
|
Per-agent latency tracking middleware |
|
|
Topology event logging (loops, fanout, routes, fallbacks, timeouts) |
|
|
Dispatch/join lifecycle logging |
|
|
Restrict middleware to specific agents |
|
|
Conditionally apply middleware |
|
|
Single-hook middleware: fires before each agent |
|
|
Single-hook middleware: fires after each agent |
|
|
Single-hook middleware: fires before each LLM request |
|
|
Single-hook middleware: fires after each LLM response |
|
|
Single-hook middleware: fires at each loop iteration |
|
|
Single-hook middleware: fires when a timeout completes/expires |
|
|
Single-hook middleware: fires when a route selects an agent |
|
|
Single-hook middleware: fires at each fallback attempt |
|
|
Circuit breaker — trips open after N consecutive model errors |
|
|
Per-agent execution timeout |
|
|
Cache LLM responses keyed by request content |
|
|
Auto-downgrade to fallback model on primary model failure |
|
|
Suppress duplicate model calls within a sliding window |
|
|
Probabilistic middleware — fires inner middleware only N% of the time |
|
|
OpenTelemetry span export (no-op if opentelemetry not installed) |
|
|
Metrics collection (no-op if no collector provided) |
|
|
A2A-specific retry middleware for remote agent failures |
|
|
Circuit breaker for A2A remote agents |
|
|
Per-delegation timeout for A2A remote agent calls |
|
|
Log A2UI surface operations (createSurface, updateComponents, etc.) |
Built-in factories¶
M.retry(max_attempts: int = 3, backoff: float = 1.0) -> MComposite¶
Retry middleware with exponential backoff.
Parameters:
max_attempts(int) — default:3backoff(float) — default:1.0
M.log() -> MComposite¶
Structured event logging middleware.
M.cost() -> MComposite¶
Token usage tracking middleware.
M.latency() -> MComposite¶
Per-agent latency tracking middleware.
M.topology_log() -> MComposite¶
Topology event logging (loops, fanout, routes, fallbacks, timeouts).
M.dispatch_log() -> MComposite¶
Dispatch/join lifecycle logging.
Composition operators¶
M.scope(agents: str | tuple[str, ...], mw: MComposite | Any) -> MComposite¶
Restrict middleware to specific agents.
Usage: M.scope(“writer”, M.cost()) M.scope((“writer”, “reviewer”), M.log())
Parameters:
agents(str | tuple[str, …])mw(MComposite | Any)
M.when(condition: str | Callable[[], bool] | type, mw: MComposite | Any) -> MComposite¶
Conditionally apply middleware.
condition can be:
- String shortcut: "stream", "dispatched", "pipeline"
matching ExecutionMode.
- Callable returning bool, evaluated at hook invocation time.
- PredicateSchema subclass, evaluated against session state
at hook invocation time.
Usage: M.when(“stream”, M.latency()) M.when(lambda: is_debug(), M.log()) M.when(PremiumOnly, M.scope(“writer”, M.cost()))
Parameters:
condition(str | Callable[[], bool] | type)mw(MComposite | Any)
Single-hook shortcuts¶
M.before_agent(fn: Callable) -> MComposite¶
Single-hook middleware: fires before each agent.
Parameters:
fn(Callable)
M.after_agent(fn: Callable) -> MComposite¶
Single-hook middleware: fires after each agent.
Parameters:
fn(Callable)
M.before_model(fn: Callable) -> MComposite¶
Single-hook middleware: fires before each LLM request.
Parameters:
fn(Callable)
M.after_model(fn: Callable) -> MComposite¶
Single-hook middleware: fires after each LLM response.
Parameters:
fn(Callable)
M.on_loop(fn: Callable) -> MComposite¶
Single-hook middleware: fires at each loop iteration.
Parameters:
fn(Callable)
M.on_timeout(fn: Callable) -> MComposite¶
Single-hook middleware: fires when a timeout completes/expires.
Parameters:
fn(Callable)
M.on_route(fn: Callable) -> MComposite¶
Single-hook middleware: fires when a route selects an agent.
Parameters:
fn(Callable)
M.on_fallback(fn: Callable) -> MComposite¶
Single-hook middleware: fires at each fallback attempt.
Parameters:
fn(Callable)
————————————————————¶
M.circuit_breaker(threshold: int = 5, reset_after: float = 60) -> MComposite¶
Circuit breaker — trips open after N consecutive model errors.
Parameters:
threshold(int) — default:5reset_after(float) — default:60
M.timeout(seconds: float = 30) -> MComposite¶
Per-agent execution timeout.
Parameters:
seconds(float) — default:30
M.cache(ttl: float = 300, key_fn: Any = None) -> MComposite¶
Cache LLM responses keyed by request content.
Parameters:
ttl(float) — default:300key_fn(Any) — default:None
M.fallback_model(model: str = gemini-2.0-flash) -> MComposite¶
Auto-downgrade to fallback model on primary model failure.
Parameters:
model(str) — default:'gemini-2.0-flash'
M.dedup(window: int = 10) -> MComposite¶
Suppress duplicate model calls within a sliding window.
Parameters:
window(int) — default:10
M.sample(rate: float, mw: MComposite | Any) -> MComposite¶
Probabilistic middleware — fires inner middleware only N% of the time.
Parameters:
rate(float)mw(MComposite | Any)
M.trace(exporter: Any = None) -> MComposite¶
OpenTelemetry span export (no-op if opentelemetry not installed).
Parameters:
exporter(Any) — default:None
M.metrics(collector: Any = None) -> MComposite¶
Metrics collection (no-op if no collector provided).
Parameters:
collector(Any) — default:None
A2A-specific middleware¶
M.a2a_retry(max_attempts: int = 3, backoff: float = 2.0, *, agents: str | tuple[str, ...] | None = None, on_retry: Callable | None = None) -> MComposite¶
A2A-specific retry middleware for remote agent failures.
Handles HTTP transport errors, A2A task FAILED/REJECTED states, and network-level transient failures. Uses exponential backoff.
Args:
max_attempts: Maximum number of retry attempts (default 3).backoff: Base delay in seconds for exponential backoff (default 2.0).agents: Scope to specific agent names (default: all agents).on_retry: Optional callback(ctx, agent_name, attempt, error)called before each retry.
Usage: pipeline.middleware(M.a2a_retry(max_attempts=3, backoff=2.0)) pipeline.middleware(M.scope(“remote_*”, M.a2a_retry()))
Parameters:
max_attempts(int) — default:3backoff(float) — default:2.0agents(str | tuple[str, …] | None) — default:Noneon_retry(Callable | None) — default:None
M.a2a_circuit_breaker(threshold: int = 5, reset_after: float = 60, *, agents: str | tuple[str, ...] | None = None, on_open: Callable | None = None, on_close: Callable | None = None) -> MComposite¶
Circuit breaker for A2A remote agents.
Opens after threshold consecutive failures. Stays open for
reset_after seconds, then allows a single probe call.
Args:
threshold: Number of failures before opening (default 5).reset_after: Seconds to stay open before half-open probe (default 60).agents: Scope to specific agent names (default: all agents).on_open: Callback(ctx, agent_name)when circuit opens.on_close: Callback(ctx, agent_name)when circuit closes.
Usage: pipeline.middleware(M.a2a_circuit_breaker(threshold=5, reset_after=60))
Parameters:
threshold(int) — default:5reset_after(float) — default:60agents(str | tuple[str, …] | None) — default:Noneon_open(Callable | None) — default:Noneon_close(Callable | None) — default:None
M.a2a_timeout(seconds: float = 30, *, agents: str | tuple[str, ...] | None = None, on_timeout: Callable | None = None) -> MComposite¶
Per-delegation timeout for A2A remote agent calls.
Enforces wall-clock time limits on entire agent invocations. Critical for remote A2A calls with network latency + remote LLM processing.
Args:
seconds: Maximum seconds for the agent invocation (default 30).agents: Scope to specific agent names (default: all agents).on_timeout: Callback(ctx, agent_name, seconds)on timeout.
Usage: pipeline.middleware(M.a2a_timeout(seconds=30)) pipeline.middleware(M.scope(“slow_agent”, M.a2a_timeout(120)))
Parameters:
seconds(float) — default:30agents(str | tuple[str, …] | None) — default:Noneon_timeout(Callable | None) — default:None
M.a2ui_log(*, level: str = info, agents: list[str] | None = None) -> MComposite¶
Log A2UI surface operations (createSurface, updateComponents, etc.).
Attaches an after_model callback that inspects tool calls for A2UI operations and logs them at the specified level.
Args:
level: Log level —"info"or"debug".agents: Optional list of agent names to scope the logging to.
Usage: agent.middleware(M.a2ui_log()) pipeline.middleware(M.a2ui_log(level=”debug”))
Parameters:
level(str) — default:'info'agents(list[str] | None) — default:None
Composition Operators¶
| (compose (MComposite))¶
Stack middleware into a chain
Types¶
Type |
Description |
|---|---|
|
Composable middleware chain. |