gemini_adk_rs/live/callbacks.rs
1//! Typed callback registry for Live session events.
2//!
3//! Fast lane callbacks (sync, < 1ms): audio, text, transcripts, VAD.
4//! Control lane callbacks (async, can block): tool calls, lifecycle, interruptions.
5//! Outbound interceptors: transform tool responses, inject context at turn boundaries.
6//!
7//! # Callback Modes
8//!
9//! Each control-lane callback has an associated [`CallbackMode`]:
10//!
11//! - [`Blocking`](CallbackMode::Blocking) — awaited inline. The event loop
12//! waits for completion before processing the next event. Guarantees
13//! ordering and state consistency.
14//! - [`Concurrent`](CallbackMode::Concurrent) — spawned as a detached tokio
15//! task. The event loop continues immediately. Use for fire-and-forget
16//! work (logging, background agent dispatch, analytics).
17//!
18//! Fast-lane callbacks (audio, text, VAD) are always sync and inline.
19//! Interceptors (`before_tool_response`, `on_turn_boundary`) are always blocking.
20//!
21//! Some control-lane callbacks are forced-blocking (no concurrent variant):
22//! `on_interrupted` (must clear state before audio resumes),
23//! `on_tool_call` (return value is the tool response).
24
25use std::sync::Arc;
26use std::time::Duration;
27
28use bytes::Bytes;
29use gemini_genai_rs::prelude::{FunctionCall, FunctionResponse, SessionPhase, UsageMetadata};
30use gemini_genai_rs::session::SessionWriter;
31
32use super::BoxFuture;
33use crate::state::State;
34
35/// Controls how a control-lane callback is executed relative to the event loop.
36///
37/// Each control-lane callback in [`EventCallbacks`] has a companion `_mode` field
38/// (e.g., `on_turn_complete_mode`) that determines execution semantics.
39///
40/// At the L2 fluent API level, use `_concurrent` suffixed methods (e.g.,
41/// `on_turn_complete_concurrent()`) to set both the callback and its mode
42/// in a single call.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44pub enum CallbackMode {
45 /// Callback is awaited inline — the event loop waits for completion.
46 ///
47 /// Use when subsequent events depend on the callback's side effects,
48 /// or when ordering guarantees are required.
49 #[default]
50 Blocking,
51 /// Callback is spawned as a concurrent task — the event loop continues immediately.
52 ///
53 /// Use for fire-and-forget work: logging, analytics, webhook dispatch,
54 /// background agent triggering. The callback runs in a detached tokio task.
55 Concurrent,
56}
57
58// ── Named callback types ──────────────────────────────────────────────────
59// These aliases are the vocabulary of the callback registry: every field
60// below (and the corresponding L2 setters) is one of these shapes.
61
62/// Fast-lane sync callback over a raw audio chunk.
63pub type AudioCallback = Box<dyn Fn(&Bytes) + Send + Sync>;
64/// Fast-lane sync callback over a text payload (delta, accumulated text, thought).
65pub type TextCallback = Box<dyn Fn(&str) + Send + Sync>;
66/// Fast-lane sync callback over a transcript chunk with its `is_final` flag.
67pub type TranscriptCallback = Box<dyn Fn(&str, bool) + Send + Sync>;
68/// Fast-lane sync callback with no payload (VAD start/end).
69pub type SignalCallback = Box<dyn Fn() + Send + Sync>;
70/// Fast-lane sync callback over a session phase change.
71pub type PhaseCallback = Box<dyn Fn(SessionPhase) + Send + Sync>;
72/// Fast-lane sync callback over usage metadata.
73pub type UsageCallback = Box<dyn Fn(&UsageMetadata) + Send + Sync>;
74
75/// Control-lane async callback with no payload.
76pub type AsyncCallback = Arc<dyn Fn() -> BoxFuture<()> + Send + Sync>;
77/// Control-lane async callback over one payload value.
78pub type AsyncCallbackWith<T> = Arc<dyn Fn(T) -> BoxFuture<()> + Send + Sync>;
79/// Control-lane async callback over two payload values.
80pub type AsyncCallbackWith2<A, B> = Arc<dyn Fn(A, B) -> BoxFuture<()> + Send + Sync>;
81/// Tool-call override: return `Some(responses)` to reply, `None` to defer to
82/// auto-dispatch via the registered `ToolDispatcher`.
83pub type ToolCallCallback =
84 Arc<dyn Fn(Vec<FunctionCall>, State) -> BoxFuture<Option<Vec<FunctionResponse>>> + Send + Sync>;
85/// Middleware over outgoing tool responses (inspect/rewrite before send).
86pub type BeforeToolResponseCallback =
87 Arc<dyn Fn(Vec<FunctionResponse>, State) -> BoxFuture<Vec<FunctionResponse>> + Send + Sync>;
88/// Sync state-reactive instruction generator (`None` = leave unchanged).
89pub type InstructionFn = Arc<dyn Fn(&State) -> Option<String> + Send + Sync>;
90
91/// Typed callback registry for Live session events.
92///
93/// Callbacks are divided into two lanes:
94/// - **Fast lane** (sync): Called inline, must be < 1ms. For audio, text, transcripts, VAD.
95/// - **Control lane** (async): Awaited on a dedicated task. For tool calls, lifecycle, interruptions.
96pub struct EventCallbacks {
97 // -- Fast lane (sync callbacks) --
98 /// Called for each audio chunk from the model (PCM16 24kHz).
99 pub on_audio: Option<AudioCallback>,
100 /// Called for each incremental text delta from the model.
101 pub on_text: Option<TextCallback>,
102 /// Called when the model completes a text response.
103 pub on_text_complete: Option<TextCallback>,
104 /// Called for input (user speech) transcription updates.
105 pub on_input_transcript: Option<TranscriptCallback>,
106 /// Called for output (model speech) transcription updates.
107 pub on_output_transcript: Option<TranscriptCallback>,
108 /// Called when the model emits a thought/reasoning summary (when includeThoughts is enabled).
109 pub on_thought: Option<TextCallback>,
110 /// Called when server-side VAD detects voice activity start.
111 pub on_vad_start: Option<SignalCallback>,
112 /// Called when server-side VAD detects voice activity end.
113 pub on_vad_end: Option<SignalCallback>,
114 /// Called on session phase transitions.
115 pub on_phase: Option<PhaseCallback>,
116 /// Called when server sends token usage metadata.
117 pub on_usage: Option<UsageCallback>,
118
119 // -- Control lane (async callbacks) --
120 /// Called when the model is interrupted by barge-in.
121 pub on_interrupted: Option<AsyncCallback>,
122 /// Called when model requests tool execution.
123 /// Return `None` to use auto-dispatch (ToolDispatcher), `Some` to override.
124 /// Receives State for natural state promotion from tool results.
125 pub on_tool_call: Option<ToolCallCallback>,
126 /// Called when server cancels pending tool calls.
127 pub on_tool_cancelled: Option<AsyncCallbackWith<Vec<String>>>,
128 /// Called when the model completes its turn.
129 pub on_turn_complete: Option<AsyncCallback>,
130 /// Called when the model finishes generating its full intended response,
131 /// before any interruption truncation (the wire `GenerationComplete`).
132 pub on_generation_complete: Option<AsyncCallback>,
133 /// Called when server sends GoAway (session ending soon).
134 pub on_go_away: Option<AsyncCallbackWith<Duration>>,
135 /// Called when session setup completes (connected).
136 ///
137 /// Receives a `SessionWriter` for sending messages on connect (e.g. greeting prompts).
138 pub on_connected: Option<AsyncCallbackWith<Arc<dyn SessionWriter>>>,
139 /// Called when session disconnects.
140 pub on_disconnected: Option<AsyncCallbackWith<Option<String>>>,
141 /// Called after session resumes from GoAway.
142 pub on_resumed: Option<AsyncCallback>,
143 /// Called on non-fatal errors.
144 pub on_error: Option<AsyncCallbackWith<String>>,
145 /// Called when agent transfer occurs (from, to).
146 pub on_transfer: Option<AsyncCallbackWith2<String, String>>,
147 /// Called when a TurnExtractor produces a result (extractor_name, value).
148 pub on_extracted: Option<AsyncCallbackWith2<String, serde_json::Value>>,
149 /// Called when a TurnExtractor fails (extractor_name, error_message).
150 ///
151 /// By default, extraction failures are logged via `tracing::warn!`.
152 /// Register this callback to implement custom error handling (retry, alert, etc.).
153 pub on_extraction_error: Option<AsyncCallbackWith2<String, String>>,
154
155 // -- Callback modes (control-lane only) --
156 /// Execution mode for [`on_turn_complete`](Self::on_turn_complete).
157 pub on_turn_complete_mode: CallbackMode,
158 /// Execution mode for [`on_generation_complete`](Self::on_generation_complete).
159 pub on_generation_complete_mode: CallbackMode,
160 /// Execution mode for [`on_connected`](Self::on_connected).
161 pub on_connected_mode: CallbackMode,
162 /// Execution mode for [`on_disconnected`](Self::on_disconnected).
163 pub on_disconnected_mode: CallbackMode,
164 /// Execution mode for [`on_error`](Self::on_error).
165 pub on_error_mode: CallbackMode,
166 /// Execution mode for [`on_go_away`](Self::on_go_away).
167 pub on_go_away_mode: CallbackMode,
168 /// Execution mode for [`on_extracted`](Self::on_extracted).
169 pub on_extracted_mode: CallbackMode,
170 /// Execution mode for [`on_extraction_error`](Self::on_extraction_error).
171 pub on_extraction_error_mode: CallbackMode,
172 /// Execution mode for [`on_tool_cancelled`](Self::on_tool_cancelled).
173 pub on_tool_cancelled_mode: CallbackMode,
174 /// Execution mode for [`on_transfer`](Self::on_transfer).
175 pub on_transfer_mode: CallbackMode,
176 /// Execution mode for [`on_resumed`](Self::on_resumed).
177 pub on_resumed_mode: CallbackMode,
178
179 // -- Outbound interceptors (transform data going to Gemini) --
180 /// Intercept tool responses before sending to Gemini.
181 ///
182 /// Receives the tool responses and shared State. Returns (potentially modified)
183 /// responses. Use this to rewrite, augment, or filter tool results based on
184 /// conversation state.
185 pub before_tool_response: Option<BeforeToolResponseCallback>,
186
187 /// Called at turn boundaries (after extractors, before `on_turn_complete`).
188 ///
189 /// Receives shared State and a SessionWriter for injecting content into
190 /// the conversation. Use this for context stuffing, K/V injection, condensed
191 /// state summaries, or any outbound content interleaving.
192 pub on_turn_boundary: Option<AsyncCallbackWith2<State, Arc<dyn SessionWriter>>>,
193
194 /// State-reactive system instruction template (full replacement).
195 ///
196 /// Called after extractors run on each TurnComplete. If it returns
197 /// `Some(instruction)`, the system instruction is updated mid-session.
198 /// Returns `None` to leave the instruction unchanged.
199 ///
200 /// This is sync (no async) because instruction generation should be fast.
201 pub instruction_template: Option<InstructionFn>,
202
203 /// State-reactive instruction amendment (additive, not replacement).
204 ///
205 /// Called after extractors and phase transitions on each TurnComplete.
206 /// If it returns `Some(text)`, the text is appended to the current phase
207 /// instruction (separated by `\n\n`). Returns `None` to skip amendment.
208 ///
209 /// Unlike `instruction_template` (which replaces the entire instruction),
210 /// this only adds to the phase instruction — the developer never needs to
211 /// know or repeat the base instruction.
212 pub instruction_amendment: Option<InstructionFn>,
213}
214
215impl Default for EventCallbacks {
216 fn default() -> Self {
217 Self {
218 on_audio: None,
219 on_text: None,
220 on_text_complete: None,
221 on_input_transcript: None,
222 on_output_transcript: None,
223 on_thought: None,
224 on_vad_start: None,
225 on_vad_end: None,
226 on_phase: None,
227 on_usage: None,
228 on_interrupted: None,
229 on_tool_call: None,
230 on_tool_cancelled: None,
231 on_turn_complete: None,
232 on_generation_complete: None,
233 on_go_away: None,
234 on_connected: None,
235 on_disconnected: None,
236 on_resumed: None,
237 on_error: None,
238 on_transfer: None,
239 on_extracted: None,
240 on_extraction_error: None,
241 on_turn_complete_mode: CallbackMode::Blocking,
242 on_generation_complete_mode: CallbackMode::Blocking,
243 on_connected_mode: CallbackMode::Blocking,
244 on_disconnected_mode: CallbackMode::Blocking,
245 on_error_mode: CallbackMode::Blocking,
246 on_go_away_mode: CallbackMode::Blocking,
247 on_extracted_mode: CallbackMode::Blocking,
248 on_extraction_error_mode: CallbackMode::Blocking,
249 on_tool_cancelled_mode: CallbackMode::Blocking,
250 on_transfer_mode: CallbackMode::Blocking,
251 on_resumed_mode: CallbackMode::Blocking,
252 before_tool_response: None,
253 on_turn_boundary: None,
254 instruction_template: None,
255 instruction_amendment: None,
256 }
257 }
258}
259
260impl std::fmt::Debug for EventCallbacks {
261 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 f.debug_struct("EventCallbacks")
263 .field("on_audio", &self.on_audio.is_some())
264 .field("on_text", &self.on_text.is_some())
265 .field("on_text_complete", &self.on_text_complete.is_some())
266 .field("on_input_transcript", &self.on_input_transcript.is_some())
267 .field("on_output_transcript", &self.on_output_transcript.is_some())
268 .field("on_thought", &self.on_thought.is_some())
269 .field("on_vad_start", &self.on_vad_start.is_some())
270 .field("on_vad_end", &self.on_vad_end.is_some())
271 .field("on_phase", &self.on_phase.is_some())
272 .field("on_usage", &self.on_usage.is_some())
273 .field("on_interrupted", &self.on_interrupted.is_some())
274 .field("on_tool_call", &self.on_tool_call.is_some())
275 .field("on_tool_cancelled", &self.on_tool_cancelled.is_some())
276 .field("on_turn_complete", &self.on_turn_complete.is_some())
277 .field("on_go_away", &self.on_go_away.is_some())
278 .field("on_connected", &self.on_connected.is_some())
279 .field("on_disconnected", &self.on_disconnected.is_some())
280 .field("on_resumed", &self.on_resumed.is_some())
281 .field("on_error", &self.on_error.is_some())
282 .field("on_transfer", &self.on_transfer.is_some())
283 .field("on_extracted", &self.on_extracted.is_some())
284 .field("on_extraction_error", &self.on_extraction_error.is_some())
285 .field("on_turn_complete_mode", &self.on_turn_complete_mode)
286 .field("on_connected_mode", &self.on_connected_mode)
287 .field("on_disconnected_mode", &self.on_disconnected_mode)
288 .field("on_error_mode", &self.on_error_mode)
289 .field("on_go_away_mode", &self.on_go_away_mode)
290 .field("on_extracted_mode", &self.on_extracted_mode)
291 .field("on_extraction_error_mode", &self.on_extraction_error_mode)
292 .field("on_tool_cancelled_mode", &self.on_tool_cancelled_mode)
293 .field("on_transfer_mode", &self.on_transfer_mode)
294 .field("on_resumed_mode", &self.on_resumed_mode)
295 .field("before_tool_response", &self.before_tool_response.is_some())
296 .field("on_turn_boundary", &self.on_turn_boundary.is_some())
297 .field("instruction_template", &self.instruction_template.is_some())
298 .field(
299 "instruction_amendment",
300 &self.instruction_amendment.is_some(),
301 )
302 .finish()
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309
310 #[test]
311 fn default_callbacks_all_none() {
312 let cb = EventCallbacks::default();
313 assert!(cb.on_audio.is_none());
314 assert!(cb.on_text.is_none());
315 assert!(cb.on_interrupted.is_none());
316 assert!(cb.on_tool_call.is_none());
317 }
318
319 #[test]
320 fn sync_callback_callable() {
321 let mut cb = EventCallbacks::default();
322 let called = Arc::new(std::sync::atomic::AtomicBool::new(false));
323 let called_clone = called.clone();
324 cb.on_text = Some(Box::new(move |_text| {
325 called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
326 }));
327 if let Some(f) = &cb.on_text {
328 f("hello");
329 }
330 assert!(called.load(std::sync::atomic::Ordering::SeqCst));
331 }
332
333 #[test]
334 fn callback_mode_defaults_to_blocking() {
335 let cb = EventCallbacks::default();
336 assert_eq!(cb.on_turn_complete_mode, CallbackMode::Blocking);
337 assert_eq!(cb.on_connected_mode, CallbackMode::Blocking);
338 assert_eq!(cb.on_disconnected_mode, CallbackMode::Blocking);
339 assert_eq!(cb.on_error_mode, CallbackMode::Blocking);
340 assert_eq!(cb.on_go_away_mode, CallbackMode::Blocking);
341 assert_eq!(cb.on_extracted_mode, CallbackMode::Blocking);
342 assert_eq!(cb.on_extraction_error_mode, CallbackMode::Blocking);
343 assert_eq!(cb.on_tool_cancelled_mode, CallbackMode::Blocking);
344 assert_eq!(cb.on_transfer_mode, CallbackMode::Blocking);
345 assert_eq!(cb.on_resumed_mode, CallbackMode::Blocking);
346 }
347
348 #[test]
349 fn debug_shows_registered() {
350 let cb = EventCallbacks {
351 on_audio: Some(Box::new(|_| {})),
352 ..Default::default()
353 };
354 let debug = format!("{:?}", cb);
355 assert!(debug.contains("on_audio: true"));
356 assert!(debug.contains("on_text: false"));
357 }
358}