gemini_genai_rs/session/events.rs
1//! Session events, commands, and turn tracking.
2//!
3//! [`SessionEvent`] — events emitted by the server for application consumption.
4//! [`SessionCommand`] — commands sent from the application to the transport.
5//! [`Turn`] — tracking for a single model response turn.
6//! [`recv_event`] — broadcast lag-tolerant event receiver.
7
8use super::state::SessionPhase;
9use crate::protocol::{Content, FunctionCall, FunctionResponse, UsageMetadata};
10use std::time::Instant;
11use tokio::sync::broadcast;
12
13// ---------------------------------------------------------------------------
14// Events (server -> application)
15// ---------------------------------------------------------------------------
16
17/// Events emitted by the session, consumed by application code.
18#[derive(Debug, Clone)]
19pub enum SessionEvent {
20 /// Session connected and setup complete.
21 Connected,
22 /// Incremental text from model response.
23 TextDelta(String),
24 /// Complete text of a finished model turn.
25 TextComplete(String),
26 /// Audio data from model response (PCM16 samples, base64-decoded).
27 ///
28 /// Uses [`bytes::Bytes`] for zero-copy fan-out: cloning a `Bytes` handle
29 /// bumps an `Arc` refcount instead of copying the underlying data.
30 AudioData(bytes::Bytes),
31 /// Input transcription from server.
32 InputTranscription(String),
33 /// Output transcription from server.
34 OutputTranscription(String),
35 /// Thought/reasoning summary from the model (when includeThoughts is enabled).
36 Thought(String),
37 /// Model requested tool calls.
38 ToolCall(Vec<FunctionCall>),
39 /// Server cancelled pending tool calls.
40 ToolCallCancelled(Vec<String>),
41 /// Model turn is complete (it's the user's turn now).
42 TurnComplete,
43 /// Model finished generating its full response.
44 ///
45 /// Fires even if the generation was interrupted — tells you the model's
46 /// internal generation pipeline has stopped. Distinct from `TurnComplete`
47 /// which is the turn-taking signal.
48 GenerationComplete,
49 /// Model was interrupted by barge-in.
50 Interrupted,
51 /// Session phase changed.
52 PhaseChanged(SessionPhase),
53 /// Server sent GoAway signal with optional time remaining.
54 GoAway(Option<String>),
55 /// Session disconnected (with optional reason).
56 Disconnected(Option<String>),
57 /// Non-fatal error.
58 Error(String),
59 /// Session resumption update with handle, resumability, and consumed index.
60 SessionResumeUpdate(ResumeInfo),
61 /// Server-side voice activity detected (user started speaking).
62 VoiceActivityStart,
63 /// Server-side voice activity ended (user stopped speaking).
64 VoiceActivityEnd,
65 /// Token usage metadata from server (for context window tracking).
66 ///
67 /// Contains full token breakdown: prompt, response, cached, tool-use,
68 /// thinking tokens, plus per-modality details.
69 Usage(UsageMetadata),
70}
71
72/// Session resumption information from the server.
73#[derive(Debug, Clone)]
74pub struct ResumeInfo {
75 /// Opaque handle for session resumption.
76 pub handle: String,
77 /// Whether the session is currently resumable.
78 pub resumable: bool,
79 /// Index of the last client message consumed by the server.
80 pub last_consumed_index: Option<String>,
81}
82
83// ---------------------------------------------------------------------------
84// Commands (application -> server)
85// ---------------------------------------------------------------------------
86
87/// Commands sent from application code to the session transport.
88#[derive(Debug, Clone)]
89pub enum SessionCommand {
90 /// Send audio data (raw PCM16 bytes, will be base64-encoded).
91 SendAudio(Vec<u8>),
92 /// Send a text message.
93 SendText(String),
94 /// Send tool responses.
95 SendToolResponse(Vec<FunctionResponse>),
96 /// Signal activity start (client VAD detected speech).
97 ActivityStart,
98 /// Signal activity end (client VAD detected silence).
99 ActivityEnd,
100 /// Send client content (conversation history or context injection).
101 SendClientContent {
102 /// Conversation turns to include.
103 turns: Vec<Content>,
104 /// Whether this completes the client's turn.
105 turn_complete: bool,
106 },
107 /// Send video/image data (raw JPEG bytes, will be base64-encoded).
108 SendVideo(Vec<u8>),
109 /// Update system instruction mid-session (sends client_content with role=system).
110 UpdateInstruction(String),
111 /// Gracefully disconnect.
112 Disconnect,
113}
114
115// ---------------------------------------------------------------------------
116// Turn tracking
117// ---------------------------------------------------------------------------
118
119/// Represents a single model response turn.
120#[derive(Debug, Clone)]
121pub struct Turn {
122 /// Unique turn identifier.
123 pub id: String,
124 /// Accumulated text parts.
125 pub text: String,
126 /// Whether this turn included audio.
127 pub has_audio: bool,
128 /// Tool calls requested in this turn.
129 pub tool_calls: Vec<FunctionCall>,
130 /// When the turn started.
131 pub started_at: Instant,
132 /// When the turn completed (if complete).
133 pub completed_at: Option<Instant>,
134 /// Whether the turn was interrupted.
135 pub interrupted: bool,
136}
137
138impl Turn {
139 /// Create a new turn.
140 pub fn new() -> Self {
141 Self {
142 id: uuid::Uuid::new_v4().to_string(),
143 text: String::new(),
144 has_audio: false,
145 tool_calls: Vec::new(),
146 started_at: Instant::now(),
147 completed_at: None,
148 interrupted: false,
149 }
150 }
151
152 /// Duration of the turn.
153 pub fn duration(&self) -> std::time::Duration {
154 let end = self.completed_at.unwrap_or_else(Instant::now);
155 end.duration_since(self.started_at)
156 }
157}
158
159impl Default for Turn {
160 fn default() -> Self {
161 Self::new()
162 }
163}
164
165// ---------------------------------------------------------------------------
166// Broadcast lag helper
167// ---------------------------------------------------------------------------
168
169/// Receive the next event from a broadcast receiver, handling lag gracefully.
170///
171/// If the receiver falls behind (too slow to keep up with the sender), the
172/// skipped events are logged and the next available event is returned.
173/// Returns `None` when the channel is closed.
174///
175/// # Example
176///
177/// ```ignore
178/// let mut events = handle.subscribe();
179/// while let Some(event) = recv_event(&mut events).await {
180/// // handle event
181/// }
182/// ```
183pub async fn recv_event(rx: &mut broadcast::Receiver<SessionEvent>) -> Option<SessionEvent> {
184 loop {
185 match rx.recv().await {
186 Ok(event) => return Some(event),
187 Err(broadcast::error::RecvError::Lagged(n)) => {
188 #[cfg(feature = "tracing-support")]
189 tracing::warn!(skipped = n, "Event subscriber lagged, skipped {n} events");
190 // Without tracing, silently continue
191 let _ = n;
192 continue;
193 }
194 Err(broadcast::error::RecvError::Closed) => return None,
195 }
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[tokio::test]
204 async fn recv_event_returns_events_normally() {
205 let (tx, mut rx) = broadcast::channel(16);
206
207 tx.send(SessionEvent::Connected).unwrap();
208 tx.send(SessionEvent::TurnComplete).unwrap();
209
210 let event = recv_event(&mut rx).await;
211 assert!(matches!(event, Some(SessionEvent::Connected)));
212
213 let event = recv_event(&mut rx).await;
214 assert!(matches!(event, Some(SessionEvent::TurnComplete)));
215 }
216
217 #[tokio::test]
218 async fn recv_event_returns_none_on_closed_channel() {
219 let (tx, mut rx) = broadcast::channel::<SessionEvent>(16);
220 drop(tx);
221
222 let event = recv_event(&mut rx).await;
223 assert!(event.is_none(), "should return None when channel is closed");
224 }
225
226 #[tokio::test]
227 async fn recv_event_handles_lag() {
228 // Create a tiny broadcast channel (capacity 2)
229 let (tx, mut rx) = broadcast::channel(2);
230
231 // Send 4 events — the receiver will lag behind
232 for i in 0..4 {
233 let _ = tx.send(SessionEvent::TextDelta(format!("msg{i}")));
234 }
235
236 // recv_event should skip the lagged events and return the next available
237 let event = recv_event(&mut rx).await;
238 assert!(event.is_some(), "should get an event after lag");
239 }
240}