gemini_adk_rs/live/
reactor.rs

1//! First-class reactor vocabulary for Live sessions.
2//!
3//! This module is intentionally small: it defines the normalized events,
4//! reactions, and typed effects that existing mechanisms can migrate onto
5//! incrementally without rewriting the current control plane in one step.
6
7use std::sync::Mutex;
8use std::time::{Duration, Instant};
9
10use gemini_genai_rs::prelude::Content;
11
12use crate::state::StateMutation;
13
14use super::events::LiveEvent;
15
16/// A normalized event that can drive ADK-level reactions.
17#[derive(Debug, Clone)]
18pub enum ReactorEvent {
19    /// Existing semantic Live event.
20    Live(LiveEvent),
21    /// State changed since the last reactor cursor.
22    StateChanged(Vec<StateMutation>),
23    /// Periodic tick for timers and sustained conditions.
24    TimerTick {
25        /// Time observed by the control lane for this tick.
26        now: Instant,
27    },
28    /// Client-side playback drained after model audio was generated.
29    PlaybackDrained {
30        /// Whether the control plane has armed a deferred model prompt.
31        prompt_pending: bool,
32    },
33    /// Client detected that the user started speaking.
34    UserSpeechStarted,
35    /// Client detected that the user stopped speaking.
36    UserSpeechEnded {
37        /// Whether the control plane has armed a deferred model prompt.
38        prompt_pending: bool,
39    },
40    /// User speech ended, but the model may or may not produce a turn.
41    SoftTurnComplete,
42}
43
44/// Voice-flow state owned by the reactor.
45#[derive(Debug, Clone, Default)]
46pub struct VoiceRuntimeState {
47    /// Whether the client believes the user is currently speaking.
48    pub user_speaking: bool,
49    /// Whether browser playback is believed to be active.
50    pub playback_active: bool,
51    /// Whether a deferred model prompt is armed.
52    pub prompt_pending: bool,
53    /// Monotonic epoch bumped whenever a prompt is cancelled or armed state changes.
54    pub prompt_epoch: u64,
55    /// Last time the client reported barge-in/user speech start.
56    pub last_barge_in_at: Option<Instant>,
57    /// Last time browser playback reported drained.
58    pub last_playback_drained_at: Option<Instant>,
59}
60
61impl VoiceRuntimeState {
62    /// Apply an incoming event to the voice runtime state before rules run.
63    pub fn apply_event(&mut self, event: &ReactorEvent) {
64        match event {
65            ReactorEvent::PlaybackDrained { prompt_pending } => {
66                self.playback_active = false;
67                self.prompt_pending = *prompt_pending;
68                self.last_playback_drained_at = Some(Instant::now());
69            }
70            ReactorEvent::UserSpeechStarted => {
71                self.user_speaking = true;
72                self.playback_active = false;
73                if self.prompt_pending {
74                    self.prompt_epoch = self.prompt_epoch.saturating_add(1);
75                }
76                self.prompt_pending = false;
77                self.last_barge_in_at = Some(Instant::now());
78            }
79            ReactorEvent::UserSpeechEnded { prompt_pending } => {
80                self.user_speaking = false;
81                self.prompt_pending = *prompt_pending && !self.playback_active;
82            }
83            _ => {}
84        }
85    }
86}
87
88/// Execution policy for an effect.
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct EffectPolicy {
91    /// Whether this effect blocks later effects from running.
92    pub mode: EffectMode,
93    /// Optional maximum time budget for the effect.
94    pub timeout: Option<Duration>,
95}
96
97impl Default for EffectPolicy {
98    fn default() -> Self {
99        Self {
100            mode: EffectMode::Blocking,
101            timeout: None,
102        }
103    }
104}
105
106/// Whether an effect must complete before the reactor continues.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum EffectMode {
109    /// Await this effect before continuing.
110    Blocking,
111    /// Run this effect independently of later effects.
112    Concurrent,
113}
114
115/// A typed runtime effect emitted by a reaction.
116#[derive(Debug, Clone)]
117pub enum LiveEffect {
118    /// No operation; useful for conditional reaction builders.
119    Noop,
120    /// Add state/context turns to the session.
121    SendContext(Vec<Content>),
122    /// Ask the model to generate from accumulated context.
123    PromptModel,
124    /// Cancel a deferred model prompt while leaving queued context intact.
125    CancelDeferredPrompt,
126    /// Tell the Live API that user speech activity started.
127    SignalUserActivityStart,
128    /// Tell the Live API that user speech activity ended.
129    SignalUserActivityEnd,
130    /// Replace or amend the active instruction.
131    UpdateInstruction(String),
132    /// Emit a semantic event for observers.
133    Emit(LiveEvent),
134}
135
136/// A policy-wrapped effect.
137#[derive(Debug, Clone)]
138pub struct Reaction {
139    /// Rule or subsystem that produced the reaction.
140    pub source: &'static str,
141    /// Runtime effect requested by the rule.
142    pub effect: LiveEffect,
143    /// Execution policy for the effect.
144    pub policy: EffectPolicy,
145}
146
147impl Reaction {
148    /// Create a blocking reaction.
149    pub fn blocking(source: &'static str, effect: LiveEffect) -> Self {
150        Self {
151            source,
152            effect,
153            policy: EffectPolicy::default(),
154        }
155    }
156
157    /// Create a concurrent reaction.
158    pub fn concurrent(source: &'static str, effect: LiveEffect) -> Self {
159        Self {
160            source,
161            effect,
162            policy: EffectPolicy {
163                mode: EffectMode::Concurrent,
164                ..EffectPolicy::default()
165            },
166        }
167    }
168}
169
170/// A rule that reacts to normalized events and emits typed effects.
171pub trait ReactorRule: Send + Sync {
172    /// Stable rule name for diagnostics and reaction provenance.
173    fn name(&self) -> &str;
174    /// Produce reactions for a normalized event.
175    fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction>;
176}
177
178/// Ordered collection of reactor rules.
179pub struct LiveReactor {
180    rules: Vec<Box<dyn ReactorRule>>,
181    voice: Mutex<VoiceRuntimeState>,
182}
183
184impl Default for LiveReactor {
185    fn default() -> Self {
186        Self {
187            rules: Vec::new(),
188            voice: Mutex::new(VoiceRuntimeState::default()),
189        }
190    }
191}
192
193impl LiveReactor {
194    /// Create an empty reactor.
195    pub fn new() -> Self {
196        Self::default()
197    }
198
199    /// Create a reactor with the default voice-flow rules.
200    pub fn voice_defaults() -> Self {
201        let mut reactor = Self::new();
202        reactor.add_rule(PromptOnPlaybackDrained);
203        reactor.add_rule(UserSpeechActivityRule);
204        reactor
205    }
206
207    /// Add a rule to the end of the ordered rule list.
208    pub fn add_rule(&mut self, rule: impl ReactorRule + 'static) {
209        self.rules.push(Box::new(rule));
210    }
211
212    /// Run all rules against an event and collect reactions in rule order.
213    pub fn react(&self, event: &ReactorEvent) -> Vec<Reaction> {
214        let voice = {
215            let mut voice = self.voice.lock().expect("voice reactor state poisoned");
216            voice.apply_event(event);
217            voice.clone()
218        };
219
220        self.rules
221            .iter()
222            .flat_map(|rule| rule.react(event, &voice))
223            .collect()
224    }
225
226    /// Return a snapshot of the current voice runtime state.
227    pub fn voice_state(&self) -> VoiceRuntimeState {
228        self.voice
229            .lock()
230            .expect("voice reactor state poisoned")
231            .clone()
232    }
233}
234
235/// Prompt the model when browser playback is fully drained and a prompt is armed.
236pub struct PromptOnPlaybackDrained;
237
238impl ReactorRule for PromptOnPlaybackDrained {
239    fn name(&self) -> &str {
240        "prompt_on_playback_drained"
241    }
242
243    fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction> {
244        if matches!(event, ReactorEvent::PlaybackDrained { .. })
245            && voice.prompt_pending
246            && !voice.user_speaking
247            && !voice.playback_active
248        {
249            vec![Reaction::blocking(
250                "prompt_on_playback_drained",
251                LiveEffect::PromptModel,
252            )]
253        } else {
254            Vec::new()
255        }
256    }
257}
258
259/// Cancel pending model prompts and signal activity around user speech.
260pub struct UserSpeechActivityRule;
261
262impl ReactorRule for UserSpeechActivityRule {
263    fn name(&self) -> &str {
264        "user_speech_activity"
265    }
266
267    fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction> {
268        match event {
269            ReactorEvent::UserSpeechStarted => vec![
270                Reaction::blocking("user_speech_activity", LiveEffect::CancelDeferredPrompt),
271                Reaction::blocking("user_speech_activity", LiveEffect::SignalUserActivityStart),
272            ],
273            ReactorEvent::UserSpeechEnded { .. } => {
274                let mut reactions = vec![Reaction::blocking(
275                    "user_speech_activity",
276                    LiveEffect::SignalUserActivityEnd,
277                )];
278                if voice.prompt_pending && !voice.playback_active {
279                    reactions.push(Reaction::blocking(
280                        "user_speech_activity",
281                        LiveEffect::PromptModel,
282                    ));
283                }
284                reactions
285            }
286            _ => Vec::new(),
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    #[test]
296    fn reactor_collects_reactions_in_rule_order() {
297        let mut reactor = LiveReactor::new();
298        reactor.add_rule(PromptOnPlaybackDrained);
299
300        let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
301            prompt_pending: true,
302        });
303        assert_eq!(reactions.len(), 1);
304        assert_eq!(reactions[0].source, "prompt_on_playback_drained");
305        assert_eq!(reactions[0].policy.mode, EffectMode::Blocking);
306        assert!(matches!(reactions[0].effect, LiveEffect::PromptModel));
307    }
308
309    #[test]
310    fn playback_drained_without_pending_prompt_is_noop() {
311        let reactor = LiveReactor::voice_defaults();
312
313        let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
314            prompt_pending: false,
315        });
316
317        assert!(reactions.is_empty());
318    }
319
320    #[test]
321    fn user_speech_started_cancels_prompt_and_signals_activity() {
322        let reactor = LiveReactor::voice_defaults();
323
324        let prompt_reactions = reactor.react(&ReactorEvent::PlaybackDrained {
325            prompt_pending: true,
326        });
327        assert_eq!(prompt_reactions.len(), 1);
328
329        let reactions = reactor.react(&ReactorEvent::UserSpeechStarted);
330
331        assert_eq!(reactions.len(), 2);
332        assert!(matches!(
333            reactions[0].effect,
334            LiveEffect::CancelDeferredPrompt
335        ));
336        assert!(matches!(
337            reactions[1].effect,
338            LiveEffect::SignalUserActivityStart
339        ));
340        let voice = reactor.voice_state();
341        assert!(voice.user_speaking);
342        assert!(!voice.prompt_pending);
343        assert_eq!(voice.prompt_epoch, 1);
344    }
345
346    #[test]
347    fn user_speech_ended_signals_activity_end() {
348        let reactor = LiveReactor::voice_defaults();
349
350        let reactions = reactor.react(&ReactorEvent::UserSpeechEnded {
351            prompt_pending: false,
352        });
353
354        assert_eq!(reactions.len(), 1);
355        assert!(matches!(
356            reactions[0].effect,
357            LiveEffect::SignalUserActivityEnd
358        ));
359        assert!(!reactor.voice_state().user_speaking);
360    }
361
362    #[test]
363    fn speech_end_prompts_when_playback_already_drained_and_prompt_pending() {
364        let reactor = LiveReactor::voice_defaults();
365
366        reactor.react(&ReactorEvent::UserSpeechStarted);
367        let drain_reactions = reactor.react(&ReactorEvent::PlaybackDrained {
368            prompt_pending: true,
369        });
370        assert!(drain_reactions.is_empty());
371
372        let reactions = reactor.react(&ReactorEvent::UserSpeechEnded {
373            prompt_pending: true,
374        });
375
376        assert_eq!(reactions.len(), 2);
377        assert!(matches!(
378            reactions[0].effect,
379            LiveEffect::SignalUserActivityEnd
380        ));
381        assert!(matches!(reactions[1].effect, LiveEffect::PromptModel));
382    }
383
384    #[test]
385    fn playback_drained_does_not_prompt_while_user_is_speaking() {
386        let reactor = LiveReactor::voice_defaults();
387
388        reactor.react(&ReactorEvent::UserSpeechStarted);
389        let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
390            prompt_pending: true,
391        });
392
393        assert!(reactions.is_empty());
394        let voice = reactor.voice_state();
395        assert!(voice.user_speaking);
396        assert!(voice.prompt_pending);
397    }
398
399    #[test]
400    fn speech_start_wins_after_pending_prompt_snapshot() {
401        let reactor = LiveReactor::voice_defaults();
402
403        reactor.react(&ReactorEvent::PlaybackDrained {
404            prompt_pending: true,
405        });
406        reactor.react(&ReactorEvent::UserSpeechStarted);
407        let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
408            prompt_pending: false,
409        });
410
411        assert!(reactions.is_empty());
412        let voice = reactor.voice_state();
413        assert!(voice.user_speaking);
414        assert!(!voice.prompt_pending);
415        assert_eq!(voice.prompt_epoch, 1);
416    }
417}