gemini_genai_rs/buffer/
mod.rs1pub mod convert;
7pub mod jitter;
8
9pub use convert::{bytes_to_i16, i16_to_bytes, into_shared};
10pub use jitter::{AudioJitterBuffer, JitterConfig};
11
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14#[repr(align(128))]
18struct CachePad<T>(T);
19
20pub struct SpscRing<T: Copy + Default> {
36 buf: Box<[T]>,
37 cap_mask: usize,
38 head: CachePad<AtomicUsize>,
39 tail: CachePad<AtomicUsize>,
40}
41
42impl<T: Copy + Default> SpscRing<T> {
43 pub fn new(capacity: usize) -> Self {
51 assert!(capacity > 0, "ring buffer capacity must be > 0");
52 let cap = capacity.next_power_of_two();
53 let buf = vec![T::default(); cap].into_boxed_slice();
54 Self {
55 buf,
56 cap_mask: cap - 1,
57 head: CachePad(AtomicUsize::new(0)),
58 tail: CachePad(AtomicUsize::new(0)),
59 }
60 }
61
62 pub fn capacity(&self) -> usize {
64 self.cap_mask + 1
65 }
66
67 pub fn len(&self) -> usize {
69 let head = self.head.0.load(Ordering::Acquire);
70 let tail = self.tail.0.load(Ordering::Acquire);
71 head.wrapping_sub(tail)
72 }
73
74 pub fn is_empty(&self) -> bool {
76 self.len() == 0
77 }
78
79 pub fn available(&self) -> usize {
81 self.capacity() - self.len()
82 }
83
84 pub fn write(&self, data: &[T]) -> usize {
91 let head = self.head.0.load(Ordering::Relaxed);
92 let tail = self.tail.0.load(Ordering::Acquire);
93
94 let free = self.capacity() - head.wrapping_sub(tail);
95 let to_write = data.len().min(free);
96
97 if to_write == 0 {
98 return 0;
99 }
100
101 let start = head & self.cap_mask;
102 let end = start + to_write;
103
104 if end <= self.capacity() {
105 unsafe {
108 let dst = self.buf.as_ptr() as *mut T;
109 std::ptr::copy_nonoverlapping(data.as_ptr(), dst.add(start), to_write);
110 }
111 } else {
112 let first_len = self.capacity() - start;
114 let second_len = to_write - first_len;
115 unsafe {
116 let dst = self.buf.as_ptr() as *mut T;
117 std::ptr::copy_nonoverlapping(data.as_ptr(), dst.add(start), first_len);
118 std::ptr::copy_nonoverlapping(data.as_ptr().add(first_len), dst, second_len);
119 }
120 }
121
122 self.head
123 .0
124 .store(head.wrapping_add(to_write), Ordering::Release);
125 to_write
126 }
127
128 pub fn read(&self, out: &mut [T]) -> usize {
135 let tail = self.tail.0.load(Ordering::Relaxed);
136 let head = self.head.0.load(Ordering::Acquire);
137
138 let available = head.wrapping_sub(tail);
139 let to_read = out.len().min(available);
140
141 if to_read == 0 {
142 return 0;
143 }
144
145 let start = tail & self.cap_mask;
146 let end = start + to_read;
147
148 if end <= self.capacity() {
149 out[..to_read].copy_from_slice(&self.buf[start..start + to_read]);
150 } else {
151 let first_len = self.capacity() - start;
152 let second_len = to_read - first_len;
153 out[..first_len].copy_from_slice(&self.buf[start..]);
154 out[first_len..to_read].copy_from_slice(&self.buf[..second_len]);
155 }
156
157 self.tail
158 .0
159 .store(tail.wrapping_add(to_read), Ordering::Release);
160 to_read
161 }
162
163 pub fn clear(&self) {
165 let head = self.head.0.load(Ordering::Acquire);
166 self.tail.0.store(head, Ordering::Release);
167 }
168}
169
170unsafe impl<T: Copy + Default + Send> Send for SpscRing<T> {}
174unsafe impl<T: Copy + Default + Send> Sync for SpscRing<T> {}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179
180 #[test]
181 fn create_ring() {
182 let ring = SpscRing::<i16>::new(100);
183 assert_eq!(ring.capacity(), 128);
185 assert!(ring.is_empty());
186 assert_eq!(ring.available(), 128);
187 }
188
189 #[test]
190 fn write_and_read() {
191 let ring = SpscRing::<i16>::new(16);
192 let data = [1i16, 2, 3, 4, 5];
193
194 let written = ring.write(&data);
195 assert_eq!(written, 5);
196 assert_eq!(ring.len(), 5);
197
198 let mut out = [0i16; 5];
199 let read = ring.read(&mut out);
200 assert_eq!(read, 5);
201 assert_eq!(out, [1, 2, 3, 4, 5]);
202 assert!(ring.is_empty());
203 }
204
205 #[test]
206 fn wraparound() {
207 let ring = SpscRing::<i16>::new(8); let data = [1i16, 2, 3, 4, 5, 6];
209
210 ring.write(&data);
211 let mut out = [0i16; 4];
212 ring.read(&mut out); assert_eq!(out, [1, 2, 3, 4]);
214
215 let data2 = [7i16, 8, 9, 10, 11, 12];
217 let written = ring.write(&data2);
218 assert_eq!(written, 6);
219
220 let mut out2 = [0i16; 8];
221 let read = ring.read(&mut out2);
222 assert_eq!(read, 8);
223 assert_eq!(&out2[..8], &[5, 6, 7, 8, 9, 10, 11, 12]);
224 }
225
226 #[test]
227 fn overflow_returns_partial() {
228 let ring = SpscRing::<i16>::new(4); let data = [1i16, 2, 3, 4, 5, 6]; let written = ring.write(&data);
231 assert_eq!(written, 4);
232 }
233
234 #[test]
235 fn underflow_returns_partial() {
236 let ring = SpscRing::<i16>::new(16);
237 ring.write(&[1i16, 2, 3]);
238
239 let mut out = [0i16; 10];
240 let read = ring.read(&mut out);
241 assert_eq!(read, 3);
242 assert_eq!(&out[..3], &[1, 2, 3]);
243 }
244
245 #[test]
246 fn clear_discards_data() {
247 let ring = SpscRing::<i16>::new(16);
248 ring.write(&[1i16, 2, 3, 4, 5]);
249 assert_eq!(ring.len(), 5);
250
251 ring.clear();
252 assert!(ring.is_empty());
253 assert_eq!(ring.available(), 16);
254 }
255
256 #[test]
257 fn concurrent_write_read() {
258 use std::sync::Arc;
259
260 let ring = Arc::new(SpscRing::<i16>::new(1024));
261 let ring_w = ring.clone();
262 let ring_r = ring.clone();
263
264 let writer = std::thread::spawn(move || {
265 let mut total = 0usize;
266 for i in 0..1000 {
267 let chunk: Vec<i16> = (0..16).map(|j| (i * 16 + j) as i16).collect();
268 loop {
269 let w = ring_w.write(&chunk[total % 16..]);
270 total += w;
271 if total >= (i as usize + 1) * 16 {
272 break;
273 }
274 std::thread::yield_now();
275 }
276 }
277 });
278
279 let reader = std::thread::spawn(move || {
280 let mut total = 0usize;
281 let mut buf = [0i16; 64];
282 while total < 16000 {
283 let r = ring_r.read(&mut buf);
284 total += r;
285 if r == 0 {
286 std::thread::yield_now();
287 }
288 }
289 total
290 });
291
292 writer.join().unwrap();
293 let total_read = reader.join().unwrap();
294 assert_eq!(total_read, 16000);
295 }
296
297 #[test]
298 #[should_panic(expected = "capacity must be > 0")]
299 fn zero_capacity_panics() {
300 SpscRing::<i16>::new(0);
301 }
302}