Dispatch-Aware Middleware: Observability for Background Execution¶
Demonstrates the dispatch/join middleware hooks for observing background agent lifecycle events.
Key concepts:
DispatchLogMiddleware: built-in observability for dispatch/join
on_dispatch: fired when a task is dispatched as background
on_task_complete: fired when a dispatched task completes
on_task_error: fired when a dispatched task fails
on_join: fired after a join barrier completes
on_stream_item: fired after each stream item is processed
get_execution_mode(): query current mode (pipeline/dispatched/stream)
task_budget(): configure max concurrent dispatch tasks
Tip
What you’ll learn How to compose agents into a sequential pipeline.
Source: 61_dispatch_middleware.py
from adk_fluent import Agent, dispatch, get_execution_mode, join
from adk_fluent._primitive_builders import BackgroundTask
from adk_fluent.backends.adk._primitives import _execution_mode
from adk_fluent._primitives import _middleware_dispatch_hooks
from adk_fluent.middleware import (
DispatchLogMiddleware,
Middleware,
_MiddlewarePlugin,
)
from adk_fluent.stream import StreamRunner
# --- 1. DispatchLogMiddleware exists and has the right hooks ---
mw = DispatchLogMiddleware()
assert hasattr(mw, "on_dispatch")
assert hasattr(mw, "on_task_complete")
assert hasattr(mw, "on_task_error")
assert hasattr(mw, "on_join")
assert hasattr(mw, "on_stream_item")
assert isinstance(mw.log, list)
assert len(mw.log) == 0
# --- 2. Middleware protocol includes dispatch/stream hooks ---
assert hasattr(Middleware, "on_dispatch")
assert hasattr(Middleware, "on_task_complete")
assert hasattr(Middleware, "on_task_error")
assert hasattr(Middleware, "on_join")
assert hasattr(Middleware, "on_stream_item")
# --- 3. _MiddlewarePlugin has dispatch adapter methods ---
plugin = _MiddlewarePlugin(name="test", stack=[mw])
assert hasattr(plugin, "on_dispatch")
assert hasattr(plugin, "on_task_complete")
assert hasattr(plugin, "on_task_error")
assert hasattr(plugin, "on_join")
assert hasattr(plugin, "on_stream_item")
# --- 4. get_execution_mode() works ---
assert get_execution_mode() in ("pipeline", "dispatched", "stream")
# Default mode is pipeline
assert get_execution_mode() == "pipeline"
# --- 5. Configurable task_budget on dispatch() ---
writer = Agent("writer").model("gemini-2.5-flash").instruct("Write.")
emailer = Agent("emailer").model("gemini-2.5-flash").instruct("Email.")
# Via factory parameter
d = dispatch(emailer, task_budget=25)
assert isinstance(d, BackgroundTask)
assert d._max_tasks == 25
built = d.build()
assert built._max_tasks == 25
# Via fluent method
d2 = dispatch(emailer).task_budget(10)
assert d2._max_tasks == 10
built2 = d2.build()
assert built2._max_tasks == 10
# --- 6. StreamRunner has .middleware() and .task_budget() ---
sr = StreamRunner(writer)
assert hasattr(sr, "middleware")
assert hasattr(sr, "task_budget")
sr.task_budget(100).middleware(mw)
assert sr._max_tasks == 100
assert len(sr._middlewares) == 1
# --- 7. Custom middleware with dispatch hooks ---
class TrackingMiddleware:
"""Custom middleware that tracks dispatch lifecycle."""
def __init__(self):
self.dispatched = []
self.completed = []
self.errors = []
self.joins = []
async def on_dispatch(self, ctx, task_name, agent_name):
self.dispatched.append(task_name)
async def on_task_complete(self, ctx, task_name, result):
self.completed.append(task_name)
async def on_task_error(self, ctx, task_name, error):
self.errors.append(task_name)
async def on_join(self, ctx, joined, timed_out):
self.joins.append({"joined": joined, "timed_out": timed_out})
tracker = TrackingMiddleware()
plugin2 = _MiddlewarePlugin(name="tracker", stack=[tracker])
assert hasattr(plugin2, "on_dispatch")
# --- 8. Prelude exports ---
from adk_fluent.prelude import DispatchLogMiddleware as DLM, get_execution_mode as gem
assert DLM is DispatchLogMiddleware
assert gem is get_execution_mode
# --- 9. Top-level exports ---
from adk_fluent import DispatchLogMiddleware as DLM2, get_execution_mode as gem2
assert DLM2 is DispatchLogMiddleware
assert gem2 is get_execution_mode
print("All dispatch middleware assertions passed!")
graph TD
n1>"dispatch_7 dispatch(1)"]
n2["emailer"]
n1 --> n2