gemini_genai_rs/session/
events.rs

1//! Session events, commands, and turn tracking.
2//!
3//! [`SessionEvent`] — events emitted by the server for application consumption.
4//! [`SessionCommand`] — commands sent from the application to the transport.
5//! [`Turn`] — tracking for a single model response turn.
6//! [`recv_event`] — broadcast lag-tolerant event receiver.
7
8use super::state::SessionPhase;
9use crate::protocol::{Content, FunctionCall, FunctionResponse, UsageMetadata};
10use std::time::Instant;
11use tokio::sync::broadcast;
12
13// ---------------------------------------------------------------------------
14// Events (server -> application)
15// ---------------------------------------------------------------------------
16
17/// Events emitted by the session, consumed by application code.
18#[derive(Debug, Clone)]
19#[non_exhaustive]
20pub enum SessionEvent {
21    /// Session connected and setup complete.
22    Connected,
23    /// Incremental text from model response.
24    TextDelta(String),
25    /// Complete text of a finished model turn.
26    TextComplete(String),
27    /// Audio data from model response (PCM16 samples, base64-decoded).
28    ///
29    /// Uses [`bytes::Bytes`] for zero-copy fan-out: cloning a `Bytes` handle
30    /// bumps an `Arc` refcount instead of copying the underlying data.
31    AudioData(bytes::Bytes),
32    /// Input transcription from server.
33    InputTranscription(String),
34    /// Output transcription from server.
35    OutputTranscription(String),
36    /// Thought/reasoning summary from the model (when includeThoughts is enabled).
37    Thought(String),
38    /// Model requested tool calls.
39    ToolCall(Vec<FunctionCall>),
40    /// Server cancelled pending tool calls.
41    ToolCallCancelled(Vec<String>),
42    /// Model turn is complete (it's the user's turn now).
43    TurnComplete,
44    /// Model finished generating its full response.
45    ///
46    /// Fires even if the generation was interrupted — tells you the model's
47    /// internal generation pipeline has stopped. Distinct from `TurnComplete`
48    /// which is the turn-taking signal.
49    GenerationComplete,
50    /// Model was interrupted by barge-in.
51    Interrupted,
52    /// Session phase changed.
53    PhaseChanged(SessionPhase),
54    /// Server sent GoAway signal with optional time remaining.
55    GoAway(Option<String>),
56    /// Session disconnected (with optional reason).
57    Disconnected(Option<String>),
58    /// Non-fatal error.
59    Error(String),
60    /// Session resumption update with handle, resumability, and consumed index.
61    SessionResumeUpdate(ResumeInfo),
62    /// Server-side voice activity detected (user started speaking).
63    VoiceActivityStart,
64    /// Server-side voice activity ended (user stopped speaking).
65    VoiceActivityEnd,
66    /// Token usage metadata from server (for context window tracking).
67    ///
68    /// Contains full token breakdown: prompt, response, cached, tool-use,
69    /// thinking tokens, plus per-modality details.
70    Usage(UsageMetadata),
71}
72
73/// Session resumption information from the server.
74#[derive(Debug, Clone)]
75pub struct ResumeInfo {
76    /// Opaque handle for session resumption.
77    pub handle: String,
78    /// Whether the session is currently resumable.
79    pub resumable: bool,
80    /// Index of the last client message consumed by the server.
81    pub last_consumed_index: Option<String>,
82}
83
84// ---------------------------------------------------------------------------
85// Commands (application -> server)
86// ---------------------------------------------------------------------------
87
88/// Commands sent from application code to the session transport.
89#[derive(Debug, Clone)]
90pub enum SessionCommand {
91    /// Send audio data (raw PCM16 bytes, will be base64-encoded).
92    SendAudio(Vec<u8>),
93    /// Send a text message.
94    SendText(String),
95    /// Send tool responses.
96    SendToolResponse(Vec<FunctionResponse>),
97    /// Signal activity start (client VAD detected speech).
98    ActivityStart,
99    /// Signal activity end (client VAD detected silence).
100    ActivityEnd,
101    /// Send client content (conversation history or context injection).
102    SendClientContent {
103        /// Conversation turns to include.
104        turns: Vec<Content>,
105        /// Whether this completes the client's turn.
106        turn_complete: bool,
107    },
108    /// Send video/image data (raw JPEG bytes, will be base64-encoded).
109    SendVideo(Vec<u8>),
110    /// Update system instruction mid-session (sends client_content with role=system).
111    UpdateInstruction(String),
112    /// Gracefully disconnect.
113    Disconnect,
114}
115
116// ---------------------------------------------------------------------------
117// Turn tracking
118// ---------------------------------------------------------------------------
119
120/// Represents a single model response turn.
121#[derive(Debug, Clone)]
122pub struct Turn {
123    /// Unique turn identifier.
124    pub id: String,
125    /// Accumulated text parts.
126    pub text: String,
127    /// Whether this turn included audio.
128    pub has_audio: bool,
129    /// Tool calls requested in this turn.
130    pub tool_calls: Vec<FunctionCall>,
131    /// When the turn started.
132    pub started_at: Instant,
133    /// When the turn completed (if complete).
134    pub completed_at: Option<Instant>,
135    /// Whether the turn was interrupted.
136    pub interrupted: bool,
137}
138
139impl Turn {
140    /// Create a new turn.
141    pub fn new() -> Self {
142        Self {
143            id: uuid::Uuid::new_v4().to_string(),
144            text: String::new(),
145            has_audio: false,
146            tool_calls: Vec::new(),
147            started_at: Instant::now(),
148            completed_at: None,
149            interrupted: false,
150        }
151    }
152
153    /// Duration of the turn.
154    pub fn duration(&self) -> std::time::Duration {
155        let end = self.completed_at.unwrap_or_else(Instant::now);
156        end.duration_since(self.started_at)
157    }
158}
159
160impl Default for Turn {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166// ---------------------------------------------------------------------------
167// Broadcast lag helper
168// ---------------------------------------------------------------------------
169
170/// Receive the next event from a broadcast receiver, handling lag gracefully.
171///
172/// If the receiver falls behind (too slow to keep up with the sender), the
173/// skipped events are logged and the next available event is returned.
174/// Returns `None` when the channel is closed.
175///
176/// # Example
177///
178/// ```ignore
179/// let mut events = handle.subscribe();
180/// while let Some(event) = recv_event(&mut events).await {
181///     // handle event
182/// }
183/// ```
184pub async fn recv_event(rx: &mut broadcast::Receiver<SessionEvent>) -> Option<SessionEvent> {
185    loop {
186        match rx.recv().await {
187            Ok(event) => return Some(event),
188            Err(broadcast::error::RecvError::Lagged(n)) => {
189                tracing::warn!(skipped = n, "Event subscriber lagged, skipped {n} events");
190                continue;
191            }
192            Err(broadcast::error::RecvError::Closed) => return None,
193        }
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[tokio::test]
202    async fn recv_event_returns_events_normally() {
203        let (tx, mut rx) = broadcast::channel(16);
204
205        tx.send(SessionEvent::Connected).unwrap();
206        tx.send(SessionEvent::TurnComplete).unwrap();
207
208        let event = recv_event(&mut rx).await;
209        assert!(matches!(event, Some(SessionEvent::Connected)));
210
211        let event = recv_event(&mut rx).await;
212        assert!(matches!(event, Some(SessionEvent::TurnComplete)));
213    }
214
215    #[tokio::test]
216    async fn recv_event_returns_none_on_closed_channel() {
217        let (tx, mut rx) = broadcast::channel::<SessionEvent>(16);
218        drop(tx);
219
220        let event = recv_event(&mut rx).await;
221        assert!(event.is_none(), "should return None when channel is closed");
222    }
223
224    #[tokio::test]
225    async fn recv_event_handles_lag() {
226        // Create a tiny broadcast channel (capacity 2)
227        let (tx, mut rx) = broadcast::channel(2);
228
229        // Send 4 events — the receiver will lag behind
230        for i in 0..4 {
231            let _ = tx.send(SessionEvent::TextDelta(format!("msg{i}")));
232        }
233
234        // recv_event should skip the lagged events and return the next available
235        let event = recv_event(&mut rx).await;
236        assert!(event.is_some(), "should get an event after lag");
237    }
238}