gemini_adk_fluent_rs/live/
mod.rs

1//! `Live` — Fluent builder for callback-driven Gemini Live sessions.
2//!
3//! Wraps L1's `LiveSessionBuilder` with ergonomic callback registration
4//! and integration with composition modules (M, T, P).
5//!
6//! # Callback Modes
7//!
8//! Control-lane callbacks support two execution modes via [`gemini_adk_rs::live::CallbackMode`]:
9//!
10//! - **Default methods** (e.g., `.on_turn_complete()`) → [`gemini_adk_rs::live::CallbackMode::Blocking`]
11//! - **`_concurrent` methods** (e.g., `.on_turn_complete_concurrent()`) → [`gemini_adk_rs::live::CallbackMode::Concurrent`]
12//!
13//! Use concurrent mode for fire-and-forget work (logging, analytics, webhook dispatch).
14//!
15//! # Background Tool Execution
16//!
17//! Mark tools for background execution to eliminate dead air in voice sessions:
18//!
19//! ```rust,ignore
20//! Live::builder()
21//!     .tools(dispatcher)
22//!     .tool_background("search_kb")
23//!     .connect_vertex(project, location, token)
24//!     .await?;
25//! ```
26
27mod callbacks;
28mod config;
29mod connect;
30mod extraction;
31mod phases;
32
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36
37use gemini_adk_rs::live::extractor::TurnExtractor;
38use gemini_adk_rs::live::needs::RepairConfig;
39use gemini_adk_rs::live::persistence::SessionPersistence;
40use gemini_adk_rs::live::steering::{ContextDelivery, SteeringMode};
41use gemini_adk_rs::live::{
42    ComputedRegistry, EventCallbacks, InstructionModifier, Phase, TemporalRegistry,
43    ToolExecutionMode, WatcherRegistry,
44};
45use gemini_adk_rs::llm::BaseLlm;
46use gemini_adk_rs::tool::ToolDispatcher;
47use gemini_genai_rs::prelude::*;
48
49/// A deferred agent tool registration (resolved at connect time when State is available).
50pub(crate) struct DeferredAgentTool {
51    pub(crate) name: String,
52    pub(crate) description: String,
53    pub(crate) agent: Arc<dyn gemini_adk_rs::text::TextAgent>,
54}
55
56/// Fluent builder for constructing and connecting Gemini Live sessions.
57///
58/// Accumulates model configuration, callbacks, extractors, phases, watchers,
59/// temporal patterns, and tool execution modes, then connects via one of
60/// the `connect_*` methods.
61///
62/// Control-lane callbacks can be registered with `_concurrent` suffixed
63/// methods for fire-and-forget execution. Tools can be marked for background
64/// execution via [`tool_background()`](Self::tool_background).
65///
66/// # Example
67/// ```ignore
68/// let session = Live::builder()
69///     .model(GeminiModel::Gemini2_0FlashLive)
70///     .voice(Voice::Kore)
71///     .instruction("You are a weather assistant")
72///     .tools(dispatcher)
73///     .on_audio(|data| playback_tx.send(data.clone()).ok())
74///     .on_text(|t| print!("{t}"))
75///     .on_interrupted(|| async { playback.flush().await; })
76///     .connect_vertex("project", "us-central1", token)
77///     .await?;
78/// ```
79///
80/// # Extraction Pipeline
81/// ```ignore
82/// let handle = Live::builder()
83///     .model(GeminiModel::Gemini2_0FlashLive)
84///     .instruction("You are a restaurant order assistant")
85///     .extract_turns::<OrderState>(
86///         flash_llm,
87///         "Extract: items ordered, quantities, modifications, order_phase",
88///     )
89///     .on_extracted(|name, value| async move {
90///         println!("Extracted {name}: {value}");
91///     })
92///     .connect_vertex(project, location, token)
93///     .await?;
94///
95/// // Read latest extraction from shared State at any time:
96/// let order: Option<OrderState> = handle.extracted("OrderState");
97/// ```
98pub struct Live {
99    pub(crate) config: SessionConfig,
100    pub(crate) callbacks: EventCallbacks,
101    pub(crate) dispatcher: Option<ToolDispatcher>,
102    pub(crate) extractors: Vec<Arc<dyn TurnExtractor>>,
103    // L1 registries
104    pub(crate) computed: ComputedRegistry,
105    pub(crate) phases: Vec<Phase>,
106    pub(crate) initial_phase: Option<String>,
107    pub(crate) watchers: WatcherRegistry,
108    pub(crate) temporal: TemporalRegistry,
109    pub(crate) greeting: Option<String>,
110    // Phase defaults: modifiers + prompt_on_enter inherited by all phases.
111    pub(crate) phase_default_modifiers: Vec<InstructionModifier>,
112    pub(crate) phase_default_prompt_on_enter: bool,
113    // Per-tool execution modes (standard vs background).
114    pub(crate) tool_execution_modes: HashMap<String, ToolExecutionMode>,
115    // Deferred agent tools (resolved at connect time).
116    pub(crate) deferred_agent_tools: Vec<DeferredAgentTool>,
117    // LLMs to warm up at connect time.
118    pub(crate) warm_up_llms: Vec<Arc<dyn BaseLlm>>,
119    // Control plane configuration.
120    pub(crate) soft_turn_timeout: Option<Duration>,
121    pub(crate) steering_mode: SteeringMode,
122    pub(crate) context_delivery: ContextDelivery,
123    pub(crate) repair_config: Option<RepairConfig>,
124    pub(crate) persistence: Option<Arc<dyn SessionPersistence>>,
125    pub(crate) session_id: Option<String>,
126    pub(crate) tool_advisory: bool,
127    pub(crate) telemetry_interval: Option<Duration>,
128}
129
130impl Live {
131    /// Start building a Live session.
132    ///
133    /// # Examples
134    ///
135    /// Minimal live session setup:
136    ///
137    /// ```rust,ignore
138    /// use gemini_adk_fluent_rs::prelude::*;
139    ///
140    /// let handle = Live::builder()
141    ///     .model(GeminiModel::Gemini2_0FlashLive)
142    ///     .voice(Voice::Kore)
143    ///     .instruction("You are a helpful assistant")
144    ///     .greeting("Hello! How can I help?")
145    ///     .on_audio(|data| { /* send to speaker */ })
146    ///     .on_text(|t| print!("{t}"))
147    ///     .connect_google_ai("API_KEY")
148    ///     .await?;
149    ///
150    /// handle.send_text("What is the weather?").await?;
151    /// handle.disconnect().await?;
152    /// ```
153    ///
154    /// With phases and state-based transitions:
155    ///
156    /// ```rust,ignore
157    /// let handle = Live::builder()
158    ///     .model(GeminiModel::Gemini2_0FlashLive)
159    ///     .phase("greeting")
160    ///         .instruction("Welcome the user")
161    ///         .transition("main", S::is_true("greeted"))
162    ///         .done()
163    ///     .phase("main")
164    ///         .instruction("Help the user")
165    ///         .terminal()
166    ///         .done()
167    ///     .initial_phase("greeting")
168    ///     .connect_google_ai("API_KEY")
169    ///     .await?;
170    /// ```
171    pub fn builder() -> Self {
172        Self {
173            config: SessionConfig::from_endpoint(ApiEndpoint::google_ai("")),
174            callbacks: EventCallbacks::default(),
175            dispatcher: None,
176            extractors: Vec::new(),
177            computed: ComputedRegistry::new(),
178            phases: Vec::new(),
179            initial_phase: None,
180            watchers: WatcherRegistry::new(),
181            temporal: TemporalRegistry::new(),
182            greeting: None,
183            phase_default_modifiers: Vec::new(),
184            phase_default_prompt_on_enter: false,
185            tool_execution_modes: HashMap::new(),
186            deferred_agent_tools: Vec::new(),
187            warm_up_llms: Vec::new(),
188            soft_turn_timeout: None,
189            steering_mode: SteeringMode::default(),
190            context_delivery: ContextDelivery::default(),
191            repair_config: None,
192            persistence: None,
193            session_id: None,
194            tool_advisory: true,
195            telemetry_interval: None,
196        }
197    }
198
199    /// Set the periodic telemetry emission interval.
200    ///
201    /// When set, the processor emits `LiveEvent::Telemetry` snapshots
202    /// and `LiveEvent::TurnMetrics` at this rate.
203    pub fn telemetry_interval(mut self, interval: Duration) -> Self {
204        self.telemetry_interval = Some(interval);
205        self
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use std::sync::Arc;
213    use std::time::Duration;
214
215    #[test]
216    fn builder_chain_compiles() {
217        let _live = Live::builder()
218            .model(GeminiModel::Gemini2_0FlashLive)
219            .voice(Voice::Kore)
220            .instruction("Test")
221            .temperature(0.7)
222            .google_search()
223            .transcription(true, true)
224            .affective_dialog(true)
225            .session_resume(true)
226            .context_compression(4000, 2000)
227            .on_audio(|_data| {})
228            .on_text(|_t| {})
229            .on_vad_start(|| {})
230            .on_interrupted(|| async {})
231            .on_turn_complete(|| async {})
232            .on_go_away(|_d| async {})
233            .on_connected(|_writer| async {})
234            .on_disconnected(|_r| async {})
235            .on_error(|_e| async {});
236        // Just verify the builder chain compiles
237    }
238
239    #[test]
240    fn builder_with_extraction_compiles() {
241        use gemini_adk_rs::llm::{BaseLlm, LlmError, LlmRequest, LlmResponse};
242        use schemars::JsonSchema;
243
244        #[derive(serde::Deserialize, serde::Serialize, JsonSchema)]
245        struct OrderState {
246            phase: String,
247            items: Vec<String>,
248        }
249
250        struct FakeLlm;
251
252        #[async_trait::async_trait]
253        impl BaseLlm for FakeLlm {
254            fn model_id(&self) -> &str {
255                "fake"
256            }
257            async fn generate(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
258                unimplemented!()
259            }
260        }
261
262        let _live = Live::builder()
263            .model(GeminiModel::Gemini2_0FlashLive)
264            .instruction("Restaurant order assistant")
265            .extract_turns::<OrderState>(
266                Arc::new(FakeLlm),
267                "Extract order state: items, quantities, phase",
268            )
269            .on_extracted(|name, value| async move {
270                let _ = (name, value);
271            })
272            // Outbound interceptors
273            .before_tool_response(|responses, _state| async move {
274                responses // pass through
275            })
276            .on_turn_boundary(|_state, _writer| async move {
277                // inject context
278            })
279            .instruction_template(|state| {
280                let phase: String = state.get("phase").unwrap_or_default();
281                match phase.as_str() {
282                    "ordering" => Some("Take orders accurately.".into()),
283                    _ => None,
284                }
285            });
286        // Just verify the builder chain with all features compiles
287    }
288
289    #[test]
290    fn builder_with_computed_state_compiles() {
291        let _live = Live::builder()
292            .model(GeminiModel::Gemini2_0FlashLive)
293            .instruction("Test computed state")
294            .computed("doubled", &["app:count"], |state| {
295                let count: i64 = state.get("app:count")?;
296                Some(serde_json::json!(count * 2))
297            })
298            .computed("level", &["app:score"], |state| {
299                let score: f64 = state.get("app:score")?;
300                if score > 0.5 {
301                    Some(serde_json::json!("high"))
302                } else {
303                    Some(serde_json::json!("low"))
304                }
305            });
306    }
307
308    #[test]
309    fn builder_with_phases_compiles() {
310        let _live = Live::builder()
311            .model(GeminiModel::Gemini2_0FlashLive)
312            .phase("greeting")
313            .instruction("Welcome the user warmly")
314            .transition("main", |s| s.get::<bool>("greeted").unwrap_or(false))
315            .on_enter(|state, _writer| async move {
316                state.set("entered_greeting", true);
317            })
318            .done()
319            .phase("main")
320            .dynamic_instruction(|s| {
321                let topic: String = s.get("topic").unwrap_or_default();
322                format!("Discuss {topic}")
323            })
324            .tools(vec!["search".into(), "lookup".into()])
325            .transition("farewell", |s| s.get::<bool>("done").unwrap_or(false))
326            .done()
327            .phase("farewell")
328            .instruction("Say goodbye")
329            .terminal()
330            .done()
331            .initial_phase("greeting");
332    }
333
334    #[test]
335    fn builder_with_phase_guard_compiles() {
336        let _live = Live::builder()
337            .model(GeminiModel::Gemini2_0FlashLive)
338            .phase("start")
339            .instruction("Begin")
340            .transition("secure", |_| true)
341            .done()
342            .phase("secure")
343            .instruction("Secure area")
344            .guard(|s| s.get::<bool>("verified").unwrap_or(false))
345            .on_exit(|state, _writer| async move {
346                state.set("left_secure", true);
347            })
348            .terminal()
349            .done()
350            .initial_phase("start");
351    }
352
353    #[test]
354    fn builder_with_watchers_compiles() {
355        let _live = Live::builder()
356            .model(GeminiModel::Gemini2_0FlashLive)
357            .watch("app:score")
358            .crossed_above(0.9)
359            .then(|_old, _new, state| async move {
360                state.set("high_score_alert", true);
361            })
362            .watch("app:status")
363            .changed_to(serde_json::json!("complete"))
364            .blocking()
365            .then(|_old, _new, _state| async move {
366                // blocking action
367            })
368            .watch("app:flag")
369            .became_true()
370            .then(|_old, _new, _state| async move {
371                // flag became true
372            });
373    }
374
375    #[test]
376    fn builder_with_temporal_patterns_compiles() {
377        let _live = Live::builder()
378            .model(GeminiModel::Gemini2_0FlashLive)
379            .when_sustained(
380                "user_confused",
381                |s| s.get::<bool>("confused").unwrap_or(false),
382                Duration::from_secs(30),
383                |_state, _writer| async move {
384                    // offer help
385                },
386            )
387            .when_rate(
388                "rapid_errors",
389                |evt| matches!(evt, SessionEvent::TextDelta(_)),
390                5,
391                Duration::from_secs(10),
392                |_state, _writer| async move {
393                    // throttle
394                },
395            )
396            .when_turns(
397                "stuck_in_loop",
398                |s| s.get::<bool>("repeating").unwrap_or(false),
399                3,
400                |_state, _writer| async move {
401                    // break loop
402                },
403            );
404    }
405
406    #[test]
407    fn builder_full_l1_chain_compiles() {
408        // Full chain combining all L1 features in a single builder
409        let _live = Live::builder()
410            .model(GeminiModel::Gemini2_0FlashLive)
411            .voice(Voice::Kore)
412            .instruction("Full featured agent")
413            // Computed state
414            .computed("sentiment_level", &["app:sentiment_score"], |state| {
415                let score: f64 = state.get("app:sentiment_score")?;
416                if score > 0.7 {
417                    Some(serde_json::json!("positive"))
418                } else if score < 0.3 {
419                    Some(serde_json::json!("negative"))
420                } else {
421                    Some(serde_json::json!("neutral"))
422                }
423            })
424            // Phases
425            .phase("greeting")
426            .instruction("Greet the user")
427            .transition("help", |s| s.get::<bool>("needs_help").unwrap_or(false))
428            .done()
429            .phase("help")
430            .instruction("Help the user")
431            .terminal()
432            .done()
433            .initial_phase("greeting")
434            // Watchers
435            .watch("app:sentiment_score")
436            .crossed_below(0.2)
437            .then(|_old, _new, state| async move {
438                state.set("alert:low_sentiment", true);
439            })
440            // Temporal
441            .when_turns(
442                "repeated_confusion",
443                |s| s.get::<bool>("confused").unwrap_or(false),
444                3,
445                |_state, _writer| async move {},
446            )
447            // Standard callbacks
448            .on_audio(|_data| {})
449            .on_text(|_t| {})
450            .on_turn_complete(|| async {});
451    }
452
453    #[test]
454    fn builder_with_callback_modes_compiles() {
455        let _live = Live::builder()
456            .model(GeminiModel::Gemini2_0FlashLive)
457            .on_turn_complete_concurrent(|| async {})
458            .on_error_concurrent(|_e| async {})
459            .on_extracted_concurrent(|_name, _val| async {})
460            .on_extraction_error_concurrent(|_name, _err| async {})
461            .on_connected_concurrent(|_w| async {})
462            .on_disconnected_concurrent(|_r| async {})
463            .on_go_away_concurrent(|_d| async {});
464    }
465
466    #[test]
467    fn builder_with_background_tools_compiles() {
468        use gemini_adk_rs::live::DefaultResultFormatter;
469
470        let _live = Live::builder()
471            .model(GeminiModel::Gemini2_0FlashLive)
472            .tool_background("search_kb")
473            .tool_background_with_formatter("analyze_document", Arc::new(DefaultResultFormatter));
474    }
475
476    #[test]
477    fn builder_mixed_callback_modes_and_bg_tools() {
478        use gemini_adk_rs::live::DefaultResultFormatter;
479
480        let _live = Live::builder()
481            .model(GeminiModel::Gemini2_0FlashLive)
482            .voice(Voice::Kore)
483            .instruction("Full featured agent")
484            .tool_background("slow_tool")
485            .tool_background_with_formatter("kb_search", Arc::new(DefaultResultFormatter))
486            .on_turn_complete_concurrent(|| async {})
487            .on_extracted_concurrent(|_name, _val| async {})
488            .on_audio(|_data| {})
489            .on_text(|_t| {})
490            .on_interrupted(|| async {});
491    }
492}