1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering::Relaxed};
9use std::time::Instant;
10
11use serde_json::json;
12
13pub struct SessionTelemetry {
22 start: Instant,
23
24 audio_chunks_out: AtomicU64,
26 audio_bytes_out: AtomicU64,
27
28 interruptions: AtomicU64,
30
31 vad_end_ns: AtomicU64,
34 awaiting_response: AtomicBool,
35 text_send_ns: AtomicU64,
37 awaiting_text_response: AtomicBool,
38
39 last_latency_ns: AtomicU64,
41 latency_sum_ns: AtomicU64,
42 latency_count: AtomicU64,
43 min_latency_ns: AtomicU64,
44 max_latency_ns: AtomicU64,
45
46 turn_complete_count: AtomicU64,
48 last_turn_start_ns: AtomicU64,
49 turn_duration_sum_ns: AtomicU64,
50 turn_duration_count: AtomicU64,
51
52 total_token_count: AtomicU64,
55 prompt_token_count: AtomicU64,
57 response_token_count: AtomicU64,
59 cached_content_token_count: AtomicU64,
61 thoughts_token_count: AtomicU64,
63}
64
65impl SessionTelemetry {
66 pub fn new() -> Self {
68 Self {
69 start: Instant::now(),
70 audio_chunks_out: AtomicU64::new(0),
71 audio_bytes_out: AtomicU64::new(0),
72 interruptions: AtomicU64::new(0),
73 vad_end_ns: AtomicU64::new(0),
74 awaiting_response: AtomicBool::new(false),
75 text_send_ns: AtomicU64::new(0),
76 awaiting_text_response: AtomicBool::new(false),
77 last_latency_ns: AtomicU64::new(0),
78 latency_sum_ns: AtomicU64::new(0),
79 latency_count: AtomicU64::new(0),
80 min_latency_ns: AtomicU64::new(u64::MAX),
81 max_latency_ns: AtomicU64::new(0),
82 turn_complete_count: AtomicU64::new(0),
83 last_turn_start_ns: AtomicU64::new(0),
84 turn_duration_sum_ns: AtomicU64::new(0),
85 turn_duration_count: AtomicU64::new(0),
86 total_token_count: AtomicU64::new(0),
87 prompt_token_count: AtomicU64::new(0),
88 response_token_count: AtomicU64::new(0),
89 cached_content_token_count: AtomicU64::new(0),
90 thoughts_token_count: AtomicU64::new(0),
91 }
92 }
93
94 #[inline]
98 pub fn record_audio_out(&self, byte_len: usize) {
99 self.audio_chunks_out.fetch_add(1, Relaxed);
100 self.audio_bytes_out.fetch_add(byte_len as u64, Relaxed);
101
102 self.record_text_response_latency();
104
105 if self
108 .awaiting_response
109 .compare_exchange(true, false, Relaxed, Relaxed)
110 .is_ok()
111 {
112 let now_ns = self.elapsed_ns();
113 let vad_end = self.vad_end_ns.load(Relaxed);
114 if now_ns > vad_end && vad_end > 0 {
115 let latency = now_ns - vad_end;
116 self.last_latency_ns.store(latency, Relaxed);
117 self.latency_sum_ns.fetch_add(latency, Relaxed);
118 self.latency_count.fetch_add(1, Relaxed);
119 let mut current_min = self.min_latency_ns.load(Relaxed);
121 while latency < current_min {
122 match self.min_latency_ns.compare_exchange_weak(
123 current_min,
124 latency,
125 Relaxed,
126 Relaxed,
127 ) {
128 Ok(_) => break,
129 Err(actual) => current_min = actual,
130 }
131 }
132 let mut current_max = self.max_latency_ns.load(Relaxed);
134 while latency > current_max {
135 match self.max_latency_ns.compare_exchange_weak(
136 current_max,
137 latency,
138 Relaxed,
139 Relaxed,
140 ) {
141 Ok(_) => break,
142 Err(actual) => current_max = actual,
143 }
144 }
145 }
146 }
147 }
148
149 #[inline]
151 pub fn record_vad_end(&self) {
152 self.vad_end_ns.store(self.elapsed_ns(), Relaxed);
153 self.awaiting_response.store(true, Relaxed);
154 }
155
156 #[inline]
158 pub fn record_text_send(&self) {
159 self.text_send_ns.store(self.elapsed_ns(), Relaxed);
160 self.awaiting_text_response.store(true, Relaxed);
161 }
162
163 #[inline]
166 fn record_text_response_latency(&self) {
167 if self
168 .awaiting_text_response
169 .compare_exchange(true, false, Relaxed, Relaxed)
170 .is_ok()
171 {
172 let now_ns = self.elapsed_ns();
173 let send_ns = self.text_send_ns.load(Relaxed);
174 if now_ns > send_ns && send_ns > 0 {
175 let latency = now_ns - send_ns;
176 self.last_latency_ns.store(latency, Relaxed);
177 self.latency_sum_ns.fetch_add(latency, Relaxed);
178 self.latency_count.fetch_add(1, Relaxed);
179 let mut current_min = self.min_latency_ns.load(Relaxed);
181 while latency < current_min {
182 match self.min_latency_ns.compare_exchange_weak(
183 current_min,
184 latency,
185 Relaxed,
186 Relaxed,
187 ) {
188 Ok(_) => break,
189 Err(actual) => current_min = actual,
190 }
191 }
192 let mut current_max = self.max_latency_ns.load(Relaxed);
194 while latency > current_max {
195 match self.max_latency_ns.compare_exchange_weak(
196 current_max,
197 latency,
198 Relaxed,
199 Relaxed,
200 ) {
201 Ok(_) => break,
202 Err(actual) => current_max = actual,
203 }
204 }
205 }
206 }
207 }
208
209 #[inline]
211 pub fn record_text_out(&self) {
212 self.record_text_response_latency();
213 }
214
215 #[inline]
217 pub fn record_interruption(&self) {
218 self.interruptions.fetch_add(1, Relaxed);
219 }
220
221 #[inline]
223 pub fn record_turn_complete(&self) {
224 self.turn_complete_count.fetch_add(1, Relaxed);
225 let now = self.elapsed_ns();
226 let turn_start = self.last_turn_start_ns.swap(now, Relaxed);
227 if turn_start > 0 {
228 let duration = now.saturating_sub(turn_start);
229 self.turn_duration_sum_ns.fetch_add(duration, Relaxed);
230 self.turn_duration_count.fetch_add(1, Relaxed);
231 }
232 }
233
234 #[inline]
236 pub fn record_usage(
237 &self,
238 total: Option<u32>,
239 prompt: Option<u32>,
240 response: Option<u32>,
241 cached: Option<u32>,
242 thoughts: Option<u32>,
243 ) {
244 if let Some(v) = total {
245 self.total_token_count.store(v as u64, Relaxed);
246 }
247 if let Some(v) = prompt {
248 self.prompt_token_count.store(v as u64, Relaxed);
249 }
250 if let Some(v) = response {
251 self.response_token_count.store(v as u64, Relaxed);
252 }
253 if let Some(v) = cached {
254 self.cached_content_token_count.store(v as u64, Relaxed);
255 }
256 if let Some(v) = thoughts {
257 self.thoughts_token_count.store(v as u64, Relaxed);
258 }
259 }
260
261 #[inline]
263 pub fn mark_turn_start(&self) {
264 let now = self.elapsed_ns();
265 self.last_turn_start_ns
267 .compare_exchange(0, now, Relaxed, Relaxed)
268 .ok();
269 }
270
271 pub fn snapshot(&self) -> serde_json::Value {
275 let elapsed = self.start.elapsed();
276 let elapsed_secs = elapsed.as_secs_f64();
277
278 let chunks = self.audio_chunks_out.load(Relaxed);
279 let bytes = self.audio_bytes_out.load(Relaxed);
280 let latency_count = self.latency_count.load(Relaxed);
281
282 let avg_latency_ms = if latency_count > 0 {
283 self.latency_sum_ns.load(Relaxed) / latency_count / 1_000_000
284 } else {
285 0
286 };
287
288 let last_latency_ms = self.last_latency_ns.load(Relaxed) / 1_000_000;
289
290 let min_latency_ms = {
291 let v = self.min_latency_ns.load(Relaxed);
292 if v == u64::MAX {
293 0
294 } else {
295 v / 1_000_000
296 }
297 };
298 let max_latency_ms = self.max_latency_ns.load(Relaxed) / 1_000_000;
299
300 let turn_count = self.turn_duration_count.load(Relaxed);
301 let turn_complete_count = self.turn_complete_count.load(Relaxed);
302 let avg_turn_ms = if turn_count > 0 {
303 self.turn_duration_sum_ns.load(Relaxed) / turn_count / 1_000_000
304 } else {
305 0
306 };
307
308 let throughput_kbps = if elapsed_secs > 0.0 {
310 (bytes as f64 / 1024.0) / elapsed_secs
311 } else {
312 0.0
313 };
314
315 let total_tokens = self.total_token_count.load(Relaxed);
316 let prompt_tokens = self.prompt_token_count.load(Relaxed);
317 let response_tokens = self.response_token_count.load(Relaxed);
318 let cached_tokens = self.cached_content_token_count.load(Relaxed);
319 let thoughts_tokens = self.thoughts_token_count.load(Relaxed);
320
321 json!({
322 "uptime_secs": elapsed.as_secs(),
323 "audio_chunks_out": chunks,
324 "audio_kbytes_out": bytes / 1024,
325 "audio_throughput_kbps": (throughput_kbps * 10.0).round() / 10.0,
326 "interruptions": self.interruptions.load(Relaxed),
327 "last_response_latency_ms": last_latency_ms,
328 "avg_response_latency_ms": avg_latency_ms,
329 "min_response_latency_ms": min_latency_ms,
330 "max_response_latency_ms": max_latency_ms,
331 "response_count": latency_count,
332 "turn_count": turn_complete_count,
333 "avg_turn_duration_ms": avg_turn_ms,
334 "total_token_count": total_tokens,
335 "prompt_token_count": prompt_tokens,
336 "response_token_count": response_tokens,
337 "cached_content_token_count": cached_tokens,
338 "thoughts_token_count": thoughts_tokens,
339 })
340 }
341
342 #[inline]
343 fn elapsed_ns(&self) -> u64 {
344 self.start.elapsed().as_nanos() as u64
345 }
346}
347
348impl Default for SessionTelemetry {
349 fn default() -> Self {
350 Self::new()
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 #[test]
359 fn new_snapshot_is_zeroed() {
360 let t = SessionTelemetry::new();
361 let snap = t.snapshot();
362 assert_eq!(snap["audio_chunks_out"], 0);
363 assert_eq!(snap["interruptions"], 0);
364 assert_eq!(snap["last_response_latency_ms"], 0);
365 assert_eq!(snap["response_count"], 0);
366 assert_eq!(snap["turn_count"], 0);
367 }
368
369 #[test]
370 fn audio_counters_accumulate() {
371 let t = SessionTelemetry::new();
372 t.record_audio_out(480);
373 t.record_audio_out(480);
374 t.record_audio_out(480);
375 let snap = t.snapshot();
376 assert_eq!(snap["audio_chunks_out"], 3);
377 }
378
379 #[test]
380 fn interruption_counter() {
381 let t = SessionTelemetry::new();
382 t.record_interruption();
383 t.record_interruption();
384 assert_eq!(t.snapshot()["interruptions"], 2);
385 }
386
387 #[test]
388 fn turn_complete_counter_is_independent_of_latency() {
389 let t = SessionTelemetry::new();
390 t.record_turn_complete();
391 t.record_turn_complete();
392
393 let snap = t.snapshot();
394 assert_eq!(snap["turn_count"], 2);
395 assert_eq!(snap["response_count"], 0);
396 }
397
398 #[test]
399 fn latency_tracking() {
400 let t = SessionTelemetry::new();
401 t.record_vad_end();
403 std::thread::sleep(std::time::Duration::from_millis(10));
404 t.record_audio_out(480);
405 t.record_audio_out(480);
407 t.record_audio_out(480);
408
409 let snap = t.snapshot();
410 assert_eq!(snap["response_count"], 1);
411 assert!(snap["last_response_latency_ms"].as_u64().unwrap() >= 5);
413 }
414
415 #[test]
416 fn multiple_turns_average_latency() {
417 let t = SessionTelemetry::new();
418
419 t.record_vad_end();
421 std::thread::sleep(std::time::Duration::from_millis(10));
422 t.record_audio_out(480);
423
424 t.record_vad_end();
426 std::thread::sleep(std::time::Duration::from_millis(10));
427 t.record_audio_out(480);
428
429 let snap = t.snapshot();
430 assert_eq!(snap["response_count"], 2);
431 assert!(snap["avg_response_latency_ms"].as_u64().unwrap() >= 5);
432 }
433
434 #[test]
435 fn text_input_latency_via_text_out() {
436 let t = SessionTelemetry::new();
437 t.record_text_send();
439 std::thread::sleep(std::time::Duration::from_millis(10));
440 t.record_text_out();
441 t.record_text_out();
443
444 let snap = t.snapshot();
445 assert_eq!(snap["response_count"], 1);
446 assert!(snap["last_response_latency_ms"].as_u64().unwrap() >= 5);
447 }
448
449 #[test]
450 fn text_input_latency_via_audio_out() {
451 let t = SessionTelemetry::new();
452 t.record_text_send();
454 std::thread::sleep(std::time::Duration::from_millis(10));
455 t.record_audio_out(480);
456
457 let snap = t.snapshot();
458 assert_eq!(snap["response_count"], 1);
460 assert!(snap["last_response_latency_ms"].as_u64().unwrap() >= 5);
461 }
462
463 #[test]
464 fn mixed_voice_and_text_turns() {
465 let t = SessionTelemetry::new();
466
467 t.record_vad_end();
469 std::thread::sleep(std::time::Duration::from_millis(10));
470 t.record_audio_out(480);
471
472 t.record_text_send();
474 std::thread::sleep(std::time::Duration::from_millis(10));
475 t.record_text_out();
476
477 let snap = t.snapshot();
478 assert_eq!(snap["response_count"], 2);
479 }
480}