gemini_adk_rs/live/handle.rs
1//! LiveHandle — runtime interaction with a Live session.
2
3use std::sync::Arc;
4
5use gemini_genai_rs::prelude::{FunctionResponse, SessionEvent, SessionPhase, VadEvent};
6use gemini_genai_rs::session::{SessionError, SessionHandle, SessionWriter};
7use parking_lot::Mutex;
8use serde::de::DeserializeOwned;
9use tokio::sync::broadcast;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::flow::{FlowExplanation, SharedFlowMonitor};
14use crate::state::State;
15
16use super::background_tool::BackgroundToolTracker;
17use super::context_writer::PendingContext;
18use super::effect_executor::LiveEffectExecutor;
19use super::input_vad::{BackendInputVad, BackendVadSnapshot};
20use super::reactor::{LiveReactor, ReactorEvent, VoiceRuntimeState};
21use super::telemetry::SessionTelemetry;
22
23/// Handle for interacting with a running Live session.
24///
25/// Provides send methods for audio/text/video, system instruction updates,
26/// event subscription, state access, telemetry, and graceful shutdown.
27///
28/// When [`ContextDelivery::Deferred`](super::steering::ContextDelivery::Deferred) is
29/// enabled, `send_audio`, `send_text`, and `send_video` automatically flush
30/// any pending context turns before forwarding the user content.
31#[derive(Clone)]
32pub struct LiveHandle {
33 session: SessionHandle,
34 /// Writer used for user-facing sends. When deferred context delivery is
35 /// enabled, this is a `DeferredWriter` that flushes pending context.
36 /// Otherwise it's the raw `SessionHandle`.
37 writer: Arc<dyn SessionWriter>,
38 /// Fast-lane task. Held in `Arc<Mutex<Option<..>>>` so `LiveHandle` stays
39 /// `Clone` while [`disconnect`](Self::disconnect) can take ownership to
40 /// grace-await and then abort the lane.
41 fast_task: Arc<Mutex<Option<JoinHandle<()>>>>,
42 /// Control-lane task (same ownership scheme as `fast_task`).
43 ctrl_task: Arc<Mutex<Option<JoinHandle<()>>>>,
44 /// Cancellation token for the telemetry lane, cancelled on disconnect.
45 telem_cancel: CancellationToken,
46 state: State,
47 telemetry: Arc<SessionTelemetry>,
48 event_tx: broadcast::Sender<super::events::LiveEvent>,
49 pending_context: Option<Arc<PendingContext>>,
50 reactor: Arc<LiveReactor>,
51 effect_executor: LiveEffectExecutor,
52 input_vad: Arc<Mutex<BackendInputVad>>,
53 /// Governed-flow monitor shared with the control lane (None when the
54 /// session is not governed by a flow).
55 flow: Option<SharedFlowMonitor>,
56 /// Tracker for in-flight background tool tasks. Shared with the control
57 /// lane (which spawns/cancels per-call tasks) so [`disconnect`](Self::disconnect)
58 /// can cancel every outstanding background tool — otherwise orphaned tasks
59 /// could keep running and post stale `ToolCompleted` events after shutdown.
60 background_tracker: Arc<BackgroundToolTracker>,
61}
62
63impl LiveHandle {
64 #[allow(
65 clippy::too_many_arguments,
66 reason = "crate-internal constructor called once from spawn_lanes; the runtime parts are deliberately enumerated rather than re-bundled"
67 )]
68 pub(crate) fn new(
69 session: SessionHandle,
70 writer: Arc<dyn SessionWriter>,
71 fast_task: JoinHandle<()>,
72 ctrl_task: JoinHandle<()>,
73 state: State,
74 telemetry: Arc<SessionTelemetry>,
75 event_tx: broadcast::Sender<super::events::LiveEvent>,
76 pending_context: Option<Arc<PendingContext>>,
77 flow: Option<SharedFlowMonitor>,
78 background_tracker: Arc<BackgroundToolTracker>,
79 telem_cancel: CancellationToken,
80 ) -> Self {
81 let reactor = Arc::new(LiveReactor::voice_defaults());
82 let effect_executor = LiveEffectExecutor::new(
83 Arc::new(session.clone()),
84 pending_context.clone(),
85 event_tx.clone(),
86 );
87
88 Self {
89 session,
90 writer,
91 fast_task: Arc::new(Mutex::new(Some(fast_task))),
92 ctrl_task: Arc::new(Mutex::new(Some(ctrl_task))),
93 telem_cancel,
94 state,
95 telemetry,
96 event_tx,
97 pending_context,
98 reactor,
99 effect_executor,
100 input_vad: Arc::new(Mutex::new(BackendInputVad::default())),
101 flow,
102 background_tracker,
103 }
104 }
105
106 /// Send audio data (raw PCM16 16kHz bytes).
107 ///
108 /// When deferred context delivery is enabled, any pending model-role
109 /// context turns are flushed to the wire before the audio frame.
110 pub async fn send_audio(&self, data: Vec<u8>) -> Result<(), SessionError> {
111 let vad_events = {
112 let mut input_vad = self.input_vad.lock();
113 input_vad.process_pcm_bytes(&data)
114 };
115
116 if vad_events.contains(&VadEvent::SpeechStart) {
117 self.user_speech_started().await?;
118 }
119
120 self.writer.send_audio(data).await?;
121
122 if vad_events.contains(&VadEvent::SpeechEnd) {
123 self.user_speech_ended().await?;
124 }
125
126 Ok(())
127 }
128
129 /// Send a text message.
130 ///
131 /// When deferred context delivery is enabled, any pending model-role
132 /// context turns are flushed to the wire before the text message.
133 pub async fn send_text(&self, text: impl Into<String>) -> Result<(), SessionError> {
134 self.telemetry.record_text_send();
135 self.writer.send_text(text.into()).await
136 }
137
138 /// Send a video/image frame (raw JPEG bytes).
139 ///
140 /// When deferred context delivery is enabled, any pending model-role
141 /// context turns are flushed to the wire before the video frame.
142 pub async fn send_video(&self, jpeg_data: Vec<u8>) -> Result<(), SessionError> {
143 self.writer.send_video(jpeg_data).await
144 }
145
146 /// Update the system instruction mid-session.
147 pub async fn update_instruction(
148 &self,
149 instruction: impl Into<String>,
150 ) -> Result<(), SessionError> {
151 SessionWriter::update_instruction(&self.session, instruction.into()).await
152 }
153
154 /// Send tool responses manually (if not using auto-dispatch).
155 pub async fn send_tool_response(
156 &self,
157 responses: Vec<FunctionResponse>,
158 ) -> Result<(), SessionError> {
159 self.session.send_tool_response(responses).await
160 }
161
162 /// Notify the runtime that client-side playback has drained.
163 ///
164 /// Voice UIs should call this only when it is safe for the model to speak,
165 /// for example after browser speaker playback has drained and the user is
166 /// not actively speaking. User audio/text sends intentionally flush context
167 /// only and leave the prompt armed.
168 pub async fn playback_drained(&self) -> Result<(), SessionError> {
169 let prompt_pending = self
170 .pending_context
171 .as_ref()
172 .is_some_and(|pending| pending.has_prompt());
173 let reactions = self
174 .reactor
175 .react(&ReactorEvent::PlaybackDrained { prompt_pending });
176 self.effect_executor.execute_reactions(reactions).await
177 }
178
179 /// Notify the runtime that client-side user speech has started.
180 ///
181 /// This is the barge-in edge for voice clients: pending model prompts are
182 /// cancelled before they can race with user audio, while queued context is
183 /// kept so the next user send can still carry it.
184 pub async fn user_speech_started(&self) -> Result<(), SessionError> {
185 let reactions = self.reactor.react(&ReactorEvent::UserSpeechStarted);
186 self.effect_executor.execute_reactions(reactions).await
187 }
188
189 /// Notify the runtime that client-side user speech has ended.
190 pub async fn user_speech_ended(&self) -> Result<(), SessionError> {
191 let prompt_pending = self
192 .pending_context
193 .as_ref()
194 .is_some_and(|pending| pending.has_prompt());
195 let reactions = self
196 .reactor
197 .react(&ReactorEvent::UserSpeechEnded { prompt_pending });
198 self.effect_executor.execute_reactions(reactions).await
199 }
200
201 /// Snapshot the reactor-owned voice runtime state.
202 pub fn voice_state(&self) -> VoiceRuntimeState {
203 self.reactor.voice_state()
204 }
205
206 /// Snapshot backend input VAD state.
207 pub fn input_vad_state(&self) -> BackendVadSnapshot {
208 self.input_vad.lock().snapshot()
209 }
210
211 /// Flush deferred context and any pending model prompt.
212 ///
213 /// Prefer [`Self::playback_drained`] for voice clients. This compatibility
214 /// method routes through the same reactor/effect executor path.
215 pub async fn flush_deferred_prompt(&self) -> Result<(), SessionError> {
216 self.playback_drained().await
217 }
218
219 /// Get the user-facing session writer.
220 ///
221 /// When deferred context delivery is enabled, this returns the
222 /// `DeferredWriter` that flushes pending context before sends.
223 pub fn writer(&self) -> Arc<dyn SessionWriter> {
224 self.writer.clone()
225 }
226
227 /// Subscribe to raw session events (for custom processing).
228 pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
229 self.session.subscribe()
230 }
231
232 /// Get the current session phase.
233 pub fn phase(&self) -> SessionPhase {
234 self.session.phase()
235 }
236
237 /// Gracefully disconnect the session.
238 ///
239 /// Shutdown sequence:
240 /// 1. Cancel all in-flight background tool tasks (they are aborted at an
241 /// await point; tool futures must therefore be drop-safe).
242 /// 2. Close the L0 session. The terminal `Disconnected` event makes the
243 /// event router exit, which closes the lane channels.
244 /// 3. Grace-await the fast and control lanes (~250 ms each) so they can
245 /// drain queued events and run their final persistence drain, then
246 /// abort whatever is still stuck (e.g. a lane blocked in a slow tool).
247 /// 4. Cancel the telemetry lane.
248 pub async fn disconnect(&self) -> Result<(), SessionError> {
249 // Cancel background tool tasks FIRST: once the session is closing,
250 // their results can no longer be delivered, and leaving them running
251 // would let them post stale ToolCompleted events to a dead lane.
252 self.background_tracker.cancel_all();
253 let result = SessionWriter::disconnect(&self.session).await;
254
255 // Grace-await the lanes, then abort. Taking the JoinHandles out of
256 // their mutexes gives us the ownership `await` requires; a second
257 // disconnect (or a clone's disconnect) simply finds them gone.
258 for lane in [&self.fast_task, &self.ctrl_task] {
259 let task = lane.lock().take();
260 if let Some(mut task) = task {
261 if tokio::time::timeout(Self::LANE_SHUTDOWN_GRACE, &mut task)
262 .await
263 .is_err()
264 {
265 task.abort();
266 }
267 }
268 }
269
270 // Stop the telemetry lane (it runs on its own broadcast receiver and
271 // would otherwise idle on its debounce timer for the handle's lifetime).
272 self.telem_cancel.cancel();
273 result
274 }
275
276 /// How long [`disconnect`](Self::disconnect) waits for each lane to drain
277 /// before aborting it.
278 const LANE_SHUTDOWN_GRACE: std::time::Duration = std::time::Duration::from_millis(250);
279
280 /// Wait for the session to end (disconnect, GoAway, or error).
281 pub async fn done(&self) -> Result<(), SessionError> {
282 self.session
283 .join()
284 .await
285 .map_err(|_| SessionError::ChannelClosed)
286 }
287
288 /// Get the underlying SessionHandle for advanced usage.
289 pub fn session(&self) -> &SessionHandle {
290 &self.session
291 }
292
293 /// Latest session-resumption handle issued by the server, if any.
294 ///
295 /// While session resumption is enabled
296 /// ([`SessionConfig::session_resumption`](gemini_genai_rs::prelude::SessionConfig::session_resumption);
297 /// L2: `Live::builder().session_resume(true)`), the Gemini server
298 /// periodically sends `SessionResumptionUpdate` messages; this returns the
299 /// most recent handle (also captured in persistence snapshots as
300 /// [`SessionSnapshot::resume_handle`](crate::live::persistence::SessionSnapshot::resume_handle)).
301 ///
302 /// To survive a server-initiated `GoAway` or a planned restart, read this
303 /// handle (e.g. from the `on_go_away` callback) and pass it to
304 /// `session_resumption(Some(handle))` on the next connect's
305 /// [`SessionConfig`](gemini_genai_rs::prelude::SessionConfig). No
306 /// automatic reconnect is performed — resumption is an explicit caller
307 /// decision.
308 ///
309 /// Returns `None` when resumption is disabled or no update has arrived yet.
310 pub fn resume_handle(&self) -> Option<String> {
311 self.session.state.resume_handle.lock().clone()
312 }
313
314 /// Access the shared State container.
315 ///
316 /// Extraction results from `TurnExtractor`s are stored here under the
317 /// extractor's name. Use `state().get::<T>(name)` to read typed values.
318 pub fn state(&self) -> &State {
319 &self.state
320 }
321
322 /// Access the session telemetry (auto-collected by the telemetry lane).
323 ///
324 /// Use `telemetry().snapshot()` to get a JSON snapshot of all metrics.
325 pub fn telemetry(&self) -> &Arc<SessionTelemetry> {
326 &self.telemetry
327 }
328
329 /// Subscribe to semantic events from the processor.
330 ///
331 /// Returns a broadcast receiver. Call multiple times for independent
332 /// subscribers. Zero-cost when no subscribers exist.
333 pub fn events(&self) -> broadcast::Receiver<super::events::LiveEvent> {
334 self.event_tx.subscribe()
335 }
336
337 /// Subscribe to semantic events as a [`futures::Stream`].
338 ///
339 /// Stream-flavored sibling of [`events`](Self::events): each call creates
340 /// an independent subscriber starting from the current point in the event
341 /// flow. If the subscriber falls behind the broadcast buffer, the missed
342 /// events are skipped and the stream continues; the stream ends when the
343 /// session's event channel closes. See
344 /// [`LiveEventStream`](super::events::LiveEventStream).
345 ///
346 /// # Example
347 ///
348 /// ```rust,ignore
349 /// use futures::StreamExt;
350 ///
351 /// let mut stream = handle.stream();
352 /// while let Some(ev) = stream.next().await {
353 /// match ev {
354 /// LiveEvent::TextDelta(t) => print!("{t}"),
355 /// LiveEvent::TurnComplete => println!(),
356 /// _ => {}
357 /// }
358 /// }
359 /// ```
360 pub fn stream(&self) -> super::events::LiveEventStream {
361 super::events::LiveEventStream::new(self.event_tx.subscribe())
362 }
363
364 /// Convenience: get the latest extraction result by extractor name.
365 pub fn extracted<T: DeserializeOwned>(&self, name: &str) -> Option<T> {
366 self.state.get(name)
367 }
368
369 /// Snapshot the governed flow's control-plane state: active steps, which
370 /// tools are admitted vs blocked (with reasons), and unmet requirements.
371 ///
372 /// The deterministic answer to "why did the assistant ask that?" — computed
373 /// against the live [`State`] and the marking the control lane maintains.
374 /// Returns `None` when the session is not governed by a flow
375 /// (`Live::govern`/`observe` was not used).
376 ///
377 /// This is a synchronous snapshot: it briefly locks the shared
378 /// [`FlowMonitor`](crate::flow::FlowMonitor) and never blocks on session
379 /// I/O.
380 pub fn explain(&self) -> Option<FlowExplanation> {
381 self.flow
382 .as_ref()
383 .map(|mon| mon.lock().explain(&self.state))
384 }
385
386 /// Why the governed flow is blocked right now — alias of
387 /// [`explain`](Self::explain), named for the common debugging question.
388 /// Returns `None` when the session is not governed by a flow.
389 pub fn why_blocked(&self) -> Option<FlowExplanation> {
390 self.explain()
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use crate::live::telemetry::SessionTelemetry;
398 use gemini_genai_rs::session::{SessionCommand, SessionState};
399 use tokio_util::sync::CancellationToken;
400
401 /// Build a LiveHandle wired to an in-memory SessionHandle (no transport).
402 /// The command receiver is returned so `disconnect()` sends succeed.
403 fn make_handle_with_lanes(
404 fast: JoinHandle<()>,
405 ctrl: JoinHandle<()>,
406 ) -> (LiveHandle, tokio::sync::mpsc::Receiver<SessionCommand>) {
407 let (command_tx, command_rx) = tokio::sync::mpsc::channel(8);
408 let (event_tx, _) = broadcast::channel(16);
409 let (phase_tx, phase_rx) = tokio::sync::watch::channel(SessionPhase::Active);
410 let state = Arc::new(SessionState::with_events(phase_tx, event_tx.clone()));
411 let session = SessionHandle::new(command_tx, event_tx, state, phase_rx);
412 let writer: Arc<dyn SessionWriter> = Arc::new(session.clone());
413 let (live_tx, _) = broadcast::channel(16);
414 let handle = LiveHandle::new(
415 session,
416 writer,
417 fast,
418 ctrl,
419 State::new(),
420 Arc::new(SessionTelemetry::new()),
421 live_tx,
422 None,
423 None,
424 Arc::new(BackgroundToolTracker::new()),
425 CancellationToken::new(),
426 );
427 (handle, command_rx)
428 }
429
430 fn make_handle() -> (LiveHandle, tokio::sync::mpsc::Receiver<SessionCommand>) {
431 make_handle_with_lanes(tokio::spawn(async {}), tokio::spawn(async {}))
432 }
433
434 /// Sets a flag when dropped — observes that an aborted task's future was
435 /// actually torn down.
436 struct SetOnDrop(Arc<std::sync::atomic::AtomicBool>);
437 impl Drop for SetOnDrop {
438 fn drop(&mut self) {
439 self.0.store(true, std::sync::atomic::Ordering::SeqCst);
440 }
441 }
442
443 #[tokio::test]
444 async fn disconnect_cancels_background_tool_tasks() {
445 let (handle, _cmd_rx) = make_handle();
446 let tracker = handle.background_tracker.clone();
447
448 // Register a never-finishing background tool task.
449 let token = CancellationToken::new();
450 let t = token.clone();
451 let task = tokio::spawn(async move {
452 t.cancelled().await;
453 std::future::pending::<()>().await;
454 });
455 tracker.spawn("call-1".into(), task, token.clone());
456 assert_eq!(tracker.active_count(), 1);
457
458 handle.disconnect().await.expect("disconnect");
459
460 assert_eq!(
461 tracker.active_count(),
462 0,
463 "disconnect must cancel all tracked background tool tasks"
464 );
465 assert!(token.is_cancelled(), "cooperative token must be cancelled");
466 }
467
468 #[tokio::test]
469 async fn disconnect_aborts_stuck_lanes_within_grace_period() {
470 use std::sync::atomic::{AtomicBool, Ordering};
471
472 // Lanes that never finish on their own (simulating a lane blocked in a
473 // slow tool); drop guards record that abort tore the futures down.
474 let fast_dropped = Arc::new(AtomicBool::new(false));
475 let ctrl_dropped = Arc::new(AtomicBool::new(false));
476 let f = fast_dropped.clone();
477 let c = ctrl_dropped.clone();
478 let fast = tokio::spawn(async move {
479 let _guard = SetOnDrop(f);
480 std::future::pending::<()>().await;
481 });
482 let ctrl = tokio::spawn(async move {
483 let _guard = SetOnDrop(c);
484 std::future::pending::<()>().await;
485 });
486
487 let (handle, _cmd_rx) = make_handle_with_lanes(fast, ctrl);
488 let telem_cancel = handle.telem_cancel.clone();
489
490 // disconnect() must return in bounded time even with stuck lanes.
491 tokio::time::timeout(std::time::Duration::from_secs(2), handle.disconnect())
492 .await
493 .expect("disconnect must not hang on stuck lanes")
494 .expect("disconnect");
495
496 // Give the aborts a beat to take effect, then verify teardown.
497 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
498 assert!(
499 fast_dropped.load(Ordering::SeqCst),
500 "fast lane must be aborted after the grace period"
501 );
502 assert!(
503 ctrl_dropped.load(Ordering::SeqCst),
504 "control lane must be aborted after the grace period"
505 );
506 assert!(
507 telem_cancel.is_cancelled(),
508 "telemetry lane must be cancelled on disconnect"
509 );
510 }
511
512 #[tokio::test]
513 async fn resume_handle_surfaces_latest_server_handle() {
514 let (handle, _cmd_rx) = make_handle();
515 assert_eq!(handle.resume_handle(), None, "no update yet");
516
517 // Simulate the L0 transport storing a SessionResumptionUpdate.
518 *handle.session.state.resume_handle.lock() = Some("rh-42".into());
519 assert_eq!(handle.resume_handle(), Some("rh-42".to_string()));
520 }
521
522 #[tokio::test]
523 async fn disconnect_is_idempotent_across_clones() {
524 let (handle, _cmd_rx) = make_handle();
525 let clone = handle.clone();
526 handle.disconnect().await.expect("first disconnect");
527 // The clone's disconnect finds the lane handles already taken.
528 clone.disconnect().await.expect("second disconnect");
529 }
530}