StreamRunner: Continuous Userless Agent Execution¶
Demonstrates the Source and StreamRunner for processing continuous data streams without a human in the loop.
Key concepts:
Source.from_iter(): wrap a sync iterable as an async stream
Source.from_async(): pass through an async generator
Source.poll(): poll a function at intervals
Source.callback() / Inbox: push-based source for webhooks
StreamRunner: bridges AsyncIterator → ADK runner.run_async()
Session strategies: per_item, shared, keyed
Callbacks: on_result, on_error (dead-letter queue)
StreamStats: live counters (processed, errors, throughput)
Tip
What you’ll learn How to register lifecycle callbacks with accumulation semantics.
Source: 60_stream_runner.py
Equivalence¶
# Source.callback() returns Inbox
inbox2 = Source.callback()
assert isinstance(inbox2, Inbox)
# Inbox push/close
inbox2.push("a")
inbox2.push("b")
assert inbox2.pending == 2
inbox2.close()
# StreamRunner is fluent
assert isinstance(runner, StreamRunner)
assert runner._concurrency == 5
assert runner._session_strategy == "per_item"
assert runner._shutdown_timeout == 10
# Session key implies keyed strategy
assert keyed_runner._session_strategy == "keyed"
assert keyed_runner._session_key_fn is not None
# Stats dataclass works
stats2 = StreamStats()
stats2.processed = 10
stats2.errors = 1
assert stats2.processed == 10
assert stats2.errors == 1
assert stats2.throughput > 0 # time has passed since creation
# StreamRunner requires source to start
import asyncio
try:
asyncio.run(StreamRunner(processor).start())
assert False, "Should have raised ValueError"
except ValueError as e:
assert "No source configured" in str(e)