gemini_adk_rs/live/
handle.rs

1//! LiveHandle — runtime interaction with a Live session.
2
3use std::sync::Arc;
4
5use gemini_genai_rs::prelude::{FunctionResponse, SessionEvent, SessionPhase, VadEvent};
6use gemini_genai_rs::session::{SessionError, SessionHandle, SessionWriter};
7use parking_lot::Mutex;
8use serde::de::DeserializeOwned;
9use tokio::sync::broadcast;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::flow::{FlowExplanation, SharedFlowMonitor};
14use crate::state::State;
15
16use super::background_tool::BackgroundToolTracker;
17use super::context_writer::PendingContext;
18use super::effect_executor::LiveEffectExecutor;
19use super::input_vad::{BackendInputVad, BackendVadSnapshot};
20use super::reactor::{LiveReactor, ReactorEvent, VoiceRuntimeState};
21use super::telemetry::SessionTelemetry;
22
23/// Handle for interacting with a running Live session.
24///
25/// Provides send methods for audio/text/video, system instruction updates,
26/// event subscription, state access, telemetry, and graceful shutdown.
27///
28/// When [`ContextDelivery::Deferred`](super::steering::ContextDelivery::Deferred) is
29/// enabled, `send_audio`, `send_text`, and `send_video` automatically flush
30/// any pending context turns before forwarding the user content.
31#[derive(Clone)]
32pub struct LiveHandle {
33    session: SessionHandle,
34    /// Writer used for user-facing sends.  When deferred context delivery is
35    /// enabled, this is a `DeferredWriter` that flushes pending context.
36    /// Otherwise it's the raw `SessionHandle`.
37    writer: Arc<dyn SessionWriter>,
38    /// Fast-lane task. Held in `Arc<Mutex<Option<..>>>` so `LiveHandle` stays
39    /// `Clone` while [`disconnect`](Self::disconnect) can take ownership to
40    /// grace-await and then abort the lane.
41    fast_task: Arc<Mutex<Option<JoinHandle<()>>>>,
42    /// Control-lane task (same ownership scheme as `fast_task`).
43    ctrl_task: Arc<Mutex<Option<JoinHandle<()>>>>,
44    /// Cancellation token for the telemetry lane, cancelled on disconnect.
45    telem_cancel: CancellationToken,
46    state: State,
47    telemetry: Arc<SessionTelemetry>,
48    event_tx: broadcast::Sender<super::events::LiveEvent>,
49    pending_context: Option<Arc<PendingContext>>,
50    reactor: Arc<LiveReactor>,
51    effect_executor: LiveEffectExecutor,
52    input_vad: Arc<Mutex<BackendInputVad>>,
53    /// Governed-flow monitor shared with the control lane (None when the
54    /// session is not governed by a flow).
55    flow: Option<SharedFlowMonitor>,
56    /// Tracker for in-flight background tool tasks. Shared with the control
57    /// lane (which spawns/cancels per-call tasks) so [`disconnect`](Self::disconnect)
58    /// can cancel every outstanding background tool — otherwise orphaned tasks
59    /// could keep running and post stale `ToolCompleted` events after shutdown.
60    background_tracker: Arc<BackgroundToolTracker>,
61}
62
63impl LiveHandle {
64    #[allow(
65        clippy::too_many_arguments,
66        reason = "crate-internal constructor called once from spawn_lanes; the runtime parts are deliberately enumerated rather than re-bundled"
67    )]
68    pub(crate) fn new(
69        session: SessionHandle,
70        writer: Arc<dyn SessionWriter>,
71        fast_task: JoinHandle<()>,
72        ctrl_task: JoinHandle<()>,
73        state: State,
74        telemetry: Arc<SessionTelemetry>,
75        event_tx: broadcast::Sender<super::events::LiveEvent>,
76        pending_context: Option<Arc<PendingContext>>,
77        flow: Option<SharedFlowMonitor>,
78        background_tracker: Arc<BackgroundToolTracker>,
79        telem_cancel: CancellationToken,
80    ) -> Self {
81        let reactor = Arc::new(LiveReactor::voice_defaults());
82        let effect_executor = LiveEffectExecutor::new(
83            Arc::new(session.clone()),
84            pending_context.clone(),
85            event_tx.clone(),
86        );
87
88        Self {
89            session,
90            writer,
91            fast_task: Arc::new(Mutex::new(Some(fast_task))),
92            ctrl_task: Arc::new(Mutex::new(Some(ctrl_task))),
93            telem_cancel,
94            state,
95            telemetry,
96            event_tx,
97            pending_context,
98            reactor,
99            effect_executor,
100            input_vad: Arc::new(Mutex::new(BackendInputVad::default())),
101            flow,
102            background_tracker,
103        }
104    }
105
106    /// Send audio data (raw PCM16 16kHz bytes).
107    ///
108    /// When deferred context delivery is enabled, any pending model-role
109    /// context turns are flushed to the wire before the audio frame.
110    pub async fn send_audio(&self, data: Vec<u8>) -> Result<(), SessionError> {
111        let vad_events = {
112            let mut input_vad = self.input_vad.lock();
113            input_vad.process_pcm_bytes(&data)
114        };
115
116        if vad_events.contains(&VadEvent::SpeechStart) {
117            self.user_speech_started().await?;
118        }
119
120        self.writer.send_audio(data).await?;
121
122        if vad_events.contains(&VadEvent::SpeechEnd) {
123            self.user_speech_ended().await?;
124        }
125
126        Ok(())
127    }
128
129    /// Send a text message.
130    ///
131    /// When deferred context delivery is enabled, any pending model-role
132    /// context turns are flushed to the wire before the text message.
133    pub async fn send_text(&self, text: impl Into<String>) -> Result<(), SessionError> {
134        self.telemetry.record_text_send();
135        self.writer.send_text(text.into()).await
136    }
137
138    /// Send a video/image frame (raw JPEG bytes).
139    ///
140    /// When deferred context delivery is enabled, any pending model-role
141    /// context turns are flushed to the wire before the video frame.
142    pub async fn send_video(&self, jpeg_data: Vec<u8>) -> Result<(), SessionError> {
143        self.writer.send_video(jpeg_data).await
144    }
145
146    /// Update the system instruction mid-session.
147    pub async fn update_instruction(
148        &self,
149        instruction: impl Into<String>,
150    ) -> Result<(), SessionError> {
151        SessionWriter::update_instruction(&self.session, instruction.into()).await
152    }
153
154    /// Send tool responses manually (if not using auto-dispatch).
155    pub async fn send_tool_response(
156        &self,
157        responses: Vec<FunctionResponse>,
158    ) -> Result<(), SessionError> {
159        self.session.send_tool_response(responses).await
160    }
161
162    /// Notify the runtime that client-side playback has drained.
163    ///
164    /// Voice UIs should call this only when it is safe for the model to speak,
165    /// for example after browser speaker playback has drained and the user is
166    /// not actively speaking. User audio/text sends intentionally flush context
167    /// only and leave the prompt armed.
168    pub async fn playback_drained(&self) -> Result<(), SessionError> {
169        let prompt_pending = self
170            .pending_context
171            .as_ref()
172            .is_some_and(|pending| pending.has_prompt());
173        let reactions = self
174            .reactor
175            .react(&ReactorEvent::PlaybackDrained { prompt_pending });
176        self.effect_executor.execute_reactions(reactions).await
177    }
178
179    /// Notify the runtime that client-side user speech has started.
180    ///
181    /// This is the barge-in edge for voice clients: pending model prompts are
182    /// cancelled before they can race with user audio, while queued context is
183    /// kept so the next user send can still carry it.
184    pub async fn user_speech_started(&self) -> Result<(), SessionError> {
185        let reactions = self.reactor.react(&ReactorEvent::UserSpeechStarted);
186        self.effect_executor.execute_reactions(reactions).await
187    }
188
189    /// Notify the runtime that client-side user speech has ended.
190    pub async fn user_speech_ended(&self) -> Result<(), SessionError> {
191        let prompt_pending = self
192            .pending_context
193            .as_ref()
194            .is_some_and(|pending| pending.has_prompt());
195        let reactions = self
196            .reactor
197            .react(&ReactorEvent::UserSpeechEnded { prompt_pending });
198        self.effect_executor.execute_reactions(reactions).await
199    }
200
201    /// Snapshot the reactor-owned voice runtime state.
202    pub fn voice_state(&self) -> VoiceRuntimeState {
203        self.reactor.voice_state()
204    }
205
206    /// Snapshot backend input VAD state.
207    pub fn input_vad_state(&self) -> BackendVadSnapshot {
208        self.input_vad.lock().snapshot()
209    }
210
211    /// Flush deferred context and any pending model prompt.
212    ///
213    /// Prefer [`Self::playback_drained`] for voice clients. This compatibility
214    /// method routes through the same reactor/effect executor path.
215    pub async fn flush_deferred_prompt(&self) -> Result<(), SessionError> {
216        self.playback_drained().await
217    }
218
219    /// Get the user-facing session writer.
220    ///
221    /// When deferred context delivery is enabled, this returns the
222    /// `DeferredWriter` that flushes pending context before sends.
223    pub fn writer(&self) -> Arc<dyn SessionWriter> {
224        self.writer.clone()
225    }
226
227    /// Subscribe to raw session events (for custom processing).
228    pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
229        self.session.subscribe()
230    }
231
232    /// Get the current session phase.
233    pub fn phase(&self) -> SessionPhase {
234        self.session.phase()
235    }
236
237    /// Gracefully disconnect the session.
238    ///
239    /// Shutdown sequence:
240    /// 1. Cancel all in-flight background tool tasks (they are aborted at an
241    ///    await point; tool futures must therefore be drop-safe).
242    /// 2. Close the L0 session. The terminal `Disconnected` event makes the
243    ///    event router exit, which closes the lane channels.
244    /// 3. Grace-await the fast and control lanes (~250 ms each) so they can
245    ///    drain queued events and run their final persistence drain, then
246    ///    abort whatever is still stuck (e.g. a lane blocked in a slow tool).
247    /// 4. Cancel the telemetry lane.
248    pub async fn disconnect(&self) -> Result<(), SessionError> {
249        // Cancel background tool tasks FIRST: once the session is closing,
250        // their results can no longer be delivered, and leaving them running
251        // would let them post stale ToolCompleted events to a dead lane.
252        self.background_tracker.cancel_all();
253        let result = SessionWriter::disconnect(&self.session).await;
254
255        // Grace-await the lanes, then abort. Taking the JoinHandles out of
256        // their mutexes gives us the ownership `await` requires; a second
257        // disconnect (or a clone's disconnect) simply finds them gone.
258        for lane in [&self.fast_task, &self.ctrl_task] {
259            let task = lane.lock().take();
260            if let Some(mut task) = task {
261                if tokio::time::timeout(Self::LANE_SHUTDOWN_GRACE, &mut task)
262                    .await
263                    .is_err()
264                {
265                    task.abort();
266                }
267            }
268        }
269
270        // Stop the telemetry lane (it runs on its own broadcast receiver and
271        // would otherwise idle on its debounce timer for the handle's lifetime).
272        self.telem_cancel.cancel();
273        result
274    }
275
276    /// How long [`disconnect`](Self::disconnect) waits for each lane to drain
277    /// before aborting it.
278    const LANE_SHUTDOWN_GRACE: std::time::Duration = std::time::Duration::from_millis(250);
279
280    /// Wait for the session to end (disconnect, GoAway, or error).
281    pub async fn done(&self) -> Result<(), SessionError> {
282        self.session
283            .join()
284            .await
285            .map_err(|_| SessionError::ChannelClosed)
286    }
287
288    /// Get the underlying SessionHandle for advanced usage.
289    pub fn session(&self) -> &SessionHandle {
290        &self.session
291    }
292
293    /// Latest session-resumption handle issued by the server, if any.
294    ///
295    /// While session resumption is enabled
296    /// ([`SessionConfig::session_resumption`](gemini_genai_rs::prelude::SessionConfig::session_resumption);
297    /// L2: `Live::builder().session_resume(true)`), the Gemini server
298    /// periodically sends `SessionResumptionUpdate` messages; this returns the
299    /// most recent handle (also captured in persistence snapshots as
300    /// [`SessionSnapshot::resume_handle`](crate::live::persistence::SessionSnapshot::resume_handle)).
301    ///
302    /// To survive a server-initiated `GoAway` or a planned restart, read this
303    /// handle (e.g. from the `on_go_away` callback) and pass it to
304    /// `session_resumption(Some(handle))` on the next connect's
305    /// [`SessionConfig`](gemini_genai_rs::prelude::SessionConfig). No
306    /// automatic reconnect is performed — resumption is an explicit caller
307    /// decision.
308    ///
309    /// Returns `None` when resumption is disabled or no update has arrived yet.
310    pub fn resume_handle(&self) -> Option<String> {
311        self.session.state.resume_handle.lock().clone()
312    }
313
314    /// Access the shared State container.
315    ///
316    /// Extraction results from `TurnExtractor`s are stored here under the
317    /// extractor's name. Use `state().get::<T>(name)` to read typed values.
318    pub fn state(&self) -> &State {
319        &self.state
320    }
321
322    /// Access the session telemetry (auto-collected by the telemetry lane).
323    ///
324    /// Use `telemetry().snapshot()` to get a JSON snapshot of all metrics.
325    pub fn telemetry(&self) -> &Arc<SessionTelemetry> {
326        &self.telemetry
327    }
328
329    /// Subscribe to semantic events from the processor.
330    ///
331    /// Returns a broadcast receiver. Call multiple times for independent
332    /// subscribers. Zero-cost when no subscribers exist.
333    pub fn events(&self) -> broadcast::Receiver<super::events::LiveEvent> {
334        self.event_tx.subscribe()
335    }
336
337    /// Subscribe to semantic events as a [`futures::Stream`].
338    ///
339    /// Stream-flavored sibling of [`events`](Self::events): each call creates
340    /// an independent subscriber starting from the current point in the event
341    /// flow. If the subscriber falls behind the broadcast buffer, the missed
342    /// events are skipped and the stream continues; the stream ends when the
343    /// session's event channel closes. See
344    /// [`LiveEventStream`](super::events::LiveEventStream).
345    ///
346    /// # Example
347    ///
348    /// ```rust,ignore
349    /// use futures::StreamExt;
350    ///
351    /// let mut stream = handle.stream();
352    /// while let Some(ev) = stream.next().await {
353    ///     match ev {
354    ///         LiveEvent::TextDelta(t) => print!("{t}"),
355    ///         LiveEvent::TurnComplete => println!(),
356    ///         _ => {}
357    ///     }
358    /// }
359    /// ```
360    pub fn stream(&self) -> super::events::LiveEventStream {
361        super::events::LiveEventStream::new(self.event_tx.subscribe())
362    }
363
364    /// Convenience: get the latest extraction result by extractor name.
365    pub fn extracted<T: DeserializeOwned>(&self, name: &str) -> Option<T> {
366        self.state.get(name)
367    }
368
369    /// Snapshot the governed flow's control-plane state: active steps, which
370    /// tools are admitted vs blocked (with reasons), and unmet requirements.
371    ///
372    /// The deterministic answer to "why did the assistant ask that?" — computed
373    /// against the live [`State`] and the marking the control lane maintains.
374    /// Returns `None` when the session is not governed by a flow
375    /// (`Live::govern`/`observe` was not used).
376    ///
377    /// This is a synchronous snapshot: it briefly locks the shared
378    /// [`FlowMonitor`](crate::flow::FlowMonitor) and never blocks on session
379    /// I/O.
380    pub fn explain(&self) -> Option<FlowExplanation> {
381        self.flow
382            .as_ref()
383            .map(|mon| mon.lock().explain(&self.state))
384    }
385
386    /// Why the governed flow is blocked right now — alias of
387    /// [`explain`](Self::explain), named for the common debugging question.
388    /// Returns `None` when the session is not governed by a flow.
389    pub fn why_blocked(&self) -> Option<FlowExplanation> {
390        self.explain()
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397    use crate::live::telemetry::SessionTelemetry;
398    use gemini_genai_rs::session::{SessionCommand, SessionState};
399    use tokio_util::sync::CancellationToken;
400
401    /// Build a LiveHandle wired to an in-memory SessionHandle (no transport).
402    /// The command receiver is returned so `disconnect()` sends succeed.
403    fn make_handle_with_lanes(
404        fast: JoinHandle<()>,
405        ctrl: JoinHandle<()>,
406    ) -> (LiveHandle, tokio::sync::mpsc::Receiver<SessionCommand>) {
407        let (command_tx, command_rx) = tokio::sync::mpsc::channel(8);
408        let (event_tx, _) = broadcast::channel(16);
409        let (phase_tx, phase_rx) = tokio::sync::watch::channel(SessionPhase::Active);
410        let state = Arc::new(SessionState::with_events(phase_tx, event_tx.clone()));
411        let session = SessionHandle::new(command_tx, event_tx, state, phase_rx);
412        let writer: Arc<dyn SessionWriter> = Arc::new(session.clone());
413        let (live_tx, _) = broadcast::channel(16);
414        let handle = LiveHandle::new(
415            session,
416            writer,
417            fast,
418            ctrl,
419            State::new(),
420            Arc::new(SessionTelemetry::new()),
421            live_tx,
422            None,
423            None,
424            Arc::new(BackgroundToolTracker::new()),
425            CancellationToken::new(),
426        );
427        (handle, command_rx)
428    }
429
430    fn make_handle() -> (LiveHandle, tokio::sync::mpsc::Receiver<SessionCommand>) {
431        make_handle_with_lanes(tokio::spawn(async {}), tokio::spawn(async {}))
432    }
433
434    /// Sets a flag when dropped — observes that an aborted task's future was
435    /// actually torn down.
436    struct SetOnDrop(Arc<std::sync::atomic::AtomicBool>);
437    impl Drop for SetOnDrop {
438        fn drop(&mut self) {
439            self.0.store(true, std::sync::atomic::Ordering::SeqCst);
440        }
441    }
442
443    #[tokio::test]
444    async fn disconnect_cancels_background_tool_tasks() {
445        let (handle, _cmd_rx) = make_handle();
446        let tracker = handle.background_tracker.clone();
447
448        // Register a never-finishing background tool task.
449        let token = CancellationToken::new();
450        let t = token.clone();
451        let task = tokio::spawn(async move {
452            t.cancelled().await;
453            std::future::pending::<()>().await;
454        });
455        tracker.spawn("call-1".into(), task, token.clone());
456        assert_eq!(tracker.active_count(), 1);
457
458        handle.disconnect().await.expect("disconnect");
459
460        assert_eq!(
461            tracker.active_count(),
462            0,
463            "disconnect must cancel all tracked background tool tasks"
464        );
465        assert!(token.is_cancelled(), "cooperative token must be cancelled");
466    }
467
468    #[tokio::test]
469    async fn disconnect_aborts_stuck_lanes_within_grace_period() {
470        use std::sync::atomic::{AtomicBool, Ordering};
471
472        // Lanes that never finish on their own (simulating a lane blocked in a
473        // slow tool); drop guards record that abort tore the futures down.
474        let fast_dropped = Arc::new(AtomicBool::new(false));
475        let ctrl_dropped = Arc::new(AtomicBool::new(false));
476        let f = fast_dropped.clone();
477        let c = ctrl_dropped.clone();
478        let fast = tokio::spawn(async move {
479            let _guard = SetOnDrop(f);
480            std::future::pending::<()>().await;
481        });
482        let ctrl = tokio::spawn(async move {
483            let _guard = SetOnDrop(c);
484            std::future::pending::<()>().await;
485        });
486
487        let (handle, _cmd_rx) = make_handle_with_lanes(fast, ctrl);
488        let telem_cancel = handle.telem_cancel.clone();
489
490        // disconnect() must return in bounded time even with stuck lanes.
491        tokio::time::timeout(std::time::Duration::from_secs(2), handle.disconnect())
492            .await
493            .expect("disconnect must not hang on stuck lanes")
494            .expect("disconnect");
495
496        // Give the aborts a beat to take effect, then verify teardown.
497        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
498        assert!(
499            fast_dropped.load(Ordering::SeqCst),
500            "fast lane must be aborted after the grace period"
501        );
502        assert!(
503            ctrl_dropped.load(Ordering::SeqCst),
504            "control lane must be aborted after the grace period"
505        );
506        assert!(
507            telem_cancel.is_cancelled(),
508            "telemetry lane must be cancelled on disconnect"
509        );
510    }
511
512    #[tokio::test]
513    async fn resume_handle_surfaces_latest_server_handle() {
514        let (handle, _cmd_rx) = make_handle();
515        assert_eq!(handle.resume_handle(), None, "no update yet");
516
517        // Simulate the L0 transport storing a SessionResumptionUpdate.
518        *handle.session.state.resume_handle.lock() = Some("rh-42".into());
519        assert_eq!(handle.resume_handle(), Some("rh-42".to_string()));
520    }
521
522    #[tokio::test]
523    async fn disconnect_is_idempotent_across_clones() {
524        let (handle, _cmd_rx) = make_handle();
525        let clone = handle.clone();
526        handle.disconnect().await.expect("first disconnect");
527        // The clone's disconnect finds the lane handles already taken.
528        clone.disconnect().await.expect("second disconnect");
529    }
530}