gemini_genai_rs/session/events.rs
1//! Session events, commands, and turn tracking.
2//!
3//! [`SessionEvent`] — events emitted by the server for application consumption.
4//! [`SessionCommand`] — commands sent from the application to the transport.
5//! [`Turn`] — tracking for a single model response turn.
6//! [`recv_event`] — broadcast lag-tolerant event receiver.
7
8use super::state::SessionPhase;
9use crate::protocol::{Content, FunctionCall, FunctionResponse, UsageMetadata};
10use std::time::Instant;
11use tokio::sync::broadcast;
12
13// ---------------------------------------------------------------------------
14// Events (server -> application)
15// ---------------------------------------------------------------------------
16
17/// Events emitted by the session, consumed by application code.
18#[derive(Debug, Clone)]
19#[non_exhaustive]
20pub enum SessionEvent {
21 /// Session connected and setup complete.
22 Connected,
23 /// Incremental text from model response.
24 TextDelta(String),
25 /// Complete text of a finished model turn.
26 TextComplete(String),
27 /// Audio data from model response (PCM16 samples, base64-decoded).
28 ///
29 /// Uses [`bytes::Bytes`] for zero-copy fan-out: cloning a `Bytes` handle
30 /// bumps an `Arc` refcount instead of copying the underlying data.
31 AudioData(bytes::Bytes),
32 /// Input transcription from server.
33 InputTranscription(String),
34 /// Output transcription from server.
35 OutputTranscription(String),
36 /// Thought/reasoning summary from the model (when includeThoughts is enabled).
37 Thought(String),
38 /// Model requested tool calls.
39 ToolCall(Vec<FunctionCall>),
40 /// Server cancelled pending tool calls.
41 ToolCallCancelled(Vec<String>),
42 /// Model turn is complete (it's the user's turn now).
43 TurnComplete,
44 /// Model finished generating its full response.
45 ///
46 /// Fires even if the generation was interrupted — tells you the model's
47 /// internal generation pipeline has stopped. Distinct from `TurnComplete`
48 /// which is the turn-taking signal.
49 GenerationComplete,
50 /// Model was interrupted by barge-in.
51 Interrupted,
52 /// Session phase changed.
53 PhaseChanged(SessionPhase),
54 /// Server sent GoAway signal with optional time remaining.
55 GoAway(Option<String>),
56 /// Session disconnected (with optional reason).
57 Disconnected(Option<String>),
58 /// Non-fatal error.
59 Error(String),
60 /// Session resumption update with handle, resumability, and consumed index.
61 SessionResumeUpdate(ResumeInfo),
62 /// Server-side voice activity detected (user started speaking).
63 VoiceActivityStart,
64 /// Server-side voice activity ended (user stopped speaking).
65 VoiceActivityEnd,
66 /// Token usage metadata from server (for context window tracking).
67 ///
68 /// Contains full token breakdown: prompt, response, cached, tool-use,
69 /// thinking tokens, plus per-modality details.
70 Usage(UsageMetadata),
71}
72
73/// Session resumption information from the server.
74#[derive(Debug, Clone)]
75pub struct ResumeInfo {
76 /// Opaque handle for session resumption.
77 pub handle: String,
78 /// Whether the session is currently resumable.
79 pub resumable: bool,
80 /// Index of the last client message consumed by the server.
81 pub last_consumed_index: Option<String>,
82}
83
84// ---------------------------------------------------------------------------
85// Commands (application -> server)
86// ---------------------------------------------------------------------------
87
88/// Commands sent from application code to the session transport.
89#[derive(Debug, Clone)]
90pub enum SessionCommand {
91 /// Send audio data (raw PCM16 bytes, will be base64-encoded).
92 SendAudio(Vec<u8>),
93 /// Send a text message.
94 SendText(String),
95 /// Send tool responses.
96 SendToolResponse(Vec<FunctionResponse>),
97 /// Signal activity start (client VAD detected speech).
98 ActivityStart,
99 /// Signal activity end (client VAD detected silence).
100 ActivityEnd,
101 /// Send client content (conversation history or context injection).
102 SendClientContent {
103 /// Conversation turns to include.
104 turns: Vec<Content>,
105 /// Whether this completes the client's turn.
106 turn_complete: bool,
107 },
108 /// Send video/image data (raw JPEG bytes, will be base64-encoded).
109 SendVideo(Vec<u8>),
110 /// Update system instruction mid-session (sends client_content with role=system).
111 UpdateInstruction(String),
112 /// Gracefully disconnect.
113 Disconnect,
114}
115
116// ---------------------------------------------------------------------------
117// Turn tracking
118// ---------------------------------------------------------------------------
119
120/// Represents a single model response turn.
121#[derive(Debug, Clone)]
122pub struct Turn {
123 /// Unique turn identifier.
124 pub id: String,
125 /// Accumulated text parts.
126 pub text: String,
127 /// Whether this turn included audio.
128 pub has_audio: bool,
129 /// Tool calls requested in this turn.
130 pub tool_calls: Vec<FunctionCall>,
131 /// When the turn started.
132 pub started_at: Instant,
133 /// When the turn completed (if complete).
134 pub completed_at: Option<Instant>,
135 /// Whether the turn was interrupted.
136 pub interrupted: bool,
137}
138
139impl Turn {
140 /// Create a new turn.
141 pub fn new() -> Self {
142 Self {
143 id: uuid::Uuid::new_v4().to_string(),
144 text: String::new(),
145 has_audio: false,
146 tool_calls: Vec::new(),
147 started_at: Instant::now(),
148 completed_at: None,
149 interrupted: false,
150 }
151 }
152
153 /// Duration of the turn.
154 pub fn duration(&self) -> std::time::Duration {
155 let end = self.completed_at.unwrap_or_else(Instant::now);
156 end.duration_since(self.started_at)
157 }
158}
159
160impl Default for Turn {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166// ---------------------------------------------------------------------------
167// Broadcast lag helper
168// ---------------------------------------------------------------------------
169
170/// Receive the next event from a broadcast receiver, handling lag gracefully.
171///
172/// If the receiver falls behind (too slow to keep up with the sender), the
173/// skipped events are logged and the next available event is returned.
174/// Returns `None` when the channel is closed.
175///
176/// # Example
177///
178/// ```ignore
179/// let mut events = handle.subscribe();
180/// while let Some(event) = recv_event(&mut events).await {
181/// // handle event
182/// }
183/// ```
184pub async fn recv_event(rx: &mut broadcast::Receiver<SessionEvent>) -> Option<SessionEvent> {
185 loop {
186 match rx.recv().await {
187 Ok(event) => return Some(event),
188 Err(broadcast::error::RecvError::Lagged(n)) => {
189 tracing::warn!(skipped = n, "Event subscriber lagged, skipped {n} events");
190 continue;
191 }
192 Err(broadcast::error::RecvError::Closed) => return None,
193 }
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[tokio::test]
202 async fn recv_event_returns_events_normally() {
203 let (tx, mut rx) = broadcast::channel(16);
204
205 tx.send(SessionEvent::Connected).unwrap();
206 tx.send(SessionEvent::TurnComplete).unwrap();
207
208 let event = recv_event(&mut rx).await;
209 assert!(matches!(event, Some(SessionEvent::Connected)));
210
211 let event = recv_event(&mut rx).await;
212 assert!(matches!(event, Some(SessionEvent::TurnComplete)));
213 }
214
215 #[tokio::test]
216 async fn recv_event_returns_none_on_closed_channel() {
217 let (tx, mut rx) = broadcast::channel::<SessionEvent>(16);
218 drop(tx);
219
220 let event = recv_event(&mut rx).await;
221 assert!(event.is_none(), "should return None when channel is closed");
222 }
223
224 #[tokio::test]
225 async fn recv_event_handles_lag() {
226 // Create a tiny broadcast channel (capacity 2)
227 let (tx, mut rx) = broadcast::channel(2);
228
229 // Send 4 events — the receiver will lag behind
230 for i in 0..4 {
231 let _ = tx.send(SessionEvent::TextDelta(format!("msg{i}")));
232 }
233
234 // recv_event should skip the lagged events and return the next available
235 let event = recv_event(&mut rx).await;
236 assert!(event.is_some(), "should get an event after lag");
237 }
238}