Signals + Reactor — reactive state over the durable tape.¶
Shows the three reactive primitives added in Phase F / Phase G / auto-tracking:
- class:
Signal— typed state cell with version tracking. Mutations emit :class:SignalChangedon the ambient :class:EventBus(which in turn lands on the :class:SessionTape).
- class:
Reactor— cursor-following scheduler. Registered rules fire when their :class:SignalPredicatematches a change on the tape.
computed(name, fn)— derived signal that auto-tracks reads of other signals and re-runs on any dep change.
Run: uv run pytest examples/cookbook/80_reactor_basic.py -v
Tip
What you’ll learn How to manage interactive sessions with agents.
Source: 80_reactor_basic.py
from __future__ import annotations
import asyncio
import pytest
from adk_fluent import H, Reactor, Signal, computed
def test_signal_emits_on_change() -> None:
"""Setting a new value emits; repeating the value is a no-op."""
bus = H.event_bus()
tape = bus.tape()
temp = Signal("temperature", 72.0, bus=bus)
assert temp.set(85.0) is True
assert temp.set(85.0) is False # equal — skipped
assert temp.set(85.0, force=True) is True
kinds = [e["kind"] for e in tape.events]
assert kinds.count("signal_changed") == 2
def test_signal_predicate_filters_by_edge() -> None:
"""``.rising`` only matches when the value actually rises above 90."""
temp = Signal("temperature", 72.0)
rising_90 = temp.rising.where(lambda v, prev: v > 90)
from adk_fluent._reactor._predicate import _Change
assert rising_90.matches(_Change("temperature", 95.0, 89.0)) # rising above 90
assert not rising_90.matches(_Change("temperature", 80.0, 95.0)) # falling
assert not rising_90.matches(_Change("temperature", 85.0, 72.0)) # rising but below 90
def test_computed_auto_tracks_reads() -> None:
"""``computed`` re-runs when any signal it read changes."""
price = Signal("price", 100.0)
tax_rate = Signal("tax", 0.1)
total = computed("total", lambda: price.get() * (1 + tax_rate.get()))
assert total.get() == pytest.approx(110.0)
price.set(200.0)
assert total.get() == pytest.approx(220.0)
tax_rate.set(0.2)
assert total.get() == pytest.approx(240.0)
@pytest.mark.asyncio
async def test_reactor_fires_rule_on_signal() -> None:
"""End-to-end: signal change → predicate match → handler fires."""
bus = H.event_bus()
tape = bus.tape()
temp = Signal("temp", 72.0, bus=bus)
alerts: list[float] = []
done = asyncio.Event()
async def alert_handler(change) -> None: # noqa: ANN001
alerts.append(change.value)
done.set()
reactor = Reactor(tape, bus=bus)
reactor.when(temp.rising.where(lambda v, prev: v > 90), alert_handler, name="hot_alert", priority=10)
# Run the reactor in the background with a small budget so the test ends.
run_task = asyncio.create_task(reactor.run(budget=1))
await asyncio.sleep(0) # hand control to the reactor
temp.set(95.0) # should trip the predicate
fires = await asyncio.wait_for(run_task, timeout=1.0)
assert fires == 1
# Wait for the spawned handler task to complete.
await asyncio.wait_for(done.wait(), timeout=1.0)
assert alerts == [95.0]
if __name__ == "__main__":
pytest.main([__file__, "-v"])