1use std::sync::Mutex;
8use std::time::{Duration, Instant};
9
10use gemini_genai_rs::prelude::Content;
11
12use crate::state::StateMutation;
13
14use super::events::LiveEvent;
15
16#[derive(Debug, Clone)]
18pub enum ReactorEvent {
19 Live(LiveEvent),
21 StateChanged(Vec<StateMutation>),
23 TimerTick {
25 now: Instant,
27 },
28 PlaybackDrained {
30 prompt_pending: bool,
32 },
33 UserSpeechStarted,
35 UserSpeechEnded {
37 prompt_pending: bool,
39 },
40 SoftTurnComplete,
42}
43
44#[derive(Debug, Clone, Default)]
46pub struct VoiceRuntimeState {
47 pub user_speaking: bool,
49 pub playback_active: bool,
51 pub prompt_pending: bool,
53 pub prompt_epoch: u64,
55 pub last_barge_in_at: Option<Instant>,
57 pub last_playback_drained_at: Option<Instant>,
59}
60
61impl VoiceRuntimeState {
62 pub fn apply_event(&mut self, event: &ReactorEvent) {
64 match event {
65 ReactorEvent::PlaybackDrained { prompt_pending } => {
66 self.playback_active = false;
67 self.prompt_pending = *prompt_pending;
68 self.last_playback_drained_at = Some(Instant::now());
69 }
70 ReactorEvent::UserSpeechStarted => {
71 self.user_speaking = true;
72 self.playback_active = false;
73 if self.prompt_pending {
74 self.prompt_epoch = self.prompt_epoch.saturating_add(1);
75 }
76 self.prompt_pending = false;
77 self.last_barge_in_at = Some(Instant::now());
78 }
79 ReactorEvent::UserSpeechEnded { prompt_pending } => {
80 self.user_speaking = false;
81 self.prompt_pending = *prompt_pending && !self.playback_active;
82 }
83 _ => {}
84 }
85 }
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct EffectPolicy {
91 pub mode: EffectMode,
93 pub timeout: Option<Duration>,
95}
96
97impl Default for EffectPolicy {
98 fn default() -> Self {
99 Self {
100 mode: EffectMode::Blocking,
101 timeout: None,
102 }
103 }
104}
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum EffectMode {
109 Blocking,
111 Concurrent,
113}
114
115#[derive(Debug, Clone)]
117pub enum LiveEffect {
118 Noop,
120 SendContext(Vec<Content>),
122 PromptModel,
124 CancelDeferredPrompt,
126 SignalUserActivityStart,
128 SignalUserActivityEnd,
130 UpdateInstruction(String),
132 Emit(LiveEvent),
134}
135
136#[derive(Debug, Clone)]
138pub struct Reaction {
139 pub source: &'static str,
141 pub effect: LiveEffect,
143 pub policy: EffectPolicy,
145}
146
147impl Reaction {
148 pub fn blocking(source: &'static str, effect: LiveEffect) -> Self {
150 Self {
151 source,
152 effect,
153 policy: EffectPolicy::default(),
154 }
155 }
156
157 pub fn concurrent(source: &'static str, effect: LiveEffect) -> Self {
159 Self {
160 source,
161 effect,
162 policy: EffectPolicy {
163 mode: EffectMode::Concurrent,
164 ..EffectPolicy::default()
165 },
166 }
167 }
168}
169
170pub trait ReactorRule: Send + Sync {
172 fn name(&self) -> &str;
174 fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction>;
176}
177
178pub struct LiveReactor {
180 rules: Vec<Box<dyn ReactorRule>>,
181 voice: Mutex<VoiceRuntimeState>,
182}
183
184impl Default for LiveReactor {
185 fn default() -> Self {
186 Self {
187 rules: Vec::new(),
188 voice: Mutex::new(VoiceRuntimeState::default()),
189 }
190 }
191}
192
193impl LiveReactor {
194 pub fn new() -> Self {
196 Self::default()
197 }
198
199 pub fn voice_defaults() -> Self {
201 let mut reactor = Self::new();
202 reactor.add_rule(PromptOnPlaybackDrained);
203 reactor.add_rule(UserSpeechActivityRule);
204 reactor
205 }
206
207 pub fn add_rule(&mut self, rule: impl ReactorRule + 'static) {
209 self.rules.push(Box::new(rule));
210 }
211
212 pub fn react(&self, event: &ReactorEvent) -> Vec<Reaction> {
214 let voice = {
215 let mut voice = self.voice.lock().expect("voice reactor state poisoned");
216 voice.apply_event(event);
217 voice.clone()
218 };
219
220 self.rules
221 .iter()
222 .flat_map(|rule| rule.react(event, &voice))
223 .collect()
224 }
225
226 pub fn voice_state(&self) -> VoiceRuntimeState {
228 self.voice
229 .lock()
230 .expect("voice reactor state poisoned")
231 .clone()
232 }
233}
234
235pub struct PromptOnPlaybackDrained;
237
238impl ReactorRule for PromptOnPlaybackDrained {
239 fn name(&self) -> &str {
240 "prompt_on_playback_drained"
241 }
242
243 fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction> {
244 if matches!(event, ReactorEvent::PlaybackDrained { .. })
245 && voice.prompt_pending
246 && !voice.user_speaking
247 && !voice.playback_active
248 {
249 vec![Reaction::blocking(
250 "prompt_on_playback_drained",
251 LiveEffect::PromptModel,
252 )]
253 } else {
254 Vec::new()
255 }
256 }
257}
258
259pub struct UserSpeechActivityRule;
261
262impl ReactorRule for UserSpeechActivityRule {
263 fn name(&self) -> &str {
264 "user_speech_activity"
265 }
266
267 fn react(&self, event: &ReactorEvent, voice: &VoiceRuntimeState) -> Vec<Reaction> {
268 match event {
269 ReactorEvent::UserSpeechStarted => vec![
270 Reaction::blocking("user_speech_activity", LiveEffect::CancelDeferredPrompt),
271 Reaction::blocking("user_speech_activity", LiveEffect::SignalUserActivityStart),
272 ],
273 ReactorEvent::UserSpeechEnded { .. } => {
274 let mut reactions = vec![Reaction::blocking(
275 "user_speech_activity",
276 LiveEffect::SignalUserActivityEnd,
277 )];
278 if voice.prompt_pending && !voice.playback_active {
279 reactions.push(Reaction::blocking(
280 "user_speech_activity",
281 LiveEffect::PromptModel,
282 ));
283 }
284 reactions
285 }
286 _ => Vec::new(),
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 #[test]
296 fn reactor_collects_reactions_in_rule_order() {
297 let mut reactor = LiveReactor::new();
298 reactor.add_rule(PromptOnPlaybackDrained);
299
300 let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
301 prompt_pending: true,
302 });
303 assert_eq!(reactions.len(), 1);
304 assert_eq!(reactions[0].source, "prompt_on_playback_drained");
305 assert_eq!(reactions[0].policy.mode, EffectMode::Blocking);
306 assert!(matches!(reactions[0].effect, LiveEffect::PromptModel));
307 }
308
309 #[test]
310 fn playback_drained_without_pending_prompt_is_noop() {
311 let reactor = LiveReactor::voice_defaults();
312
313 let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
314 prompt_pending: false,
315 });
316
317 assert!(reactions.is_empty());
318 }
319
320 #[test]
321 fn user_speech_started_cancels_prompt_and_signals_activity() {
322 let reactor = LiveReactor::voice_defaults();
323
324 let prompt_reactions = reactor.react(&ReactorEvent::PlaybackDrained {
325 prompt_pending: true,
326 });
327 assert_eq!(prompt_reactions.len(), 1);
328
329 let reactions = reactor.react(&ReactorEvent::UserSpeechStarted);
330
331 assert_eq!(reactions.len(), 2);
332 assert!(matches!(
333 reactions[0].effect,
334 LiveEffect::CancelDeferredPrompt
335 ));
336 assert!(matches!(
337 reactions[1].effect,
338 LiveEffect::SignalUserActivityStart
339 ));
340 let voice = reactor.voice_state();
341 assert!(voice.user_speaking);
342 assert!(!voice.prompt_pending);
343 assert_eq!(voice.prompt_epoch, 1);
344 }
345
346 #[test]
347 fn user_speech_ended_signals_activity_end() {
348 let reactor = LiveReactor::voice_defaults();
349
350 let reactions = reactor.react(&ReactorEvent::UserSpeechEnded {
351 prompt_pending: false,
352 });
353
354 assert_eq!(reactions.len(), 1);
355 assert!(matches!(
356 reactions[0].effect,
357 LiveEffect::SignalUserActivityEnd
358 ));
359 assert!(!reactor.voice_state().user_speaking);
360 }
361
362 #[test]
363 fn speech_end_prompts_when_playback_already_drained_and_prompt_pending() {
364 let reactor = LiveReactor::voice_defaults();
365
366 reactor.react(&ReactorEvent::UserSpeechStarted);
367 let drain_reactions = reactor.react(&ReactorEvent::PlaybackDrained {
368 prompt_pending: true,
369 });
370 assert!(drain_reactions.is_empty());
371
372 let reactions = reactor.react(&ReactorEvent::UserSpeechEnded {
373 prompt_pending: true,
374 });
375
376 assert_eq!(reactions.len(), 2);
377 assert!(matches!(
378 reactions[0].effect,
379 LiveEffect::SignalUserActivityEnd
380 ));
381 assert!(matches!(reactions[1].effect, LiveEffect::PromptModel));
382 }
383
384 #[test]
385 fn playback_drained_does_not_prompt_while_user_is_speaking() {
386 let reactor = LiveReactor::voice_defaults();
387
388 reactor.react(&ReactorEvent::UserSpeechStarted);
389 let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
390 prompt_pending: true,
391 });
392
393 assert!(reactions.is_empty());
394 let voice = reactor.voice_state();
395 assert!(voice.user_speaking);
396 assert!(voice.prompt_pending);
397 }
398
399 #[test]
400 fn speech_start_wins_after_pending_prompt_snapshot() {
401 let reactor = LiveReactor::voice_defaults();
402
403 reactor.react(&ReactorEvent::PlaybackDrained {
404 prompt_pending: true,
405 });
406 reactor.react(&ReactorEvent::UserSpeechStarted);
407 let reactions = reactor.react(&ReactorEvent::PlaybackDrained {
408 prompt_pending: false,
409 });
410
411 assert!(reactions.is_empty());
412 let voice = reactor.voice_state();
413 assert!(voice.user_speaking);
414 assert!(!voice.prompt_pending);
415 assert_eq!(voice.prompt_epoch, 1);
416 }
417}