gemini_adk_rs/live/
handle.rs1use std::sync::Arc;
4
5use gemini_genai_rs::prelude::{FunctionResponse, SessionEvent, SessionPhase, VadEvent};
6use gemini_genai_rs::session::{SessionError, SessionHandle, SessionWriter};
7use parking_lot::Mutex;
8use serde::de::DeserializeOwned;
9use tokio::sync::broadcast;
10use tokio::task::JoinHandle;
11
12use crate::state::State;
13
14use super::context_writer::PendingContext;
15use super::effect_executor::LiveEffectExecutor;
16use super::input_vad::{BackendInputVad, BackendVadSnapshot};
17use super::reactor::{LiveReactor, ReactorEvent, VoiceRuntimeState};
18use super::telemetry::SessionTelemetry;
19
20#[derive(Clone)]
29pub struct LiveHandle {
30 session: SessionHandle,
31 writer: Arc<dyn SessionWriter>,
35 _fast_task: Arc<JoinHandle<()>>,
36 _ctrl_task: Arc<JoinHandle<()>>,
37 state: State,
38 telemetry: Arc<SessionTelemetry>,
39 event_tx: broadcast::Sender<super::events::LiveEvent>,
40 pending_context: Option<Arc<PendingContext>>,
41 reactor: Arc<LiveReactor>,
42 effect_executor: LiveEffectExecutor,
43 input_vad: Arc<Mutex<BackendInputVad>>,
44}
45
46impl LiveHandle {
47 pub(crate) fn new(
48 session: SessionHandle,
49 writer: Arc<dyn SessionWriter>,
50 fast_task: JoinHandle<()>,
51 ctrl_task: JoinHandle<()>,
52 state: State,
53 telemetry: Arc<SessionTelemetry>,
54 event_tx: broadcast::Sender<super::events::LiveEvent>,
55 pending_context: Option<Arc<PendingContext>>,
56 ) -> Self {
57 let reactor = Arc::new(LiveReactor::voice_defaults());
58 let effect_executor = LiveEffectExecutor::new(
59 Arc::new(session.clone()),
60 pending_context.clone(),
61 event_tx.clone(),
62 );
63
64 Self {
65 session,
66 writer,
67 _fast_task: Arc::new(fast_task),
68 _ctrl_task: Arc::new(ctrl_task),
69 state,
70 telemetry,
71 event_tx,
72 pending_context,
73 reactor,
74 effect_executor,
75 input_vad: Arc::new(Mutex::new(BackendInputVad::default())),
76 }
77 }
78
79 pub async fn send_audio(&self, data: Vec<u8>) -> Result<(), SessionError> {
84 let vad_events = {
85 let mut input_vad = self.input_vad.lock();
86 input_vad.process_pcm_bytes(&data)
87 };
88
89 if vad_events.contains(&VadEvent::SpeechStart) {
90 self.user_speech_started().await?;
91 }
92
93 self.writer.send_audio(data).await?;
94
95 if vad_events.contains(&VadEvent::SpeechEnd) {
96 self.user_speech_ended().await?;
97 }
98
99 Ok(())
100 }
101
102 pub async fn send_text(&self, text: impl Into<String>) -> Result<(), SessionError> {
107 self.telemetry.record_text_send();
108 self.writer.send_text(text.into()).await
109 }
110
111 pub async fn send_video(&self, jpeg_data: Vec<u8>) -> Result<(), SessionError> {
116 self.writer.send_video(jpeg_data).await
117 }
118
119 pub async fn update_instruction(
121 &self,
122 instruction: impl Into<String>,
123 ) -> Result<(), SessionError> {
124 SessionWriter::update_instruction(&self.session, instruction.into()).await
125 }
126
127 pub async fn send_tool_response(
129 &self,
130 responses: Vec<FunctionResponse>,
131 ) -> Result<(), SessionError> {
132 self.session.send_tool_response(responses).await
133 }
134
135 pub async fn playback_drained(&self) -> Result<(), SessionError> {
142 let prompt_pending = self
143 .pending_context
144 .as_ref()
145 .is_some_and(|pending| pending.has_prompt());
146 let reactions = self
147 .reactor
148 .react(&ReactorEvent::PlaybackDrained { prompt_pending });
149 self.effect_executor.execute_reactions(reactions).await
150 }
151
152 pub async fn user_speech_started(&self) -> Result<(), SessionError> {
158 let reactions = self.reactor.react(&ReactorEvent::UserSpeechStarted);
159 self.effect_executor.execute_reactions(reactions).await
160 }
161
162 pub async fn user_speech_ended(&self) -> Result<(), SessionError> {
164 let prompt_pending = self
165 .pending_context
166 .as_ref()
167 .is_some_and(|pending| pending.has_prompt());
168 let reactions = self
169 .reactor
170 .react(&ReactorEvent::UserSpeechEnded { prompt_pending });
171 self.effect_executor.execute_reactions(reactions).await
172 }
173
174 pub fn voice_state(&self) -> VoiceRuntimeState {
176 self.reactor.voice_state()
177 }
178
179 pub fn input_vad_state(&self) -> BackendVadSnapshot {
181 self.input_vad.lock().snapshot()
182 }
183
184 pub async fn flush_deferred_prompt(&self) -> Result<(), SessionError> {
189 self.playback_drained().await
190 }
191
192 pub fn writer(&self) -> Arc<dyn SessionWriter> {
197 self.writer.clone()
198 }
199
200 pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
202 self.session.subscribe()
203 }
204
205 pub fn phase(&self) -> SessionPhase {
207 self.session.phase()
208 }
209
210 pub async fn disconnect(&self) -> Result<(), SessionError> {
212 SessionWriter::disconnect(&self.session).await
213 }
214
215 pub async fn done(&self) -> Result<(), SessionError> {
217 self.session
218 .join()
219 .await
220 .map_err(|_| SessionError::ChannelClosed)
221 }
222
223 pub fn session(&self) -> &SessionHandle {
225 &self.session
226 }
227
228 pub fn state(&self) -> &State {
233 &self.state
234 }
235
236 pub fn telemetry(&self) -> &Arc<SessionTelemetry> {
240 &self.telemetry
241 }
242
243 pub fn events(&self) -> broadcast::Receiver<super::events::LiveEvent> {
248 self.event_tx.subscribe()
249 }
250
251 pub fn extracted<T: DeserializeOwned>(&self, name: &str) -> Option<T> {
253 self.state.get(name)
254 }
255}