gemini_adk_rs/live/
reactor.rs

1//! First-class reactor vocabulary for Live sessions.
2//!
3//! This module is intentionally small: it defines the normalized events,
4//! reactions, and typed effects that existing mechanisms can migrate onto
5//! incrementally without rewriting the current control plane in one step.
6
7use std::sync::Mutex;
8use std::time::{Duration, Instant};
9
10use gemini_genai_rs::prelude::Content;
11
12use crate::state::StateMutation;
13
14use super::events::LiveEvent;
15
16/// A normalized event that can drive ADK-level reactions.
17#[derive(Debug, Clone)]
18pub enum ReactorEvent {
19    /// Existing semantic Live event.
20    Live(LiveEvent),
21    /// State changed since the last reactor cursor.
22    StateChanged(Vec<StateMutation>),
23    /// Periodic tick for timers and sustained conditions.
24    TimerTick {
25        /// Time observed by the control lane for this tick.
26        now: Instant,
27    },
28    /// Client-side playback drained after model audio was generated.
29    PlaybackDrained {
30        /// Whether the control plane has armed a deferred model prompt.
31        prompt_pending: bool,
32    },
33    /// Client detected that the user started speaking.
34    UserSpeechStarted,
35    /// Client detected that the user stopped speaking.
36    UserSpeechEnded {
37        /// Whether the control plane has armed a deferred model prompt.
38        prompt_pending: bool,
39    },
40    /// User speech ended, but the model may or may not produce a turn.
41    SoftTurnComplete,
42}
43
44/// Voice-flow state owned by the reactor.
45#[derive(Debug, Clone, Default)]
46pub struct VoiceRuntimeState {
47    /// Whether the client believes the user is currently speaking.
48    pub user_speaking: bool,
49    /// Whether browser playback is believed to be active.
50    pub playback_active: bool,
51    /// Whether a deferred model prompt is armed.
52    pub prompt_pending: bool,
53    /// Monotonic epoch bumped whenever a prompt is cancelled or armed state changes.
54    pub prompt_epoch: u64,
55    /// Last time the client reported barge-in/user speech start.
56    pub last_barge_in_at: Option<Instant>,
57    /// Last time browser playback reported drained.
58    pub last_playback_drained_at: Option<Instant>,
59}
60
61impl VoiceRuntimeState {
62    /// Apply an incoming event to the voice runtime state before rules run.
63    pub fn apply_event(&mut self, event: &ReactorEvent) {
64        match event {
65            ReactorEvent::PlaybackDrained { prompt_pending } => {
66                self.playback_active = false;
67                self.prompt_pending = *prompt_pending;
68                self.last_playback_drained_at = Some(Instant::now());
69            }
70            ReactorEvent::UserSpeechStarted => {
71                self.user_speaking = true;
72                self.playback_active = false;
73                if self.prompt_pending {
74                    self.prompt_epoch = self.prompt_epoch.saturating_add(1);
75                }
76                self.prompt_pending = false;
77                self.last_barge_in_at = Some(Instant::now());
78            }
79            ReactorEvent::UserSpeechEnded { prompt_pending } => {
80                self.user_speaking = false;
81                self.prompt_pending = *prompt_pending && !self.playback_active;
82            }
83            _ => {}
84        }
85    }
86}
87
88/// Execution policy for an effect.
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct EffectPolicy {
91    /// Whether this effect blocks later effects from running.
92    pub mode: EffectMode,
93    /// Optional maximum time budget for the effect.
94    pub timeout: Option<Duration>,
95    /// Optional key used to collapse duplicate queued effects.
96    pub dedupe_key: Option<String>,
97    /// Optional cancellation group for replacing stale concurrent work.
98    pub cancel_scope: Option<String>,
99}
100
101impl Default for EffectPolicy {
102    fn default() -> Self {
103        Self {
104            mode: EffectMode::Blocking,
105            timeout: None,
106            dedupe_key: None,
107            cancel_scope: None,
108        }
109    }
110}
111
112/// Whether an effect must complete before the reactor continues.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum EffectMode {
115    /// Await this effect before continuing.
116    Blocking,
117    /// Run this effect independently of later effects.
118    Concurrent,
119}
120
121/// A typed runtime effect emitted by a reaction.
122#[derive(Debug, Clone)]
123pub enum LiveEffect {
124    /// No operation; useful for conditional reaction builders.
125    Noop,
126    /// Add state/context turns to the session.
127    SendContext(Vec<Content>),
128    /// Ask the model to generate from accumulated context.
129    PromptModel,
130    /// Cancel a deferred model prompt while leaving queued context intact.
131    CancelDeferredPrompt,
132    /// Tell the Live API that user speech activity started.
133    SignalUserActivityStart,
134    /// Tell the Live API that user speech activity ended.
135    SignalUserActivityEnd,
136    /// Replace or amend the active instruction.
137    UpdateInstruction(String),
138    /// Emit a semantic event for observers.
139    Emit(LiveEvent),
140    /// Request a phase transition by name.
141    TransitionPhase(String),
142}
143
144/// A policy-wrapped effect.
145#[derive(Debug, Clone)]
146pub struct Reaction {
147    /// Rule or subsystem that produced the reaction.
148    pub source: &'static str,
149    /// Runtime effect requested by the rule.
150    pub effect: LiveEffect,
151    /// Execution policy for the effect.
152    pub policy: EffectPolicy,
153}
154
155impl Reaction {
156    /// Create a blocking reaction.
157    pub fn blocking(source: &'static str, effect: LiveEffect) -> Self {
158        Self {
159            source,
160            effect,
161            policy: EffectPolicy::default(),
162        }
163    }
164
165    /// Create a concurrent reaction.
166    pub fn concurrent(source: &'static str, effect: LiveEffect) -> Self {
167        Self {
168            source,
169            effect,
170            policy: EffectPolicy {
171                mode: EffectMode::Concurrent,
172                ..EffectPolicy::default()
173            },
174        }
175    }
176}
177
178/// A rule that reacts to normalized events and emits typed effects.
179pub trait ReactorRule: Send + Sync {
180    /// Stable rule name for diagnostics and reaction provenance.
181    fn name(&self) -> &str;
182    /// Produce reactions for a normalized event.
183    fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction>;
184}
185
186/// Ordered collection of reactor rules.
187pub struct LiveReactor {
188    rules: Vec<Box<dyn ReactorRule>>,
189    voice: Mutex<VoiceRuntimeState>,
190}
191
192impl Default for LiveReactor {
193    fn default() -> Self {
194        Self {
195            rules: Vec::new(),
196            voice: Mutex::new(VoiceRuntimeState::default()),
197        }
198    }
199}
200
201impl LiveReactor {
202    /// Create an empty reactor.
203    pub fn new() -> Self {
204        Self::default()
205    }
206
207    /// Create a reactor with the default voice-flow rules.
208    pub fn voice_defaults() -> Self {
209        let mut reactor = Self::new();
210        reactor.add_rule(PromptOnPlaybackDrained);
211        reactor.add_rule(UserSpeechActivityRule);
212        reactor
213    }
214
215    /// Add a rule to the end of the ordered rule list.
216    pub fn add_rule(&mut self, rule: impl ReactorRule + 'static) {
217        self.rules.push(Box::new(rule));
218    }
219
220    /// Run all rules against an event and collect reactions in rule order.
221    pub fn react(&self, event: &ReactorEvent) -> Vec<Reaction> {
222        let voice = {
223            let mut voice = self.voice.lock().expect("voice reactor state poisoned");
224            voice.apply_event(event);
225            voice.clone()
226        };
227
228        self.rules
229            .iter()
230            .flat_map(|rule| rule.react(event, &voice))
231            .collect()
232    }
233
234    /// Return a snapshot of the current voice runtime state.
235    pub fn voice_state(&self) -> VoiceRuntimeState {
236        self.voice
237            .lock()
238            .expect("voice reactor state poisoned")
239            .clone()
240    }
241}
242
243/// Prompt the model when browser playback is fully drained and a prompt is armed.
244pub struct PromptOnPlaybackDrained;
245
246impl ReactorRule for PromptOnPlaybackDrained {
247    fn name(&self) -> &str {
248        "prompt_on_playback_drained"
249    }
250
251    fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction> {
252        if matches!(event, ReactorEvent::PlaybackDrained { .. })
253            && voice.prompt_pending
254            && !voice.user_speaking
255            && !voice.playback_active
256        {
257            vec![Reaction::blocking(
258                "prompt_on_playback_drained",
259                LiveEffect::PromptModel,
260            )]
261        } else {
262            Vec::new()
263        }
264    }
265}
266
267/// Cancel pending model prompts and signal activity around user speech.
268pub struct UserSpeechActivityRule;
269
270impl ReactorRule for UserSpeechActivityRule {
271    fn name(&self) -> &str {
272        "user_speech_activity"
273    }
274
275    fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction> {
276        match event {
277            ReactorEvent::UserSpeechStarted => vec![
278                Reaction::blocking("user_speech_activity", LiveEffect::CancelDeferredPrompt),
279                Reaction::blocking("user_speech_activity", LiveEffect::SignalUserActivityStart),
280            ],
281            ReactorEvent::UserSpeechEnded { .. } => {
282                let mut reactions = vec![Reaction::blocking(
283                    "user_speech_activity",
284                    LiveEffect::SignalUserActivityEnd,
285                )];
286                if voice.prompt_pending && !voice.playback_active {
287                    reactions.push(Reaction::blocking(
288                        "user_speech_activity",
289                        LiveEffect::PromptModel,
290                    ));
291                }
292                reactions
293            }
294            _ => Vec::new(),
295        }
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn reactor_collects_reactions_in_rule_order() {
305        let mut reactor = LiveReactor::new();
306        reactor.add_rule(PromptOnPlaybackDrained);
307
308        let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
309            prompt_pending: true,
310        });
311        assert_eq!(reactions.len(), 1);
312        assert_eq!(reactions[0].source, "prompt_on_playback_drained");
313        assert_eq!(reactions[0].policy.mode, EffectMode::Blocking);
314        assert!(matches!(reactions[0].effect, LiveEffect::PromptModel));
315    }
316
317    #[test]
318    fn playback_drained_without_pending_prompt_is_noop() {
319        let reactor = LiveReactor::voice_defaults();
320
321        let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
322            prompt_pending: false,
323        });
324
325        assert!(reactions.is_empty());
326    }
327
328    #[test]
329    fn user_speech_started_cancels_prompt_and_signals_activity() {
330        let reactor = LiveReactor::voice_defaults();
331
332        let prompt_reactions = reactor.react(&ReactorEvent::PlaybackDrained {
333            prompt_pending: true,
334        });
335        assert_eq!(prompt_reactions.len(), 1);
336
337        let reactions = reactor.react(&ReactorEvent::UserSpeechStarted);
338
339        assert_eq!(reactions.len(), 2);
340        assert!(matches!(
341            reactions[0].effect,
342            LiveEffect::CancelDeferredPrompt
343        ));
344        assert!(matches!(
345            reactions[1].effect,
346            LiveEffect::SignalUserActivityStart
347        ));
348        let voice = reactor.voice_state();
349        assert!(voice.user_speaking);
350        assert!(!voice.prompt_pending);
351        assert_eq!(voice.prompt_epoch, 1);
352    }
353
354    #[test]
355    fn user_speech_ended_signals_activity_end() {
356        let reactor = LiveReactor::voice_defaults();
357
358        let reactions = reactor.react(&ReactorEvent::UserSpeechEnded {
359            prompt_pending: false,
360        });
361
362        assert_eq!(reactions.len(), 1);
363        assert!(matches!(
364            reactions[0].effect,
365            LiveEffect::SignalUserActivityEnd
366        ));
367        assert!(!reactor.voice_state().user_speaking);
368    }
369
370    #[test]
371    fn speech_end_prompts_when_playback_already_drained_and_prompt_pending() {
372        let reactor = LiveReactor::voice_defaults();
373
374        reactor.react(&ReactorEvent::UserSpeechStarted);
375        let drain_reactions = reactor.react(&ReactorEvent::PlaybackDrained {
376            prompt_pending: true,
377        });
378        assert!(drain_reactions.is_empty());
379
380        let reactions = reactor.react(&ReactorEvent::UserSpeechEnded {
381            prompt_pending: true,
382        });
383
384        assert_eq!(reactions.len(), 2);
385        assert!(matches!(
386            reactions[0].effect,
387            LiveEffect::SignalUserActivityEnd
388        ));
389        assert!(matches!(reactions[1].effect, LiveEffect::PromptModel));
390    }
391
392    #[test]
393    fn playback_drained_does_not_prompt_while_user_is_speaking() {
394        let reactor = LiveReactor::voice_defaults();
395
396        reactor.react(&ReactorEvent::UserSpeechStarted);
397        let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
398            prompt_pending: true,
399        });
400
401        assert!(reactions.is_empty());
402        let voice = reactor.voice_state();
403        assert!(voice.user_speaking);
404        assert!(voice.prompt_pending);
405    }
406
407    #[test]
408    fn speech_start_wins_after_pending_prompt_snapshot() {
409        let reactor = LiveReactor::voice_defaults();
410
411        reactor.react(&ReactorEvent::PlaybackDrained {
412            prompt_pending: true,
413        });
414        reactor.react(&ReactorEvent::UserSpeechStarted);
415        let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
416            prompt_pending: false,
417        });
418
419        assert!(reactions.is_empty());
420        let voice = reactor.voice_state();
421        assert!(voice.user_speaking);
422        assert!(!voice.prompt_pending);
423        assert_eq!(voice.prompt_epoch, 1);
424    }
425}