gemini_genai_rs/buffer/
mod.rs

1//! Lock-free audio buffers for the hot path.
2//!
3//! - [`SpscRing`]: Single-producer single-consumer ring buffer for zero-copy audio streaming.
4//! - [`AudioJitterBuffer`]: Adaptive jitter buffer for smooth playback of network audio.
5
6pub mod convert;
7pub mod jitter;
8
9pub use convert::{bytes_to_i16, i16_to_bytes, into_shared};
10pub use jitter::{AudioJitterBuffer, JitterConfig};
11
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14/// Cache-line padding to prevent false sharing between producer and consumer.
15///
16/// 128 bytes covers both x86_64 (64B) and Apple Silicon (128B) cache lines.
17#[repr(align(128))]
18struct CachePad<T>(T);
19
20/// Lock-free single-producer single-consumer ring buffer.
21///
22/// The hot path for audio data. No heap allocation after initialization.
23/// Uses atomic head/tail pointers with cache-line padding to prevent false sharing.
24///
25/// # Performance
26///
27/// - Wait-free on the fast path (atomic store/load only)
28/// - Power-of-two capacity for bitwise modulo (single-cycle AND vs multi-cycle DIV)
29/// - Bounded memory, no allocation after init
30///
31/// # Safety
32///
33/// This buffer is safe for concurrent use by exactly one producer and one consumer.
34/// Multiple producers or multiple consumers will cause data races.
35pub struct SpscRing<T: Copy + Default> {
36    buf: Box<[T]>,
37    cap_mask: usize,
38    head: CachePad<AtomicUsize>,
39    tail: CachePad<AtomicUsize>,
40}
41
42impl<T: Copy + Default> SpscRing<T> {
43    /// Create a new ring buffer with the given capacity.
44    ///
45    /// Capacity is rounded up to the next power of two for efficient modulo.
46    ///
47    /// # Panics
48    ///
49    /// Panics if `capacity` is 0.
50    pub fn new(capacity: usize) -> Self {
51        assert!(capacity > 0, "ring buffer capacity must be > 0");
52        let cap = capacity.next_power_of_two();
53        let buf = vec![T::default(); cap].into_boxed_slice();
54        Self {
55            buf,
56            cap_mask: cap - 1,
57            head: CachePad(AtomicUsize::new(0)),
58            tail: CachePad(AtomicUsize::new(0)),
59        }
60    }
61
62    /// Returns the usable capacity of the buffer.
63    pub fn capacity(&self) -> usize {
64        self.cap_mask + 1
65    }
66
67    /// Returns the number of items currently available to read.
68    pub fn len(&self) -> usize {
69        let head = self.head.0.load(Ordering::Acquire);
70        let tail = self.tail.0.load(Ordering::Acquire);
71        head.wrapping_sub(tail)
72    }
73
74    /// Returns true if the buffer is empty.
75    pub fn is_empty(&self) -> bool {
76        self.len() == 0
77    }
78
79    /// Returns the number of free slots available for writing.
80    pub fn available(&self) -> usize {
81        self.capacity() - self.len()
82    }
83
84    /// Write samples into the ring buffer.
85    ///
86    /// Returns the number of samples actually written (may be less than
87    /// `data.len()` if the buffer is nearly full).
88    ///
89    /// This is the **producer** method — call from exactly one thread.
90    pub fn write(&self, data: &[T]) -> usize {
91        let head = self.head.0.load(Ordering::Relaxed);
92        let tail = self.tail.0.load(Ordering::Acquire);
93
94        let free = self.capacity() - head.wrapping_sub(tail);
95        let to_write = data.len().min(free);
96
97        if to_write == 0 {
98            return 0;
99        }
100
101        let start = head & self.cap_mask;
102        let end = start + to_write;
103
104        if end <= self.capacity() {
105            // Contiguous write
106            // SAFETY: we are the sole producer and have verified space is available
107            unsafe {
108                let dst = self.buf.as_ptr() as *mut T;
109                std::ptr::copy_nonoverlapping(data.as_ptr(), dst.add(start), to_write);
110            }
111        } else {
112            // Wrapped write: two segments
113            let first_len = self.capacity() - start;
114            let second_len = to_write - first_len;
115            unsafe {
116                let dst = self.buf.as_ptr() as *mut T;
117                std::ptr::copy_nonoverlapping(data.as_ptr(), dst.add(start), first_len);
118                std::ptr::copy_nonoverlapping(data.as_ptr().add(first_len), dst, second_len);
119            }
120        }
121
122        self.head
123            .0
124            .store(head.wrapping_add(to_write), Ordering::Release);
125        to_write
126    }
127
128    /// Read samples from the ring buffer into `out`.
129    ///
130    /// Returns the number of samples actually read (may be less than
131    /// `out.len()` if the buffer has fewer samples available).
132    ///
133    /// This is the **consumer** method — call from exactly one thread.
134    pub fn read(&self, out: &mut [T]) -> usize {
135        let tail = self.tail.0.load(Ordering::Relaxed);
136        let head = self.head.0.load(Ordering::Acquire);
137
138        let available = head.wrapping_sub(tail);
139        let to_read = out.len().min(available);
140
141        if to_read == 0 {
142            return 0;
143        }
144
145        let start = tail & self.cap_mask;
146        let end = start + to_read;
147
148        if end <= self.capacity() {
149            out[..to_read].copy_from_slice(&self.buf[start..start + to_read]);
150        } else {
151            let first_len = self.capacity() - start;
152            let second_len = to_read - first_len;
153            out[..first_len].copy_from_slice(&self.buf[start..]);
154            out[first_len..to_read].copy_from_slice(&self.buf[..second_len]);
155        }
156
157        self.tail
158            .0
159            .store(tail.wrapping_add(to_read), Ordering::Release);
160        to_read
161    }
162
163    /// Discard all buffered data without reading it.
164    pub fn clear(&self) {
165        let head = self.head.0.load(Ordering::Acquire);
166        self.tail.0.store(head, Ordering::Release);
167    }
168}
169
170// SAFETY: SpscRing is safe to share across threads. The atomic operations on head/tail
171// provide the necessary synchronization. The invariant that exactly one producer and
172// one consumer exist must be upheld by the caller.
173unsafe impl<T: Copy + Default + Send> Send for SpscRing<T> {}
174unsafe impl<T: Copy + Default + Send> Sync for SpscRing<T> {}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179
180    #[test]
181    fn create_ring() {
182        let ring = SpscRing::<i16>::new(100);
183        // Rounds up to 128
184        assert_eq!(ring.capacity(), 128);
185        assert!(ring.is_empty());
186        assert_eq!(ring.available(), 128);
187    }
188
189    #[test]
190    fn write_and_read() {
191        let ring = SpscRing::<i16>::new(16);
192        let data = [1i16, 2, 3, 4, 5];
193
194        let written = ring.write(&data);
195        assert_eq!(written, 5);
196        assert_eq!(ring.len(), 5);
197
198        let mut out = [0i16; 5];
199        let read = ring.read(&mut out);
200        assert_eq!(read, 5);
201        assert_eq!(out, [1, 2, 3, 4, 5]);
202        assert!(ring.is_empty());
203    }
204
205    #[test]
206    fn wraparound() {
207        let ring = SpscRing::<i16>::new(8); // capacity = 8
208        let data = [1i16, 2, 3, 4, 5, 6];
209
210        ring.write(&data);
211        let mut out = [0i16; 4];
212        ring.read(&mut out); // consume 4, tail = 4
213        assert_eq!(out, [1, 2, 3, 4]);
214
215        // Write more — will wrap around
216        let data2 = [7i16, 8, 9, 10, 11, 12];
217        let written = ring.write(&data2);
218        assert_eq!(written, 6);
219
220        let mut out2 = [0i16; 8];
221        let read = ring.read(&mut out2);
222        assert_eq!(read, 8);
223        assert_eq!(&out2[..8], &[5, 6, 7, 8, 9, 10, 11, 12]);
224    }
225
226    #[test]
227    fn overflow_returns_partial() {
228        let ring = SpscRing::<i16>::new(4); // capacity = 4
229        let data = [1i16, 2, 3, 4, 5, 6]; // too many
230        let written = ring.write(&data);
231        assert_eq!(written, 4);
232    }
233
234    #[test]
235    fn underflow_returns_partial() {
236        let ring = SpscRing::<i16>::new(16);
237        ring.write(&[1i16, 2, 3]);
238
239        let mut out = [0i16; 10];
240        let read = ring.read(&mut out);
241        assert_eq!(read, 3);
242        assert_eq!(&out[..3], &[1, 2, 3]);
243    }
244
245    #[test]
246    fn clear_discards_data() {
247        let ring = SpscRing::<i16>::new(16);
248        ring.write(&[1i16, 2, 3, 4, 5]);
249        assert_eq!(ring.len(), 5);
250
251        ring.clear();
252        assert!(ring.is_empty());
253        assert_eq!(ring.available(), 16);
254    }
255
256    #[test]
257    fn concurrent_write_read() {
258        use std::sync::Arc;
259
260        let ring = Arc::new(SpscRing::<i16>::new(1024));
261        let ring_w = ring.clone();
262        let ring_r = ring.clone();
263
264        let writer = std::thread::spawn(move || {
265            let mut total = 0usize;
266            for i in 0..1000 {
267                let chunk: Vec<i16> = (0..16).map(|j| (i * 16 + j) as i16).collect();
268                loop {
269                    let w = ring_w.write(&chunk[total % 16..]);
270                    total += w;
271                    if total >= (i as usize + 1) * 16 {
272                        break;
273                    }
274                    std::thread::yield_now();
275                }
276            }
277        });
278
279        let reader = std::thread::spawn(move || {
280            let mut total = 0usize;
281            let mut buf = [0i16; 64];
282            while total < 16000 {
283                let r = ring_r.read(&mut buf);
284                total += r;
285                if r == 0 {
286                    std::thread::yield_now();
287                }
288            }
289            total
290        });
291
292        writer.join().unwrap();
293        let total_read = reader.join().unwrap();
294        assert_eq!(total_read, 16000);
295    }
296
297    #[test]
298    #[should_panic(expected = "capacity must be > 0")]
299    fn zero_capacity_panics() {
300        SpscRing::<i16>::new(0);
301    }
302}