gemini_adk_rs/live/
events.rs

1//! Semantic events emitted by the L1 processor.
2//!
3//! Subscribe via `LiveHandle::events()` (broadcast receiver) or
4//! `LiveHandle::stream()` (a [`futures::Stream`]). Zero-cost when no
5//! subscribers.
6
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use bytes::Bytes;
12use futures::Stream;
13use tokio::sync::broadcast;
14
15/// Semantic events emitted by the Live session processor.
16///
17/// The L1 equivalent of L0's [`SessionEvent`](gemini_genai_rs::prelude::SessionEvent).
18/// L0 events are wire-level; LiveEvents are semantic (extractions completed,
19/// phases transitioned, tools executed).
20///
21/// Subscribe via [`LiveHandle::events()`](super::handle::LiveHandle::events).
22/// Multiple independent subscribers supported. Zero-cost when no subscribers
23/// exist (`broadcast::send` with 0 receivers is a no-op).
24#[derive(Debug, Clone)]
25#[non_exhaustive]
26pub enum LiveEvent {
27    // -- Fast-lane events (high frequency, sync emission) --
28    /// Raw PCM audio from model. Uses `Bytes` (refcounted) — clone is
29    /// a pointer increment (~2ns), not a deep copy.
30    Audio(Bytes),
31    /// Incremental text token from model.
32    TextDelta(String),
33    /// Complete text response (all deltas concatenated).
34    TextComplete(String),
35    /// User speech transcription.
36    InputTranscript {
37        /// The transcribed text content.
38        text: String,
39        /// Whether this is the final transcription for the utterance.
40        is_final: bool,
41    },
42    /// Model speech transcription.
43    OutputTranscript {
44        /// The transcribed text content.
45        text: String,
46        /// Whether this is the final transcription for the utterance.
47        is_final: bool,
48    },
49    /// Model reasoning/thinking content.
50    Thought(String),
51    /// Voice activity detected — user started speaking.
52    VadStart,
53    /// Voice activity ended — user stopped speaking.
54    VadEnd,
55
56    // -- Control-lane events (lower frequency, async emission) --
57    /// Extraction completed. Emitted for both the top-level result
58    /// AND each flattened key (e.g., "order.items", "order.phase").
59    Extraction {
60        /// Extractor name, or `"extractor.field"` for flattened keys.
61        name: String,
62        /// The extracted JSON value.
63        value: serde_json::Value,
64    },
65    /// Extraction failed.
66    ExtractionError {
67        /// Name of the extractor that failed.
68        name: String,
69        /// Human-readable error description.
70        error: String,
71    },
72    /// A raw extraction field was considered for promotion into authoritative state.
73    StatePromotion {
74        /// Extractor name that produced the field.
75        extractor: String,
76        /// Field name inside the extractor result.
77        field: String,
78        /// State key targeted by the promotion rule.
79        state_key: String,
80        /// Whether the promotion was accepted and written.
81        accepted: bool,
82        /// Human-readable reason for the decision.
83        reason: String,
84        /// Extracted value that was considered.
85        value: serde_json::Value,
86    },
87    /// Phase machine transitioned.
88    PhaseTransition {
89        /// Phase the machine transitioned from.
90        from: String,
91        /// Phase the machine transitioned to.
92        to: String,
93        /// Human-readable reason for the transition.
94        reason: String,
95    },
96    /// Tool dispatched and result obtained.
97    ToolExecution {
98        /// Name of the tool that was called.
99        name: String,
100        /// Arguments passed to the tool.
101        args: serde_json::Value,
102        /// Result returned by the tool.
103        result: serde_json::Value,
104    },
105    /// Tool calls cancelled — either by the server (a `ToolCallCancelled`
106    /// wire event) or locally when a user barge-in interrupted an in-flight
107    /// inline tool. No response is sent for a cancelled call, and a cancelled
108    /// call never advances the governed flow.
109    ToolCancelled {
110        /// IDs of the cancelled tool calls.
111        ids: Vec<String>,
112    },
113    /// Model completed a conversational turn.
114    TurnComplete,
115    /// Model output interrupted by user speech.
116    Interrupted,
117    /// Session connected to Gemini.
118    Connected,
119    /// Session disconnected.
120    Disconnected {
121        /// Optional reason for disconnection (server-provided or error message).
122        reason: Option<String>,
123    },
124    /// Unrecoverable error.
125    Error(String),
126    /// Server requesting session wind-down.
127    GoAway {
128        /// Time remaining before the server closes the connection.
129        time_left: Duration,
130    },
131
132    // -- Periodic events --
133    /// Aggregated session telemetry snapshot.
134    Telemetry(serde_json::Value),
135    /// Per-turn latency and token metrics.
136    TurnMetrics {
137        /// Turn number (1-indexed).
138        turn: u32,
139        /// End-to-end latency for this turn in milliseconds.
140        latency_ms: u32,
141        /// Number of prompt tokens consumed.
142        prompt_tokens: u32,
143        /// Number of response tokens generated.
144        response_tokens: u32,
145    },
146}
147
148/// A [`futures::Stream`] of [`LiveEvent`]s from a Live session.
149///
150/// Created by [`LiveHandle::stream()`](super::handle::LiveHandle::stream).
151/// Wraps the underlying [`broadcast::Receiver`] with stream semantics:
152///
153/// - **Lagged**: if this subscriber falls behind the broadcast buffer, the
154///   missed events are skipped and the stream continues with the next
155///   available event (no error item is yielded).
156/// - **Closed**: when the session's event channel closes, the stream ends
157///   (`next()` returns `None`).
158///
159/// Composes with all `futures`/`tokio-stream` combinators:
160///
161/// ```rust,ignore
162/// use futures::StreamExt;
163///
164/// let mut stream = handle.stream();
165/// while let Some(ev) = stream.next().await {
166///     match ev {
167///         LiveEvent::TextDelta(t) => print!("{t}"),
168///         LiveEvent::TurnComplete => println!(),
169///         _ => {}
170///     }
171/// }
172/// ```
173pub struct LiveEventStream {
174    inner: Pin<Box<dyn Stream<Item = LiveEvent> + Send>>,
175}
176
177impl LiveEventStream {
178    /// Wrap a broadcast receiver of [`LiveEvent`]s as a stream.
179    pub(crate) fn new(rx: broadcast::Receiver<LiveEvent>) -> Self {
180        let inner = futures::stream::unfold(rx, |mut rx| async move {
181            loop {
182                match rx.recv().await {
183                    Ok(ev) => return Some((ev, rx)),
184                    // Skip lagged (missed) events and keep going.
185                    Err(broadcast::error::RecvError::Lagged(_)) => continue,
186                    // Channel closed: end the stream.
187                    Err(broadcast::error::RecvError::Closed) => return None,
188                }
189            }
190        });
191        Self {
192            inner: Box::pin(inner),
193        }
194    }
195}
196
197impl Stream for LiveEventStream {
198    type Item = LiveEvent;
199
200    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
201        self.inner.as_mut().poll_next(cx)
202    }
203}
204
205impl std::fmt::Debug for LiveEventStream {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        f.debug_struct("LiveEventStream").finish_non_exhaustive()
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use futures::StreamExt;
215
216    #[tokio::test]
217    async fn stream_yields_events_in_order_and_ends_on_close() {
218        let (tx, rx) = broadcast::channel::<LiveEvent>(16);
219        let mut stream = LiveEventStream::new(rx);
220
221        tx.send(LiveEvent::VadStart).unwrap();
222        tx.send(LiveEvent::TextDelta("hi".into())).unwrap();
223        tx.send(LiveEvent::TurnComplete).unwrap();
224
225        assert!(matches!(stream.next().await, Some(LiveEvent::VadStart)));
226        match stream.next().await {
227            Some(LiveEvent::TextDelta(t)) => assert_eq!(t, "hi"),
228            other => panic!("expected TextDelta, got {other:?}"),
229        }
230        assert!(matches!(stream.next().await, Some(LiveEvent::TurnComplete)));
231
232        // Closing the channel ends the stream.
233        drop(tx);
234        assert!(stream.next().await.is_none(), "stream ends on Closed");
235    }
236
237    #[tokio::test]
238    async fn stream_skips_lagged_events_and_continues() {
239        // Capacity-2 channel: sending 5 events before polling forces a lag.
240        let (tx, rx) = broadcast::channel::<LiveEvent>(2);
241        let mut stream = LiveEventStream::new(rx);
242
243        for i in 0..5u32 {
244            tx.send(LiveEvent::TextDelta(format!("e{i}"))).unwrap();
245        }
246
247        // The first poll observes the lag, skips it, and yields the oldest
248        // event still buffered (e3), then e4 — no error, no end-of-stream.
249        match stream.next().await {
250            Some(LiveEvent::TextDelta(t)) => assert_eq!(t, "e3"),
251            other => panic!("expected e3 after lag skip, got {other:?}"),
252        }
253        match stream.next().await {
254            Some(LiveEvent::TextDelta(t)) => assert_eq!(t, "e4"),
255            other => panic!("expected e4, got {other:?}"),
256        }
257
258        // The stream is still alive after the lag.
259        tx.send(LiveEvent::TurnComplete).unwrap();
260        assert!(matches!(stream.next().await, Some(LiveEvent::TurnComplete)));
261
262        drop(tx);
263        assert!(stream.next().await.is_none());
264    }
265}