gemini_adk_rs/live/events.rs
1//! Semantic events emitted by the L1 processor.
2//!
3//! Subscribe via `LiveHandle::events()` (broadcast receiver) or
4//! `LiveHandle::stream()` (a [`futures::Stream`]). Zero-cost when no
5//! subscribers.
6
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use bytes::Bytes;
12use futures::Stream;
13use tokio::sync::broadcast;
14
15/// Semantic events emitted by the Live session processor.
16///
17/// The L1 equivalent of L0's [`SessionEvent`](gemini_genai_rs::prelude::SessionEvent).
18/// L0 events are wire-level; LiveEvents are semantic (extractions completed,
19/// phases transitioned, tools executed).
20///
21/// Subscribe via [`LiveHandle::events()`](super::handle::LiveHandle::events).
22/// Multiple independent subscribers supported. Zero-cost when no subscribers
23/// exist (`broadcast::send` with 0 receivers is a no-op).
24#[derive(Debug, Clone)]
25#[non_exhaustive]
26pub enum LiveEvent {
27 // -- Fast-lane events (high frequency, sync emission) --
28 /// Raw PCM audio from model. Uses `Bytes` (refcounted) — clone is
29 /// a pointer increment (~2ns), not a deep copy.
30 Audio(Bytes),
31 /// Incremental text token from model.
32 TextDelta(String),
33 /// Complete text response (all deltas concatenated).
34 TextComplete(String),
35 /// User speech transcription.
36 InputTranscript {
37 /// The transcribed text content.
38 text: String,
39 /// Whether this is the final transcription for the utterance.
40 is_final: bool,
41 },
42 /// Model speech transcription.
43 OutputTranscript {
44 /// The transcribed text content.
45 text: String,
46 /// Whether this is the final transcription for the utterance.
47 is_final: bool,
48 },
49 /// Model reasoning/thinking content.
50 Thought(String),
51 /// Voice activity detected — user started speaking.
52 VadStart,
53 /// Voice activity ended — user stopped speaking.
54 VadEnd,
55
56 // -- Control-lane events (lower frequency, async emission) --
57 /// Extraction completed. Emitted for both the top-level result
58 /// AND each flattened key (e.g., "order.items", "order.phase").
59 Extraction {
60 /// Extractor name, or `"extractor.field"` for flattened keys.
61 name: String,
62 /// The extracted JSON value.
63 value: serde_json::Value,
64 },
65 /// Extraction failed.
66 ExtractionError {
67 /// Name of the extractor that failed.
68 name: String,
69 /// Human-readable error description.
70 error: String,
71 },
72 /// A raw extraction field was considered for promotion into authoritative state.
73 StatePromotion {
74 /// Extractor name that produced the field.
75 extractor: String,
76 /// Field name inside the extractor result.
77 field: String,
78 /// State key targeted by the promotion rule.
79 state_key: String,
80 /// Whether the promotion was accepted and written.
81 accepted: bool,
82 /// Human-readable reason for the decision.
83 reason: String,
84 /// Extracted value that was considered.
85 value: serde_json::Value,
86 },
87 /// Phase machine transitioned.
88 PhaseTransition {
89 /// Phase the machine transitioned from.
90 from: String,
91 /// Phase the machine transitioned to.
92 to: String,
93 /// Human-readable reason for the transition.
94 reason: String,
95 },
96 /// Tool dispatched and result obtained.
97 ToolExecution {
98 /// Name of the tool that was called.
99 name: String,
100 /// Arguments passed to the tool.
101 args: serde_json::Value,
102 /// Result returned by the tool.
103 result: serde_json::Value,
104 },
105 /// Tool calls cancelled — either by the server (a `ToolCallCancelled`
106 /// wire event) or locally when a user barge-in interrupted an in-flight
107 /// inline tool. No response is sent for a cancelled call, and a cancelled
108 /// call never advances the governed flow.
109 ToolCancelled {
110 /// IDs of the cancelled tool calls.
111 ids: Vec<String>,
112 },
113 /// Model completed a conversational turn.
114 TurnComplete,
115 /// Model output interrupted by user speech.
116 Interrupted,
117 /// Session connected to Gemini.
118 Connected,
119 /// Session disconnected.
120 Disconnected {
121 /// Optional reason for disconnection (server-provided or error message).
122 reason: Option<String>,
123 },
124 /// Unrecoverable error.
125 Error(String),
126 /// Server requesting session wind-down.
127 GoAway {
128 /// Time remaining before the server closes the connection.
129 time_left: Duration,
130 },
131
132 // -- Periodic events --
133 /// Aggregated session telemetry snapshot.
134 Telemetry(serde_json::Value),
135 /// Per-turn latency and token metrics.
136 TurnMetrics {
137 /// Turn number (1-indexed).
138 turn: u32,
139 /// End-to-end latency for this turn in milliseconds.
140 latency_ms: u32,
141 /// Number of prompt tokens consumed.
142 prompt_tokens: u32,
143 /// Number of response tokens generated.
144 response_tokens: u32,
145 },
146}
147
148/// A [`futures::Stream`] of [`LiveEvent`]s from a Live session.
149///
150/// Created by [`LiveHandle::stream()`](super::handle::LiveHandle::stream).
151/// Wraps the underlying [`broadcast::Receiver`] with stream semantics:
152///
153/// - **Lagged**: if this subscriber falls behind the broadcast buffer, the
154/// missed events are skipped and the stream continues with the next
155/// available event (no error item is yielded).
156/// - **Closed**: when the session's event channel closes, the stream ends
157/// (`next()` returns `None`).
158///
159/// Composes with all `futures`/`tokio-stream` combinators:
160///
161/// ```rust,ignore
162/// use futures::StreamExt;
163///
164/// let mut stream = handle.stream();
165/// while let Some(ev) = stream.next().await {
166/// match ev {
167/// LiveEvent::TextDelta(t) => print!("{t}"),
168/// LiveEvent::TurnComplete => println!(),
169/// _ => {}
170/// }
171/// }
172/// ```
173pub struct LiveEventStream {
174 inner: Pin<Box<dyn Stream<Item = LiveEvent> + Send>>,
175}
176
177impl LiveEventStream {
178 /// Wrap a broadcast receiver of [`LiveEvent`]s as a stream.
179 pub(crate) fn new(rx: broadcast::Receiver<LiveEvent>) -> Self {
180 let inner = futures::stream::unfold(rx, |mut rx| async move {
181 loop {
182 match rx.recv().await {
183 Ok(ev) => return Some((ev, rx)),
184 // Skip lagged (missed) events and keep going.
185 Err(broadcast::error::RecvError::Lagged(_)) => continue,
186 // Channel closed: end the stream.
187 Err(broadcast::error::RecvError::Closed) => return None,
188 }
189 }
190 });
191 Self {
192 inner: Box::pin(inner),
193 }
194 }
195}
196
197impl Stream for LiveEventStream {
198 type Item = LiveEvent;
199
200 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
201 self.inner.as_mut().poll_next(cx)
202 }
203}
204
205impl std::fmt::Debug for LiveEventStream {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 f.debug_struct("LiveEventStream").finish_non_exhaustive()
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use futures::StreamExt;
215
216 #[tokio::test]
217 async fn stream_yields_events_in_order_and_ends_on_close() {
218 let (tx, rx) = broadcast::channel::<LiveEvent>(16);
219 let mut stream = LiveEventStream::new(rx);
220
221 tx.send(LiveEvent::VadStart).unwrap();
222 tx.send(LiveEvent::TextDelta("hi".into())).unwrap();
223 tx.send(LiveEvent::TurnComplete).unwrap();
224
225 assert!(matches!(stream.next().await, Some(LiveEvent::VadStart)));
226 match stream.next().await {
227 Some(LiveEvent::TextDelta(t)) => assert_eq!(t, "hi"),
228 other => panic!("expected TextDelta, got {other:?}"),
229 }
230 assert!(matches!(stream.next().await, Some(LiveEvent::TurnComplete)));
231
232 // Closing the channel ends the stream.
233 drop(tx);
234 assert!(stream.next().await.is_none(), "stream ends on Closed");
235 }
236
237 #[tokio::test]
238 async fn stream_skips_lagged_events_and_continues() {
239 // Capacity-2 channel: sending 5 events before polling forces a lag.
240 let (tx, rx) = broadcast::channel::<LiveEvent>(2);
241 let mut stream = LiveEventStream::new(rx);
242
243 for i in 0..5u32 {
244 tx.send(LiveEvent::TextDelta(format!("e{i}"))).unwrap();
245 }
246
247 // The first poll observes the lag, skips it, and yields the oldest
248 // event still buffered (e3), then e4 — no error, no end-of-stream.
249 match stream.next().await {
250 Some(LiveEvent::TextDelta(t)) => assert_eq!(t, "e3"),
251 other => panic!("expected e3 after lag skip, got {other:?}"),
252 }
253 match stream.next().await {
254 Some(LiveEvent::TextDelta(t)) => assert_eq!(t, "e4"),
255 other => panic!("expected e4, got {other:?}"),
256 }
257
258 // The stream is still alive after the lag.
259 tx.send(LiveEvent::TurnComplete).unwrap();
260 assert!(matches!(stream.next().await, Some(LiveEvent::TurnComplete)));
261
262 drop(tx);
263 assert!(stream.next().await.is_none());
264 }
265}