gemini_adk_rs/live/
telemetry.rs

1//! Lightweight session telemetry — atomic fast-lane counters + periodic aggregation.
2//!
3//! All hot-path operations (counter increments, timestamp recording) are lock-free
4//! and zero-allocation (~1ns per call). Aggregation only happens periodically on
5//! the telemetry lane or at turn boundaries, ensuring no impact on the
6//! latency-sensitive audio pipeline.
7
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering::Relaxed};
9use std::time::Instant;
10
11use serde_json::json;
12
13/// Zero-overhead telemetry collector for speech-to-speech sessions.
14///
15/// Designed for the three-lane processor model:
16/// - **Fast lane** (sync, <1ms): No telemetry calls — pure audio/text forwarding.
17/// - **Telemetry lane** (async, debounced): Calls `record_*` methods on every event.
18///   These use only atomic operations — no allocations, no locks, no syscalls.
19/// - **Control lane** (async): Calls `snapshot()` at turn boundaries to get
20///   aggregated stats as a JSON value ready to send to the browser.
21pub struct SessionTelemetry {
22    start: Instant,
23
24    // ── Audio throughput ──
25    audio_chunks_out: AtomicU64,
26    audio_bytes_out: AtomicU64,
27
28    // ── Interruptions ──
29    interruptions: AtomicU64,
30
31    // ── Response latency tracking ──
32    // Stores nanos-since-session-start for atomic compatibility with Instant.
33    vad_end_ns: AtomicU64,
34    awaiting_response: AtomicBool,
35    /// Timestamp when user sent text (for text-input latency tracking).
36    text_send_ns: AtomicU64,
37    awaiting_text_response: AtomicBool,
38
39    // Aggregated latency stats (CAS + fetch_add)
40    last_latency_ns: AtomicU64,
41    latency_sum_ns: AtomicU64,
42    latency_count: AtomicU64,
43    min_latency_ns: AtomicU64,
44    max_latency_ns: AtomicU64,
45
46    // ── Turn timing ──
47    turn_complete_count: AtomicU64,
48    last_turn_start_ns: AtomicU64,
49    turn_duration_sum_ns: AtomicU64,
50    turn_duration_count: AtomicU64,
51
52    // ── Token usage (from UsageMetadata) ──
53    /// Latest total token count from server.
54    total_token_count: AtomicU64,
55    /// Latest prompt token count from server.
56    prompt_token_count: AtomicU64,
57    /// Latest response token count from server.
58    response_token_count: AtomicU64,
59    /// Latest cached content token count from server.
60    cached_content_token_count: AtomicU64,
61    /// Latest thoughts token count (thinking models).
62    thoughts_token_count: AtomicU64,
63}
64
65impl SessionTelemetry {
66    /// Create a new telemetry tracker, starting the session clock.
67    pub fn new() -> Self {
68        Self {
69            start: Instant::now(),
70            audio_chunks_out: AtomicU64::new(0),
71            audio_bytes_out: AtomicU64::new(0),
72            interruptions: AtomicU64::new(0),
73            vad_end_ns: AtomicU64::new(0),
74            awaiting_response: AtomicBool::new(false),
75            text_send_ns: AtomicU64::new(0),
76            awaiting_text_response: AtomicBool::new(false),
77            last_latency_ns: AtomicU64::new(0),
78            latency_sum_ns: AtomicU64::new(0),
79            latency_count: AtomicU64::new(0),
80            min_latency_ns: AtomicU64::new(u64::MAX),
81            max_latency_ns: AtomicU64::new(0),
82            turn_complete_count: AtomicU64::new(0),
83            last_turn_start_ns: AtomicU64::new(0),
84            turn_duration_sum_ns: AtomicU64::new(0),
85            turn_duration_count: AtomicU64::new(0),
86            total_token_count: AtomicU64::new(0),
87            prompt_token_count: AtomicU64::new(0),
88            response_token_count: AtomicU64::new(0),
89            cached_content_token_count: AtomicU64::new(0),
90            thoughts_token_count: AtomicU64::new(0),
91        }
92    }
93
94    // ── Atomic methods (~1ns each) ──
95
96    /// Record an outgoing audio chunk. Called from the telemetry lane.
97    #[inline]
98    pub fn record_audio_out(&self, byte_len: usize) {
99        self.audio_chunks_out.fetch_add(1, Relaxed);
100        self.audio_bytes_out.fetch_add(byte_len as u64, Relaxed);
101
102        // Also check text-send latency (user sent text, model responds with audio)
103        self.record_text_response_latency();
104
105        // Latency: if we're awaiting the model's first byte after VAD end,
106        // record the response latency via CAS (only the first chunk wins).
107        if self
108            .awaiting_response
109            .compare_exchange(true, false, Relaxed, Relaxed)
110            .is_ok()
111        {
112            let now_ns = self.elapsed_ns();
113            let vad_end = self.vad_end_ns.load(Relaxed);
114            if now_ns > vad_end && vad_end > 0 {
115                let latency = now_ns - vad_end;
116                self.last_latency_ns.store(latency, Relaxed);
117                self.latency_sum_ns.fetch_add(latency, Relaxed);
118                self.latency_count.fetch_add(1, Relaxed);
119                // Update min (CAS loop)
120                let mut current_min = self.min_latency_ns.load(Relaxed);
121                while latency < current_min {
122                    match self.min_latency_ns.compare_exchange_weak(
123                        current_min,
124                        latency,
125                        Relaxed,
126                        Relaxed,
127                    ) {
128                        Ok(_) => break,
129                        Err(actual) => current_min = actual,
130                    }
131                }
132                // Update max (CAS loop)
133                let mut current_max = self.max_latency_ns.load(Relaxed);
134                while latency > current_max {
135                    match self.max_latency_ns.compare_exchange_weak(
136                        current_max,
137                        latency,
138                        Relaxed,
139                        Relaxed,
140                    ) {
141                        Ok(_) => break,
142                        Err(actual) => current_max = actual,
143                    }
144                }
145            }
146        }
147    }
148
149    /// Record VAD end (user stopped speaking).
150    #[inline]
151    pub fn record_vad_end(&self) {
152        self.vad_end_ns.store(self.elapsed_ns(), Relaxed);
153        self.awaiting_response.store(true, Relaxed);
154    }
155
156    /// Record that user sent a text message (for text-input latency tracking).
157    #[inline]
158    pub fn record_text_send(&self) {
159        self.text_send_ns.store(self.elapsed_ns(), Relaxed);
160        self.awaiting_text_response.store(true, Relaxed);
161    }
162
163    /// Record first model output for text-input latency.
164    /// Call on first TextDelta or AudioData after a text send.
165    #[inline]
166    fn record_text_response_latency(&self) {
167        if self
168            .awaiting_text_response
169            .compare_exchange(true, false, Relaxed, Relaxed)
170            .is_ok()
171        {
172            let now_ns = self.elapsed_ns();
173            let send_ns = self.text_send_ns.load(Relaxed);
174            if now_ns > send_ns && send_ns > 0 {
175                let latency = now_ns - send_ns;
176                self.last_latency_ns.store(latency, Relaxed);
177                self.latency_sum_ns.fetch_add(latency, Relaxed);
178                self.latency_count.fetch_add(1, Relaxed);
179                // Update min (CAS loop)
180                let mut current_min = self.min_latency_ns.load(Relaxed);
181                while latency < current_min {
182                    match self.min_latency_ns.compare_exchange_weak(
183                        current_min,
184                        latency,
185                        Relaxed,
186                        Relaxed,
187                    ) {
188                        Ok(_) => break,
189                        Err(actual) => current_min = actual,
190                    }
191                }
192                // Update max (CAS loop)
193                let mut current_max = self.max_latency_ns.load(Relaxed);
194                while latency > current_max {
195                    match self.max_latency_ns.compare_exchange_weak(
196                        current_max,
197                        latency,
198                        Relaxed,
199                        Relaxed,
200                    ) {
201                        Ok(_) => break,
202                        Err(actual) => current_max = actual,
203                    }
204                }
205            }
206        }
207    }
208
209    /// Record first model text output (TextDelta). Tracks text-input latency.
210    #[inline]
211    pub fn record_text_out(&self) {
212        self.record_text_response_latency();
213    }
214
215    /// Record an interruption (barge-in).
216    #[inline]
217    pub fn record_interruption(&self) {
218        self.interruptions.fetch_add(1, Relaxed);
219    }
220
221    /// Record turn completion for duration tracking.
222    #[inline]
223    pub fn record_turn_complete(&self) {
224        self.turn_complete_count.fetch_add(1, Relaxed);
225        let now = self.elapsed_ns();
226        let turn_start = self.last_turn_start_ns.swap(now, Relaxed);
227        if turn_start > 0 {
228            let duration = now.saturating_sub(turn_start);
229            self.turn_duration_sum_ns.fetch_add(duration, Relaxed);
230            self.turn_duration_count.fetch_add(1, Relaxed);
231        }
232    }
233
234    /// Record token usage from a `UsageMetadata` event.
235    #[inline]
236    pub fn record_usage(
237        &self,
238        total: Option<u32>,
239        prompt: Option<u32>,
240        response: Option<u32>,
241        cached: Option<u32>,
242        thoughts: Option<u32>,
243    ) {
244        if let Some(v) = total {
245            self.total_token_count.store(v as u64, Relaxed);
246        }
247        if let Some(v) = prompt {
248            self.prompt_token_count.store(v as u64, Relaxed);
249        }
250        if let Some(v) = response {
251            self.response_token_count.store(v as u64, Relaxed);
252        }
253        if let Some(v) = cached {
254            self.cached_content_token_count.store(v as u64, Relaxed);
255        }
256        if let Some(v) = thoughts {
257            self.thoughts_token_count.store(v as u64, Relaxed);
258        }
259    }
260
261    /// Mark the beginning of a new turn (e.g., when model starts responding).
262    #[inline]
263    pub fn mark_turn_start(&self) {
264        let now = self.elapsed_ns();
265        // Only set if not already set (first call per turn wins)
266        self.last_turn_start_ns
267            .compare_exchange(0, now, Relaxed, Relaxed)
268            .ok();
269    }
270
271    // ── Aggregation (called at turn boundaries / periodic flush) ──
272
273    /// Snapshot all metrics as a JSON value.
274    pub fn snapshot(&self) -> serde_json::Value {
275        let elapsed = self.start.elapsed();
276        let elapsed_secs = elapsed.as_secs_f64();
277
278        let chunks = self.audio_chunks_out.load(Relaxed);
279        let bytes = self.audio_bytes_out.load(Relaxed);
280        let latency_count = self.latency_count.load(Relaxed);
281
282        let avg_latency_ms = if latency_count > 0 {
283            self.latency_sum_ns.load(Relaxed) / latency_count / 1_000_000
284        } else {
285            0
286        };
287
288        let last_latency_ms = self.last_latency_ns.load(Relaxed) / 1_000_000;
289
290        let min_latency_ms = {
291            let v = self.min_latency_ns.load(Relaxed);
292            if v == u64::MAX {
293                0
294            } else {
295                v / 1_000_000
296            }
297        };
298        let max_latency_ms = self.max_latency_ns.load(Relaxed) / 1_000_000;
299
300        let turn_count = self.turn_duration_count.load(Relaxed);
301        let turn_complete_count = self.turn_complete_count.load(Relaxed);
302        let avg_turn_ms = if turn_count > 0 {
303            self.turn_duration_sum_ns.load(Relaxed) / turn_count / 1_000_000
304        } else {
305            0
306        };
307
308        // Audio throughput (KB/s over session lifetime)
309        let throughput_kbps = if elapsed_secs > 0.0 {
310            (bytes as f64 / 1024.0) / elapsed_secs
311        } else {
312            0.0
313        };
314
315        let total_tokens = self.total_token_count.load(Relaxed);
316        let prompt_tokens = self.prompt_token_count.load(Relaxed);
317        let response_tokens = self.response_token_count.load(Relaxed);
318        let cached_tokens = self.cached_content_token_count.load(Relaxed);
319        let thoughts_tokens = self.thoughts_token_count.load(Relaxed);
320
321        json!({
322            "uptime_secs": elapsed.as_secs(),
323            "audio_chunks_out": chunks,
324            "audio_kbytes_out": bytes / 1024,
325            "audio_throughput_kbps": (throughput_kbps * 10.0).round() / 10.0,
326            "interruptions": self.interruptions.load(Relaxed),
327            "last_response_latency_ms": last_latency_ms,
328            "avg_response_latency_ms": avg_latency_ms,
329            "min_response_latency_ms": min_latency_ms,
330            "max_response_latency_ms": max_latency_ms,
331            "response_count": latency_count,
332            "turn_count": turn_complete_count,
333            "avg_turn_duration_ms": avg_turn_ms,
334            "total_token_count": total_tokens,
335            "prompt_token_count": prompt_tokens,
336            "response_token_count": response_tokens,
337            "cached_content_token_count": cached_tokens,
338            "thoughts_token_count": thoughts_tokens,
339        })
340    }
341
342    #[inline]
343    fn elapsed_ns(&self) -> u64 {
344        self.start.elapsed().as_nanos() as u64
345    }
346}
347
348impl Default for SessionTelemetry {
349    fn default() -> Self {
350        Self::new()
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[test]
359    fn new_snapshot_is_zeroed() {
360        let t = SessionTelemetry::new();
361        let snap = t.snapshot();
362        assert_eq!(snap["audio_chunks_out"], 0);
363        assert_eq!(snap["interruptions"], 0);
364        assert_eq!(snap["last_response_latency_ms"], 0);
365        assert_eq!(snap["response_count"], 0);
366        assert_eq!(snap["turn_count"], 0);
367    }
368
369    #[test]
370    fn audio_counters_accumulate() {
371        let t = SessionTelemetry::new();
372        t.record_audio_out(480);
373        t.record_audio_out(480);
374        t.record_audio_out(480);
375        let snap = t.snapshot();
376        assert_eq!(snap["audio_chunks_out"], 3);
377    }
378
379    #[test]
380    fn interruption_counter() {
381        let t = SessionTelemetry::new();
382        t.record_interruption();
383        t.record_interruption();
384        assert_eq!(t.snapshot()["interruptions"], 2);
385    }
386
387    #[test]
388    fn turn_complete_counter_is_independent_of_latency() {
389        let t = SessionTelemetry::new();
390        t.record_turn_complete();
391        t.record_turn_complete();
392
393        let snap = t.snapshot();
394        assert_eq!(snap["turn_count"], 2);
395        assert_eq!(snap["response_count"], 0);
396    }
397
398    #[test]
399    fn latency_tracking() {
400        let t = SessionTelemetry::new();
401        // Simulate: VAD end → short delay → first audio chunk
402        t.record_vad_end();
403        std::thread::sleep(std::time::Duration::from_millis(10));
404        t.record_audio_out(480);
405        // Subsequent chunks should not re-record latency
406        t.record_audio_out(480);
407        t.record_audio_out(480);
408
409        let snap = t.snapshot();
410        assert_eq!(snap["response_count"], 1);
411        // Latency should be >= 10ms (we slept 10ms)
412        assert!(snap["last_response_latency_ms"].as_u64().unwrap() >= 5);
413    }
414
415    #[test]
416    fn multiple_turns_average_latency() {
417        let t = SessionTelemetry::new();
418
419        // Turn 1
420        t.record_vad_end();
421        std::thread::sleep(std::time::Duration::from_millis(10));
422        t.record_audio_out(480);
423
424        // Turn 2
425        t.record_vad_end();
426        std::thread::sleep(std::time::Duration::from_millis(10));
427        t.record_audio_out(480);
428
429        let snap = t.snapshot();
430        assert_eq!(snap["response_count"], 2);
431        assert!(snap["avg_response_latency_ms"].as_u64().unwrap() >= 5);
432    }
433
434    #[test]
435    fn text_input_latency_via_text_out() {
436        let t = SessionTelemetry::new();
437        // Simulate: user sends text → delay → model responds with text
438        t.record_text_send();
439        std::thread::sleep(std::time::Duration::from_millis(10));
440        t.record_text_out();
441        // Subsequent text outputs should not re-record
442        t.record_text_out();
443
444        let snap = t.snapshot();
445        assert_eq!(snap["response_count"], 1);
446        assert!(snap["last_response_latency_ms"].as_u64().unwrap() >= 5);
447    }
448
449    #[test]
450    fn text_input_latency_via_audio_out() {
451        let t = SessionTelemetry::new();
452        // Simulate: user sends text → delay → model responds with audio
453        t.record_text_send();
454        std::thread::sleep(std::time::Duration::from_millis(10));
455        t.record_audio_out(480);
456
457        let snap = t.snapshot();
458        // Should record text-send latency (response_count = 1)
459        assert_eq!(snap["response_count"], 1);
460        assert!(snap["last_response_latency_ms"].as_u64().unwrap() >= 5);
461    }
462
463    #[test]
464    fn mixed_voice_and_text_turns() {
465        let t = SessionTelemetry::new();
466
467        // Voice turn
468        t.record_vad_end();
469        std::thread::sleep(std::time::Duration::from_millis(10));
470        t.record_audio_out(480);
471
472        // Text turn
473        t.record_text_send();
474        std::thread::sleep(std::time::Duration::from_millis(10));
475        t.record_text_out();
476
477        let snap = t.snapshot();
478        assert_eq!(snap["response_count"], 2);
479    }
480}