gemini_adk_rs/live/
callbacks.rs

1//! Typed callback registry for Live session events.
2//!
3//! Fast lane callbacks (sync, < 1ms): audio, text, transcripts, VAD.
4//! Control lane callbacks (async, can block): tool calls, lifecycle, interruptions.
5//! Outbound interceptors: transform tool responses, inject context at turn boundaries.
6//!
7//! # Callback Modes
8//!
9//! Each control-lane callback has an associated [`CallbackMode`]:
10//!
11//! - [`Blocking`](CallbackMode::Blocking) — awaited inline. The event loop
12//!   waits for completion before processing the next event. Guarantees
13//!   ordering and state consistency.
14//! - [`Concurrent`](CallbackMode::Concurrent) — spawned as a detached tokio
15//!   task. The event loop continues immediately. Use for fire-and-forget
16//!   work (logging, background agent dispatch, analytics).
17//!
18//! Fast-lane callbacks (audio, text, VAD) are always sync and inline.
19//! Interceptors (`before_tool_response`, `on_turn_boundary`) are always blocking.
20//!
21//! Some control-lane callbacks are forced-blocking (no concurrent variant):
22//! `on_interrupted` (must clear state before audio resumes),
23//! `on_tool_call` (return value is the tool response).
24
25use std::sync::Arc;
26use std::time::Duration;
27
28use bytes::Bytes;
29use gemini_genai_rs::prelude::{FunctionCall, FunctionResponse, SessionPhase, UsageMetadata};
30use gemini_genai_rs::session::SessionWriter;
31
32use super::BoxFuture;
33use crate::state::State;
34
35/// Controls how a control-lane callback is executed relative to the event loop.
36///
37/// Each control-lane callback in [`EventCallbacks`] has a companion `_mode` field
38/// (e.g., `on_turn_complete_mode`) that determines execution semantics.
39///
40/// At the L2 fluent API level, use `_concurrent` suffixed methods (e.g.,
41/// `on_turn_complete_concurrent()`) to set both the callback and its mode
42/// in a single call.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44pub enum CallbackMode {
45    /// Callback is awaited inline — the event loop waits for completion.
46    ///
47    /// Use when subsequent events depend on the callback's side effects,
48    /// or when ordering guarantees are required.
49    #[default]
50    Blocking,
51    /// Callback is spawned as a concurrent task — the event loop continues immediately.
52    ///
53    /// Use for fire-and-forget work: logging, analytics, webhook dispatch,
54    /// background agent triggering. The callback runs in a detached tokio task.
55    Concurrent,
56}
57
58// ── Named callback types ──────────────────────────────────────────────────
59// These aliases are the vocabulary of the callback registry: every field
60// below (and the corresponding L2 setters) is one of these shapes.
61
62/// Fast-lane sync callback over a raw audio chunk.
63pub type AudioCallback = Box<dyn Fn(&Bytes) + Send + Sync>;
64/// Fast-lane sync callback over a text payload (delta, accumulated text, thought).
65pub type TextCallback = Box<dyn Fn(&str) + Send + Sync>;
66/// Fast-lane sync callback over a transcript chunk with its `is_final` flag.
67pub type TranscriptCallback = Box<dyn Fn(&str, bool) + Send + Sync>;
68/// Fast-lane sync callback with no payload (VAD start/end).
69pub type SignalCallback = Box<dyn Fn() + Send + Sync>;
70/// Fast-lane sync callback over a session phase change.
71pub type PhaseCallback = Box<dyn Fn(SessionPhase) + Send + Sync>;
72/// Fast-lane sync callback over usage metadata.
73pub type UsageCallback = Box<dyn Fn(&UsageMetadata) + Send + Sync>;
74
75/// Control-lane async callback with no payload.
76pub type AsyncCallback = Arc<dyn Fn() -> BoxFuture<()> + Send + Sync>;
77/// Control-lane async callback over one payload value.
78pub type AsyncCallbackWith<T> = Arc<dyn Fn(T) -> BoxFuture<()> + Send + Sync>;
79/// Control-lane async callback over two payload values.
80pub type AsyncCallbackWith2<A, B> = Arc<dyn Fn(A, B) -> BoxFuture<()> + Send + Sync>;
81/// Tool-call override: return `Some(responses)` to reply, `None` to defer to
82/// auto-dispatch via the registered `ToolDispatcher`.
83pub type ToolCallCallback =
84    Arc<dyn Fn(Vec<FunctionCall>, State) -> BoxFuture<Option<Vec<FunctionResponse>>> + Send + Sync>;
85/// Middleware over outgoing tool responses (inspect/rewrite before send).
86pub type BeforeToolResponseCallback =
87    Arc<dyn Fn(Vec<FunctionResponse>, State) -> BoxFuture<Vec<FunctionResponse>> + Send + Sync>;
88/// Sync state-reactive instruction generator (`None` = leave unchanged).
89pub type InstructionFn = Arc<dyn Fn(&State) -> Option<String> + Send + Sync>;
90
91/// Typed callback registry for Live session events.
92///
93/// Callbacks are divided into two lanes:
94/// - **Fast lane** (sync): Called inline, must be < 1ms. For audio, text, transcripts, VAD.
95/// - **Control lane** (async): Awaited on a dedicated task. For tool calls, lifecycle, interruptions.
96pub struct EventCallbacks {
97    // -- Fast lane (sync callbacks) --
98    /// Called for each audio chunk from the model (PCM16 24kHz).
99    pub on_audio: Option<AudioCallback>,
100    /// Called for each incremental text delta from the model.
101    pub on_text: Option<TextCallback>,
102    /// Called when the model completes a text response.
103    pub on_text_complete: Option<TextCallback>,
104    /// Called for input (user speech) transcription updates.
105    pub on_input_transcript: Option<TranscriptCallback>,
106    /// Called for output (model speech) transcription updates.
107    pub on_output_transcript: Option<TranscriptCallback>,
108    /// Called when the model emits a thought/reasoning summary (when includeThoughts is enabled).
109    pub on_thought: Option<TextCallback>,
110    /// Called when server-side VAD detects voice activity start.
111    pub on_vad_start: Option<SignalCallback>,
112    /// Called when server-side VAD detects voice activity end.
113    pub on_vad_end: Option<SignalCallback>,
114    /// Called on session phase transitions.
115    pub on_phase: Option<PhaseCallback>,
116    /// Called when server sends token usage metadata.
117    pub on_usage: Option<UsageCallback>,
118
119    // -- Control lane (async callbacks) --
120    /// Called when the model is interrupted by barge-in.
121    pub on_interrupted: Option<AsyncCallback>,
122    /// Called when model requests tool execution.
123    /// Return `None` to use auto-dispatch (ToolDispatcher), `Some` to override.
124    /// Receives State for natural state promotion from tool results.
125    pub on_tool_call: Option<ToolCallCallback>,
126    /// Called when server cancels pending tool calls.
127    pub on_tool_cancelled: Option<AsyncCallbackWith<Vec<String>>>,
128    /// Called when the model completes its turn.
129    pub on_turn_complete: Option<AsyncCallback>,
130    /// Called when the model finishes generating its full intended response,
131    /// before any interruption truncation (the wire `GenerationComplete`).
132    pub on_generation_complete: Option<AsyncCallback>,
133    /// Called when server sends GoAway (session ending soon).
134    pub on_go_away: Option<AsyncCallbackWith<Duration>>,
135    /// Called when session setup completes (connected).
136    ///
137    /// Receives a `SessionWriter` for sending messages on connect (e.g. greeting prompts).
138    pub on_connected: Option<AsyncCallbackWith<Arc<dyn SessionWriter>>>,
139    /// Called when session disconnects.
140    pub on_disconnected: Option<AsyncCallbackWith<Option<String>>>,
141    /// Called after session resumes from GoAway.
142    pub on_resumed: Option<AsyncCallback>,
143    /// Called on non-fatal errors.
144    pub on_error: Option<AsyncCallbackWith<String>>,
145    /// Called when agent transfer occurs (from, to).
146    pub on_transfer: Option<AsyncCallbackWith2<String, String>>,
147    /// Called when a TurnExtractor produces a result (extractor_name, value).
148    pub on_extracted: Option<AsyncCallbackWith2<String, serde_json::Value>>,
149    /// Called when a TurnExtractor fails (extractor_name, error_message).
150    ///
151    /// By default, extraction failures are logged via `tracing::warn!`.
152    /// Register this callback to implement custom error handling (retry, alert, etc.).
153    pub on_extraction_error: Option<AsyncCallbackWith2<String, String>>,
154
155    // -- Callback modes (control-lane only) --
156    /// Execution mode for [`on_turn_complete`](Self::on_turn_complete).
157    pub on_turn_complete_mode: CallbackMode,
158    /// Execution mode for [`on_generation_complete`](Self::on_generation_complete).
159    pub on_generation_complete_mode: CallbackMode,
160    /// Execution mode for [`on_connected`](Self::on_connected).
161    pub on_connected_mode: CallbackMode,
162    /// Execution mode for [`on_disconnected`](Self::on_disconnected).
163    pub on_disconnected_mode: CallbackMode,
164    /// Execution mode for [`on_error`](Self::on_error).
165    pub on_error_mode: CallbackMode,
166    /// Execution mode for [`on_go_away`](Self::on_go_away).
167    pub on_go_away_mode: CallbackMode,
168    /// Execution mode for [`on_extracted`](Self::on_extracted).
169    pub on_extracted_mode: CallbackMode,
170    /// Execution mode for [`on_extraction_error`](Self::on_extraction_error).
171    pub on_extraction_error_mode: CallbackMode,
172    /// Execution mode for [`on_tool_cancelled`](Self::on_tool_cancelled).
173    pub on_tool_cancelled_mode: CallbackMode,
174    /// Execution mode for [`on_transfer`](Self::on_transfer).
175    pub on_transfer_mode: CallbackMode,
176    /// Execution mode for [`on_resumed`](Self::on_resumed).
177    pub on_resumed_mode: CallbackMode,
178
179    // -- Outbound interceptors (transform data going to Gemini) --
180    /// Intercept tool responses before sending to Gemini.
181    ///
182    /// Receives the tool responses and shared State. Returns (potentially modified)
183    /// responses. Use this to rewrite, augment, or filter tool results based on
184    /// conversation state.
185    pub before_tool_response: Option<BeforeToolResponseCallback>,
186
187    /// Called at turn boundaries (after extractors, before `on_turn_complete`).
188    ///
189    /// Receives shared State and a SessionWriter for injecting content into
190    /// the conversation. Use this for context stuffing, K/V injection, condensed
191    /// state summaries, or any outbound content interleaving.
192    pub on_turn_boundary: Option<AsyncCallbackWith2<State, Arc<dyn SessionWriter>>>,
193
194    /// State-reactive system instruction template (full replacement).
195    ///
196    /// Called after extractors run on each TurnComplete. If it returns
197    /// `Some(instruction)`, the system instruction is updated mid-session.
198    /// Returns `None` to leave the instruction unchanged.
199    ///
200    /// This is sync (no async) because instruction generation should be fast.
201    pub instruction_template: Option<InstructionFn>,
202
203    /// State-reactive instruction amendment (additive, not replacement).
204    ///
205    /// Called after extractors and phase transitions on each TurnComplete.
206    /// If it returns `Some(text)`, the text is appended to the current phase
207    /// instruction (separated by `\n\n`). Returns `None` to skip amendment.
208    ///
209    /// Unlike `instruction_template` (which replaces the entire instruction),
210    /// this only adds to the phase instruction — the developer never needs to
211    /// know or repeat the base instruction.
212    pub instruction_amendment: Option<InstructionFn>,
213}
214
215impl Default for EventCallbacks {
216    fn default() -> Self {
217        Self {
218            on_audio: None,
219            on_text: None,
220            on_text_complete: None,
221            on_input_transcript: None,
222            on_output_transcript: None,
223            on_thought: None,
224            on_vad_start: None,
225            on_vad_end: None,
226            on_phase: None,
227            on_usage: None,
228            on_interrupted: None,
229            on_tool_call: None,
230            on_tool_cancelled: None,
231            on_turn_complete: None,
232            on_generation_complete: None,
233            on_go_away: None,
234            on_connected: None,
235            on_disconnected: None,
236            on_resumed: None,
237            on_error: None,
238            on_transfer: None,
239            on_extracted: None,
240            on_extraction_error: None,
241            on_turn_complete_mode: CallbackMode::Blocking,
242            on_generation_complete_mode: CallbackMode::Blocking,
243            on_connected_mode: CallbackMode::Blocking,
244            on_disconnected_mode: CallbackMode::Blocking,
245            on_error_mode: CallbackMode::Blocking,
246            on_go_away_mode: CallbackMode::Blocking,
247            on_extracted_mode: CallbackMode::Blocking,
248            on_extraction_error_mode: CallbackMode::Blocking,
249            on_tool_cancelled_mode: CallbackMode::Blocking,
250            on_transfer_mode: CallbackMode::Blocking,
251            on_resumed_mode: CallbackMode::Blocking,
252            before_tool_response: None,
253            on_turn_boundary: None,
254            instruction_template: None,
255            instruction_amendment: None,
256        }
257    }
258}
259
260impl std::fmt::Debug for EventCallbacks {
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        f.debug_struct("EventCallbacks")
263            .field("on_audio", &self.on_audio.is_some())
264            .field("on_text", &self.on_text.is_some())
265            .field("on_text_complete", &self.on_text_complete.is_some())
266            .field("on_input_transcript", &self.on_input_transcript.is_some())
267            .field("on_output_transcript", &self.on_output_transcript.is_some())
268            .field("on_thought", &self.on_thought.is_some())
269            .field("on_vad_start", &self.on_vad_start.is_some())
270            .field("on_vad_end", &self.on_vad_end.is_some())
271            .field("on_phase", &self.on_phase.is_some())
272            .field("on_usage", &self.on_usage.is_some())
273            .field("on_interrupted", &self.on_interrupted.is_some())
274            .field("on_tool_call", &self.on_tool_call.is_some())
275            .field("on_tool_cancelled", &self.on_tool_cancelled.is_some())
276            .field("on_turn_complete", &self.on_turn_complete.is_some())
277            .field("on_go_away", &self.on_go_away.is_some())
278            .field("on_connected", &self.on_connected.is_some())
279            .field("on_disconnected", &self.on_disconnected.is_some())
280            .field("on_resumed", &self.on_resumed.is_some())
281            .field("on_error", &self.on_error.is_some())
282            .field("on_transfer", &self.on_transfer.is_some())
283            .field("on_extracted", &self.on_extracted.is_some())
284            .field("on_extraction_error", &self.on_extraction_error.is_some())
285            .field("on_turn_complete_mode", &self.on_turn_complete_mode)
286            .field("on_connected_mode", &self.on_connected_mode)
287            .field("on_disconnected_mode", &self.on_disconnected_mode)
288            .field("on_error_mode", &self.on_error_mode)
289            .field("on_go_away_mode", &self.on_go_away_mode)
290            .field("on_extracted_mode", &self.on_extracted_mode)
291            .field("on_extraction_error_mode", &self.on_extraction_error_mode)
292            .field("on_tool_cancelled_mode", &self.on_tool_cancelled_mode)
293            .field("on_transfer_mode", &self.on_transfer_mode)
294            .field("on_resumed_mode", &self.on_resumed_mode)
295            .field("before_tool_response", &self.before_tool_response.is_some())
296            .field("on_turn_boundary", &self.on_turn_boundary.is_some())
297            .field("instruction_template", &self.instruction_template.is_some())
298            .field(
299                "instruction_amendment",
300                &self.instruction_amendment.is_some(),
301            )
302            .finish()
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309
310    #[test]
311    fn default_callbacks_all_none() {
312        let cb = EventCallbacks::default();
313        assert!(cb.on_audio.is_none());
314        assert!(cb.on_text.is_none());
315        assert!(cb.on_interrupted.is_none());
316        assert!(cb.on_tool_call.is_none());
317    }
318
319    #[test]
320    fn sync_callback_callable() {
321        let mut cb = EventCallbacks::default();
322        let called = Arc::new(std::sync::atomic::AtomicBool::new(false));
323        let called_clone = called.clone();
324        cb.on_text = Some(Box::new(move |_text| {
325            called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
326        }));
327        if let Some(f) = &cb.on_text {
328            f("hello");
329        }
330        assert!(called.load(std::sync::atomic::Ordering::SeqCst));
331    }
332
333    #[test]
334    fn callback_mode_defaults_to_blocking() {
335        let cb = EventCallbacks::default();
336        assert_eq!(cb.on_turn_complete_mode, CallbackMode::Blocking);
337        assert_eq!(cb.on_connected_mode, CallbackMode::Blocking);
338        assert_eq!(cb.on_disconnected_mode, CallbackMode::Blocking);
339        assert_eq!(cb.on_error_mode, CallbackMode::Blocking);
340        assert_eq!(cb.on_go_away_mode, CallbackMode::Blocking);
341        assert_eq!(cb.on_extracted_mode, CallbackMode::Blocking);
342        assert_eq!(cb.on_extraction_error_mode, CallbackMode::Blocking);
343        assert_eq!(cb.on_tool_cancelled_mode, CallbackMode::Blocking);
344        assert_eq!(cb.on_transfer_mode, CallbackMode::Blocking);
345        assert_eq!(cb.on_resumed_mode, CallbackMode::Blocking);
346    }
347
348    #[test]
349    fn debug_shows_registered() {
350        let cb = EventCallbacks {
351            on_audio: Some(Box::new(|_| {})),
352            ..Default::default()
353        };
354        let debug = format!("{:?}", cb);
355        assert!(debug.contains("on_audio: true"));
356        assert!(debug.contains("on_text: false"));
357    }
358}