gemini_adk_fluent_rs/live/
callbacks.rs

1//! Event callback registration methods for `Live`.
2
3use std::future::Future;
4use std::sync::Arc;
5use std::time::Duration;
6
7use bytes::Bytes;
8
9use gemini_adk_rs::live::CallbackMode;
10use gemini_adk_rs::State;
11use gemini_genai_rs::prelude::*;
12
13use super::Live;
14
15impl Live {
16    // -- Outbound Interceptors --
17
18    /// Intercept tool responses before they are sent back to Gemini.
19    ///
20    /// Use this to rewrite, augment, or filter tool results based on
21    /// conversation state. The callback receives the tool responses and the
22    /// shared `State`, and returns (potentially modified) responses.
23    ///
24    /// # Example
25    /// ```ignore
26    /// .before_tool_response(|responses, state| async move {
27    ///     let order: OrderState = state.get("OrderState").unwrap_or_default();
28    ///     responses.into_iter().map(|mut r| {
29    ///         r.response["current_order"] = serde_json::to_value(&order).unwrap();
30    ///         r
31    ///     }).collect()
32    /// })
33    /// ```
34    pub fn before_tool_response<F, Fut>(mut self, f: F) -> Self
35    where
36        F: Fn(Vec<FunctionResponse>, gemini_adk_rs::State) -> Fut + Send + Sync + 'static,
37        Fut: Future<Output = Vec<FunctionResponse>> + Send + 'static,
38    {
39        self.callbacks.before_tool_response = Some(Arc::new(move |responses, state| {
40            Box::pin(f(responses, state))
41        }));
42        self
43    }
44
45    /// Hook called at turn boundaries — after extractors run, before `on_turn_complete`.
46    ///
47    /// Receives the shared `State` and a `SessionWriter` for injecting content
48    /// into the conversation. Use for context stuffing, K/V data injection,
49    /// condensed state summaries, or any outbound content interleaving.
50    ///
51    /// # Example
52    /// ```ignore
53    /// .on_turn_boundary(|state, writer| async move {
54    ///     let summary = state.get::<String>("summary").unwrap_or_default();
55    ///     writer.send_client_content(
56    ///         vec![Content::user().text(format!("[Context: {summary}]"))],
57    ///         false,
58    ///     ).await.ok();
59    /// })
60    /// ```
61    pub fn on_turn_boundary<F, Fut>(mut self, f: F) -> Self
62    where
63        F: Fn(gemini_adk_rs::State, Arc<dyn gemini_genai_rs::session::SessionWriter>) -> Fut
64            + Send
65            + Sync
66            + 'static,
67        Fut: Future<Output = ()> + Send + 'static,
68    {
69        self.callbacks.on_turn_boundary =
70            Some(Arc::new(move |state, writer| Box::pin(f(state, writer))));
71        self
72    }
73
74    // -- Fast Lane Callbacks (sync, < 1ms) --
75
76    /// Called for each audio chunk from the model (PCM16 24kHz).
77    pub fn on_audio(mut self, f: impl Fn(&Bytes) + Send + Sync + 'static) -> Self {
78        self.callbacks.on_audio = Some(Box::new(f));
79        self
80    }
81
82    /// Called for each incremental text delta.
83    pub fn on_text(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
84        self.callbacks.on_text = Some(Box::new(f));
85        self
86    }
87
88    /// Called when model completes a text response.
89    pub fn on_text_complete(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
90        self.callbacks.on_text_complete = Some(Box::new(f));
91        self
92    }
93
94    /// Called for input (user speech) transcription.
95    pub fn on_input_transcript(mut self, f: impl Fn(&str, bool) + Send + Sync + 'static) -> Self {
96        self.callbacks.on_input_transcript = Some(Box::new(f));
97        self
98    }
99
100    /// Called for output (model speech) transcription.
101    pub fn on_output_transcript(mut self, f: impl Fn(&str, bool) + Send + Sync + 'static) -> Self {
102        self.callbacks.on_output_transcript = Some(Box::new(f));
103        self
104    }
105
106    /// Called when the model emits a thought/reasoning summary.
107    ///
108    /// Requires `.include_thoughts()` on the session config. Fast lane callback
109    /// (sync, must complete in < 1ms).
110    pub fn on_thought(mut self, f: impl Fn(&str) + Send + Sync + 'static) -> Self {
111        self.callbacks.on_thought = Some(Box::new(f));
112        self
113    }
114
115    /// Called when server VAD detects voice activity start.
116    pub fn on_vad_start(mut self, f: impl Fn() + Send + Sync + 'static) -> Self {
117        self.callbacks.on_vad_start = Some(Box::new(f));
118        self
119    }
120
121    /// Called when server VAD detects voice activity end.
122    pub fn on_vad_end(mut self, f: impl Fn() + Send + Sync + 'static) -> Self {
123        self.callbacks.on_vad_end = Some(Box::new(f));
124        self
125    }
126
127    /// Called when server sends token usage metadata.
128    ///
129    /// Receives a reference to the full [`UsageMetadata`] including prompt,
130    /// response, cached, tool-use, and thoughts token counts plus per-modality
131    /// breakdowns. Fires on the telemetry lane (not the fast lane).
132    pub fn on_usage(mut self, f: impl Fn(&UsageMetadata) + Send + Sync + 'static) -> Self {
133        self.callbacks.on_usage = Some(Box::new(f));
134        self
135    }
136
137    // -- Control Lane Callbacks (async, can block) --
138
139    /// Called when model is interrupted by barge-in.
140    pub fn on_interrupted<F, Fut>(mut self, f: F) -> Self
141    where
142        F: Fn() -> Fut + Send + Sync + 'static,
143        Fut: Future<Output = ()> + Send + 'static,
144    {
145        self.callbacks.on_interrupted = Some(Arc::new(move || Box::pin(f())));
146        self
147    }
148
149    /// Called when model requests tool execution.
150    /// Return `None` to auto-dispatch, `Some(responses)` to override.
151    /// Receives State for natural state promotion from tool results.
152    pub fn on_tool_call<F, Fut>(mut self, f: F) -> Self
153    where
154        F: Fn(Vec<FunctionCall>, State) -> Fut + Send + Sync + 'static,
155        Fut: Future<Output = Option<Vec<FunctionResponse>>> + Send + 'static,
156    {
157        self.callbacks.on_tool_call = Some(Arc::new(move |calls, state| Box::pin(f(calls, state))));
158        self
159    }
160
161    /// Called when model turn completes.
162    pub fn on_turn_complete<F, Fut>(mut self, f: F) -> Self
163    where
164        F: Fn() -> Fut + Send + Sync + 'static,
165        Fut: Future<Output = ()> + Send + 'static,
166    {
167        self.callbacks.on_turn_complete = Some(Arc::new(move || Box::pin(f())));
168        self
169    }
170
171    /// Called when server sends GoAway.
172    pub fn on_go_away<F, Fut>(mut self, f: F) -> Self
173    where
174        F: Fn(Duration) -> Fut + Send + Sync + 'static,
175        Fut: Future<Output = ()> + Send + 'static,
176    {
177        self.callbacks.on_go_away = Some(Arc::new(move |d| Box::pin(f(d))));
178        self
179    }
180
181    /// Called when session connects (setup complete).
182    ///
183    /// Receives a `SessionWriter` for sending messages on connect.
184    pub fn on_connected<F, Fut>(mut self, f: F) -> Self
185    where
186        F: Fn(Arc<dyn gemini_genai_rs::session::SessionWriter>) -> Fut + Send + Sync + 'static,
187        Fut: Future<Output = ()> + Send + 'static,
188    {
189        self.callbacks.on_connected = Some(Arc::new(move |w| Box::pin(f(w))));
190        self
191    }
192
193    /// Called when session disconnects.
194    pub fn on_disconnected<F, Fut>(mut self, f: F) -> Self
195    where
196        F: Fn(Option<String>) -> Fut + Send + Sync + 'static,
197        Fut: Future<Output = ()> + Send + 'static,
198    {
199        self.callbacks.on_disconnected = Some(Arc::new(move |r| Box::pin(f(r))));
200        self
201    }
202
203    /// Called on non-fatal errors.
204    pub fn on_error<F, Fut>(mut self, f: F) -> Self
205    where
206        F: Fn(String) -> Fut + Send + Sync + 'static,
207        Fut: Future<Output = ()> + Send + 'static,
208    {
209        self.callbacks.on_error = Some(Arc::new(move |e| Box::pin(f(e))));
210        self
211    }
212
213    // -- Concurrent callback variants --
214    // These set CallbackMode::Concurrent so the callback is spawned as a
215    // detached tokio task instead of being awaited inline.
216
217    /// Called when model turn completes (spawned concurrently).
218    pub fn on_turn_complete_concurrent<F, Fut>(mut self, f: F) -> Self
219    where
220        F: Fn() -> Fut + Send + Sync + 'static,
221        Fut: Future<Output = ()> + Send + 'static,
222    {
223        self.callbacks.on_turn_complete = Some(Arc::new(move || Box::pin(f())));
224        self.callbacks.on_turn_complete_mode = CallbackMode::Concurrent;
225        self
226    }
227
228    /// Called when session connects (spawned concurrently).
229    pub fn on_connected_concurrent<F, Fut>(mut self, f: F) -> Self
230    where
231        F: Fn(Arc<dyn gemini_genai_rs::session::SessionWriter>) -> Fut + Send + Sync + 'static,
232        Fut: Future<Output = ()> + Send + 'static,
233    {
234        self.callbacks.on_connected = Some(Arc::new(move |w| Box::pin(f(w))));
235        self.callbacks.on_connected_mode = CallbackMode::Concurrent;
236        self
237    }
238
239    /// Called when session disconnects (spawned concurrently).
240    pub fn on_disconnected_concurrent<F, Fut>(mut self, f: F) -> Self
241    where
242        F: Fn(Option<String>) -> Fut + Send + Sync + 'static,
243        Fut: Future<Output = ()> + Send + 'static,
244    {
245        self.callbacks.on_disconnected = Some(Arc::new(move |r| Box::pin(f(r))));
246        self.callbacks.on_disconnected_mode = CallbackMode::Concurrent;
247        self
248    }
249
250    /// Called on non-fatal errors (spawned concurrently).
251    pub fn on_error_concurrent<F, Fut>(mut self, f: F) -> Self
252    where
253        F: Fn(String) -> Fut + Send + Sync + 'static,
254        Fut: Future<Output = ()> + Send + 'static,
255    {
256        self.callbacks.on_error = Some(Arc::new(move |e| Box::pin(f(e))));
257        self.callbacks.on_error_mode = CallbackMode::Concurrent;
258        self
259    }
260
261    /// Called when server sends GoAway (spawned concurrently).
262    pub fn on_go_away_concurrent<F, Fut>(mut self, f: F) -> Self
263    where
264        F: Fn(Duration) -> Fut + Send + Sync + 'static,
265        Fut: Future<Output = ()> + Send + 'static,
266    {
267        self.callbacks.on_go_away = Some(Arc::new(move |d| Box::pin(f(d))));
268        self.callbacks.on_go_away_mode = CallbackMode::Concurrent;
269        self
270    }
271
272    /// Called when a TurnExtractor produces a result (spawned concurrently).
273    pub fn on_extracted_concurrent<F, Fut>(mut self, f: F) -> Self
274    where
275        F: Fn(String, serde_json::Value) -> Fut + Send + Sync + 'static,
276        Fut: Future<Output = ()> + Send + 'static,
277    {
278        self.callbacks.on_extracted = Some(Arc::new(move |name, value| Box::pin(f(name, value))));
279        self.callbacks.on_extracted_mode = CallbackMode::Concurrent;
280        self
281    }
282
283    /// Called when a TurnExtractor fails (spawned concurrently).
284    pub fn on_extraction_error_concurrent<F, Fut>(mut self, f: F) -> Self
285    where
286        F: Fn(String, String) -> Fut + Send + Sync + 'static,
287        Fut: Future<Output = ()> + Send + 'static,
288    {
289        self.callbacks.on_extraction_error =
290            Some(Arc::new(move |name, error| Box::pin(f(name, error))));
291        self.callbacks.on_extraction_error_mode = CallbackMode::Concurrent;
292        self
293    }
294}