1use std::sync::Mutex;
8use std::time::{Duration, Instant};
9
10use gemini_genai_rs::prelude::Content;
11
12use crate::state::StateMutation;
13
14use super::events::LiveEvent;
15
16#[derive(Debug, Clone)]
18pub enum ReactorEvent {
19 Live(LiveEvent),
21 StateChanged(Vec<StateMutation>),
23 TimerTick {
25 now: Instant,
27 },
28 PlaybackDrained {
30 prompt_pending: bool,
32 },
33 UserSpeechStarted,
35 UserSpeechEnded {
37 prompt_pending: bool,
39 },
40 SoftTurnComplete,
42}
43
44#[derive(Debug, Clone, Default)]
46pub struct VoiceRuntimeState {
47 pub user_speaking: bool,
49 pub playback_active: bool,
51 pub prompt_pending: bool,
53 pub prompt_epoch: u64,
55 pub last_barge_in_at: Option<Instant>,
57 pub last_playback_drained_at: Option<Instant>,
59}
60
61impl VoiceRuntimeState {
62 pub fn apply_event(&mut self, event: &ReactorEvent) {
64 match event {
65 ReactorEvent::PlaybackDrained { prompt_pending } => {
66 self.playback_active = false;
67 self.prompt_pending = *prompt_pending;
68 self.last_playback_drained_at = Some(Instant::now());
69 }
70 ReactorEvent::UserSpeechStarted => {
71 self.user_speaking = true;
72 self.playback_active = false;
73 if self.prompt_pending {
74 self.prompt_epoch = self.prompt_epoch.saturating_add(1);
75 }
76 self.prompt_pending = false;
77 self.last_barge_in_at = Some(Instant::now());
78 }
79 ReactorEvent::UserSpeechEnded { prompt_pending } => {
80 self.user_speaking = false;
81 self.prompt_pending = *prompt_pending && !self.playback_active;
82 }
83 _ => {}
84 }
85 }
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct EffectPolicy {
91 pub mode: EffectMode,
93 pub timeout: Option<Duration>,
95 pub dedupe_key: Option<String>,
97 pub cancel_scope: Option<String>,
99}
100
101impl Default for EffectPolicy {
102 fn default() -> Self {
103 Self {
104 mode: EffectMode::Blocking,
105 timeout: None,
106 dedupe_key: None,
107 cancel_scope: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum EffectMode {
115 Blocking,
117 Concurrent,
119}
120
121#[derive(Debug, Clone)]
123pub enum LiveEffect {
124 Noop,
126 SendContext(Vec<Content>),
128 PromptModel,
130 CancelDeferredPrompt,
132 SignalUserActivityStart,
134 SignalUserActivityEnd,
136 UpdateInstruction(String),
138 Emit(LiveEvent),
140 TransitionPhase(String),
142}
143
144#[derive(Debug, Clone)]
146pub struct Reaction {
147 pub source: &'static str,
149 pub effect: LiveEffect,
151 pub policy: EffectPolicy,
153}
154
155impl Reaction {
156 pub fn blocking(source: &'static str, effect: LiveEffect) -> Self {
158 Self {
159 source,
160 effect,
161 policy: EffectPolicy::default(),
162 }
163 }
164
165 pub fn concurrent(source: &'static str, effect: LiveEffect) -> Self {
167 Self {
168 source,
169 effect,
170 policy: EffectPolicy {
171 mode: EffectMode::Concurrent,
172 ..EffectPolicy::default()
173 },
174 }
175 }
176}
177
178pub trait ReactorRule: Send + Sync {
180 fn name(&self) -> &str;
182 fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction>;
184}
185
186pub struct LiveReactor {
188 rules: Vec<Box<dyn ReactorRule>>,
189 voice: Mutex<VoiceRuntimeState>,
190}
191
192impl Default for LiveReactor {
193 fn default() -> Self {
194 Self {
195 rules: Vec::new(),
196 voice: Mutex::new(VoiceRuntimeState::default()),
197 }
198 }
199}
200
201impl LiveReactor {
202 pub fn new() -> Self {
204 Self::default()
205 }
206
207 pub fn voice_defaults() -> Self {
209 let mut reactor = Self::new();
210 reactor.add_rule(PromptOnPlaybackDrained);
211 reactor.add_rule(UserSpeechActivityRule);
212 reactor
213 }
214
215 pub fn add_rule(&mut self, rule: impl ReactorRule + 'static) {
217 self.rules.push(Box::new(rule));
218 }
219
220 pub fn react(&self, event: &ReactorEvent) -> Vec<Reaction> {
222 let voice = {
223 let mut voice = self.voice.lock().expect("voice reactor state poisoned");
224 voice.apply_event(event);
225 voice.clone()
226 };
227
228 self.rules
229 .iter()
230 .flat_map(|rule| rule.react(event, &voice))
231 .collect()
232 }
233
234 pub fn voice_state(&self) -> VoiceRuntimeState {
236 self.voice
237 .lock()
238 .expect("voice reactor state poisoned")
239 .clone()
240 }
241}
242
243pub struct PromptOnPlaybackDrained;
245
246impl ReactorRule for PromptOnPlaybackDrained {
247 fn name(&self) -> &str {
248 "prompt_on_playback_drained"
249 }
250
251 fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction> {
252 if matches!(event, ReactorEvent::PlaybackDrained { .. })
253 && voice.prompt_pending
254 && !voice.user_speaking
255 && !voice.playback_active
256 {
257 vec![Reaction::blocking(
258 "prompt_on_playback_drained",
259 LiveEffect::PromptModel,
260 )]
261 } else {
262 Vec::new()
263 }
264 }
265}
266
267pub struct UserSpeechActivityRule;
269
270impl ReactorRule for UserSpeechActivityRule {
271 fn name(&self) -> &str {
272 "user_speech_activity"
273 }
274
275 fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction> {
276 match event {
277 ReactorEvent::UserSpeechStarted => vec![
278 Reaction::blocking("user_speech_activity", LiveEffect::CancelDeferredPrompt),
279 Reaction::blocking("user_speech_activity", LiveEffect::SignalUserActivityStart),
280 ],
281 ReactorEvent::UserSpeechEnded { .. } => {
282 let mut reactions = vec![Reaction::blocking(
283 "user_speech_activity",
284 LiveEffect::SignalUserActivityEnd,
285 )];
286 if voice.prompt_pending && !voice.playback_active {
287 reactions.push(Reaction::blocking(
288 "user_speech_activity",
289 LiveEffect::PromptModel,
290 ));
291 }
292 reactions
293 }
294 _ => Vec::new(),
295 }
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn reactor_collects_reactions_in_rule_order() {
305 let mut reactor = LiveReactor::new();
306 reactor.add_rule(PromptOnPlaybackDrained);
307
308 let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
309 prompt_pending: true,
310 });
311 assert_eq!(reactions.len(), 1);
312 assert_eq!(reactions[0].source, "prompt_on_playback_drained");
313 assert_eq!(reactions[0].policy.mode, EffectMode::Blocking);
314 assert!(matches!(reactions[0].effect, LiveEffect::PromptModel));
315 }
316
317 #[test]
318 fn playback_drained_without_pending_prompt_is_noop() {
319 let reactor = LiveReactor::voice_defaults();
320
321 let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
322 prompt_pending: false,
323 });
324
325 assert!(reactions.is_empty());
326 }
327
328 #[test]
329 fn user_speech_started_cancels_prompt_and_signals_activity() {
330 let reactor = LiveReactor::voice_defaults();
331
332 let prompt_reactions = reactor.react(&ReactorEvent::PlaybackDrained {
333 prompt_pending: true,
334 });
335 assert_eq!(prompt_reactions.len(), 1);
336
337 let reactions = reactor.react(&ReactorEvent::UserSpeechStarted);
338
339 assert_eq!(reactions.len(), 2);
340 assert!(matches!(
341 reactions[0].effect,
342 LiveEffect::CancelDeferredPrompt
343 ));
344 assert!(matches!(
345 reactions[1].effect,
346 LiveEffect::SignalUserActivityStart
347 ));
348 let voice = reactor.voice_state();
349 assert!(voice.user_speaking);
350 assert!(!voice.prompt_pending);
351 assert_eq!(voice.prompt_epoch, 1);
352 }
353
354 #[test]
355 fn user_speech_ended_signals_activity_end() {
356 let reactor = LiveReactor::voice_defaults();
357
358 let reactions = reactor.react(&ReactorEvent::UserSpeechEnded {
359 prompt_pending: false,
360 });
361
362 assert_eq!(reactions.len(), 1);
363 assert!(matches!(
364 reactions[0].effect,
365 LiveEffect::SignalUserActivityEnd
366 ));
367 assert!(!reactor.voice_state().user_speaking);
368 }
369
370 #[test]
371 fn speech_end_prompts_when_playback_already_drained_and_prompt_pending() {
372 let reactor = LiveReactor::voice_defaults();
373
374 reactor.react(&ReactorEvent::UserSpeechStarted);
375 let drain_reactions = reactor.react(&ReactorEvent::PlaybackDrained {
376 prompt_pending: true,
377 });
378 assert!(drain_reactions.is_empty());
379
380 let reactions = reactor.react(&ReactorEvent::UserSpeechEnded {
381 prompt_pending: true,
382 });
383
384 assert_eq!(reactions.len(), 2);
385 assert!(matches!(
386 reactions[0].effect,
387 LiveEffect::SignalUserActivityEnd
388 ));
389 assert!(matches!(reactions[1].effect, LiveEffect::PromptModel));
390 }
391
392 #[test]
393 fn playback_drained_does_not_prompt_while_user_is_speaking() {
394 let reactor = LiveReactor::voice_defaults();
395
396 reactor.react(&ReactorEvent::UserSpeechStarted);
397 let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
398 prompt_pending: true,
399 });
400
401 assert!(reactions.is_empty());
402 let voice = reactor.voice_state();
403 assert!(voice.user_speaking);
404 assert!(voice.prompt_pending);
405 }
406
407 #[test]
408 fn speech_start_wins_after_pending_prompt_snapshot() {
409 let reactor = LiveReactor::voice_defaults();
410
411 reactor.react(&ReactorEvent::PlaybackDrained {
412 prompt_pending: true,
413 });
414 reactor.react(&ReactorEvent::UserSpeechStarted);
415 let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
416 prompt_pending: false,
417 });
418
419 assert!(reactions.is_empty());
420 let voice = reactor.voice_state();
421 assert!(voice.user_speaking);
422 assert!(!voice.prompt_pending);
423 assert_eq!(voice.prompt_epoch, 1);
424 }
425}