gemini_adk_rs/live/
handle.rs

1//! LiveHandle — runtime interaction with a Live session.
2
3use std::sync::Arc;
4
5use gemini_genai_rs::prelude::{FunctionResponse, SessionEvent, SessionPhase, VadEvent};
6use gemini_genai_rs::session::{SessionError, SessionHandle, SessionWriter};
7use parking_lot::Mutex;
8use serde::de::DeserializeOwned;
9use tokio::sync::broadcast;
10use tokio::task::JoinHandle;
11
12use crate::state::State;
13
14use super::context_writer::PendingContext;
15use super::effect_executor::LiveEffectExecutor;
16use super::input_vad::{BackendInputVad, BackendVadSnapshot};
17use super::reactor::{LiveReactor, ReactorEvent, VoiceRuntimeState};
18use super::telemetry::SessionTelemetry;
19
20/// Handle for interacting with a running Live session.
21///
22/// Provides send methods for audio/text/video, system instruction updates,
23/// event subscription, state access, telemetry, and graceful shutdown.
24///
25/// When [`ContextDelivery::Deferred`](super::steering::ContextDelivery::Deferred) is
26/// enabled, `send_audio`, `send_text`, and `send_video` automatically flush
27/// any pending context turns before forwarding the user content.
28#[derive(Clone)]
29pub struct LiveHandle {
30    session: SessionHandle,
31    /// Writer used for user-facing sends.  When deferred context delivery is
32    /// enabled, this is a `DeferredWriter` that flushes pending context.
33    /// Otherwise it's the raw `SessionHandle`.
34    writer: Arc<dyn SessionWriter>,
35    _fast_task: Arc<JoinHandle<()>>,
36    _ctrl_task: Arc<JoinHandle<()>>,
37    state: State,
38    telemetry: Arc<SessionTelemetry>,
39    event_tx: broadcast::Sender<super::events::LiveEvent>,
40    pending_context: Option<Arc<PendingContext>>,
41    reactor: Arc<LiveReactor>,
42    effect_executor: LiveEffectExecutor,
43    input_vad: Arc<Mutex<BackendInputVad>>,
44}
45
46impl LiveHandle {
47    pub(crate) fn new(
48        session: SessionHandle,
49        writer: Arc<dyn SessionWriter>,
50        fast_task: JoinHandle<()>,
51        ctrl_task: JoinHandle<()>,
52        state: State,
53        telemetry: Arc<SessionTelemetry>,
54        event_tx: broadcast::Sender<super::events::LiveEvent>,
55        pending_context: Option<Arc<PendingContext>>,
56    ) -> Self {
57        let reactor = Arc::new(LiveReactor::voice_defaults());
58        let effect_executor = LiveEffectExecutor::new(
59            Arc::new(session.clone()),
60            pending_context.clone(),
61            event_tx.clone(),
62        );
63
64        Self {
65            session,
66            writer,
67            _fast_task: Arc::new(fast_task),
68            _ctrl_task: Arc::new(ctrl_task),
69            state,
70            telemetry,
71            event_tx,
72            pending_context,
73            reactor,
74            effect_executor,
75            input_vad: Arc::new(Mutex::new(BackendInputVad::default())),
76        }
77    }
78
79    /// Send audio data (raw PCM16 16kHz bytes).
80    ///
81    /// When deferred context delivery is enabled, any pending model-role
82    /// context turns are flushed to the wire before the audio frame.
83    pub async fn send_audio(&self, data: Vec<u8>) -> Result<(), SessionError> {
84        let vad_events = {
85            let mut input_vad = self.input_vad.lock();
86            input_vad.process_pcm_bytes(&data)
87        };
88
89        if vad_events.contains(&VadEvent::SpeechStart) {
90            self.user_speech_started().await?;
91        }
92
93        self.writer.send_audio(data).await?;
94
95        if vad_events.contains(&VadEvent::SpeechEnd) {
96            self.user_speech_ended().await?;
97        }
98
99        Ok(())
100    }
101
102    /// Send a text message.
103    ///
104    /// When deferred context delivery is enabled, any pending model-role
105    /// context turns are flushed to the wire before the text message.
106    pub async fn send_text(&self, text: impl Into<String>) -> Result<(), SessionError> {
107        self.telemetry.record_text_send();
108        self.writer.send_text(text.into()).await
109    }
110
111    /// Send a video/image frame (raw JPEG bytes).
112    ///
113    /// When deferred context delivery is enabled, any pending model-role
114    /// context turns are flushed to the wire before the video frame.
115    pub async fn send_video(&self, jpeg_data: Vec<u8>) -> Result<(), SessionError> {
116        self.writer.send_video(jpeg_data).await
117    }
118
119    /// Update the system instruction mid-session.
120    pub async fn update_instruction(
121        &self,
122        instruction: impl Into<String>,
123    ) -> Result<(), SessionError> {
124        SessionWriter::update_instruction(&self.session, instruction.into()).await
125    }
126
127    /// Send tool responses manually (if not using auto-dispatch).
128    pub async fn send_tool_response(
129        &self,
130        responses: Vec<FunctionResponse>,
131    ) -> Result<(), SessionError> {
132        self.session.send_tool_response(responses).await
133    }
134
135    /// Notify the runtime that client-side playback has drained.
136    ///
137    /// Voice UIs should call this only when it is safe for the model to speak,
138    /// for example after browser speaker playback has drained and the user is
139    /// not actively speaking. User audio/text sends intentionally flush context
140    /// only and leave the prompt armed.
141    pub async fn playback_drained(&self) -> Result<(), SessionError> {
142        let prompt_pending = self
143            .pending_context
144            .as_ref()
145            .is_some_and(|pending| pending.has_prompt());
146        let reactions = self
147            .reactor
148            .react(&ReactorEvent::PlaybackDrained { prompt_pending });
149        self.effect_executor.execute_reactions(reactions).await
150    }
151
152    /// Notify the runtime that client-side user speech has started.
153    ///
154    /// This is the barge-in edge for voice clients: pending model prompts are
155    /// cancelled before they can race with user audio, while queued context is
156    /// kept so the next user send can still carry it.
157    pub async fn user_speech_started(&self) -> Result<(), SessionError> {
158        let reactions = self.reactor.react(&ReactorEvent::UserSpeechStarted);
159        self.effect_executor.execute_reactions(reactions).await
160    }
161
162    /// Notify the runtime that client-side user speech has ended.
163    pub async fn user_speech_ended(&self) -> Result<(), SessionError> {
164        let prompt_pending = self
165            .pending_context
166            .as_ref()
167            .is_some_and(|pending| pending.has_prompt());
168        let reactions = self
169            .reactor
170            .react(&ReactorEvent::UserSpeechEnded { prompt_pending });
171        self.effect_executor.execute_reactions(reactions).await
172    }
173
174    /// Snapshot the reactor-owned voice runtime state.
175    pub fn voice_state(&self) -> VoiceRuntimeState {
176        self.reactor.voice_state()
177    }
178
179    /// Snapshot backend input VAD state.
180    pub fn input_vad_state(&self) -> BackendVadSnapshot {
181        self.input_vad.lock().snapshot()
182    }
183
184    /// Flush deferred context and any pending model prompt.
185    ///
186    /// Prefer [`Self::playback_drained`] for voice clients. This compatibility
187    /// method routes through the same reactor/effect executor path.
188    pub async fn flush_deferred_prompt(&self) -> Result<(), SessionError> {
189        self.playback_drained().await
190    }
191
192    /// Get the user-facing session writer.
193    ///
194    /// When deferred context delivery is enabled, this returns the
195    /// `DeferredWriter` that flushes pending context before sends.
196    pub fn writer(&self) -> Arc<dyn SessionWriter> {
197        self.writer.clone()
198    }
199
200    /// Subscribe to raw session events (for custom processing).
201    pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
202        self.session.subscribe()
203    }
204
205    /// Get the current session phase.
206    pub fn phase(&self) -> SessionPhase {
207        self.session.phase()
208    }
209
210    /// Gracefully disconnect the session.
211    pub async fn disconnect(&self) -> Result<(), SessionError> {
212        SessionWriter::disconnect(&self.session).await
213    }
214
215    /// Wait for the session to end (disconnect, GoAway, or error).
216    pub async fn done(&self) -> Result<(), SessionError> {
217        self.session
218            .join()
219            .await
220            .map_err(|_| SessionError::ChannelClosed)
221    }
222
223    /// Get the underlying SessionHandle for advanced usage.
224    pub fn session(&self) -> &SessionHandle {
225        &self.session
226    }
227
228    /// Access the shared State container.
229    ///
230    /// Extraction results from `TurnExtractor`s are stored here under the
231    /// extractor's name. Use `state().get::<T>(name)` to read typed values.
232    pub fn state(&self) -> &State {
233        &self.state
234    }
235
236    /// Access the session telemetry (auto-collected by the telemetry lane).
237    ///
238    /// Use `telemetry().snapshot()` to get a JSON snapshot of all metrics.
239    pub fn telemetry(&self) -> &Arc<SessionTelemetry> {
240        &self.telemetry
241    }
242
243    /// Subscribe to semantic events from the processor.
244    ///
245    /// Returns a broadcast receiver. Call multiple times for independent
246    /// subscribers. Zero-cost when no subscribers exist.
247    pub fn events(&self) -> broadcast::Receiver<super::events::LiveEvent> {
248        self.event_tx.subscribe()
249    }
250
251    /// Convenience: get the latest extraction result by extractor name.
252    pub fn extracted<T: DeserializeOwned>(&self, name: &str) -> Option<T> {
253        self.state.get(name)
254    }
255}