gemini_adk_rs/live/
processor.rs

1//! Three-lane event processor for Live sessions.
2//!
3//! **Fast lane**: audio, text, VAD (sync callbacks, never blocks)
4//! **Control lane**: tool calls, interruptions, lifecycle, transcript accumulation,
5//!   extractors, phases, watchers (async callbacks, can block)
6//! **Telemetry lane**: SessionSignals + SessionTelemetry (debounced state writes,
7//!   runs on its own broadcast receiver — zero work on the router hot path)
8
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use bytes::Bytes;
14use tokio::sync::{broadcast, mpsc};
15use tokio_util::sync::CancellationToken;
16
17use gemini_genai_rs::prelude::{SessionEvent, SessionPhase};
18use gemini_genai_rs::session::SessionWriter;
19
20use crate::state::State;
21use crate::tool::ToolDispatcher;
22
23use super::background_tool::BackgroundToolTracker;
24use super::callbacks::EventCallbacks;
25use super::computed::ComputedRegistry;
26use super::context_writer::PendingContext;
27use super::control_plane::run_control_lane;
28use super::events::LiveEvent;
29use super::extractor::TurnExtractor;
30use super::needs::NeedsFulfillment;
31use super::persistence::SessionPersistence;
32use super::phase::PhaseMachine;
33use super::session_signals::SessionSignals;
34use super::soft_turn::SoftTurnDetector;
35use super::steering::{ContextDelivery, SteeringMode};
36use super::telemetry::SessionTelemetry;
37use super::temporal::TemporalRegistry;
38use super::watcher::WatcherRegistry;
39
40/// Backpressure (delivery) policy for a single class of fast-lane events.
41///
42/// The event router forwards fast-lane frames (audio, text, transcripts,
43/// thoughts, VAD, phase) over a bounded channel to the fast-lane consumer. When
44/// that consumer falls behind and the channel fills, the policy decides what the
45/// router does — and crucially, whether the router *blocks*. Because the router
46/// is shared by both the fast lane and the control lane, a blocking fast-lane
47/// send stalls routing for *all* events, including control-lane lifecycle and
48/// tool events. The policy lets callers trade frame durability for router
49/// responsiveness on a per-class basis.
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51pub enum Delivery {
52    /// Never drop a frame: `tx.send(ev).await` — the router awaits when the
53    /// channel is full. This is the historical (and default) behavior and is
54    /// byte-for-byte identical to the pre-policy code path. Use it when every
55    /// frame matters and a slow consumer applying backpressure to the router is
56    /// acceptable.
57    #[default]
58    Lossless,
59    /// Drop the *newest* frame on overflow: `tx.try_send(ev)` and, on
60    /// [`TrySendError::Full`](tokio::sync::mpsc::error::TrySendError::Full),
61    /// discard the just-produced frame and bump a dropped-frame counter. The
62    /// router never blocks on this class, so a slow fast-lane consumer can no
63    /// longer stall control-lane routing. Use it for high-frequency, loss-
64    /// tolerant streams (e.g. partial transcripts, thoughts) where freshness of
65    /// already-queued frames matters less than keeping the router moving.
66    ///
67    /// A drop-oldest / latest-only variant is intentionally *not* provided:
68    /// tokio's `mpsc` has no clean "evict the oldest queued item" primitive, so
69    /// implementing it correctly would require a custom ring buffer. That is
70    /// left as future work rather than shipped half-working.
71    LossyDropNewest,
72}
73
74/// Per-event-class delivery (backpressure) policy for the fast lane.
75///
76/// Each fast-lane event class carries its own [`Delivery`] policy. The
77/// [`Default`] impl sets **every** class to [`Delivery::Lossless`], which makes
78/// the whole feature behavior-preserving: with the default config the router
79/// uses the same `send().await` path it always has. Callers opt into lossy
80/// behavior per class via the builder setters.
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct DeliveryConfig {
83    /// Policy for raw PCM audio frames.
84    pub audio: Delivery,
85    /// Policy for incremental text deltas (and text-complete frames).
86    pub text: Delivery,
87    /// Policy for input/output transcript frames (fast-lane callback copy only;
88    /// control-lane accumulation is unaffected and always lossless).
89    pub transcript: Delivery,
90    /// Policy for thought-summary frames.
91    pub thought: Delivery,
92    /// Policy for VAD start/end frames.
93    pub vad: Delivery,
94    /// Policy for phase-changed frames.
95    pub phase: Delivery,
96}
97
98impl Default for DeliveryConfig {
99    fn default() -> Self {
100        Self {
101            audio: Delivery::Lossless,
102            text: Delivery::Lossless,
103            transcript: Delivery::Lossless,
104            thought: Delivery::Lossless,
105            vad: Delivery::Lossless,
106            phase: Delivery::Lossless,
107        }
108    }
109}
110
111impl DeliveryConfig {
112    /// A config with every class set to [`Delivery::Lossless`] (same as
113    /// [`Default`]).
114    pub fn lossless() -> Self {
115        Self::default()
116    }
117
118    /// Set the audio policy.
119    pub fn audio(mut self, d: Delivery) -> Self {
120        self.audio = d;
121        self
122    }
123
124    /// Set the text policy.
125    pub fn text(mut self, d: Delivery) -> Self {
126        self.text = d;
127        self
128    }
129
130    /// Set the transcript policy.
131    pub fn transcript(mut self, d: Delivery) -> Self {
132        self.transcript = d;
133        self
134    }
135
136    /// Set the thought policy.
137    pub fn thought(mut self, d: Delivery) -> Self {
138        self.thought = d;
139        self
140    }
141
142    /// Set the VAD policy.
143    pub fn vad(mut self, d: Delivery) -> Self {
144        self.vad = d;
145        self
146    }
147
148    /// Set the phase policy.
149    pub fn phase(mut self, d: Delivery) -> Self {
150        self.phase = d;
151        self
152    }
153}
154
155/// Per-class counters for fast-lane frames dropped under a lossy policy.
156///
157/// Incremented with a single relaxed atomic add on the router hot path when a
158/// [`Delivery::LossyDropNewest`] send overflows. Reads are for observability /
159/// tests and never gate the hot path.
160#[derive(Debug, Default)]
161pub(crate) struct DroppedFrames {
162    pub audio: AtomicU64,
163    pub text: AtomicU64,
164    pub transcript: AtomicU64,
165    pub thought: AtomicU64,
166    pub vad: AtomicU64,
167    pub phase: AtomicU64,
168}
169
170impl DroppedFrames {
171    /// Total dropped frames across all classes.
172    ///
173    /// Currently only consumed by tests; the per-class atomics are read
174    /// directly elsewhere. Kept test-gated until a handle accessor surfaces it.
175    #[cfg(test)]
176    pub fn total(&self) -> u64 {
177        self.audio.load(Ordering::Relaxed)
178            + self.text.load(Ordering::Relaxed)
179            + self.transcript.load(Ordering::Relaxed)
180            + self.thought.load(Ordering::Relaxed)
181            + self.vad.load(Ordering::Relaxed)
182            + self.phase.load(Ordering::Relaxed)
183    }
184}
185
186/// Forward one fast-lane frame according to its class delivery policy.
187///
188/// - [`Delivery::Lossless`]: `tx.send(ev).await` — awaits when the channel is
189///   full (identical to the pre-policy behavior).
190/// - [`Delivery::LossyDropNewest`]: `tx.try_send(ev)` — on a full channel, drop
191///   the frame and increment `dropped`.
192///
193/// Returns without ever blocking the router under a lossy policy.
194async fn deliver_fast(
195    tx: &mpsc::Sender<FastEvent>,
196    ev: FastEvent,
197    policy: Delivery,
198    dropped: &AtomicU64,
199) {
200    match policy {
201        Delivery::Lossless => {
202            let _ = tx.send(ev).await;
203        }
204        Delivery::LossyDropNewest => {
205            if let Err(mpsc::error::TrySendError::Full(_)) = tx.try_send(ev) {
206                dropped.fetch_add(1, Ordering::Relaxed);
207            }
208            // `TrySendError::Closed` is ignored, matching the `let _ = send`
209            // pattern used elsewhere (the consumer is gone; nothing to do).
210        }
211    }
212}
213
214/// Events routed to the fast lane (sync processing).
215pub(crate) enum FastEvent {
216    Audio(Bytes),
217    Text(String),
218    TextComplete(String),
219    InputTranscript(String),
220    OutputTranscript(String),
221    Thought(String),
222    VadStart,
223    VadEnd,
224    Phase(SessionPhase),
225    /// Interruption flag — tells fast lane to stop forwarding audio.
226    Interrupted,
227}
228
229/// Events routed to the control lane (async processing).
230pub(crate) enum ControlEvent {
231    ToolCall(Vec<gemini_genai_rs::prelude::FunctionCall>),
232    ToolCallCancelled(Vec<String>),
233    /// A background tool finished. Posted by the detached background task (which
234    /// can't reach the synchronous `FlowMonitor`) so the control lane can advance
235    /// the governed flow through the same gate as inline tools (#7).
236    ToolCompleted {
237        /// The tool call's correlation id (for once-per-call_id flow dedup).
238        call_id: String,
239        /// The tool name (matches `FunctionCall::name`).
240        name: String,
241        /// Whether the tool completed successfully.
242        ok: bool,
243    },
244    Interrupted,
245    TurnComplete,
246    /// Model finished generating (even if interrupted). Fires before TurnComplete.
247    GenerationComplete,
248    GoAway(Option<String>),
249    Connected,
250    Disconnected(Option<String>),
251    SessionResumeUpdate(gemini_genai_rs::session::ResumeInfo),
252    Error(String),
253    /// Transcript accumulation — pushed from router, exclusive to control lane.
254    InputTranscript(String),
255    OutputTranscript(String),
256}
257
258/// Shared state between the two lanes.
259pub(crate) struct SharedState {
260    /// When true, fast lane suppresses audio callbacks.
261    pub interrupted: AtomicBool,
262    /// Barge-in signal for in-flight inline tool dispatch.
263    ///
264    /// Cancelled by the ROUTER the moment an `Interrupted` event arrives,
265    /// then re-armed (replaced with a fresh token) by the control lane once
266    /// it has processed the interruption. The control lane races inline tool
267    /// dispatch against this token, so a user barge-in is never stuck waiting
268    /// behind a slow tool.
269    pub barge_in: parking_lot::Mutex<CancellationToken>,
270    /// Latest resume handle from server.
271    pub resume_handle: parking_lot::Mutex<Option<String>>,
272    /// Last instruction sent via instruction_template (for dedup).
273    pub last_instruction: parking_lot::Mutex<Option<String>>,
274    /// Pending context buffer for deferred delivery (None when Immediate mode).
275    pub pending_context: Option<Arc<PendingContext>>,
276    /// Fast-lane delivery policy per event class.
277    pub delivery: DeliveryConfig,
278    /// Per-class counters for frames dropped under a lossy delivery policy.
279    pub dropped: DroppedFrames,
280}
281
282/// Runs the three-lane event processor.
283///
284/// Returns JoinHandles for the fast consumer and control processor tasks.
285/// The telemetry lane is spawned separately via [`spawn_telemetry_lane`].
286/// Configuration for the control plane's new capabilities.
287pub(crate) struct ControlPlaneConfig {
288    /// Soft turn detector for proactive silence awareness.
289    pub soft_turn: Option<SoftTurnDetector>,
290    /// Steering mode for phase instruction delivery.
291    pub steering_mode: SteeringMode,
292    /// When to deliver context turns to the wire.
293    /// Deferred = synchronize with user activity (speech, interruption);
294    /// Immediate = send during TurnComplete processing.
295    pub context_delivery: ContextDelivery,
296    /// Conversation repair tracker.
297    pub needs_fulfillment: Option<NeedsFulfillment>,
298    /// Session persistence backend.
299    pub persistence: Option<Arc<dyn SessionPersistence>>,
300    /// Session ID for persistence key.
301    pub session_id: Option<String>,
302    /// Whether to inject tool availability advisory on phase transitions.
303    pub tool_advisory: bool,
304    /// Shared pending context buffer for deferred delivery (None when Immediate).
305    /// Must be the same Arc given to the DeferredWriter so the control lane
306    /// can push context and the DeferredWriter can drain it.
307    pub pending_context: Option<Arc<PendingContext>>,
308    /// Middleware layers run around tool dispatch in the control lane
309    /// (`before_tool` / `after_tool` / `on_tool_error`).
310    pub middleware: Arc<crate::middleware::MiddlewareChain>,
311    /// Optional governed-flow monitor: gates tool calls, projects active-step
312    /// postures into steering, and drives repair from unmet requirements.
313    /// Shared (`Arc<Mutex<..>>`) so the [`LiveHandle`](super::handle::LiveHandle)
314    /// can snapshot `explain`/`why_blocked` while the control lane advances it.
315    /// Lock briefly; never hold the guard across an `await`.
316    pub flow: Option<crate::flow::SharedFlowMonitor>,
317    /// Fast-lane delivery (backpressure) policy per event class. Defaults to
318    /// all-`Lossless`, preserving the historical `send().await` behavior.
319    pub delivery: DeliveryConfig,
320}
321
322impl Default for ControlPlaneConfig {
323    fn default() -> Self {
324        Self {
325            soft_turn: None,
326            steering_mode: SteeringMode::default(),
327            context_delivery: ContextDelivery::default(),
328            needs_fulfillment: None,
329            persistence: None,
330            session_id: None,
331            tool_advisory: true,
332            pending_context: None,
333            middleware: Arc::new(crate::middleware::MiddlewareChain::new()),
334            flow: None,
335            delivery: DeliveryConfig::default(),
336        }
337    }
338}
339
340#[allow(
341    clippy::too_many_arguments,
342    reason = "lane spawn site: parameters are the owned subsystem handles split between the fast and control lanes"
343)]
344pub(crate) fn spawn_event_processor(
345    mut event_rx: broadcast::Receiver<SessionEvent>,
346    callbacks: Arc<EventCallbacks>,
347    dispatcher: Option<Arc<ToolDispatcher>>,
348    writer: Arc<dyn SessionWriter>,
349    extractors: Vec<Arc<dyn TurnExtractor>>,
350    state: State,
351    computed: Option<ComputedRegistry>,
352    phase_machine: Option<tokio::sync::Mutex<PhaseMachine>>,
353    watchers: Option<WatcherRegistry>,
354    temporal: Option<Arc<TemporalRegistry>>,
355    background_tracker: Option<Arc<BackgroundToolTracker>>,
356    execution_modes: std::collections::HashMap<String, super::background_tool::ToolExecutionMode>,
357    control_plane: ControlPlaneConfig,
358    live_event_tx: broadcast::Sender<LiveEvent>,
359) -> (tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>) {
360    let shared = Arc::new(SharedState {
361        interrupted: AtomicBool::new(false),
362        barge_in: parking_lot::Mutex::new(CancellationToken::new()),
363        resume_handle: parking_lot::Mutex::new(None),
364        last_instruction: parking_lot::Mutex::new(None),
365        pending_context: control_plane.pending_context.clone(),
366        delivery: control_plane.delivery,
367        dropped: DroppedFrames::default(),
368    });
369
370    let timer_cancel = CancellationToken::new();
371
372    // Channels between router and lanes.
373    //
374    // The control channel matches the fast channel at 512: control events
375    // are routed with a lossless `send().await`, so a *full* control queue
376    // blocks the shared router — and a blocked router stops forwarding audio
377    // frames too, causing playback glitches. Transcript accumulation events
378    // (one per ASR chunk) flow through this channel, so a slow control-lane
379    // consumer (e.g. a blocking turn-complete pipeline) could realistically
380    // fill 64 slots; 512 gives the lane room to fall behind transiently
381    // without starving the fast lane.
382    let (fast_tx, fast_rx) = mpsc::channel::<FastEvent>(512);
383    let (ctrl_tx, ctrl_rx) = mpsc::channel::<ControlEvent>(512);
384
385    // Spawn the router task (reads broadcast, routes to lanes)
386    // NOTE: SessionSignals is NOT called here — it runs on the telemetry lane.
387    let fast_tx_clone = fast_tx.clone();
388    let ctrl_tx_clone = ctrl_tx.clone();
389    let shared_clone = shared.clone();
390    tokio::spawn(async move {
391        loop {
392            match event_rx.recv().await {
393                Ok(event) => {
394                    // `Disconnected` is terminal in L0 (the session loop returns
395                    // after emitting it), so the router exits after routing it.
396                    // Dropping the router's lane senders closes the fast/control
397                    // channels, letting both lanes drain their queues and shut
398                    // down gracefully (final persistence drain, etc.) instead of
399                    // idling forever on a broadcast channel that never closes
400                    // while the `SessionHandle` is alive.
401                    let terminal = matches!(event, SessionEvent::Disconnected(_));
402                    route_event(event, &fast_tx_clone, &ctrl_tx_clone, &shared_clone).await;
403                    if terminal {
404                        break;
405                    }
406                }
407                Err(broadcast::error::RecvError::Lagged(n)) => {
408                    #[cfg(feature = "tracing-support")]
409                    tracing::warn!(skipped = n, "Event processor lagged, skipped events");
410                    let _ = n;
411                }
412                Err(broadcast::error::RecvError::Closed) => break,
413            }
414        }
415    });
416
417    // Spawn fast consumer (no transcript buffer — transcripts are in control lane)
418    let fast_callbacks = callbacks.clone();
419    let fast_shared = shared.clone();
420    let fast_event_tx = live_event_tx.clone();
421    let fast_handle = tokio::spawn(async move {
422        run_fast_lane(fast_rx, fast_callbacks, fast_shared, fast_event_tx).await;
423    });
424
425    // Clone for the timer task (before moving into ctrl spawn)
426    let timer_temporal = temporal.clone();
427    let timer_state = state.clone();
428    let timer_writer = writer.clone();
429
430    // Spawn control processor (owns TranscriptBuffer exclusively — no mutex needed)
431    let ctrl_callbacks = callbacks;
432    let ctrl_shared = shared;
433    let ctrl_timer_cancel = timer_cancel.clone();
434    // Weak sender handed to the control lane so background tool tasks can post
435    // completions back without keeping the channel open on shutdown (the lane
436    // upgrades it per background spawn; the channel closes once the router and
437    // all in-flight background tasks drop their strong senders).
438    let ctrl_tx_weak = ctrl_tx.downgrade();
439    let ctrl_handle = tokio::spawn(async move {
440        run_control_lane(
441            ctrl_rx,
442            ctrl_tx_weak,
443            ctrl_callbacks,
444            dispatcher,
445            writer,
446            ctrl_shared,
447            extractors,
448            state,
449            computed,
450            phase_machine,
451            watchers,
452            temporal,
453            background_tracker,
454            execution_modes,
455            control_plane,
456            live_event_tx,
457        )
458        .await;
459        ctrl_timer_cancel.cancel();
460    });
461
462    // Optional timer task for sustained temporal patterns
463    if let Some(ref temporal_ref) = timer_temporal {
464        if temporal_ref.needs_timer() {
465            let t = temporal_ref.clone();
466            let cancel = timer_cancel.clone();
467            tokio::spawn(async move {
468                let mut interval = tokio::time::interval(Duration::from_millis(500));
469                loop {
470                    tokio::select! {
471                        _ = cancel.cancelled() => break,
472                        _ = interval.tick() => {
473                            for action in t.check_all(&timer_state, None, &timer_writer) {
474                                tokio::spawn(action);
475                            }
476                        }
477                    }
478                }
479            });
480        }
481    }
482
483    (fast_handle, ctrl_handle)
484}
485
486/// Spawns the telemetry lane — processes events on its own broadcast receiver.
487///
488/// SessionSignals + SessionTelemetry run here, off the router hot path.
489/// Derived timing signals (silence_ms, elapsed_ms, remaining_budget_ms)
490/// are flushed every 100ms via debounced timer.
491pub(crate) fn spawn_telemetry_lane(
492    mut telem_rx: broadcast::Receiver<SessionEvent>,
493    signals: SessionSignals,
494    telemetry: Arc<SessionTelemetry>,
495    cancel: CancellationToken,
496    on_usage: Option<super::callbacks::UsageCallback>,
497) -> tokio::task::JoinHandle<()> {
498    tokio::spawn(async move {
499        let mut debounce = tokio::time::interval(Duration::from_millis(100));
500        // Consume the first immediate tick
501        debounce.tick().await;
502        loop {
503            tokio::select! {
504                biased;
505                result = telem_rx.recv() => {
506                    match result {
507                        Ok(event) => {
508                            // SessionTelemetry: record atomic counters
509                            match &event {
510                                SessionEvent::AudioData(data) => {
511                                    telemetry.record_audio_out(data.len());
512                                }
513                                SessionEvent::TextDelta(_) => {
514                                    telemetry.record_text_out();
515                                }
516                                SessionEvent::VoiceActivityEnd => {
517                                    telemetry.record_vad_end();
518                                }
519                                SessionEvent::Interrupted => {
520                                    telemetry.record_interruption();
521                                }
522                                SessionEvent::TurnComplete => {
523                                    telemetry.record_turn_complete();
524                                }
525                                SessionEvent::VoiceActivityStart => {
526                                    telemetry.mark_turn_start();
527                                }
528                                SessionEvent::Usage(ref usage) => {
529                                    telemetry.record_usage(
530                                        usage.total_token_count,
531                                        usage.prompt_token_count,
532                                        usage.response_token_count,
533                                        usage.cached_content_token_count,
534                                        usage.thoughts_token_count,
535                                    );
536                                    if let Some(cb) = &on_usage {
537                                        cb(usage);
538                                    }
539                                }
540                                _ => {}
541                            }
542                            // SessionSignals: update state keys + atomic timestamps
543                            signals.on_event(&event);
544                        }
545                        Err(broadcast::error::RecvError::Lagged(n)) => {
546                            #[cfg(feature = "tracing-support")]
547                            tracing::warn!(skipped = n, "Telemetry lane lagged");
548                            let _ = n;
549                        }
550                        Err(broadcast::error::RecvError::Closed) => break,
551                    }
552                }
553                _ = debounce.tick() => {
554                    // Flush derived timing signals to state (debounced)
555                    signals.flush_timing();
556                }
557                _ = cancel.cancelled() => break,
558            }
559        }
560    })
561}
562
563/// Routes a SessionEvent to the appropriate lane.
564async fn route_event(
565    event: SessionEvent,
566    fast_tx: &mpsc::Sender<FastEvent>,
567    ctrl_tx: &mpsc::Sender<ControlEvent>,
568    shared: &SharedState,
569) {
570    let delivery = &shared.delivery;
571    let dropped = &shared.dropped;
572    match event {
573        // Fast lane events
574        SessionEvent::AudioData(data) => {
575            deliver_fast(
576                fast_tx,
577                FastEvent::Audio(data),
578                delivery.audio,
579                &dropped.audio,
580            )
581            .await;
582        }
583        SessionEvent::TextDelta(text) => {
584            deliver_fast(fast_tx, FastEvent::Text(text), delivery.text, &dropped.text).await;
585        }
586        SessionEvent::TextComplete(text) => {
587            deliver_fast(
588                fast_tx,
589                FastEvent::TextComplete(text),
590                delivery.text,
591                &dropped.text,
592            )
593            .await;
594        }
595        // Transcripts: fast lane for callbacks, control lane for accumulation.
596        // The control-lane accumulation send keeps its lossless `send().await`.
597        SessionEvent::InputTranscription(text) => {
598            deliver_fast(
599                fast_tx,
600                FastEvent::InputTranscript(text.clone()),
601                delivery.transcript,
602                &dropped.transcript,
603            )
604            .await;
605            let _ = ctrl_tx.send(ControlEvent::InputTranscript(text)).await;
606        }
607        SessionEvent::OutputTranscription(text) => {
608            deliver_fast(
609                fast_tx,
610                FastEvent::OutputTranscript(text.clone()),
611                delivery.transcript,
612                &dropped.transcript,
613            )
614            .await;
615            let _ = ctrl_tx.send(ControlEvent::OutputTranscript(text)).await;
616        }
617        SessionEvent::Thought(text) => {
618            deliver_fast(
619                fast_tx,
620                FastEvent::Thought(text),
621                delivery.thought,
622                &dropped.thought,
623            )
624            .await;
625        }
626        SessionEvent::VoiceActivityStart => {
627            deliver_fast(fast_tx, FastEvent::VadStart, delivery.vad, &dropped.vad).await;
628        }
629        SessionEvent::VoiceActivityEnd => {
630            deliver_fast(fast_tx, FastEvent::VadEnd, delivery.vad, &dropped.vad).await;
631        }
632        SessionEvent::PhaseChanged(phase) => {
633            deliver_fast(
634                fast_tx,
635                FastEvent::Phase(phase),
636                delivery.phase,
637                &dropped.phase,
638            )
639            .await;
640        }
641        SessionEvent::SessionResumeUpdate(info) => {
642            *shared.resume_handle.lock() = Some(info.handle.clone());
643            let _ = ctrl_tx.send(ControlEvent::SessionResumeUpdate(info)).await;
644        }
645        SessionEvent::GenerationComplete => {
646            let _ = ctrl_tx.send(ControlEvent::GenerationComplete).await;
647        }
648
649        // Control lane events
650        SessionEvent::ToolCall(calls) => {
651            let _ = ctrl_tx.send(ControlEvent::ToolCall(calls)).await;
652        }
653        SessionEvent::ToolCallCancelled(ids) => {
654            let _ = ctrl_tx.send(ControlEvent::ToolCallCancelled(ids)).await;
655        }
656        SessionEvent::Interrupted => {
657            // Signal BOTH lanes
658            shared.interrupted.store(true, Ordering::Release);
659            // Cancel any in-flight inline tool dispatch immediately: the
660            // control lane may be blocked awaiting a slow tool and would
661            // otherwise not see this interruption until the tool finished.
662            shared.barge_in.lock().cancel();
663            let _ = fast_tx.send(FastEvent::Interrupted).await;
664            let _ = ctrl_tx.send(ControlEvent::Interrupted).await;
665        }
666        SessionEvent::TurnComplete => {
667            let _ = ctrl_tx.send(ControlEvent::TurnComplete).await;
668        }
669        // Usage metadata is handled by the telemetry lane (SessionSignals)
670        SessionEvent::Usage(_) => {}
671        SessionEvent::GoAway(time_left) => {
672            let _ = ctrl_tx.send(ControlEvent::GoAway(time_left)).await;
673        }
674        SessionEvent::Connected => {
675            let _ = ctrl_tx.send(ControlEvent::Connected).await;
676        }
677        SessionEvent::Disconnected(reason) => {
678            let _ = ctrl_tx.send(ControlEvent::Disconnected(reason)).await;
679        }
680        SessionEvent::Error(err) => {
681            let _ = ctrl_tx.send(ControlEvent::Error(err)).await;
682        }
683        // SessionEvent is #[non_exhaustive]: future wire events the runtime
684        // doesn't understand yet are surfaced (not silently dropped) so
685        // applications on an older runtime can observe them.
686        other => {
687            #[cfg(feature = "tracing-support")]
688            tracing::debug!(?other, "unhandled SessionEvent variant (newer wire event?)");
689            #[cfg(not(feature = "tracing-support"))]
690            let _ = other;
691        }
692    }
693}
694
695/// Fast lane consumer — processes high-frequency events with sync callbacks.
696/// No transcript buffer — transcripts are accumulated exclusively in the control lane.
697async fn run_fast_lane(
698    mut rx: mpsc::Receiver<FastEvent>,
699    callbacks: Arc<EventCallbacks>,
700    shared: Arc<SharedState>,
701    event_tx: broadcast::Sender<LiveEvent>,
702) {
703    while let Some(event) = rx.recv().await {
704        match event {
705            FastEvent::Audio(data) => {
706                // Suppress audio during interruption
707                if !shared.interrupted.load(Ordering::Acquire) {
708                    if let Some(cb) = &callbacks.on_audio {
709                        cb(&data);
710                    }
711                    let _ = event_tx.send(LiveEvent::Audio(data));
712                }
713            }
714            FastEvent::Text(delta) => {
715                if let Some(cb) = &callbacks.on_text {
716                    cb(&delta);
717                }
718                let _ = event_tx.send(LiveEvent::TextDelta(delta));
719            }
720            FastEvent::TextComplete(text) => {
721                if let Some(cb) = &callbacks.on_text_complete {
722                    cb(&text);
723                }
724                let _ = event_tx.send(LiveEvent::TextComplete(text));
725            }
726            FastEvent::InputTranscript(text) => {
727                // Callback only — accumulation happens in control lane
728                if let Some(cb) = &callbacks.on_input_transcript {
729                    cb(&text, false);
730                }
731                let _ = event_tx.send(LiveEvent::InputTranscript {
732                    text,
733                    is_final: false,
734                });
735            }
736            FastEvent::OutputTranscript(text) => {
737                // Callback only — accumulation happens in control lane
738                if let Some(cb) = &callbacks.on_output_transcript {
739                    cb(&text, false);
740                }
741                let _ = event_tx.send(LiveEvent::OutputTranscript {
742                    text,
743                    is_final: false,
744                });
745            }
746            FastEvent::Thought(text) => {
747                if let Some(cb) = &callbacks.on_thought {
748                    cb(&text);
749                }
750                let _ = event_tx.send(LiveEvent::Thought(text));
751            }
752            FastEvent::VadStart => {
753                if let Some(cb) = &callbacks.on_vad_start {
754                    cb();
755                }
756                let _ = event_tx.send(LiveEvent::VadStart);
757            }
758            FastEvent::VadEnd => {
759                if let Some(cb) = &callbacks.on_vad_end {
760                    cb();
761                }
762                let _ = event_tx.send(LiveEvent::VadEnd);
763            }
764            FastEvent::Phase(phase) => {
765                if let Some(cb) = &callbacks.on_phase {
766                    cb(phase);
767                }
768                // Phase is L0-level wire event, not emitted as LiveEvent
769            }
770            FastEvent::Interrupted => {
771                // Audio already suppressed via shared.interrupted flag
772                // Interrupted LiveEvent is emitted from control lane
773            }
774        }
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781    use std::sync::atomic::AtomicUsize;
782
783    use crate::live::events::LiveEvent;
784    use gemini_genai_rs::prelude::FunctionResponse;
785
786    fn dummy_event_tx() -> broadcast::Sender<LiveEvent> {
787        broadcast::channel::<LiveEvent>(16).0
788    }
789
790    #[test]
791    fn delivery_config_default_is_all_lossless() {
792        let cfg = DeliveryConfig::default();
793        assert_eq!(cfg.audio, Delivery::Lossless);
794        assert_eq!(cfg.text, Delivery::Lossless);
795        assert_eq!(cfg.transcript, Delivery::Lossless);
796        assert_eq!(cfg.thought, Delivery::Lossless);
797        assert_eq!(cfg.vad, Delivery::Lossless);
798        assert_eq!(cfg.phase, Delivery::Lossless);
799        // The standalone Delivery default must also be Lossless.
800        assert_eq!(Delivery::default(), Delivery::Lossless);
801    }
802
803    #[tokio::test]
804    async fn lossy_drop_newest_does_not_block_and_counts_drops() {
805        // Capacity-1 channel that we fill, so the next send would block under
806        // Lossless. The receiver is held but never drains.
807        let (tx, _rx) = mpsc::channel::<FastEvent>(1);
808        tx.send(FastEvent::VadStart).await.unwrap(); // channel now full
809        let dropped = AtomicU64::new(0);
810
811        // Under LossyDropNewest this must return immediately (not block) and
812        // bump the counter. We bound it with a timeout to prove non-blocking.
813        let res = tokio::time::timeout(
814            Duration::from_millis(100),
815            deliver_fast(&tx, FastEvent::VadEnd, Delivery::LossyDropNewest, &dropped),
816        )
817        .await;
818        assert!(res.is_ok(), "deliver_fast blocked under LossyDropNewest");
819        assert_eq!(dropped.load(Ordering::Relaxed), 1);
820    }
821
822    #[tokio::test]
823    async fn lossless_delivers_on_non_full_channel() {
824        let (tx, mut rx) = mpsc::channel::<FastEvent>(4);
825        let dropped = AtomicU64::new(0);
826
827        deliver_fast(
828            &tx,
829            FastEvent::Text("hello".into()),
830            Delivery::Lossless,
831            &dropped,
832        )
833        .await;
834
835        // No drop, and the value arrives on the receiver.
836        assert_eq!(dropped.load(Ordering::Relaxed), 0);
837        match rx.recv().await {
838            Some(FastEvent::Text(s)) => assert_eq!(s, "hello"),
839            other => panic!("expected Text frame, got {:?}", other.is_some()),
840        }
841    }
842
843    #[test]
844    fn dropped_frames_total_sums_classes() {
845        let d = DroppedFrames::default();
846        d.audio.fetch_add(2, Ordering::Relaxed);
847        d.transcript.fetch_add(3, Ordering::Relaxed);
848        assert_eq!(d.total(), 5);
849    }
850
851    #[tokio::test]
852    async fn fast_lane_routes_audio() {
853        let count = Arc::new(AtomicUsize::new(0));
854        let count_clone = count.clone();
855
856        let callbacks = EventCallbacks {
857            on_audio: Some(Box::new(move |_| {
858                count_clone.fetch_add(1, Ordering::SeqCst);
859            })),
860            ..Default::default()
861        };
862        let callbacks = Arc::new(callbacks);
863
864        let (event_tx, _) = broadcast::channel(16);
865        let event_rx = event_tx.subscribe();
866
867        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
868
869        let (fast_handle, ctrl_handle) = spawn_event_processor(
870            event_rx,
871            callbacks,
872            None,
873            writer,
874            vec![],
875            State::new(),
876            None,
877            None,
878            None,
879            None,
880            None,
881            std::collections::HashMap::new(),
882            ControlPlaneConfig::default(),
883            dummy_event_tx(),
884        );
885
886        // Send audio events
887        let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"audio1")));
888        let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"audio2")));
889
890        // Allow tasks to process
891        tokio::time::sleep(Duration::from_millis(50)).await;
892
893        assert_eq!(count.load(Ordering::SeqCst), 2);
894
895        // Cleanup
896        drop(event_tx);
897        let _ = fast_handle.await;
898        let _ = ctrl_handle.await;
899    }
900
901    #[tokio::test]
902    async fn interrupt_suppresses_audio() {
903        let count = Arc::new(AtomicUsize::new(0));
904        let count_clone = count.clone();
905
906        let callbacks = EventCallbacks {
907            on_audio: Some(Box::new(move |_| {
908                count_clone.fetch_add(1, Ordering::SeqCst);
909            })),
910            ..Default::default()
911        };
912        let callbacks = Arc::new(callbacks);
913
914        let (event_tx, _) = broadcast::channel(16);
915        let event_rx = event_tx.subscribe();
916
917        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
918
919        let (fast_handle, ctrl_handle) = spawn_event_processor(
920            event_rx,
921            callbacks,
922            None,
923            writer,
924            vec![],
925            State::new(),
926            None,
927            None,
928            None,
929            None,
930            None,
931            std::collections::HashMap::new(),
932            ControlPlaneConfig::default(),
933            dummy_event_tx(),
934        );
935
936        // Send audio, then interrupt, then more audio
937        let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"before")));
938        tokio::time::sleep(Duration::from_millis(20)).await;
939        let _ = event_tx.send(SessionEvent::Interrupted);
940        tokio::time::sleep(Duration::from_millis(20)).await;
941        let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"during")));
942        tokio::time::sleep(Duration::from_millis(50)).await;
943
944        // At least the first audio was received
945        assert!(count.load(Ordering::SeqCst) >= 1);
946
947        drop(event_tx);
948        let _ = fast_handle.await;
949        let _ = ctrl_handle.await;
950    }
951
952    #[tokio::test]
953    async fn control_lane_routes_turn_complete() {
954        let called = Arc::new(AtomicBool::new(false));
955        let called_clone = called.clone();
956
957        let callbacks = EventCallbacks {
958            on_turn_complete: Some(Arc::new(move || {
959                let c = called_clone.clone();
960                Box::pin(async move {
961                    c.store(true, Ordering::SeqCst);
962                })
963            })),
964            ..Default::default()
965        };
966        let callbacks = Arc::new(callbacks);
967
968        let (event_tx, _) = broadcast::channel(16);
969        let event_rx = event_tx.subscribe();
970
971        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
972
973        let (fast_handle, ctrl_handle) = spawn_event_processor(
974            event_rx,
975            callbacks,
976            None,
977            writer,
978            vec![],
979            State::new(),
980            None,
981            None,
982            None,
983            None,
984            None,
985            std::collections::HashMap::new(),
986            ControlPlaneConfig::default(),
987            dummy_event_tx(),
988        );
989
990        let _ = event_tx.send(SessionEvent::TurnComplete);
991        tokio::time::sleep(Duration::from_millis(50)).await;
992
993        assert!(called.load(Ordering::SeqCst));
994
995        drop(event_tx);
996        let _ = fast_handle.await;
997        let _ = ctrl_handle.await;
998    }
999
1000    #[tokio::test]
1001    async fn transcript_accumulates_in_control_lane() {
1002        let callbacks = Arc::new(EventCallbacks::default());
1003
1004        let (event_tx, _) = broadcast::channel(16);
1005        let event_rx = event_tx.subscribe();
1006
1007        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1008
1009        let state = State::new();
1010        let (fast_handle, ctrl_handle) = spawn_event_processor(
1011            event_rx,
1012            callbacks,
1013            None,
1014            writer,
1015            vec![],
1016            state.clone(),
1017            None,
1018            None,
1019            None,
1020            None,
1021            None,
1022            std::collections::HashMap::new(),
1023            ControlPlaneConfig::default(),
1024            dummy_event_tx(),
1025        );
1026
1027        // Send transcripts
1028        let _ = event_tx.send(SessionEvent::InputTranscription("Hello ".to_string()));
1029        let _ = event_tx.send(SessionEvent::InputTranscription("world".to_string()));
1030        let _ = event_tx.send(SessionEvent::OutputTranscription("Hi there!".to_string()));
1031        tokio::time::sleep(Duration::from_millis(50)).await;
1032
1033        // End turn
1034        let _ = event_tx.send(SessionEvent::TurnComplete);
1035        tokio::time::sleep(Duration::from_millis(50)).await;
1036
1037        // Turn count should have been incremented
1038        let tc: u32 = state.session().get("turn_count").unwrap_or(0);
1039        assert_eq!(tc, 1);
1040
1041        drop(event_tx);
1042        let _ = fast_handle.await;
1043        let _ = ctrl_handle.await;
1044    }
1045
1046    #[tokio::test]
1047    async fn extractor_runs_on_turn_complete() {
1048        use crate::live::extractor::TurnExtractor;
1049        use crate::live::transcript::TranscriptTurn;
1050        use crate::llm::LlmError;
1051
1052        struct FixedExtractor;
1053
1054        #[async_trait::async_trait]
1055        impl TurnExtractor for FixedExtractor {
1056            fn name(&self) -> &str {
1057                "TestExtractor"
1058            }
1059            fn window_size(&self) -> usize {
1060                3
1061            }
1062            async fn extract(
1063                &self,
1064                _turns: &[TranscriptTurn],
1065            ) -> Result<serde_json::Value, LlmError> {
1066                Ok(serde_json::json!({"score": 0.9, "mood": "happy"}))
1067            }
1068        }
1069
1070        let callbacks = Arc::new(EventCallbacks::default());
1071
1072        let (event_tx, _) = broadcast::channel(16);
1073        let event_rx = event_tx.subscribe();
1074
1075        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1076
1077        let state = State::new();
1078
1079        let extractors: Vec<Arc<dyn TurnExtractor>> = vec![Arc::new(FixedExtractor)];
1080
1081        let (fast_handle, ctrl_handle) = spawn_event_processor(
1082            event_rx,
1083            callbacks,
1084            None,
1085            writer,
1086            extractors,
1087            state.clone(),
1088            None,
1089            None,
1090            None,
1091            None,
1092            None,
1093            std::collections::HashMap::new(),
1094            ControlPlaneConfig::default(),
1095            dummy_event_tx(),
1096        );
1097
1098        // Produce a turn with content
1099        let _ = event_tx.send(SessionEvent::InputTranscription("hi".to_string()));
1100        tokio::time::sleep(Duration::from_millis(20)).await;
1101        let _ = event_tx.send(SessionEvent::TurnComplete);
1102        tokio::time::sleep(Duration::from_millis(100)).await;
1103
1104        // Check extraction results
1105        let score: Option<f64> = state.get("score");
1106        assert_eq!(score, Some(0.9));
1107        let mood: Option<String> = state.get("mood");
1108        assert_eq!(mood, Some("happy".to_string()));
1109
1110        drop(event_tx);
1111        let _ = fast_handle.await;
1112        let _ = ctrl_handle.await;
1113    }
1114
1115    #[tokio::test]
1116    async fn telemetry_lane_auto_collects() {
1117        let (event_tx, _) = broadcast::channel(16);
1118        let telem_rx = event_tx.subscribe();
1119
1120        let telemetry = Arc::new(SessionTelemetry::new());
1121        let signals = SessionSignals::new(State::new());
1122        let cancel = CancellationToken::new();
1123
1124        let telem_handle =
1125            spawn_telemetry_lane(telem_rx, signals, telemetry.clone(), cancel.clone(), None);
1126
1127        // Send events
1128        let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"chunk1")));
1129        let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"chunk2")));
1130        let _ = event_tx.send(SessionEvent::VoiceActivityEnd);
1131        tokio::time::sleep(Duration::from_millis(50)).await;
1132        let _ = event_tx.send(SessionEvent::AudioData(Bytes::from_static(b"response")));
1133        tokio::time::sleep(Duration::from_millis(50)).await;
1134
1135        let snap = telemetry.snapshot();
1136        assert_eq!(snap["audio_chunks_out"], 3);
1137        assert!(snap["response_count"].as_u64().unwrap() >= 1);
1138
1139        cancel.cancel();
1140        let _ = telem_handle.await;
1141    }
1142
1143    #[tokio::test]
1144    async fn background_tool_sends_ack_immediately() {
1145        use crate::live::background_tool::{BackgroundToolTracker, ToolExecutionMode};
1146        use crate::tool::{SimpleTool, ToolDispatcher};
1147
1148        // Create a slow tool
1149        let tool = SimpleTool::new(
1150            "slow_search",
1151            "A slow search tool",
1152            Some(serde_json::json!({"type": "object", "properties": {"q": {"type": "string"}}})),
1153            |_args| async move {
1154                tokio::time::sleep(Duration::from_millis(200)).await;
1155                Ok(serde_json::json!({"results": ["found"]}))
1156            },
1157        );
1158
1159        let mut dispatcher = ToolDispatcher::new();
1160        dispatcher.register(tool);
1161
1162        let mut execution_modes = std::collections::HashMap::new();
1163        execution_modes.insert(
1164            "slow_search".to_string(),
1165            ToolExecutionMode::Background {
1166                formatter: None,
1167                scheduling: None,
1168            },
1169        );
1170
1171        let sent = Arc::new(parking_lot::Mutex::new(Vec::<Vec<FunctionResponse>>::new()));
1172        let sent_clone = sent.clone();
1173
1174        // Use a writer that records sent tool responses
1175        struct RecordingWriter {
1176            sent: Arc<parking_lot::Mutex<Vec<Vec<FunctionResponse>>>>,
1177        }
1178
1179        #[async_trait::async_trait]
1180        impl SessionWriter for RecordingWriter {
1181            async fn send_audio(
1182                &self,
1183                _data: Vec<u8>,
1184            ) -> Result<(), gemini_genai_rs::session::SessionError> {
1185                Ok(())
1186            }
1187            async fn send_text(
1188                &self,
1189                _text: String,
1190            ) -> Result<(), gemini_genai_rs::session::SessionError> {
1191                Ok(())
1192            }
1193            async fn send_video(
1194                &self,
1195                _data: Vec<u8>,
1196            ) -> Result<(), gemini_genai_rs::session::SessionError> {
1197                Ok(())
1198            }
1199            async fn send_tool_response(
1200                &self,
1201                responses: Vec<FunctionResponse>,
1202            ) -> Result<(), gemini_genai_rs::session::SessionError> {
1203                self.sent.lock().push(responses);
1204                Ok(())
1205            }
1206            async fn update_instruction(
1207                &self,
1208                _instruction: String,
1209            ) -> Result<(), gemini_genai_rs::session::SessionError> {
1210                Ok(())
1211            }
1212            async fn send_client_content(
1213                &self,
1214                _content: Vec<gemini_genai_rs::prelude::Content>,
1215                _turn_complete: bool,
1216            ) -> Result<(), gemini_genai_rs::session::SessionError> {
1217                Ok(())
1218            }
1219            async fn signal_activity_start(
1220                &self,
1221            ) -> Result<(), gemini_genai_rs::session::SessionError> {
1222                Ok(())
1223            }
1224            async fn signal_activity_end(
1225                &self,
1226            ) -> Result<(), gemini_genai_rs::session::SessionError> {
1227                Ok(())
1228            }
1229            async fn disconnect(&self) -> Result<(), gemini_genai_rs::session::SessionError> {
1230                Ok(())
1231            }
1232        }
1233
1234        let writer: Arc<dyn SessionWriter> = Arc::new(RecordingWriter { sent: sent_clone });
1235        let callbacks = Arc::new(EventCallbacks::default());
1236        let tracker = Arc::new(BackgroundToolTracker::new());
1237
1238        let (event_tx, _) = broadcast::channel(16);
1239        let event_rx = event_tx.subscribe();
1240
1241        let (fast_handle, ctrl_handle) = spawn_event_processor(
1242            event_rx,
1243            callbacks,
1244            Some(Arc::new(dispatcher)),
1245            writer,
1246            vec![],
1247            State::new(),
1248            None,
1249            None,
1250            None,
1251            None,
1252            Some(tracker.clone()),
1253            execution_modes,
1254            ControlPlaneConfig::default(),
1255            dummy_event_tx(),
1256        );
1257
1258        // Send a tool call
1259        let _ = event_tx.send(SessionEvent::ToolCall(vec![
1260            gemini_genai_rs::prelude::FunctionCall {
1261                name: "slow_search".to_string(),
1262                args: serde_json::json!({"q": "test"}),
1263                id: Some("fc_1".to_string()),
1264            },
1265        ]));
1266
1267        // Wait just enough for the ack (but not the full tool)
1268        tokio::time::sleep(Duration::from_millis(50)).await;
1269
1270        // Scope the guard so it is never held across an await point.
1271        {
1272            let responses = sent.lock();
1273            // First batch should be the ack
1274            assert!(!responses.is_empty(), "Should have sent ack immediately");
1275            assert_eq!(responses[0][0].response["status"], "running");
1276        }
1277
1278        // Wait for background tool to complete
1279        tokio::time::sleep(Duration::from_millis(300)).await;
1280
1281        {
1282            let responses = sent.lock();
1283            // Second batch should be the completed result
1284            assert!(
1285                responses.len() >= 2,
1286                "Should have sent result after completion"
1287            );
1288            assert_eq!(responses[1][0].response["status"], "completed");
1289        }
1290
1291        drop(event_tx);
1292        let _ = fast_handle.await;
1293        let _ = ctrl_handle.await;
1294    }
1295
1296    #[tokio::test]
1297    async fn callback_mode_blocking_awaits_inline() {
1298        use crate::live::callbacks::CallbackMode;
1299        use std::sync::atomic::AtomicU32;
1300
1301        let order = Arc::new(AtomicU32::new(0));
1302        let order_clone = order.clone();
1303
1304        let callbacks = EventCallbacks {
1305            // Blocking on_turn_complete sets order to 1
1306            on_turn_complete: Some(Arc::new(move || {
1307                let o = order_clone.clone();
1308                Box::pin(async move {
1309                    // Simulate brief work
1310                    tokio::time::sleep(Duration::from_millis(10)).await;
1311                    o.store(1, Ordering::SeqCst);
1312                })
1313            })),
1314            on_turn_complete_mode: CallbackMode::Blocking,
1315            ..Default::default()
1316        };
1317        let callbacks = Arc::new(callbacks);
1318
1319        let (event_tx, _) = broadcast::channel(16);
1320        let event_rx = event_tx.subscribe();
1321
1322        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1323
1324        let (fast_handle, ctrl_handle) = spawn_event_processor(
1325            event_rx,
1326            callbacks,
1327            None,
1328            writer,
1329            vec![],
1330            State::new(),
1331            None,
1332            None,
1333            None,
1334            None,
1335            None,
1336            std::collections::HashMap::new(),
1337            ControlPlaneConfig::default(),
1338            dummy_event_tx(),
1339        );
1340
1341        let _ = event_tx.send(SessionEvent::TurnComplete);
1342        tokio::time::sleep(Duration::from_millis(100)).await;
1343
1344        // Blocking mode: callback completed before control lane processed next event
1345        assert_eq!(order.load(Ordering::SeqCst), 1);
1346
1347        drop(event_tx);
1348        let _ = fast_handle.await;
1349        let _ = ctrl_handle.await;
1350    }
1351
1352    #[tokio::test]
1353    async fn interruption_beats_slow_inline_tool() {
1354        use crate::tool::{SimpleTool, ToolDispatcher};
1355
1356        // A slow inline tool that blocks the control lane for 5s.
1357        let mut dispatcher = ToolDispatcher::new();
1358        dispatcher.register(SimpleTool::new("slow", "slow", None, |_args| async move {
1359            tokio::time::sleep(Duration::from_secs(5)).await;
1360            Ok(serde_json::json!({"done": true}))
1361        }));
1362
1363        let interrupted_at = Arc::new(parking_lot::Mutex::new(None::<std::time::Instant>));
1364        let flag = interrupted_at.clone();
1365        let callbacks = EventCallbacks {
1366            on_interrupted: Some(Arc::new(move || {
1367                let flag = flag.clone();
1368                Box::pin(async move {
1369                    *flag.lock() = Some(std::time::Instant::now());
1370                })
1371            })),
1372            ..Default::default()
1373        };
1374
1375        let (event_tx, _) = broadcast::channel(16);
1376        let event_rx = event_tx.subscribe();
1377        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1378
1379        let (fast_handle, ctrl_handle) = spawn_event_processor(
1380            event_rx,
1381            Arc::new(callbacks),
1382            Some(Arc::new(dispatcher)),
1383            writer,
1384            vec![],
1385            State::new(),
1386            None,
1387            None,
1388            None,
1389            None,
1390            None,
1391            std::collections::HashMap::new(),
1392            ControlPlaneConfig::default(),
1393            dummy_event_tx(),
1394        );
1395
1396        // Tool call starts the 5s dispatch, then the user barges in.
1397        let start = std::time::Instant::now();
1398        let _ = event_tx.send(SessionEvent::ToolCall(vec![
1399            gemini_genai_rs::prelude::FunctionCall {
1400                name: "slow".to_string(),
1401                args: serde_json::json!({}),
1402                id: Some("fc_slow".to_string()),
1403            },
1404        ]));
1405        tokio::time::sleep(Duration::from_millis(50)).await;
1406        let _ = event_tx.send(SessionEvent::Interrupted);
1407
1408        // The interruption must be processed long before the tool's 5s —
1409        // before the fix it queued behind the blocking dispatch.
1410        let mut waited = Duration::ZERO;
1411        while interrupted_at.lock().is_none() && waited < Duration::from_secs(2) {
1412            tokio::time::sleep(Duration::from_millis(25)).await;
1413            waited += Duration::from_millis(25);
1414        }
1415        let fired = (*interrupted_at.lock()).expect("on_interrupted must fire");
1416        assert!(
1417            fired.duration_since(start) < Duration::from_secs(2),
1418            "interruption must not wait for the slow tool"
1419        );
1420
1421        drop(event_tx);
1422        let _ = fast_handle.await;
1423        let _ = ctrl_handle.await;
1424    }
1425
1426    #[tokio::test]
1427    async fn control_lane_exit_persists_final_snapshot_synchronously() {
1428        use crate::live::persistence::{MemoryPersistence, SessionPersistence};
1429
1430        let persistence = Arc::new(MemoryPersistence::new());
1431        let control_plane = ControlPlaneConfig {
1432            persistence: Some(persistence.clone()),
1433            session_id: Some("final-drain".to_string()),
1434            ..Default::default()
1435        };
1436
1437        let (event_tx, _) = broadcast::channel(16);
1438        let event_rx = event_tx.subscribe();
1439        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1440
1441        let (fast_handle, ctrl_handle) = spawn_event_processor(
1442            event_rx,
1443            Arc::new(EventCallbacks::default()),
1444            None,
1445            writer,
1446            vec![],
1447            State::new(),
1448            None,
1449            None,
1450            None,
1451            None,
1452            None,
1453            std::collections::HashMap::new(),
1454            control_plane,
1455            dummy_event_tx(),
1456        );
1457
1458        // Accumulate state mid-turn — but never reach a TurnComplete, so the
1459        // per-turn (spawn-and-forget) save never fires.
1460        let _ = event_tx.send(SessionEvent::InputTranscription("last words".to_string()));
1461        tokio::time::sleep(Duration::from_millis(50)).await;
1462
1463        // Session ends. The control lane must run a final synchronous save on
1464        // exit; before the fix nothing was ever persisted in this scenario.
1465        drop(event_tx);
1466        let _ = fast_handle.await;
1467        let _ = ctrl_handle.await;
1468
1469        let snap = persistence
1470            .load("final-drain")
1471            .await
1472            .unwrap()
1473            .expect("control-lane exit must persist a final snapshot");
1474        assert_eq!(snap.turn_count, 0);
1475    }
1476
1477    #[tokio::test]
1478    async fn lanes_exit_after_terminal_disconnected_event() {
1479        // The Disconnected event is terminal in L0; the router must exit after
1480        // routing it (dropping its lane senders) so the lanes can drain and
1481        // shut down gracefully — even though the broadcast sender stays alive
1482        // for the LiveHandle's lifetime.
1483        let callbacks = Arc::new(EventCallbacks::default());
1484        let (event_tx, _) = broadcast::channel(16);
1485        let event_rx = event_tx.subscribe();
1486        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1487
1488        let (fast_handle, ctrl_handle) = spawn_event_processor(
1489            event_rx,
1490            callbacks,
1491            None,
1492            writer,
1493            vec![],
1494            State::new(),
1495            None,
1496            None,
1497            None,
1498            None,
1499            None,
1500            std::collections::HashMap::new(),
1501            ControlPlaneConfig::default(),
1502            dummy_event_tx(),
1503        );
1504
1505        let _ = event_tx.send(SessionEvent::Disconnected(None));
1506
1507        // NOTE: event_tx is intentionally kept alive — before the fix the
1508        // router only exited on channel close, and both awaits below hung.
1509        let joined = tokio::time::timeout(Duration::from_secs(2), async {
1510            let _ = fast_handle.await;
1511            let _ = ctrl_handle.await;
1512        })
1513        .await;
1514        assert!(
1515            joined.is_ok(),
1516            "lanes must exit after the terminal Disconnected event"
1517        );
1518        drop(event_tx);
1519    }
1520
1521    #[tokio::test]
1522    async fn callback_mode_concurrent_spawns_task() {
1523        use crate::live::callbacks::CallbackMode;
1524
1525        let called = Arc::new(AtomicBool::new(false));
1526        let called_clone = called.clone();
1527
1528        let callbacks = EventCallbacks {
1529            on_turn_complete: Some(Arc::new(move || {
1530                let c = called_clone.clone();
1531                Box::pin(async move {
1532                    tokio::time::sleep(Duration::from_millis(10)).await;
1533                    c.store(true, Ordering::SeqCst);
1534                })
1535            })),
1536            on_turn_complete_mode: CallbackMode::Concurrent,
1537            ..Default::default()
1538        };
1539        let callbacks = Arc::new(callbacks);
1540
1541        let (event_tx, _) = broadcast::channel(16);
1542        let event_rx = event_tx.subscribe();
1543
1544        let writer: Arc<dyn SessionWriter> = Arc::new(crate::agent_session::NoOpSessionWriter);
1545
1546        let (fast_handle, ctrl_handle) = spawn_event_processor(
1547            event_rx,
1548            callbacks,
1549            None,
1550            writer,
1551            vec![],
1552            State::new(),
1553            None,
1554            None,
1555            None,
1556            None,
1557            None,
1558            std::collections::HashMap::new(),
1559            ControlPlaneConfig::default(),
1560            dummy_event_tx(),
1561        );
1562
1563        let _ = event_tx.send(SessionEvent::TurnComplete);
1564        // Give spawned task time to complete
1565        tokio::time::sleep(Duration::from_millis(100)).await;
1566
1567        // Concurrent mode: callback was spawned and eventually completed
1568        assert!(called.load(Ordering::SeqCst));
1569
1570        drop(event_tx);
1571        let _ = fast_handle.await;
1572        let _ = ctrl_handle.await;
1573    }
1574}