1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use bytes::Bytes;
14use tokio::sync::{broadcast, mpsc};
15use tokio_util::sync::CancellationToken;
16
17use gemini_genai_rs::prelude::{SessionEvent, SessionPhase};
18use gemini_genai_rs::session::SessionWriter;
19
20use crate::state::State;
21use crate::tool::ToolDispatcher;
22
23use super::background_tool::BackgroundToolTracker;
24use super::callbacks::EventCallbacks;
25use super::computed::ComputedRegistry;
26use super::context_writer::PendingContext;
27use super::control_plane::run_control_lane;
28use super::events::LiveEvent;
29use super::extractor::TurnExtractor;
30use super::needs::NeedsFulfillment;
31use super::persistence::SessionPersistence;
32use super::phase::PhaseMachine;
33use super::session_signals::SessionSignals;
34use super::soft_turn::SoftTurnDetector;
35use super::steering::{ContextDelivery, SteeringMode};
36use super::telemetry::SessionTelemetry;
37use super::temporal::TemporalRegistry;
38use super::watcher::WatcherRegistry;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51pub enum Delivery {
52 #[default]
58 Lossless,
59 LossyDropNewest,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct DeliveryConfig {
83 pub audio: Delivery,
85 pub text: Delivery,
87 pub transcript: Delivery,
90 pub thought: Delivery,
92 pub vad: Delivery,
94 pub phase: Delivery,
96}
97
98impl Default for DeliveryConfig {
99 fn default() -> Self {
100 Self {
101 audio: Delivery::Lossless,
102 text: Delivery::Lossless,
103 transcript: Delivery::Lossless,
104 thought: Delivery::Lossless,
105 vad: Delivery::Lossless,
106 phase: Delivery::Lossless,
107 }
108 }
109}
110
111impl DeliveryConfig {
112 pub fn lossless() -> Self {
115 Self::default()
116 }
117
118 pub fn audio(mut self, d: Delivery) -> Self {
120 self.audio = d;
121 self
122 }
123
124 pub fn text(mut self, d: Delivery) -> Self {
126 self.text = d;
127 self
128 }
129
130 pub fn transcript(mut self, d: Delivery) -> Self {
132 self.transcript = d;
133 self
134 }
135
136 pub fn thought(mut self, d: Delivery) -> Self {
138 self.thought = d;
139 self
140 }
141
142 pub fn vad(mut self, d: Delivery) -> Self {
144 self.vad = d;
145 self
146 }
147
148 pub fn phase(mut self, d: Delivery) -> Self {
150 self.phase = d;
151 self
152 }
153}
154
155#[derive(Debug, Default)]
161pub(crate) struct DroppedFrames {
162 pub audio: AtomicU64,
163 pub text: AtomicU64,
164 pub transcript: AtomicU64,
165 pub thought: AtomicU64,
166 pub vad: AtomicU64,
167 pub phase: AtomicU64,
168}
169
170impl DroppedFrames {
171 #[cfg(test)]
176 pub fn total(&self) -> u64 {
177 self.audio.load(Ordering::Relaxed)
178 + self.text.load(Ordering::Relaxed)
179 + self.transcript.load(Ordering::Relaxed)
180 + self.thought.load(Ordering::Relaxed)
181 + self.vad.load(Ordering::Relaxed)
182 + self.phase.load(Ordering::Relaxed)
183 }
184}
185
186async fn deliver_fast(
195 tx: &mpsc::Sender<FastEvent>,
196 ev: FastEvent,
197 policy: Delivery,
198 dropped: &AtomicU64,
199) {
200 match policy {
201 Delivery::Lossless => {
202 let _ = tx.send(ev).await;
203 }
204 Delivery::LossyDropNewest => {
205 if let Err(mpsc::error::TrySendError::Full(_)) = tx.try_send(ev) {
206 dropped.fetch_add(1, Ordering::Relaxed);
207 }
208 }
211 }
212}
213
214pub(crate) enum FastEvent {
216 Audio(Bytes),
217 Text(String),
218 TextComplete(String),
219 InputTranscript(String),
220 OutputTranscript(String),
221 Thought(String),
222 VadStart,
223 VadEnd,
224 Phase(SessionPhase),
225 Interrupted,
227}
228
229pub(crate) enum ControlEvent {
231 ToolCall(Vec<gemini_genai_rs::prelude::FunctionCall>),
232 ToolCallCancelled(Vec<String>),
233 ToolCompleted {
237 call_id: String,
239 name: String,
241 ok: bool,
243 },
244 Interrupted,
245 TurnComplete,
246 GenerationComplete,
248 GoAway(Option<String>),
249 Connected,
250 Disconnected(Option<String>),
251 SessionResumeUpdate(gemini_genai_rs::session::ResumeInfo),
252 Error(String),
253 InputTranscript(String),
255 OutputTranscript(String),
256}
257
258pub(crate) struct SharedState {
260 pub interrupted: AtomicBool,
262 pub barge_in: parking_lot::Mutex<CancellationToken>,
270 pub resume_handle: parking_lot::Mutex<Option<String>>,
272 pub last_instruction: parking_lot::Mutex<Option<String>>,
274 pub pending_context: Option<Arc<PendingContext>>,
276 pub delivery: DeliveryConfig,
278 pub dropped: DroppedFrames,
280}
281
282pub(crate) struct ControlPlaneConfig {
288 pub soft_turn: Option<SoftTurnDetector>,
290 pub steering_mode: SteeringMode,
292 pub context_delivery: ContextDelivery,
296 pub needs_fulfillment: Option<NeedsFulfillment>,
298 pub persistence: Option<Arc<dyn SessionPersistence>>,
300 pub session_id: Option<String>,
302 pub tool_advisory: bool,
304 pub pending_context: Option<Arc<PendingContext>>,
308 pub middleware: Arc<crate::middleware::MiddlewareChain>,
311 pub flow: Option<crate::flow::SharedFlowMonitor>,
317 pub delivery: DeliveryConfig,
320}
321
322impl Default for ControlPlaneConfig {
323 fn default() -> Self {
324 Self {
325 soft_turn: None,
326 steering_mode: SteeringMode::default(),
327 context_delivery: ContextDelivery::default(),
328 needs_fulfillment: None,
329 persistence: None,
330 session_id: None,
331 tool_advisory: true,
332 pending_context: None,
333 middleware: Arc::new(crate::middleware::MiddlewareChain::new()),
334 flow: None,
335 delivery: DeliveryConfig::default(),
336 }
337 }
338}
339
340#[allow(
341 clippy::too_many_arguments,
342 reason = "lane spawn site: parameters are the owned subsystem handles split between the fast and control lanes"
343)]
344pub(crate) fn spawn_event_processor(
345 mut event_rx: broadcast::Receiver<SessionEvent>,
346 callbacks: Arc<EventCallbacks>,
347 dispatcher: Option<Arc<ToolDispatcher>>,
348 writer: Arc<dyn SessionWriter>,
349 extractors: Vec<Arc<dyn TurnExtractor>>,
350 state: State,
351 computed: Option<ComputedRegistry>,
352 phase_machine: Option<tokio::sync::Mutex<PhaseMachine>>,
353 watchers: Option<WatcherRegistry>,
354 temporal: Option<Arc<TemporalRegistry>>,
355 background_tracker: Option<Arc<BackgroundToolTracker>>,
356 execution_modes: std::collections::HashMap<String, super::background_tool::ToolExecutionMode>,
357 control_plane: ControlPlaneConfig,
358 live_event_tx: broadcast::Sender<LiveEvent>,
359) -> (tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>) {
360 let shared = Arc::new(SharedState {
361 interrupted: AtomicBool::new(false),
362 barge_in: parking_lot::Mutex::new(CancellationToken::new()),
363 resume_handle: parking_lot::Mutex::new(None),
364 last_instruction: parking_lot::Mutex::new(None),
365 pending_context: control_plane.pending_context.clone(),
366 delivery: control_plane.delivery,
367 dropped: DroppedFrames::default(),
368 });
369
370 let timer_cancel = CancellationToken::new();
371
372 let (fast_tx, fast_rx) = mpsc::channel::<FastEvent>(512);
383 let (ctrl_tx, ctrl_rx) = mpsc::channel::<ControlEvent>(512);
384
385 let fast_tx_clone = fast_tx.clone();
388 let ctrl_tx_clone = ctrl_tx.clone();
389 let shared_clone = shared.clone();
390 tokio::spawn(async move {
391 loop {
392 match event_rx.recv().await {
393 Ok(event) => {
394 let terminal = matches!(event, SessionEvent::Disconnected(_));
402 route_event(event, &fast_tx_clone, &ctrl_tx_clone, &shared_clone).await;
403 if terminal {
404 break;
405 }
406 }
407 Err(broadcast::error::RecvError::Lagged(n)) => {
408 #[cfg(feature = "tracing-support")]
409 tracing::warn!(skipped = n, "Event processor lagged, skipped events");
410 let _ = n;
411 }
412 Err(broadcast::error::RecvError::Closed) => break,
413 }
414 }
415 });
416
417 let fast_callbacks = callbacks.clone();
419 let fast_shared = shared.clone();
420 let fast_event_tx = live_event_tx.clone();
421 let fast_handle = tokio::spawn(async move {
422 run_fast_lane(fast_rx, fast_callbacks, fast_shared, fast_event_tx).await;
423 });
424
425 let timer_temporal = temporal.clone();
427 let timer_state = state.clone();
428 let timer_writer = writer.clone();
429
430 let ctrl_callbacks = callbacks;
432 let ctrl_shared = shared;
433 let ctrl_timer_cancel = timer_cancel.clone();
434 let ctrl_tx_weak = ctrl_tx.downgrade();
439 let ctrl_handle = tokio::spawn(async move {
440 run_control_lane(
441 ctrl_rx,
442 ctrl_tx_weak,
443 ctrl_callbacks,
444 dispatcher,
445 writer,
446 ctrl_shared,
447 extractors,
448 state,
449 computed,
450 phase_machine,
451 watchers,
452 temporal,
453 background_tracker,
454 execution_modes,
455 control_plane,
456 live_event_tx,
457 )
458 .await;
459 ctrl_timer_cancel.cancel();
460 });
461
462 if let Some(ref temporal_ref) = timer_temporal {
464 if temporal_ref.needs_timer() {
465 let t = temporal_ref.clone();
466 let cancel = timer_cancel.clone();
467 tokio::spawn(async move {
468 let mut interval = tokio::time::interval(Duration::from_millis(500));
469 loop {
470 tokio::select! {
471 _ = cancel.cancelled() => break,
472 _ = interval.tick() => {
473 for action in t.check_all(&timer_state, None, &timer_writer) {
474 tokio::spawn(action);
475 }
476 }
477 }
478 }
479 });
480 }
481 }
482
483 (fast_handle, ctrl_handle)
484}
485
486pub(crate) fn spawn_telemetry_lane(
492 mut telem_rx: broadcast::Receiver<SessionEvent>,
493 signals: SessionSignals,
494 telemetry: Arc<SessionTelemetry>,
495 cancel: CancellationToken,
496 on_usage: Option<super::callbacks::UsageCallback>,
497) -> tokio::task::JoinHandle<()> {
498 tokio::spawn(async move {
499 let mut debounce = tokio::time::interval(Duration::from_millis(100));
500 debounce.tick().await;
502 loop {
503 tokio::select! {
504 biased;
505 result = telem_rx.recv() => {
506 match result {
507 Ok(event) => {
508 match &event {
510 SessionEvent::AudioData(data) => {
511 telemetry.record_audio_out(data.len());
512 }
513 SessionEvent::TextDelta(_) => {
514 telemetry.record_text_out();
515 }
516 SessionEvent::VoiceActivityEnd => {
517 telemetry.record_vad_end();
518 }
519 SessionEvent::Interrupted => {
520 telemetry.record_interruption();
521 }
522 SessionEvent::TurnComplete => {
523 telemetry.record_turn_complete();
524 }
525 SessionEvent::VoiceActivityStart => {
526 telemetry.mark_turn_start();
527 }
528 SessionEvent::Usage(ref usage) => {
529 telemetry.record_usage(
530 usage.total_token_count,
531 usage.prompt_token_count,
532 usage.response_token_count,
533 usage.cached_content_token_count,
534 usage.thoughts_token_count,
535 );
536 if let Some(cb) = &on_usage {
537 cb(usage);
538 }
539 }
540 _ => {}
541 }
542 signals.on_event(&event);
544 }
545 Err(broadcast::error::RecvError::Lagged(n)) => {
546 #[cfg(feature = "tracing-support")]
547 tracing::warn!(skipped = n, "Telemetry lane lagged");
548 let _ = n;
549 }
550 Err(broadcast::error::RecvError::Closed) => break,
551 }
552 }
553 _ = debounce.tick() => {
554 signals.flush_timing();
556 }
557 _ = cancel.cancelled() => break,
558 }
559 }
560 })
561}
562
563async fn route_event(
565 event: SessionEvent,
566 fast_tx: &mpsc::Sender<FastEvent>,
567 ctrl_tx: &mpsc::Sender<ControlEvent>,
568 shared: &SharedState,
569) {
570 let delivery = &shared.delivery;
571 let dropped = &shared.dropped;
572 match event {
573 SessionEvent::AudioData(data) => {
575 deliver_fast(
576 fast_tx,
577 FastEvent::Audio(data),
578 delivery.audio,
579 &dropped.audio,
580 )
581 .await;
582 }
583 SessionEvent::TextDelta(text) => {
584 deliver_fast(fast_tx, FastEvent::Text(text), delivery.text, &dropped.text).await;
585 }
586 SessionEvent::TextComplete(text) => {
587 deliver_fast(
588 fast_tx,
589 FastEvent::TextComplete(text),
590 delivery.text,
591 &dropped.text,
592 )
593 .await;
594 }
595 SessionEvent::InputTranscription(text) => {
598 deliver_fast(
599 fast_tx,
600 FastEvent::InputTranscript(text.clone()),
601 delivery.transcript,
602 &dropped.transcript,
603 )
604 .await;
605 let _ = ctrl_tx.send(ControlEvent::InputTranscript(text)).await;
606 }
607 SessionEvent::OutputTranscription(text) => {
608 deliver_fast(
609 fast_tx,
610 FastEvent::OutputTranscript(text.clone()),
611 delivery.transcript,
612 &dropped.transcript,
613 )
614 .await;
615 let _ = ctrl_tx.send(ControlEvent::OutputTranscript(text)).await;
616 }
617 SessionEvent::Thought(text) => {
618 deliver_fast(
619 fast_tx,
620 FastEvent::Thought(text),
621 delivery.thought,
622 &dropped.thought,
623 )
624 .await;
625 }
626 SessionEvent::VoiceActivityStart => {
627 deliver_fast(fast_tx, FastEvent::VadStart, delivery.vad, &dropped.vad).await;
628 }
629 SessionEvent::VoiceActivityEnd => {
630 deliver_fast(fast_tx, FastEvent::VadEnd, delivery.vad, &dropped.vad).await;
631 }
632 SessionEvent::PhaseChanged(phase) => {
633 deliver_fast(
634 fast_tx,
635 FastEvent::Phase(phase),
636 delivery.phase,
637 &dropped.phase,
638 )
639 .await;
640 }
641 SessionEvent::SessionResumeUpdate(info) => {
642 *shared.resume_handle.lock() = Some(info.handle.clone());
643 let _ = ctrl_tx.send(ControlEvent::SessionResumeUpdate(info)).await;
644 }
645 SessionEvent::GenerationComplete => {
646 let _ = ctrl_tx.send(ControlEvent::GenerationComplete).await;
647 }
648
649 SessionEvent::ToolCall(calls) => {
651 let _ = ctrl_tx.send(ControlEvent::ToolCall(calls)).await;
652 }
653 SessionEvent::ToolCallCancelled(ids) => {
654 let _ = ctrl_tx.send(ControlEvent::ToolCallCancelled(ids)).await;
655 }
656 SessionEvent::Interrupted => {
657 shared.interrupted.store(true, Ordering::Release);
659 shared.barge_in.lock().cancel();
663 let _ = fast_tx.send(FastEvent::Interrupted).await;
664 let _ = ctrl_tx.send(ControlEvent::Interrupted).await;
665 }
666 SessionEvent::TurnComplete => {
667 let _ = ctrl_tx.send(ControlEvent::TurnComplete).await;
668 }
669 SessionEvent::Usage(_) => {}
671 SessionEvent::GoAway(time_left) => {
672 let _ = ctrl_tx.send(ControlEvent::GoAway(time_left)).await;
673 }
674 SessionEvent::Connected => {
675 let _ = ctrl_tx.send(ControlEvent::Connected).await;
676 }
677 SessionEvent::Disconnected(reason) => {
678 let _ = ctrl_tx.send(ControlEvent::Disconnected(reason)).await;
679 }
680 SessionEvent::Error(err) => {
681 let _ = ctrl_tx.send(ControlEvent::Error(err)).await;
682 }
683 other => {
687 #[cfg(feature = "tracing-support")]
688 tracing::debug!(?other, "unhandled SessionEvent variant (newer wire event?)");
689 #[cfg(not(feature = "tracing-support"))]
690 let _ = other;
691 }
692 }
693}
694
695async fn run_fast_lane(
698 mut rx: mpsc::Receiver<FastEvent>,
699 callbacks: Arc<EventCallbacks>,
700 shared: Arc<SharedState>,
701 event_tx: broadcast::Sender<LiveEvent>,
702) {
703 while let Some(event) = rx.recv().await {
704 match event {
705 FastEvent::Audio(data) => {
706 if !shared.interrupted.load(Ordering::Acquire) {
708 if let Some(cb) = &callbacks.on_audio {
709 cb(&data);
710 }
711 let _ = event_tx.send(LiveEvent::Audio(data));
712 }
713 }
714 FastEvent::Text(delta) => {
715 if let Some(cb) = &callbacks.on_text {
716 cb(&delta);
717 }
718 let _ = event_tx.send(LiveEvent::TextDelta(delta));
719 }
720 FastEvent::TextComplete(text) => {
721 if let Some(cb) = &callbacks.on_text_complete {
722 cb(&text);
723 }
724 let _ = event_tx.send(LiveEvent::TextComplete(text));
725 }
726 FastEvent::InputTranscript(text) => {
727 if let Some(cb) = &callbacks.on_input_transcript {
729 cb(&text, false);
730 }
731 let _ = event_tx.send(LiveEvent::InputTranscript {
732 text,
733 is_final: false,
734 });
735 }
736 FastEvent::OutputTranscript(text) => {
737 if let Some(cb) = &callbacks.on_output_transcript {
739 cb(&text, false);
740 }
741 let _ = event_tx.send(LiveEvent::OutputTranscript {
742 text,
743 is_final: false,
744 });
745 }
746 FastEvent::Thought(text) => {
747 if let Some(cb) = &callbacks.on_thought {
748 cb(&text);
749 }
750 let _ = event_tx.send(LiveEvent::Thought(text));
751 }
752 FastEvent::VadStart => {
753 if let Some(cb) = &callbacks.on_vad_start {
754 cb();
755 }
756 let _ = event_tx.send(LiveEvent::VadStart);
757 }
758 FastEvent::VadEnd => {
759 if let Some(cb) = &callbacks.on_vad_end {
760 cb();
761 }
762 let _ = event_tx.send(LiveEvent::VadEnd);
763 }
764 FastEvent::Phase(phase) => {
765 if let Some(cb) = &callbacks.on_phase {
766 cb(phase);
767 }
768 }
770 FastEvent::Interrupted => {
771 }
774 }
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781 use std::sync::atomic::AtomicUsize;
782
783 use crate::live::events::LiveEvent;
784 use gemini_genai_rs::prelude::FunctionResponse;
785
786 fn dummy_event_tx() -> broadcast::Sender<LiveEvent> {
787 broadcast::channel::<LiveEvent>(16).0
788 }
789
790 #[test]
791 fn delivery_config_default_is_all_lossless() {
792 let cfg = DeliveryConfig::default();
793 assert_eq!(cfg.audio, Delivery::Lossless);
794 assert_eq!(cfg.text, Delivery::Lossless);
795 assert_eq!(cfg.transcript, Delivery::Lossless);
796 assert_eq!(cfg.thought, Delivery::Lossless);
797 assert_eq!(cfg.vad, Delivery::Lossless);
798 assert_eq!(cfg.phase, Delivery::Lossless);
799 assert_eq!(Delivery::default(), Delivery::Lossless);
801 }
802
803 #[tokio::test]
804 async fn lossy_drop_newest_does_not_block_and_counts_drops() {
805 let (tx, _rx) = mpsc::channel::<FastEvent>(1);
808 tx.send(FastEvent::VadStart).await.unwrap(); let dropped = AtomicU64::new(0);
810
811 let res = tokio::time::timeout(
814 Duration::from_millis(100),
815 deliver_fast(&tx, FastEvent::VadEnd, Delivery::LossyDropNewest, &dropped),
816 )
817 .await;
818 assert!(res.is_ok(), "deliver_fast blocked under LossyDropNewest");
819 assert_eq!(dropped.load(Ordering::Relaxed), 1);
820 }
821
822 #[tokio::test]
823 async fn lossless_delivers_on_non_full_channel() {
824 let (tx, mut rx) = mpsc::channel::<FastEvent>(4);
825 let dropped = AtomicU64::new(0);
826
827 deliver_fast(
828 &tx,
829 FastEvent::Text("hello".into()),
830 Delivery::Lossless,
831 &dropped,
832 )
833 .await;
834
835 assert_eq!(dropped.load(Ordering::Relaxed), 0);
837 match rx.recv().await {
838 Some(FastEvent::Text(s)) => assert_eq!(s, "hello"),
839 other => panic!("expected Text frame, got {:?}", other.is_some()),
840 }
841 }
842
843 #[test]
844 fn dropped_frames_total_sums_classes() {
845 let d = DroppedFrames::default();
846 d.audio.fetch_add(2, Ordering::Relaxed);
847 d.transcript.fetch_add(3, Ordering::Relaxed);
848 assert_eq!(d.total(), 5);
849 }
850
851 #[tokio::test]
852 async fn fast_lane_routes_audio() {
853 let count = Arc::new(AtomicUsize::new(0));
854 let count_clone = count.clone();
855
856 let callbacks = EventCallbacks {
857 on_audio: Some(Box::new(move |_| {
858 count_clone.fetch_add(1, Ordering::SeqCst);
859 })),
860 ..Default::default()
861 };
862 let callbacks = Arc::new(callbacks);
863
864 let (event_tx, _) = broadcast::channel(16);
865 let event_rx = event_tx.subscribe();
866
867 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
868
869 let (fast_handle, ctrl_handle) = spawn_event_processor(
870 event_rx,
871 callbacks,
872 None,
873 writer,
874 vec![],
875 State::new(),
876 None,
877 None,
878 None,
879 None,
880 None,
881 std::collections::HashMap::new(),
882 ControlPlaneConfig::default(),
883 dummy_event_tx(),
884 );
885
886 let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"audio1")));
888 let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"audio2")));
889
890 tokio::time::sleep(Duration::from_millis(50)).await;
892
893 assert_eq!(count.load(Ordering::SeqCst), 2);
894
895 drop(event_tx);
897 let _ = fast_handle.await;
898 let _ = ctrl_handle.await;
899 }
900
901 #[tokio::test]
902 async fn interrupt_suppresses_audio() {
903 let count = Arc::new(AtomicUsize::new(0));
904 let count_clone = count.clone();
905
906 let callbacks = EventCallbacks {
907 on_audio: Some(Box::new(move |_| {
908 count_clone.fetch_add(1, Ordering::SeqCst);
909 })),
910 ..Default::default()
911 };
912 let callbacks = Arc::new(callbacks);
913
914 let (event_tx, _) = broadcast::channel(16);
915 let event_rx = event_tx.subscribe();
916
917 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
918
919 let (fast_handle, ctrl_handle) = spawn_event_processor(
920 event_rx,
921 callbacks,
922 None,
923 writer,
924 vec![],
925 State::new(),
926 None,
927 None,
928 None,
929 None,
930 None,
931 std::collections::HashMap::new(),
932 ControlPlaneConfig::default(),
933 dummy_event_tx(),
934 );
935
936 let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"before")));
938 tokio::time::sleep(Duration::from_millis(20)).await;
939 let _ = event_tx.send(SessionEvent::Interrupted);
940 tokio::time::sleep(Duration::from_millis(20)).await;
941 let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"during")));
942 tokio::time::sleep(Duration::from_millis(50)).await;
943
944 assert!(count.load(Ordering::SeqCst) >= 1);
946
947 drop(event_tx);
948 let _ = fast_handle.await;
949 let _ = ctrl_handle.await;
950 }
951
952 #[tokio::test]
953 async fn control_lane_routes_turn_complete() {
954 let called = Arc::new(AtomicBool::new(false));
955 let called_clone = called.clone();
956
957 let callbacks = EventCallbacks {
958 on_turn_complete: Some(Arc::new(move || {
959 let c = called_clone.clone();
960 Box::pin(async move {
961 c.store(true, Ordering::SeqCst);
962 })
963 })),
964 ..Default::default()
965 };
966 let callbacks = Arc::new(callbacks);
967
968 let (event_tx, _) = broadcast::channel(16);
969 let event_rx = event_tx.subscribe();
970
971 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
972
973 let (fast_handle, ctrl_handle) = spawn_event_processor(
974 event_rx,
975 callbacks,
976 None,
977 writer,
978 vec![],
979 State::new(),
980 None,
981 None,
982 None,
983 None,
984 None,
985 std::collections::HashMap::new(),
986 ControlPlaneConfig::default(),
987 dummy_event_tx(),
988 );
989
990 let _ = event_tx.send(SessionEvent::TurnComplete);
991 tokio::time::sleep(Duration::from_millis(50)).await;
992
993 assert!(called.load(Ordering::SeqCst));
994
995 drop(event_tx);
996 let _ = fast_handle.await;
997 let _ = ctrl_handle.await;
998 }
999
1000 #[tokio::test]
1001 async fn transcript_accumulates_in_control_lane() {
1002 let callbacks = Arc::new(EventCallbacks::default());
1003
1004 let (event_tx, _) = broadcast::channel(16);
1005 let event_rx = event_tx.subscribe();
1006
1007 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1008
1009 let state = State::new();
1010 let (fast_handle, ctrl_handle) = spawn_event_processor(
1011 event_rx,
1012 callbacks,
1013 None,
1014 writer,
1015 vec![],
1016 state.clone(),
1017 None,
1018 None,
1019 None,
1020 None,
1021 None,
1022 std::collections::HashMap::new(),
1023 ControlPlaneConfig::default(),
1024 dummy_event_tx(),
1025 );
1026
1027 let _ = event_tx.send(SessionEvent::InputTranscription("Hello ".to_string()));
1029 let _ = event_tx.send(SessionEvent::InputTranscription("world".to_string()));
1030 let _ = event_tx.send(SessionEvent::OutputTranscription("Hi there!".to_string()));
1031 tokio::time::sleep(Duration::from_millis(50)).await;
1032
1033 let _ = event_tx.send(SessionEvent::TurnComplete);
1035 tokio::time::sleep(Duration::from_millis(50)).await;
1036
1037 let tc: u32 = state.session().get("turn_count").unwrap_or(0);
1039 assert_eq!(tc, 1);
1040
1041 drop(event_tx);
1042 let _ = fast_handle.await;
1043 let _ = ctrl_handle.await;
1044 }
1045
1046 #[tokio::test]
1047 async fn extractor_runs_on_turn_complete() {
1048 use crate::live::extractor::TurnExtractor;
1049 use crate::live::transcript::TranscriptTurn;
1050 use crate::llm::LlmError;
1051
1052 struct FixedExtractor;
1053
1054 #[async_trait::async_trait]
1055 impl TurnExtractor for FixedExtractor {
1056 fn name(&self) -> &str {
1057 "TestExtractor"
1058 }
1059 fn window_size(&self) -> usize {
1060 3
1061 }
1062 async fn extract(
1063 &self,
1064 _turns: &[TranscriptTurn],
1065 ) -> Result<serde_json::Value, LlmError> {
1066 Ok(serde_json::json!({"score": 0.9, "mood": "happy"}))
1067 }
1068 }
1069
1070 let callbacks = Arc::new(EventCallbacks::default());
1071
1072 let (event_tx, _) = broadcast::channel(16);
1073 let event_rx = event_tx.subscribe();
1074
1075 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1076
1077 let state = State::new();
1078
1079 let extractors: Vec<Arc<dyn TurnExtractor>> = vec![Arc::new(FixedExtractor)];
1080
1081 let (fast_handle, ctrl_handle) = spawn_event_processor(
1082 event_rx,
1083 callbacks,
1084 None,
1085 writer,
1086 extractors,
1087 state.clone(),
1088 None,
1089 None,
1090 None,
1091 None,
1092 None,
1093 std::collections::HashMap::new(),
1094 ControlPlaneConfig::default(),
1095 dummy_event_tx(),
1096 );
1097
1098 let _ = event_tx.send(SessionEvent::InputTranscription("hi".to_string()));
1100 tokio::time::sleep(Duration::from_millis(20)).await;
1101 let _ = event_tx.send(SessionEvent::TurnComplete);
1102 tokio::time::sleep(Duration::from_millis(100)).await;
1103
1104 let score: Option<f64> = state.get("score");
1106 assert_eq!(score, Some(0.9));
1107 let mood: Option<String> = state.get("mood");
1108 assert_eq!(mood, Some("happy".to_string()));
1109
1110 drop(event_tx);
1111 let _ = fast_handle.await;
1112 let _ = ctrl_handle.await;
1113 }
1114
1115 #[tokio::test]
1116 async fn telemetry_lane_auto_collects() {
1117 let (event_tx, _) = broadcast::channel(16);
1118 let telem_rx = event_tx.subscribe();
1119
1120 let telemetry = Arc::new(SessionTelemetry::new());
1121 let signals = SessionSignals::new(State::new());
1122 let cancel = CancellationToken::new();
1123
1124 let telem_handle =
1125 spawn_telemetry_lane(telem_rx, signals, telemetry.clone(), cancel.clone(), None);
1126
1127 let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"chunk1")));
1129 let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"chunk2")));
1130 let _ = event_tx.send(SessionEvent::VoiceActivityEnd);
1131 tokio::time::sleep(Duration::from_millis(50)).await;
1132 let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"response")));
1133 tokio::time::sleep(Duration::from_millis(50)).await;
1134
1135 let snap = telemetry.snapshot();
1136 assert_eq!(snap["audio_chunks_out"], 3);
1137 assert!(snap["response_count"].as_u64().unwrap() >= 1);
1138
1139 cancel.cancel();
1140 let _ = telem_handle.await;
1141 }
1142
1143 #[tokio::test]
1144 async fn background_tool_sends_ack_immediately() {
1145 use crate::live::background_tool::{BackgroundToolTracker, ToolExecutionMode};
1146 use crate::tool::{SimpleTool, ToolDispatcher};
1147
1148 let tool = SimpleTool::new(
1150 "slow_search",
1151 "A slow search tool",
1152 Some(serde_json::json!({"type": "object", "properties": {"q": {"type": "string"}}})),
1153 |_args| async move {
1154 tokio::time::sleep(Duration::from_millis(200)).await;
1155 Ok(serde_json::json!({"results": ["found"]}))
1156 },
1157 );
1158
1159 let mut dispatcher = ToolDispatcher::new();
1160 dispatcher.register(tool);
1161
1162 let mut execution_modes = std::collections::HashMap::new();
1163 execution_modes.insert(
1164 "slow_search".to_string(),
1165 ToolExecutionMode::Background {
1166 formatter: None,
1167 scheduling: None,
1168 },
1169 );
1170
1171 let sent = Arc::new(parking_lot::Mutex::new(Vec::<Vec<FunctionResponse>>::new()));
1172 let sent_clone = sent.clone();
1173
1174 struct RecordingWriter {
1176 sent: Arc<parking_lot::Mutex<Vec<Vec<FunctionResponse>>>>,
1177 }
1178
1179 #[async_trait::async_trait]
1180 impl SessionWriter for RecordingWriter {
1181 async fn send_audio(
1182 &self,
1183 _data: Vec<u8>,
1184 ) -> Result<(), gemini_genai_rs::session::SessionError> {
1185 Ok(())
1186 }
1187 async fn send_text(
1188 &self,
1189 _text: String,
1190 ) -> Result<(), gemini_genai_rs::session::SessionError> {
1191 Ok(())
1192 }
1193 async fn send_video(
1194 &self,
1195 _data: Vec<u8>,
1196 ) -> Result<(), gemini_genai_rs::session::SessionError> {
1197 Ok(())
1198 }
1199 async fn send_tool_response(
1200 &self,
1201 responses: Vec<FunctionResponse>,
1202 ) -> Result<(), gemini_genai_rs::session::SessionError> {
1203 self.sent.lock().push(responses);
1204 Ok(())
1205 }
1206 async fn update_instruction(
1207 &self,
1208 _instruction: String,
1209 ) -> Result<(), gemini_genai_rs::session::SessionError> {
1210 Ok(())
1211 }
1212 async fn send_client_content(
1213 &self,
1214 _content: Vec<gemini_genai_rs::prelude::Content>,
1215 _turn_complete: bool,
1216 ) -> Result<(), gemini_genai_rs::session::SessionError> {
1217 Ok(())
1218 }
1219 async fn signal_activity_start(
1220 &self,
1221 ) -> Result<(), gemini_genai_rs::session::SessionError> {
1222 Ok(())
1223 }
1224 async fn signal_activity_end(
1225 &self,
1226 ) -> Result<(), gemini_genai_rs::session::SessionError> {
1227 Ok(())
1228 }
1229 async fn disconnect(&self) -> Result<(), gemini_genai_rs::session::SessionError> {
1230 Ok(())
1231 }
1232 }
1233
1234 let writer: Arc<dyn SessionWriter> = Arc::new(RecordingWriter { sent: sent_clone });
1235 let callbacks = Arc::new(EventCallbacks::default());
1236 let tracker = Arc::new(BackgroundToolTracker::new());
1237
1238 let (event_tx, _) = broadcast::channel(16);
1239 let event_rx = event_tx.subscribe();
1240
1241 let (fast_handle, ctrl_handle) = spawn_event_processor(
1242 event_rx,
1243 callbacks,
1244 Some(Arc::new(dispatcher)),
1245 writer,
1246 vec![],
1247 State::new(),
1248 None,
1249 None,
1250 None,
1251 None,
1252 Some(tracker.clone()),
1253 execution_modes,
1254 ControlPlaneConfig::default(),
1255 dummy_event_tx(),
1256 );
1257
1258 let _ = event_tx.send(SessionEvent::ToolCall(vec![
1260 gemini_genai_rs::prelude::FunctionCall {
1261 name: "slow_search".to_string(),
1262 args: serde_json::json!({"q": "test"}),
1263 id: Some("fc_1".to_string()),
1264 },
1265 ]));
1266
1267 tokio::time::sleep(Duration::from_millis(50)).await;
1269
1270 {
1272 let responses = sent.lock();
1273 assert!(!responses.is_empty(), "Should have sent ack immediately");
1275 assert_eq!(responses[0][0].response["status"], "running");
1276 }
1277
1278 tokio::time::sleep(Duration::from_millis(300)).await;
1280
1281 {
1282 let responses = sent.lock();
1283 assert!(
1285 responses.len() >= 2,
1286 "Should have sent result after completion"
1287 );
1288 assert_eq!(responses[1][0].response["status"], "completed");
1289 }
1290
1291 drop(event_tx);
1292 let _ = fast_handle.await;
1293 let _ = ctrl_handle.await;
1294 }
1295
1296 #[tokio::test]
1297 async fn callback_mode_blocking_awaits_inline() {
1298 use crate::live::callbacks::CallbackMode;
1299 use std::sync::atomic::AtomicU32;
1300
1301 let order = Arc::new(AtomicU32::new(0));
1302 let order_clone = order.clone();
1303
1304 let callbacks = EventCallbacks {
1305 on_turn_complete: Some(Arc::new(move || {
1307 let o = order_clone.clone();
1308 Box::pin(async move {
1309 tokio::time::sleep(Duration::from_millis(10)).await;
1311 o.store(1, Ordering::SeqCst);
1312 })
1313 })),
1314 on_turn_complete_mode: CallbackMode::Blocking,
1315 ..Default::default()
1316 };
1317 let callbacks = Arc::new(callbacks);
1318
1319 let (event_tx, _) = broadcast::channel(16);
1320 let event_rx = event_tx.subscribe();
1321
1322 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1323
1324 let (fast_handle, ctrl_handle) = spawn_event_processor(
1325 event_rx,
1326 callbacks,
1327 None,
1328 writer,
1329 vec![],
1330 State::new(),
1331 None,
1332 None,
1333 None,
1334 None,
1335 None,
1336 std::collections::HashMap::new(),
1337 ControlPlaneConfig::default(),
1338 dummy_event_tx(),
1339 );
1340
1341 let _ = event_tx.send(SessionEvent::TurnComplete);
1342 tokio::time::sleep(Duration::from_millis(100)).await;
1343
1344 assert_eq!(order.load(Ordering::SeqCst), 1);
1346
1347 drop(event_tx);
1348 let _ = fast_handle.await;
1349 let _ = ctrl_handle.await;
1350 }
1351
1352 #[tokio::test]
1353 async fn interruption_beats_slow_inline_tool() {
1354 use crate::tool::{SimpleTool, ToolDispatcher};
1355
1356 let mut dispatcher = ToolDispatcher::new();
1358 dispatcher.register(SimpleTool::new("slow", "slow", None, |_args| async move {
1359 tokio::time::sleep(Duration::from_secs(5)).await;
1360 Ok(serde_json::json!({"done": true}))
1361 }));
1362
1363 let interrupted_at = Arc::new(parking_lot::Mutex::new(None::<std::time::Instant>));
1364 let flag = interrupted_at.clone();
1365 let callbacks = EventCallbacks {
1366 on_interrupted: Some(Arc::new(move || {
1367 let flag = flag.clone();
1368 Box::pin(async move {
1369 *flag.lock() = Some(std::time::Instant::now());
1370 })
1371 })),
1372 ..Default::default()
1373 };
1374
1375 let (event_tx, _) = broadcast::channel(16);
1376 let event_rx = event_tx.subscribe();
1377 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1378
1379 let (fast_handle, ctrl_handle) = spawn_event_processor(
1380 event_rx,
1381 Arc::new(callbacks),
1382 Some(Arc::new(dispatcher)),
1383 writer,
1384 vec![],
1385 State::new(),
1386 None,
1387 None,
1388 None,
1389 None,
1390 None,
1391 std::collections::HashMap::new(),
1392 ControlPlaneConfig::default(),
1393 dummy_event_tx(),
1394 );
1395
1396 let start = std::time::Instant::now();
1398 let _ = event_tx.send(SessionEvent::ToolCall(vec![
1399 gemini_genai_rs::prelude::FunctionCall {
1400 name: "slow".to_string(),
1401 args: serde_json::json!({}),
1402 id: Some("fc_slow".to_string()),
1403 },
1404 ]));
1405 tokio::time::sleep(Duration::from_millis(50)).await;
1406 let _ = event_tx.send(SessionEvent::Interrupted);
1407
1408 let mut waited = Duration::ZERO;
1411 while interrupted_at.lock().is_none() && waited < Duration::from_secs(2) {
1412 tokio::time::sleep(Duration::from_millis(25)).await;
1413 waited += Duration::from_millis(25);
1414 }
1415 let fired = (*interrupted_at.lock()).expect("on_interrupted must fire");
1416 assert!(
1417 fired.duration_since(start) < Duration::from_secs(2),
1418 "interruption must not wait for the slow tool"
1419 );
1420
1421 drop(event_tx);
1422 let _ = fast_handle.await;
1423 let _ = ctrl_handle.await;
1424 }
1425
1426 #[tokio::test]
1427 async fn control_lane_exit_persists_final_snapshot_synchronously() {
1428 use crate::live::persistence::{MemoryPersistence, SessionPersistence};
1429
1430 let persistence = Arc::new(MemoryPersistence::new());
1431 let control_plane = ControlPlaneConfig {
1432 persistence: Some(persistence.clone()),
1433 session_id: Some("final-drain".to_string()),
1434 ..Default::default()
1435 };
1436
1437 let (event_tx, _) = broadcast::channel(16);
1438 let event_rx = event_tx.subscribe();
1439 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1440
1441 let (fast_handle, ctrl_handle) = spawn_event_processor(
1442 event_rx,
1443 Arc::new(EventCallbacks::default()),
1444 None,
1445 writer,
1446 vec![],
1447 State::new(),
1448 None,
1449 None,
1450 None,
1451 None,
1452 None,
1453 std::collections::HashMap::new(),
1454 control_plane,
1455 dummy_event_tx(),
1456 );
1457
1458 let _ = event_tx.send(SessionEvent::InputTranscription("last words".to_string()));
1461 tokio::time::sleep(Duration::from_millis(50)).await;
1462
1463 drop(event_tx);
1466 let _ = fast_handle.await;
1467 let _ = ctrl_handle.await;
1468
1469 let snap = persistence
1470 .load("final-drain")
1471 .await
1472 .unwrap()
1473 .expect("control-lane exit must persist a final snapshot");
1474 assert_eq!(snap.turn_count, 0);
1475 }
1476
1477 #[tokio::test]
1478 async fn lanes_exit_after_terminal_disconnected_event() {
1479 let callbacks = Arc::new(EventCallbacks::default());
1484 let (event_tx, _) = broadcast::channel(16);
1485 let event_rx = event_tx.subscribe();
1486 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1487
1488 let (fast_handle, ctrl_handle) = spawn_event_processor(
1489 event_rx,
1490 callbacks,
1491 None,
1492 writer,
1493 vec![],
1494 State::new(),
1495 None,
1496 None,
1497 None,
1498 None,
1499 None,
1500 std::collections::HashMap::new(),
1501 ControlPlaneConfig::default(),
1502 dummy_event_tx(),
1503 );
1504
1505 let _ = event_tx.send(SessionEvent::Disconnected(None));
1506
1507 let joined = tokio::time::timeout(Duration::from_secs(2), async {
1510 let _ = fast_handle.await;
1511 let _ = ctrl_handle.await;
1512 })
1513 .await;
1514 assert!(
1515 joined.is_ok(),
1516 "lanes must exit after the terminal Disconnected event"
1517 );
1518 drop(event_tx);
1519 }
1520
1521 #[tokio::test]
1522 async fn callback_mode_concurrent_spawns_task() {
1523 use crate::live::callbacks::CallbackMode;
1524
1525 let called = Arc::new(AtomicBool::new(false));
1526 let called_clone = called.clone();
1527
1528 let callbacks = EventCallbacks {
1529 on_turn_complete: Some(Arc::new(move || {
1530 let c = called_clone.clone();
1531 Box::pin(async move {
1532 tokio::time::sleep(Duration::from_millis(10)).await;
1533 c.store(true, Ordering::SeqCst);
1534 })
1535 })),
1536 on_turn_complete_mode: CallbackMode::Concurrent,
1537 ..Default::default()
1538 };
1539 let callbacks = Arc::new(callbacks);
1540
1541 let (event_tx, _) = broadcast::channel(16);
1542 let event_rx = event_tx.subscribe();
1543
1544 let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1545
1546 let (fast_handle, ctrl_handle) = spawn_event_processor(
1547 event_rx,
1548 callbacks,
1549 None,
1550 writer,
1551 vec![],
1552 State::new(),
1553 None,
1554 None,
1555 None,
1556 None,
1557 None,
1558 std::collections::HashMap::new(),
1559 ControlPlaneConfig::default(),
1560 dummy_event_tx(),
1561 );
1562
1563 let _ = event_tx.send(SessionEvent::TurnComplete);
1564 tokio::time::sleep(Duration::from_millis(100)).await;
1566
1567 assert!(called.load(Ordering::SeqCst));
1569
1570 drop(event_tx);
1571 let _ = fast_handle.await;
1572 let _ = ctrl_handle.await;
1573 }
1574}