gemini_genai_rs/session/
events.rs

1//! Session events, commands, and turn tracking.
2//!
3//! [`SessionEvent`] — events emitted by the server for application consumption.
4//! [`SessionCommand`] — commands sent from the application to the transport.
5//! [`Turn`] — tracking for a single model response turn.
6//! [`recv_event`] — broadcast lag-tolerant event receiver.
7
8use super::state::SessionPhase;
9use crate::protocol::{Content, FunctionCall, FunctionResponse, UsageMetadata};
10use std::time::Instant;
11use tokio::sync::broadcast;
12
13// ---------------------------------------------------------------------------
14// Events (server -> application)
15// ---------------------------------------------------------------------------
16
17/// Events emitted by the session, consumed by application code.
18#[derive(Debug, Clone)]
19pub enum SessionEvent {
20    /// Session connected and setup complete.
21    Connected,
22    /// Incremental text from model response.
23    TextDelta(String),
24    /// Complete text of a finished model turn.
25    TextComplete(String),
26    /// Audio data from model response (PCM16 samples, base64-decoded).
27    ///
28    /// Uses [`bytes::Bytes`] for zero-copy fan-out: cloning a `Bytes` handle
29    /// bumps an `Arc` refcount instead of copying the underlying data.
30    AudioData(bytes::Bytes),
31    /// Input transcription from server.
32    InputTranscription(String),
33    /// Output transcription from server.
34    OutputTranscription(String),
35    /// Thought/reasoning summary from the model (when includeThoughts is enabled).
36    Thought(String),
37    /// Model requested tool calls.
38    ToolCall(Vec<FunctionCall>),
39    /// Server cancelled pending tool calls.
40    ToolCallCancelled(Vec<String>),
41    /// Model turn is complete (it's the user's turn now).
42    TurnComplete,
43    /// Model finished generating its full response.
44    ///
45    /// Fires even if the generation was interrupted — tells you the model's
46    /// internal generation pipeline has stopped. Distinct from `TurnComplete`
47    /// which is the turn-taking signal.
48    GenerationComplete,
49    /// Model was interrupted by barge-in.
50    Interrupted,
51    /// Session phase changed.
52    PhaseChanged(SessionPhase),
53    /// Server sent GoAway signal with optional time remaining.
54    GoAway(Option<String>),
55    /// Session disconnected (with optional reason).
56    Disconnected(Option<String>),
57    /// Non-fatal error.
58    Error(String),
59    /// Session resumption update with handle, resumability, and consumed index.
60    SessionResumeUpdate(ResumeInfo),
61    /// Server-side voice activity detected (user started speaking).
62    VoiceActivityStart,
63    /// Server-side voice activity ended (user stopped speaking).
64    VoiceActivityEnd,
65    /// Token usage metadata from server (for context window tracking).
66    ///
67    /// Contains full token breakdown: prompt, response, cached, tool-use,
68    /// thinking tokens, plus per-modality details.
69    Usage(UsageMetadata),
70}
71
72/// Session resumption information from the server.
73#[derive(Debug, Clone)]
74pub struct ResumeInfo {
75    /// Opaque handle for session resumption.
76    pub handle: String,
77    /// Whether the session is currently resumable.
78    pub resumable: bool,
79    /// Index of the last client message consumed by the server.
80    pub last_consumed_index: Option<String>,
81}
82
83// ---------------------------------------------------------------------------
84// Commands (application -> server)
85// ---------------------------------------------------------------------------
86
87/// Commands sent from application code to the session transport.
88#[derive(Debug, Clone)]
89pub enum SessionCommand {
90    /// Send audio data (raw PCM16 bytes, will be base64-encoded).
91    SendAudio(Vec<u8>),
92    /// Send a text message.
93    SendText(String),
94    /// Send tool responses.
95    SendToolResponse(Vec<FunctionResponse>),
96    /// Signal activity start (client VAD detected speech).
97    ActivityStart,
98    /// Signal activity end (client VAD detected silence).
99    ActivityEnd,
100    /// Send client content (conversation history or context injection).
101    SendClientContent {
102        /// Conversation turns to include.
103        turns: Vec<Content>,
104        /// Whether this completes the client's turn.
105        turn_complete: bool,
106    },
107    /// Send video/image data (raw JPEG bytes, will be base64-encoded).
108    SendVideo(Vec<u8>),
109    /// Update system instruction mid-session (sends client_content with role=system).
110    UpdateInstruction(String),
111    /// Gracefully disconnect.
112    Disconnect,
113}
114
115// ---------------------------------------------------------------------------
116// Turn tracking
117// ---------------------------------------------------------------------------
118
119/// Represents a single model response turn.
120#[derive(Debug, Clone)]
121pub struct Turn {
122    /// Unique turn identifier.
123    pub id: String,
124    /// Accumulated text parts.
125    pub text: String,
126    /// Whether this turn included audio.
127    pub has_audio: bool,
128    /// Tool calls requested in this turn.
129    pub tool_calls: Vec<FunctionCall>,
130    /// When the turn started.
131    pub started_at: Instant,
132    /// When the turn completed (if complete).
133    pub completed_at: Option<Instant>,
134    /// Whether the turn was interrupted.
135    pub interrupted: bool,
136}
137
138impl Turn {
139    /// Create a new turn.
140    pub fn new() -> Self {
141        Self {
142            id: uuid::Uuid::new_v4().to_string(),
143            text: String::new(),
144            has_audio: false,
145            tool_calls: Vec::new(),
146            started_at: Instant::now(),
147            completed_at: None,
148            interrupted: false,
149        }
150    }
151
152    /// Duration of the turn.
153    pub fn duration(&self) -> std::time::Duration {
154        let end = self.completed_at.unwrap_or_else(Instant::now);
155        end.duration_since(self.started_at)
156    }
157}
158
159impl Default for Turn {
160    fn default() -> Self {
161        Self::new()
162    }
163}
164
165// ---------------------------------------------------------------------------
166// Broadcast lag helper
167// ---------------------------------------------------------------------------
168
169/// Receive the next event from a broadcast receiver, handling lag gracefully.
170///
171/// If the receiver falls behind (too slow to keep up with the sender), the
172/// skipped events are logged and the next available event is returned.
173/// Returns `None` when the channel is closed.
174///
175/// # Example
176///
177/// ```ignore
178/// let mut events = handle.subscribe();
179/// while let Some(event) = recv_event(&mut events).await {
180///     // handle event
181/// }
182/// ```
183pub async fn recv_event(rx: &mut broadcast::Receiver<SessionEvent>) -> Option<SessionEvent> {
184    loop {
185        match rx.recv().await {
186            Ok(event) => return Some(event),
187            Err(broadcast::error::RecvError::Lagged(n)) => {
188                #[cfg(feature = "tracing-support")]
189                tracing::warn!(skipped = n, "Event subscriber lagged, skipped {n} events");
190                // Without tracing, silently continue
191                let _ = n;
192                continue;
193            }
194            Err(broadcast::error::RecvError::Closed) => return None,
195        }
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[tokio::test]
204    async fn recv_event_returns_events_normally() {
205        let (tx, mut rx) = broadcast::channel(16);
206
207        tx.send(SessionEvent::Connected).unwrap();
208        tx.send(SessionEvent::TurnComplete).unwrap();
209
210        let event = recv_event(&mut rx).await;
211        assert!(matches!(event, Some(SessionEvent::Connected)));
212
213        let event = recv_event(&mut rx).await;
214        assert!(matches!(event, Some(SessionEvent::TurnComplete)));
215    }
216
217    #[tokio::test]
218    async fn recv_event_returns_none_on_closed_channel() {
219        let (tx, mut rx) = broadcast::channel::<SessionEvent>(16);
220        drop(tx);
221
222        let event = recv_event(&mut rx).await;
223        assert!(event.is_none(), "should return None when channel is closed");
224    }
225
226    #[tokio::test]
227    async fn recv_event_handles_lag() {
228        // Create a tiny broadcast channel (capacity 2)
229        let (tx, mut rx) = broadcast::channel(2);
230
231        // Send 4 events — the receiver will lag behind
232        for i in 0..4 {
233            let _ = tx.send(SessionEvent::TextDelta(format!("msg{i}")));
234        }
235
236        // recv_event should skip the lagged events and return the next available
237        let event = recv_event(&mut rx).await;
238        assert!(event.is_some(), "should get an event after lag");
239    }
240}