R + Agent.on() — declarative reactors, zero ceremony.¶
The 0.17.0 reactor refresh makes signals and rules a first-class part of the fluent builder surface. Before, wiring a reactor required hand-building every object in sequence::
bus = H.event_bus()
tape = bus.tape()
temp = Signal("temp", 72, bus=bus)
reactor = Reactor(tape, bus=bus)
reactor.when(temp.rising.where(lambda v, _: v > 90), my_handler, priority=10)
await reactor.run()
Now::
temp = R.signal("temp", 72)
cooler = (
Agent("cooler", "gemini-2.5-flash")
.instruct("Plan a cool-down.")
.on(R.rising("temp").where(lambda v, _: v > 90), priority=10)
)
reactor = R.compile(cooler, tape=tape, bus=bus)
await reactor.run()
Run: uv run pytest examples/cookbook/81_reactor_native.py -v
Tip
What you’ll learn How to run inline smoke tests on agents.
Source: 81_reactor_native.py
from __future__ import annotations
import asyncio
import pytest
from adk_fluent import Agent, FanOut, Pipeline, R, ReactorPlugin, SessionTape
from adk_fluent._harness._event_bus import EventBus
@pytest.fixture(autouse=True)
def _reset():
R.clear()
yield
R.clear()
def test_r_signal_is_name_addressed() -> None:
"""``R.signal(name)`` is a get-or-create; same name → same instance."""
a = R.signal("temperature", 72)
b = R.signal("temperature")
assert a is b
assert R.get("temperature").value == 72
def test_r_predicates_are_name_addressed() -> None:
"""``R.rising('temp')`` resolves through the registry — no Signal object needed."""
R.signal("temperature", 0)
pred = R.rising("temperature").where(lambda v, prev: v > 90)
from adk_fluent._reactor._predicate import _Change
assert pred.matches(_Change("temperature", 95, 85)) # rising above 90
assert not pred.matches(_Change("temperature", 80, 95)) # falling
def test_on_attaches_rule_to_builder() -> None:
"""``.on(predicate)`` stores a declarative rule on the builder."""
R.signal("temp", 0)
agent = Agent("cooler").on(R.rising("temp"), lambda c: None, priority=10)
spec = agent._reactor_rules[0]
assert spec.predicate.deps == frozenset({"temp"})
assert spec.priority == 10
def test_compile_walks_composite_builders() -> None:
"""``R.compile`` picks up rules attached anywhere in a Pipeline / FanOut tree."""
R.signal("a", 0)
R.signal("b", 0)
pipeline = Pipeline("flow").step(
Agent("x").on(R.changed("a"), lambda c: None)
)
fanout = (
FanOut("parallel")
.branch(Agent("y").on(R.changed("a"), lambda c: None))
.branch(Agent("z").on(R.changed("b"), lambda c: None))
)
tape = SessionTape()
reactor_p = R.compile(pipeline, tape=tape)
reactor_f = R.compile(fanout, tape=tape)
assert len(reactor_p.rules) == 1
assert len(reactor_f.rules) == 2
@pytest.mark.asyncio
async def test_end_to_end_declarative_reactor() -> None:
"""Full chain: R.signal → .on(R.rising()) → R.compile → reactor fires handler."""
bus = EventBus()
tape = SessionTape()
bus.subscribe(tape.record)
R.attach(bus)
temp = R.signal("temp", 72)
fires: list[tuple] = []
async def handler(change) -> None:
fires.append((change.value, change.previous))
agent = (
Agent("cooler", "gemini-2.5-flash")
.instruct("Cool the building.")
.on(
R.rising("temp").where(lambda v, prev: v > 90),
handler,
priority=10,
)
)
reactor = R.compile(agent, tape=tape, bus=bus)
task = asyncio.create_task(reactor.run())
await asyncio.sleep(0.05)
temp.set(80) # rising but below 90 — no fire
temp.set(95) # rising above 90 — fires
temp.set(92) # falling — no fire
await asyncio.sleep(0.1)
reactor.stop()
await asyncio.sleep(0.02)
task.cancel()
assert fires == [(95, 80)]
@pytest.mark.asyncio
async def test_reactor_plugin_owns_lifecycle() -> None:
"""``ReactorPlugin`` starts/stops the reactor from ADK session callbacks."""
bus = EventBus()
tape = SessionTape()
bus.subscribe(tape.record)
R.attach(bus)
sig = R.signal("ping", False)
fired = asyncio.Event()
async def handler(_change) -> None:
fired.set()
agent = Agent("listener").on(R.changed("ping"), handler)
reactor = R.compile(agent, tape=tape, bus=bus)
plugin = ReactorPlugin(reactor)
await plugin.on_session_start()
sig.set(True)
await asyncio.wait_for(fired.wait(), timeout=0.3)
await plugin.on_session_end()
def test_debounce_throttle_are_immutable() -> None:
"""Fix from 0.17.0: ``.debounce()`` / ``.throttle()`` return fresh predicates."""
R.signal("temp", 0)
base = R.changed("temp")
debounced = base.debounce(50)
assert debounced is not base
assert base._debounce_ms == 0.0
assert debounced._debounce_ms == 50.0
if __name__ == "__main__":
pytest.main([__file__, "-v"])