1use std::collections::{HashMap, VecDeque};
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::SystemTime;
11
12use dashmap::DashMap;
13use serde_json::Value;
14
15const DEFAULT_MUTATION_JOURNAL_CAPACITY: usize = 1024;
16
17pub struct StateKey<T> {
29 key: &'static str,
30 _phantom: PhantomData<fn() -> T>,
31}
32
33impl<T> StateKey<T> {
34 pub const fn new(key: &'static str) -> Self {
36 Self {
37 key,
38 _phantom: PhantomData,
39 }
40 }
41
42 pub const fn key(&self) -> &'static str {
44 self.key
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum StateMutationOrigin {
52 Set,
54 SetCommitted,
56 Remove,
58 ClearPrefix,
60 Commit,
62}
63
64#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
69pub struct StateMutation {
70 pub sequence: u64,
72 pub key: String,
74 pub old: Option<Value>,
76 pub new: Option<Value>,
78 pub origin: StateMutationOrigin,
80 #[serde(rename = "timestamp_ms", with = "systemtime_epoch_millis")]
83 pub timestamp: SystemTime,
84 pub delta: bool,
86}
87
88mod systemtime_epoch_millis {
90 use std::time::{Duration, SystemTime, UNIX_EPOCH};
91
92 use serde::{Deserialize, Deserializer, Serializer};
93
94 pub fn serialize<S: Serializer>(t: &SystemTime, ser: S) -> Result<S::Ok, S::Error> {
95 let millis = t
96 .duration_since(UNIX_EPOCH)
97 .map(|d| d.as_millis() as u64)
98 .unwrap_or(0);
99 ser.serialize_u64(millis)
100 }
101
102 pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<SystemTime, D::Error> {
103 let millis = u64::deserialize(de)?;
104 Ok(UNIX_EPOCH + Duration::from_millis(millis))
105 }
106}
107
108pub trait JournalSink: Send + Sync {
118 fn write(&self, m: &StateMutation);
120}
121
122#[derive(Clone, Default)]
125struct JournalSinkSlot(Arc<parking_lot::RwLock<Option<Arc<dyn JournalSink>>>>);
126
127impl std::fmt::Debug for JournalSinkSlot {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 let installed = self.0.read().is_some();
130 f.debug_tuple("JournalSinkSlot").field(&installed).finish()
131 }
132}
133
134const JOURNAL_FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
135
136fn journal_log_error(context: &'static str, e: &dyn std::fmt::Display) {
140 #[cfg(feature = "tracing-support")]
141 tracing::warn!(error = %e, "{context}");
142 #[cfg(not(feature = "tracing-support"))]
143 let _ = (context, e);
144}
145
146struct FileJournalInner {
147 writer: std::io::BufWriter<std::fs::File>,
148 last_flush: std::time::Instant,
149}
150
151pub struct FileJournalSink {
162 inner: parking_lot::Mutex<FileJournalInner>,
163}
164
165impl FileJournalSink {
166 pub fn create(path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
168 let file = std::fs::File::create(path)?;
169 Ok(Self {
170 inner: parking_lot::Mutex::new(FileJournalInner {
171 writer: std::io::BufWriter::new(file),
172 last_flush: std::time::Instant::now(),
173 }),
174 })
175 }
176
177 pub fn flush(&self) {
179 let mut inner = self.inner.lock();
180 if let Err(e) = std::io::Write::flush(&mut inner.writer) {
181 journal_log_error("FileJournalSink flush failed", &e);
182 }
183 inner.last_flush = std::time::Instant::now();
184 }
185}
186
187impl JournalSink for FileJournalSink {
188 fn write(&self, m: &StateMutation) {
189 let line = match serde_json::to_string(m) {
190 Ok(line) => line,
191 Err(e) => {
192 journal_log_error("FileJournalSink serialize failed", &e);
193 return;
194 }
195 };
196 let mut inner = self.inner.lock();
197 if let Err(e) = std::io::Write::write_all(&mut inner.writer, line.as_bytes())
198 .and_then(|()| std::io::Write::write_all(&mut inner.writer, b"\n"))
199 {
200 journal_log_error("FileJournalSink write failed", &e);
201 return;
202 }
203 if inner.last_flush.elapsed() >= JOURNAL_FLUSH_INTERVAL {
204 if let Err(e) = std::io::Write::flush(&mut inner.writer) {
205 journal_log_error("FileJournalSink flush failed", &e);
206 }
207 inner.last_flush = std::time::Instant::now();
208 }
209 }
210}
211
212impl Drop for FileJournalSink {
213 fn drop(&mut self) {
214 if let Err(e) = std::io::Write::flush(&mut self.inner.lock().writer) {
215 journal_log_error("FileJournalSink final flush failed", &e);
216 }
217 }
218}
219
220#[derive(Default)]
222pub struct MemoryJournalSink {
223 entries: parking_lot::Mutex<Vec<StateMutation>>,
224}
225
226impl MemoryJournalSink {
227 pub fn new() -> Self {
229 Self::default()
230 }
231
232 pub fn entries(&self) -> Vec<StateMutation> {
234 self.entries.lock().clone()
235 }
236
237 pub fn len(&self) -> usize {
239 self.entries.lock().len()
240 }
241
242 pub fn is_empty(&self) -> bool {
244 self.entries.lock().is_empty()
245 }
246}
247
248impl JournalSink for MemoryJournalSink {
249 fn write(&self, m: &StateMutation) {
250 self.entries.lock().push(m.clone());
251 }
252}
253
254#[derive(Debug, thiserror::Error)]
256pub enum StateError {
257 #[error("failed to serialize state value for key '{key}': {source}")]
259 Serialize {
260 key: String,
262 source: serde_json::Error,
264 },
265}
266
267#[derive(Debug, Clone)]
273enum DeltaOp {
274 Put(Value),
276 Delete,
278}
279
280#[derive(Debug, Clone, serde::Serialize)]
287pub struct SlotEvidence {
288 pub key: String,
290 pub present: bool,
292 pub value: Option<Value>,
294 pub source: Option<String>,
297 pub confidence: Option<f64>,
299 pub last_sequence: Option<u64>,
302 pub last_origin: Option<StateMutationOrigin>,
304}
305
306#[derive(Debug, Clone)]
312pub struct State {
313 inner: Arc<DashMap<String, Value>>,
314 delta: Arc<DashMap<String, DeltaOp>>,
315 mutations: Arc<std::sync::Mutex<VecDeque<StateMutation>>>,
316 next_mutation_sequence: Arc<AtomicU64>,
317 mutation_capacity: usize,
318 journal_sink: JournalSinkSlot,
319 track_delta: bool,
320}
321
322impl Default for State {
323 fn default() -> Self {
324 Self::new()
325 }
326}
327
328impl State {
329 pub fn new() -> Self {
331 Self {
332 inner: Arc::new(DashMap::new()),
333 delta: Arc::new(DashMap::new()),
334 mutations: Arc::new(std::sync::Mutex::new(VecDeque::new())),
335 next_mutation_sequence: Arc::new(AtomicU64::new(1)),
336 mutation_capacity: DEFAULT_MUTATION_JOURNAL_CAPACITY,
337 journal_sink: JournalSinkSlot::default(),
338 track_delta: false,
339 }
340 }
341
342 pub fn with_delta_tracking(&self) -> State {
345 State {
346 inner: self.inner.clone(),
347 delta: Arc::new(DashMap::new()),
348 mutations: self.mutations.clone(),
349 next_mutation_sequence: self.next_mutation_sequence.clone(),
350 mutation_capacity: self.mutation_capacity,
351 journal_sink: self.journal_sink.clone(),
352 track_delta: true,
353 }
354 }
355
356 pub fn set_journal_sink(&self, sink: Arc<dyn JournalSink>) {
364 *self.journal_sink.0.write() = Some(sink);
365 }
366
367 pub fn with_journal_sink(self, sink: Arc<dyn JournalSink>) -> Self {
369 self.set_journal_sink(sink);
370 self
371 }
372
373 pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
376 self.get_raw(key)
377 .and_then(|v| serde_json::from_value(v).ok())
378 }
379
380 pub fn with<F, R>(&self, key: &str, f: F) -> Option<R>
388 where
389 F: FnOnce(&Value) -> R,
390 {
391 if self.track_delta {
392 match self.delta.get(key).map(|r| r.value().clone()) {
393 Some(DeltaOp::Put(v)) => return Some(f(&v)),
394 Some(DeltaOp::Delete) => return None, None => {}
396 }
397 }
398 if let Some(ref_multi) = self.inner.get(key) {
399 return Some(f(ref_multi.value()));
400 }
401 if !key.contains(':') {
402 let mut derived_key = String::with_capacity(8 + key.len());
403 use std::fmt::Write;
404 let _ = write!(derived_key, "derived:{}", key);
405 if self.track_delta {
406 match self.delta.get(&derived_key).map(|r| r.value().clone()) {
407 Some(DeltaOp::Put(v)) => return Some(f(&v)),
408 Some(DeltaOp::Delete) => return None,
409 None => {}
410 }
411 }
412 if let Some(ref_multi) = self.inner.get(&derived_key) {
413 return Some(f(ref_multi.value()));
414 }
415 }
416 None
417 }
418
419 pub fn get_raw(&self, key: &str) -> Option<Value> {
424 if self.track_delta {
425 match self.delta.get(key).map(|r| r.value().clone()) {
426 Some(DeltaOp::Put(v)) => return Some(v),
427 Some(DeltaOp::Delete) => return None, None => {}
429 }
430 }
431 if let Some(v) = self.inner.get(key) {
432 return Some(v.value().clone());
433 }
434 if !key.contains(':') {
436 use std::fmt::Write;
437 let mut derived_key = String::with_capacity(8 + key.len());
438 let _ = write!(derived_key, "derived:{}", key);
439 if self.track_delta {
440 match self.delta.get(&derived_key).map(|r| r.value().clone()) {
441 Some(DeltaOp::Put(v)) => return Some(v),
442 Some(DeltaOp::Delete) => return None,
443 None => {}
444 }
445 }
446 return self.inner.get(&derived_key).map(|v| v.value().clone());
447 }
448 None
449 }
450
451 pub fn get_key<T: serde::de::DeserializeOwned>(&self, key: &StateKey<T>) -> Option<T> {
453 self.get(key.key())
454 }
455
456 pub fn set_key<T: serde::Serialize>(
460 &self,
461 key: &StateKey<T>,
462 value: T,
463 ) -> Result<(), StateError> {
464 self.set(key.key(), value)
465 }
466
467 pub fn with_key<T, F, R>(&self, key: &StateKey<T>, f: F) -> Option<R>
469 where
470 F: FnOnce(&Value) -> R,
471 {
472 self.with(key.key(), f)
473 }
474
475 pub fn set(
481 &self,
482 key: impl Into<String>,
483 value: impl serde::Serialize,
484 ) -> Result<(), StateError> {
485 let key = key.into();
486 let v = serde_json::to_value(value).map_err(|source| StateError::Serialize {
487 key: key.clone(),
488 source,
489 })?;
490 self.put_value(key, v, StateMutationOrigin::Set);
491 Ok(())
492 }
493
494 fn put_value(&self, key: String, v: Value, origin: StateMutationOrigin) {
499 let old = self.get_raw(&key);
500 if self.track_delta {
501 self.delta.insert(key.clone(), DeltaOp::Put(v.clone()));
502 } else {
503 self.inner.insert(key.clone(), v.clone());
504 }
505 self.record_mutation(key, old, Some(v), origin);
506 }
507
508 pub fn set_committed(
512 &self,
513 key: impl Into<String>,
514 value: impl serde::Serialize,
515 ) -> Result<(), StateError> {
516 let key = key.into();
517 let v = serde_json::to_value(value).map_err(|source| StateError::Serialize {
518 key: key.clone(),
519 source,
520 })?;
521 let old = self.inner.insert(key.clone(), v.clone());
522 self.record_mutation(key, old, Some(v), StateMutationOrigin::SetCommitted);
523 Ok(())
524 }
525
526 pub fn modify<T, F>(&self, key: &str, default: T, f: F) -> Result<T, StateError>
534 where
535 T: serde::Serialize + serde::de::DeserializeOwned,
536 F: FnOnce(T) -> T,
537 {
538 use dashmap::mapref::entry::Entry;
539
540 let serialize = |key: &str, val: &T| {
541 serde_json::to_value(val).map_err(|source| StateError::Serialize {
542 key: key.to_string(),
543 source,
544 })
545 };
546
547 if self.track_delta {
548 match self.delta.entry(key.to_string()) {
551 Entry::Occupied(mut o) => {
552 let current = match o.get() {
553 DeltaOp::Put(v) => serde_json::from_value(v.clone()).unwrap_or(default),
554 DeltaOp::Delete => default,
555 };
556 let old = self.inner.get(key).map(|r| r.value().clone());
557 let new_val = f(current);
558 let v = serialize(key, &new_val)?;
559 o.insert(DeltaOp::Put(v.clone()));
560 self.record_mutation(key.to_string(), old, Some(v), StateMutationOrigin::Set);
561 Ok(new_val)
562 }
563 Entry::Vacant(slot) => {
564 let base = self
565 .inner
566 .get(key)
567 .and_then(|r| serde_json::from_value(r.value().clone()).ok());
568 let old = self.inner.get(key).map(|r| r.value().clone());
569 let new_val = f(base.unwrap_or(default));
570 let v = serialize(key, &new_val)?;
571 slot.insert(DeltaOp::Put(v.clone()));
572 self.record_mutation(key.to_string(), old, Some(v), StateMutationOrigin::Set);
573 Ok(new_val)
574 }
575 }
576 } else {
577 match self.inner.entry(key.to_string()) {
578 Entry::Occupied(mut o) => {
579 let old = o.get().clone();
580 let current = serde_json::from_value(old.clone()).unwrap_or(default);
581 let new_val = f(current);
582 let v = serialize(key, &new_val)?;
583 o.insert(v.clone());
584 self.record_mutation(
585 key.to_string(),
586 Some(old),
587 Some(v),
588 StateMutationOrigin::Set,
589 );
590 Ok(new_val)
591 }
592 Entry::Vacant(slot) => {
593 let new_val = f(default);
594 let v = serialize(key, &new_val)?;
595 slot.insert(v.clone());
596 self.record_mutation(key.to_string(), None, Some(v), StateMutationOrigin::Set);
597 Ok(new_val)
598 }
599 }
600 }
601 }
602
603 pub fn contains(&self, key: &str) -> bool {
605 if self.track_delta {
606 match self.delta.get(key).map(|r| r.value().clone()) {
607 Some(DeltaOp::Put(_)) => return true,
608 Some(DeltaOp::Delete) => return false, None => {}
610 }
611 }
612 self.inner.contains_key(key)
613 }
614
615 pub fn remove(&self, key: &str) -> Option<Value> {
621 if self.track_delta {
622 let removed = self.get_raw(key);
623 self.delta.insert(key.to_string(), DeltaOp::Delete);
626 if let Some(ref old) = removed {
627 self.record_mutation(
628 key.to_string(),
629 Some(old.clone()),
630 None,
631 StateMutationOrigin::Remove,
632 );
633 }
634 removed
635 } else {
636 let removed = self.inner.remove(key).map(|(_, v)| v);
637 if let Some(ref old) = removed {
638 self.record_mutation(
639 key.to_string(),
640 Some(old.clone()),
641 None,
642 StateMutationOrigin::Remove,
643 );
644 }
645 removed
646 }
647 }
648
649 pub fn keys(&self) -> Vec<String> {
653 if !self.track_delta || self.delta.is_empty() {
654 return self.inner.iter().map(|r| r.key().clone()).collect();
655 }
656 let mut seen =
657 std::collections::HashSet::with_capacity(self.inner.len() + self.delta.len());
658 let mut keys = Vec::with_capacity(self.inner.len() + self.delta.len());
659 for entry in self.delta.iter() {
661 let key = entry.key().clone();
662 seen.insert(key.clone());
663 if matches!(entry.value(), DeltaOp::Put(_)) {
664 keys.push(key);
665 }
666 }
667 for entry in self.inner.iter() {
668 let key = entry.key().clone();
669 if seen.insert(key.clone()) {
670 keys.push(key);
671 }
672 }
673 keys
674 }
675
676 pub fn pick(&self, keys: &[&str]) -> State {
678 let new = State::new();
679 for key in keys {
680 if let Some(v) = self.get_raw(key) {
681 new.put_value((*key).to_string(), v, StateMutationOrigin::Set);
682 }
683 }
684 new
685 }
686
687 pub fn merge(&self, other: &State) {
689 for entry in other.inner.iter() {
690 self.put_value(
691 entry.key().clone(),
692 entry.value().clone(),
693 StateMutationOrigin::Set,
694 );
695 }
696 }
697
698 pub fn rename(&self, from: &str, to: &str) {
700 if let Some(v) = self.remove(from) {
701 self.put_value(to.to_string(), v, StateMutationOrigin::Set);
702 }
703 }
704
705 pub fn is_tracking_delta(&self) -> bool {
709 self.track_delta
710 }
711
712 pub fn has_delta(&self) -> bool {
714 self.track_delta && !self.delta.is_empty()
715 }
716
717 pub fn delta(&self) -> HashMap<String, Value> {
719 self.delta
720 .iter()
721 .filter_map(|entry| match entry.value() {
722 DeltaOp::Put(v) => Some((entry.key().clone(), v.clone())),
723 DeltaOp::Delete => None,
724 })
725 .collect()
726 }
727
728 pub fn commit(&self) {
733 let ops: Vec<(String, DeltaOp)> = self
735 .delta
736 .iter()
737 .map(|e| (e.key().clone(), e.value().clone()))
738 .collect();
739 for (key, op) in ops {
740 match op {
741 DeltaOp::Put(value) => {
742 let old = self.inner.insert(key.clone(), value.clone());
743 self.record_mutation_with_delta(
744 key,
745 old,
746 Some(value),
747 StateMutationOrigin::Commit,
748 false,
749 );
750 }
751 DeltaOp::Delete => {
752 if let Some((_, old)) = self.inner.remove(&key) {
753 self.record_mutation_with_delta(
754 key,
755 Some(old),
756 None,
757 StateMutationOrigin::Commit,
758 false,
759 );
760 }
761 }
762 }
763 }
764 self.delta.clear();
765 }
766
767 pub fn rollback(&self) {
773 self.delta.clear();
774 }
775
776 pub fn app(&self) -> PrefixedState<'_> {
780 PrefixedState {
781 state: self,
782 prefix: "app:",
783 }
784 }
785
786 pub fn user(&self) -> PrefixedState<'_> {
788 PrefixedState {
789 state: self,
790 prefix: "user:",
791 }
792 }
793
794 pub fn temp(&self) -> PrefixedState<'_> {
796 PrefixedState {
797 state: self,
798 prefix: "temp:",
799 }
800 }
801
802 pub fn session(&self) -> PrefixedState<'_> {
804 PrefixedState {
805 state: self,
806 prefix: "session:",
807 }
808 }
809
810 pub fn turn(&self) -> PrefixedState<'_> {
812 PrefixedState {
813 state: self,
814 prefix: "turn:",
815 }
816 }
817
818 pub fn bg(&self) -> PrefixedState<'_> {
820 PrefixedState {
821 state: self,
822 prefix: "bg:",
823 }
824 }
825
826 pub fn derived(&self) -> ReadOnlyPrefixedState<'_> {
828 ReadOnlyPrefixedState {
829 state: self,
830 prefix: "derived:",
831 }
832 }
833
834 pub fn snapshot_values(&self, keys: &[&str]) -> HashMap<String, Value> {
839 keys.iter()
840 .filter_map(|&k| self.get_raw(k).map(|v| (k.to_string(), v)))
841 .collect()
842 }
843
844 pub fn diff_values(
847 &self,
848 prev: &HashMap<String, Value>,
849 keys: &[&str],
850 ) -> Vec<(String, Value, Value)> {
851 keys.iter()
852 .filter_map(|&k| {
853 let old = prev.get(k);
854 let new = self.get_raw(k);
855 match (old, new) {
856 (Some(o), Some(n)) if o != &n => Some((k.to_string(), o.clone(), n)),
857 (None, Some(n)) => Some((k.to_string(), Value::Null, n)),
858 (Some(o), None) => Some((k.to_string(), o.clone(), Value::Null)),
859 _ => None,
860 }
861 })
862 .collect()
863 }
864
865 pub fn to_hashmap(&self) -> std::collections::HashMap<String, serde_json::Value> {
867 self.inner
868 .iter()
869 .map(|entry| (entry.key().clone(), entry.value().clone()))
870 .collect()
871 }
872
873 pub fn from_hashmap(&self, map: std::collections::HashMap<String, serde_json::Value>) {
875 for (key, value) in map {
876 let old = self.inner.insert(key.clone(), value.clone());
878 self.record_mutation(key, old, Some(value), StateMutationOrigin::SetCommitted);
879 }
880 }
881
882 pub fn clear_prefix(&self, prefix: &str) {
888 if self.track_delta {
889 let keys: Vec<String> = self
890 .keys()
891 .into_iter()
892 .filter(|k| k.starts_with(prefix))
893 .collect();
894 for key in keys {
895 let old = self.get_raw(&key);
896 self.delta.insert(key.clone(), DeltaOp::Delete);
897 if let Some(old) = old {
898 self.record_mutation(key, Some(old), None, StateMutationOrigin::ClearPrefix);
899 }
900 }
901 return;
902 }
903 let keys_to_remove: Vec<String> = self
904 .inner
905 .iter()
906 .filter(|entry| entry.key().starts_with(prefix))
907 .map(|entry| entry.key().clone())
908 .collect();
909 for key in keys_to_remove {
910 if let Some((_, old)) = self.inner.remove(&key) {
911 self.record_mutation(key, Some(old), None, StateMutationOrigin::ClearPrefix);
912 }
913 }
914 }
915
916 pub fn recent_mutations(&self) -> Vec<StateMutation> {
918 self.mutations
919 .lock()
920 .expect("state mutation journal poisoned")
921 .iter()
922 .cloned()
923 .collect()
924 }
925
926 pub fn mutation_cursor(&self) -> u64 {
928 self.next_mutation_sequence.load(Ordering::Relaxed) - 1
929 }
930
931 pub fn mutations_since(&self, cursor: u64) -> Vec<StateMutation> {
933 let mutations = self
934 .mutations
935 .lock()
936 .expect("state mutation journal poisoned");
937 mutations
938 .iter()
939 .filter(|mutation| mutation.sequence > cursor)
940 .cloned()
941 .collect()
942 }
943
944 pub fn drain_mutations(&self) -> Vec<StateMutation> {
946 self.mutations
947 .lock()
948 .expect("state mutation journal poisoned")
949 .drain(..)
950 .collect()
951 }
952
953 pub fn evidence(&self, key: &str) -> SlotEvidence {
956 let value = self.get_raw(key);
957 let meta = self.get::<Value>(&format!("state_meta:{key}"));
958 let source = meta
959 .as_ref()
960 .and_then(|m| m.get("source"))
961 .and_then(|s| s.as_str().map(String::from));
962 let confidence = meta
963 .as_ref()
964 .and_then(|m| m.get("confidence"))
965 .and_then(serde_json::Value::as_f64);
966
967 let mut last_sequence: Option<u64> = None;
968 let mut last_origin: Option<StateMutationOrigin> = None;
969 for m in self.recent_mutations() {
970 if m.key == key && last_sequence.is_none_or(|s| m.sequence > s) {
971 last_sequence = Some(m.sequence);
972 last_origin = Some(m.origin);
973 }
974 }
975
976 SlotEvidence {
977 key: key.to_string(),
978 present: value.is_some(),
979 value,
980 source,
981 confidence,
982 last_sequence,
983 last_origin,
984 }
985 }
986
987 fn record_mutation(
988 &self,
989 key: String,
990 old: Option<Value>,
991 new: Option<Value>,
992 origin: StateMutationOrigin,
993 ) {
994 self.record_mutation_with_delta(key, old, new, origin, self.track_delta);
995 }
996
997 fn record_mutation_with_delta(
998 &self,
999 key: String,
1000 old: Option<Value>,
1001 new: Option<Value>,
1002 origin: StateMutationOrigin,
1003 delta: bool,
1004 ) {
1005 let mut mutations = self
1006 .mutations
1007 .lock()
1008 .expect("state mutation journal poisoned");
1009 if mutations.len() >= self.mutation_capacity {
1010 mutations.pop_front();
1011 }
1012 let sequence = self.next_mutation_sequence.fetch_add(1, Ordering::Relaxed);
1013 let mutation = StateMutation {
1014 sequence,
1015 key,
1016 old,
1017 new,
1018 origin,
1019 timestamp: SystemTime::now(),
1020 delta,
1021 };
1022 if let Some(sink) = self.journal_sink.0.read().as_ref() {
1025 sink.write(&mutation);
1026 }
1027 mutations.push_back(mutation);
1028 }
1029}
1030
1031pub struct PrefixedState<'a> {
1033 state: &'a State,
1034 prefix: &'static str,
1035}
1036
1037impl<'a> PrefixedState<'a> {
1038 fn prefixed_key(&self, key: &str) -> String {
1039 format!("{}{}", self.prefix, key)
1040 }
1041
1042 pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
1044 self.state.get(&self.prefixed_key(key))
1045 }
1046
1047 pub fn get_raw(&self, key: &str) -> Option<Value> {
1049 self.state.get_raw(&self.prefixed_key(key))
1050 }
1051
1052 pub fn with<F, R>(&self, key: &str, f: F) -> Option<R>
1054 where
1055 F: FnOnce(&Value) -> R,
1056 {
1057 self.state.with(&self.prefixed_key(key), f)
1058 }
1059
1060 pub fn set(
1064 &self,
1065 key: impl AsRef<str>,
1066 value: impl serde::Serialize,
1067 ) -> Result<(), StateError> {
1068 self.state.set(self.prefixed_key(key.as_ref()), value)
1069 }
1070
1071 pub fn contains(&self, key: &str) -> bool {
1073 self.state.contains(&self.prefixed_key(key))
1074 }
1075
1076 pub fn remove(&self, key: &str) -> Option<Value> {
1078 self.state.remove(&self.prefixed_key(key))
1079 }
1080
1081 pub fn keys(&self) -> Vec<String> {
1083 self.state
1084 .keys()
1085 .into_iter()
1086 .filter_map(|k| k.strip_prefix(self.prefix).map(|s| s.to_string()))
1087 .collect()
1088 }
1089}
1090
1091pub struct ReadOnlyPrefixedState<'a> {
1096 state: &'a State,
1097 prefix: &'static str,
1098}
1099
1100impl<'a> ReadOnlyPrefixedState<'a> {
1101 fn prefixed_key(&self, key: &str) -> String {
1102 format!("{}{}", self.prefix, key)
1103 }
1104
1105 pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
1107 self.state.get(&self.prefixed_key(key))
1108 }
1109
1110 pub fn get_raw(&self, key: &str) -> Option<Value> {
1112 self.state.get_raw(&self.prefixed_key(key))
1113 }
1114
1115 pub fn with<F, R>(&self, key: &str, f: F) -> Option<R>
1117 where
1118 F: FnOnce(&Value) -> R,
1119 {
1120 self.state.with(&self.prefixed_key(key), f)
1121 }
1122
1123 pub fn contains(&self, key: &str) -> bool {
1125 self.state.contains(&self.prefixed_key(key))
1126 }
1127
1128 pub fn keys(&self) -> Vec<String> {
1130 self.state
1131 .keys()
1132 .into_iter()
1133 .filter_map(|k| k.strip_prefix(self.prefix).map(|s| s.to_string()))
1134 .collect()
1135 }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140 use super::*;
1141
1142 #[test]
1143 fn journal_sink_receives_every_mutation_in_ring_order() {
1144 let state = State::new();
1145 let sink = Arc::new(MemoryJournalSink::new());
1146 state.set_journal_sink(sink.clone());
1147
1148 let _ = state.set("a", 1);
1149 let _ = state.set("b", "two");
1150 state.remove("a");
1151
1152 let entries = sink.entries();
1153 assert_eq!(entries.len(), 3);
1154 assert_eq!(entries, state.recent_mutations());
1155 assert_eq!(entries[0].key, "a");
1156 assert_eq!(entries[2].origin, StateMutationOrigin::Remove);
1157 }
1158
1159 #[test]
1160 fn journal_sink_is_shared_with_clones_and_delta_views() {
1161 let state = State::new();
1162 let sink = Arc::new(MemoryJournalSink::new());
1163 state.set_journal_sink(sink.clone());
1164
1165 let clone = state.clone();
1166 let _ = clone.set("from_clone", true);
1167
1168 let tracked = state.with_delta_tracking();
1169 let _ = tracked.set("from_delta", 1);
1170 tracked.commit();
1171
1172 let keys: Vec<_> = sink.entries().iter().map(|m| m.key.clone()).collect();
1173 assert!(keys.contains(&"from_clone".to_string()));
1174 assert!(keys.contains(&"from_delta".to_string()));
1175 assert!(sink
1177 .entries()
1178 .iter()
1179 .any(|m| m.origin == StateMutationOrigin::Commit));
1180 }
1181
1182 #[test]
1183 fn journal_sink_outlives_ring_capacity() {
1184 let state = State::new();
1186 let sink = Arc::new(MemoryJournalSink::new());
1187 state.set_journal_sink(sink.clone());
1188
1189 for i in 0..(DEFAULT_MUTATION_JOURNAL_CAPACITY + 10) {
1190 let _ = state.set(format!("k{i}"), i);
1191 }
1192
1193 assert_eq!(
1194 state.recent_mutations().len(),
1195 DEFAULT_MUTATION_JOURNAL_CAPACITY
1196 );
1197 assert_eq!(sink.len(), DEFAULT_MUTATION_JOURNAL_CAPACITY + 10);
1198 assert_eq!(sink.entries()[0].key, "k0");
1199 }
1200
1201 #[test]
1202 fn state_mutation_serde_round_trip_uses_epoch_millis() {
1203 let m = StateMutation {
1204 sequence: 42,
1205 key: "app:last_city".into(),
1206 old: None,
1207 new: Some(serde_json::json!("London")),
1208 origin: StateMutationOrigin::Set,
1209 timestamp: std::time::UNIX_EPOCH + std::time::Duration::from_millis(1_718_000_000_123),
1210 delta: false,
1211 };
1212 let json = serde_json::to_string(&m).unwrap();
1213 assert!(json.contains("\"timestamp_ms\":1718000000123"));
1214 assert!(json.contains("\"origin\":\"set\""));
1215 let back: StateMutation = serde_json::from_str(&json).unwrap();
1216 assert_eq!(back, m);
1217 }
1218
1219 #[test]
1220 fn file_journal_sink_round_trip() {
1221 let dir = std::env::temp_dir().join(format!(
1222 "gemini-rs-journal-test-{}-{}",
1223 std::process::id(),
1224 std::time::SystemTime::now()
1225 .duration_since(std::time::UNIX_EPOCH)
1226 .unwrap()
1227 .as_nanos()
1228 ));
1229 std::fs::create_dir_all(&dir).unwrap();
1230 let path = dir.join("session.journal.jsonl");
1231
1232 let state = State::new();
1233 {
1234 let sink = Arc::new(FileJournalSink::create(&path).unwrap());
1235 state.set_journal_sink(sink);
1236 let _ = state.set("a", 1);
1237 let _ = state.set("a", 2);
1238 state.remove("a");
1239 state.set_journal_sink(Arc::new(MemoryJournalSink::new()));
1241 }
1242
1243 let data = std::fs::read_to_string(&path).unwrap();
1244 let parsed: Vec<StateMutation> = data
1245 .lines()
1246 .filter(|l| !l.trim().is_empty())
1247 .map(|l| serde_json::from_str(l).unwrap())
1248 .collect();
1249 assert_eq!(parsed.len(), 3);
1250 assert_eq!(parsed[0].new, Some(serde_json::json!(1)));
1251 assert_eq!(parsed[2].origin, StateMutationOrigin::Remove);
1252
1253 let _ = std::fs::remove_dir_all(&dir);
1254 }
1255
1256 #[test]
1257 fn set_and_get_string() {
1258 let state = State::new();
1259 let _ = state.set("name", "Alice");
1260 assert_eq!(state.get::<String>("name"), Some("Alice".to_string()));
1261 }
1262
1263 #[test]
1264 fn set_and_get_json() {
1265 let state = State::new();
1266 let _ = state.set("data", serde_json::json!({"temp": 22}));
1267 let v: Value = state.get("data").unwrap();
1268 assert_eq!(v["temp"], 22);
1269 }
1270
1271 #[test]
1272 fn pick_subset() {
1273 let state = State::new();
1274 let _ = state.set("a", 1);
1275 let _ = state.set("b", 2);
1276 let _ = state.set("c", 3);
1277 let picked = state.pick(&["a", "c"]);
1278 assert!(picked.contains("a"));
1279 assert!(!picked.contains("b"));
1280 assert!(picked.contains("c"));
1281 }
1282
1283 #[test]
1284 fn merge_states() {
1285 let s1 = State::new();
1286 let _ = s1.set("a", 1);
1287 let s2 = State::new();
1288 let _ = s2.set("b", 2);
1289 s1.merge(&s2);
1290 assert!(s1.contains("a"));
1291 assert!(s1.contains("b"));
1292 }
1293
1294 #[test]
1295 fn rename_key() {
1296 let state = State::new();
1297 let _ = state.set("old", "value");
1298 state.rename("old", "new");
1299 assert!(!state.contains("old"));
1300 assert_eq!(state.get::<String>("new"), Some("value".to_string()));
1301 }
1302
1303 #[test]
1304 fn remove_returns_value() {
1305 let state = State::new();
1306 let _ = state.set("key", 42);
1307 let removed = state.remove("key");
1308 assert!(removed.is_some());
1309 assert!(!state.contains("key"));
1310 }
1311
1312 #[test]
1313 fn get_missing_returns_none() {
1314 let state = State::new();
1315 assert_eq!(state.get::<String>("nope"), None);
1316 }
1317
1318 #[test]
1321 fn delta_tracking_writes_to_delta() {
1322 let state = State::new();
1323 let _ = state.set("committed", "yes");
1324
1325 let tracked = state.with_delta_tracking();
1326 let _ = tracked.set("new_key", "new_value");
1327
1328 assert_eq!(
1330 tracked.get::<String>("new_key"),
1331 Some("new_value".to_string())
1332 );
1333 assert!(!state.contains("new_key"));
1335 assert_eq!(tracked.get::<String>("committed"), Some("yes".to_string()));
1337 }
1338
1339 #[test]
1340 fn delta_has_delta_reports_correctly() {
1341 let state = State::new();
1342 let tracked = state.with_delta_tracking();
1343 assert!(!tracked.has_delta());
1344
1345 let _ = tracked.set("key", "val");
1346 assert!(tracked.has_delta());
1347 }
1348
1349 #[test]
1350 fn delta_commit_merges_to_inner() {
1351 let state = State::new();
1352 let tracked = state.with_delta_tracking();
1353 let _ = tracked.set("key", "val");
1354 assert!(!state.contains("key"));
1355
1356 tracked.commit();
1357 assert_eq!(state.get::<String>("key"), Some("val".to_string()));
1359 assert!(!tracked.has_delta());
1360 }
1361
1362 #[test]
1363 fn delta_rollback_discards_changes() {
1364 let state = State::new();
1365 let tracked = state.with_delta_tracking();
1366 let _ = tracked.set("key", "val");
1367 assert!(tracked.has_delta());
1368
1369 tracked.rollback();
1370 assert!(!tracked.has_delta());
1371 assert!(!state.contains("key"));
1372 assert!(!tracked.contains("key"));
1373 }
1374
1375 #[test]
1376 fn delta_snapshot() {
1377 let state = State::new();
1378 let tracked = state.with_delta_tracking();
1379 let _ = tracked.set("a", 1);
1380 let _ = tracked.set("b", 2);
1381
1382 let snapshot = tracked.delta();
1383 assert_eq!(snapshot.len(), 2);
1384 assert!(snapshot.contains_key("a"));
1385 assert!(snapshot.contains_key("b"));
1386 }
1387
1388 #[test]
1389 fn set_committed_bypasses_delta() {
1390 let state = State::new();
1391 let tracked = state.with_delta_tracking();
1392 let _ = tracked.set_committed("direct", "value");
1393
1394 assert_eq!(state.get::<String>("direct"), Some("value".to_string()));
1396 assert!(!tracked.has_delta());
1398 assert_eq!(tracked.get::<String>("direct"), Some("value".to_string()));
1400 }
1401
1402 #[test]
1403 fn mutation_journal_records_set_and_remove() {
1404 let state = State::new();
1405 let _ = state.set("key", "first");
1406 let _ = state.set("key", "second");
1407 state.remove("key");
1408
1409 let mutations = state.recent_mutations();
1410 assert_eq!(mutations.len(), 3);
1411 assert_eq!(mutations[0].key, "key");
1412 assert_eq!(mutations[0].old, None);
1413 assert_eq!(mutations[0].new, Some(serde_json::json!("first")));
1414 assert_eq!(mutations[0].origin, StateMutationOrigin::Set);
1415
1416 assert_eq!(mutations[1].old, Some(serde_json::json!("first")));
1417 assert_eq!(mutations[1].new, Some(serde_json::json!("second")));
1418
1419 assert_eq!(mutations[2].old, Some(serde_json::json!("second")));
1420 assert_eq!(mutations[2].new, None);
1421 assert_eq!(mutations[2].origin, StateMutationOrigin::Remove);
1422 }
1423
1424 #[test]
1425 fn mutation_journal_is_shared_with_delta_tracking() {
1426 let state = State::new();
1427 let _ = state.set("committed", "yes");
1428
1429 let tracked = state.with_delta_tracking();
1430 let _ = tracked.set("committed", "maybe");
1431 tracked.commit();
1432
1433 let mutations = state.recent_mutations();
1434 assert_eq!(mutations.len(), 3);
1435 assert_eq!(mutations[1].key, "committed");
1436 assert_eq!(mutations[1].old, Some(serde_json::json!("yes")));
1437 assert_eq!(mutations[1].new, Some(serde_json::json!("maybe")));
1438 assert_eq!(mutations[1].origin, StateMutationOrigin::Set);
1439 assert!(mutations[1].delta);
1440
1441 assert_eq!(mutations[2].origin, StateMutationOrigin::Commit);
1442 assert!(!mutations[2].delta);
1443 }
1444
1445 #[test]
1446 fn drain_mutations_clears_journal() {
1447 let state = State::new();
1448 let _ = state.set("a", 1);
1449 let _ = state.set("b", 2);
1450
1451 let drained = state.drain_mutations();
1452 assert_eq!(drained.len(), 2);
1453 assert!(state.recent_mutations().is_empty());
1454 }
1455
1456 #[test]
1457 fn mutation_cursor_reads_only_later_changes() {
1458 let state = State::new();
1459 let _ = state.set("before", 1);
1460 let cursor = state.mutation_cursor();
1461
1462 let _ = state.set("after", 2);
1463 state.remove("before");
1464
1465 let mutations = state.mutations_since(cursor);
1466 assert_eq!(mutations.len(), 2);
1467 assert_eq!(mutations[0].key, "after");
1468 assert_eq!(mutations[1].key, "before");
1469 }
1470
1471 #[test]
1472 fn no_delta_tracking_preserves_existing_behavior() {
1473 let state = State::new();
1474 assert!(!state.is_tracking_delta());
1475 let _ = state.set("key", "val");
1476 assert_eq!(state.get::<String>("key"), Some("val".to_string()));
1477 assert!(!state.has_delta());
1478 }
1479
1480 #[test]
1483 fn prefix_app_set_and_get() {
1484 let state = State::new();
1485 let _ = state.app().set("flag", true);
1486
1487 assert_eq!(state.app().get::<bool>("flag"), Some(true));
1489 assert_eq!(state.get::<bool>("app:flag"), Some(true));
1491 }
1492
1493 #[test]
1494 fn prefix_user_set_and_get() {
1495 let state = State::new();
1496 let _ = state.user().set("name", "Alice");
1497 assert_eq!(
1498 state.user().get::<String>("name"),
1499 Some("Alice".to_string())
1500 );
1501 assert_eq!(state.get::<String>("user:name"), Some("Alice".to_string()));
1502 }
1503
1504 #[test]
1505 fn prefix_temp_set_and_get() {
1506 let state = State::new();
1507 let _ = state.temp().set("scratch", 42);
1508 assert_eq!(state.temp().get::<i32>("scratch"), Some(42));
1509 }
1510
1511 #[test]
1512 fn prefix_contains_and_remove() {
1513 let state = State::new();
1514 let _ = state.app().set("x", 1);
1515 assert!(state.app().contains("x"));
1516 state.app().remove("x");
1517 assert!(!state.app().contains("x"));
1518 }
1519
1520 #[test]
1521 fn prefix_keys() {
1522 let state = State::new();
1523 let _ = state.app().set("a", 1);
1524 let _ = state.app().set("b", 2);
1525 let _ = state.user().set("c", 3);
1526
1527 let app_keys = state.app().keys();
1528 assert_eq!(app_keys.len(), 2);
1529 assert!(app_keys.contains(&"a".to_string()));
1530 assert!(app_keys.contains(&"b".to_string()));
1531
1532 let user_keys = state.user().keys();
1533 assert_eq!(user_keys.len(), 1);
1534 assert!(user_keys.contains(&"c".to_string()));
1535 }
1536
1537 #[test]
1538 fn prefix_with_delta_tracking() {
1539 let state = State::new();
1540 let tracked = state.with_delta_tracking();
1541 let _ = tracked.app().set("flag", true);
1542
1543 assert_eq!(tracked.app().get::<bool>("flag"), Some(true));
1545 assert!(tracked.has_delta());
1547 assert!(!state.contains("app:flag"));
1548
1549 tracked.commit();
1550 assert_eq!(state.get::<bool>("app:flag"), Some(true));
1551 }
1552
1553 #[test]
1556 fn prefix_session_set_and_get() {
1557 let state = State::new();
1558 let _ = state.session().set("turn_count", 5);
1559 assert_eq!(state.session().get::<i32>("turn_count"), Some(5));
1560 assert_eq!(state.get::<i32>("session:turn_count"), Some(5));
1561 }
1562
1563 #[test]
1564 fn prefix_turn_set_and_get() {
1565 let state = State::new();
1566 let _ = state.turn().set("transcript", "hello");
1567 assert_eq!(
1568 state.turn().get::<String>("transcript"),
1569 Some("hello".to_string())
1570 );
1571 assert_eq!(
1572 state.get::<String>("turn:transcript"),
1573 Some("hello".to_string())
1574 );
1575 }
1576
1577 #[test]
1578 fn prefix_bg_set_and_get() {
1579 let state = State::new();
1580 let _ = state.bg().set("task_id", "abc-123");
1581 assert_eq!(
1582 state.bg().get::<String>("task_id"),
1583 Some("abc-123".to_string())
1584 );
1585 assert_eq!(
1586 state.get::<String>("bg:task_id"),
1587 Some("abc-123".to_string())
1588 );
1589 }
1590
1591 #[test]
1592 fn prefix_session_contains_and_remove() {
1593 let state = State::new();
1594 let _ = state.session().set("x", 1);
1595 assert!(state.session().contains("x"));
1596 state.session().remove("x");
1597 assert!(!state.session().contains("x"));
1598 }
1599
1600 #[test]
1601 fn prefix_turn_keys() {
1602 let state = State::new();
1603 let _ = state.turn().set("a", 1);
1604 let _ = state.turn().set("b", 2);
1605 let _ = state.session().set("c", 3);
1606
1607 let turn_keys = state.turn().keys();
1608 assert_eq!(turn_keys.len(), 2);
1609 assert!(turn_keys.contains(&"a".to_string()));
1610 assert!(turn_keys.contains(&"b".to_string()));
1611 }
1612
1613 #[test]
1616 fn derived_read_only_get() {
1617 let state = State::new();
1618 let _ = state.set("derived:sentiment", "positive");
1620 assert_eq!(
1621 state.derived().get::<String>("sentiment"),
1622 Some("positive".to_string())
1623 );
1624 }
1625
1626 #[test]
1627 fn derived_read_only_get_raw() {
1628 let state = State::new();
1629 let _ = state.set("derived:score", serde_json::json!(0.95));
1630 let raw = state.derived().get_raw("score");
1631 assert!(raw.is_some());
1632 assert_eq!(raw.unwrap(), serde_json::json!(0.95));
1633 }
1634
1635 #[test]
1636 fn derived_read_only_contains() {
1637 let state = State::new();
1638 let _ = state.set("derived:exists", true);
1639 assert!(state.derived().contains("exists"));
1640 assert!(!state.derived().contains("missing"));
1641 }
1642
1643 #[test]
1644 fn derived_read_only_keys() {
1645 let state = State::new();
1646 let _ = state.set("derived:a", 1);
1647 let _ = state.set("derived:b", 2);
1648 let _ = state.set("app:c", 3);
1649
1650 let derived_keys = state.derived().keys();
1651 assert_eq!(derived_keys.len(), 2);
1652 assert!(derived_keys.contains(&"a".to_string()));
1653 assert!(derived_keys.contains(&"b".to_string()));
1654 }
1655
1656 #[test]
1657 fn derived_missing_key_returns_none() {
1658 let state = State::new();
1659 assert_eq!(state.derived().get::<String>("nope"), None);
1660 assert_eq!(state.derived().get_raw("nope"), None);
1661 }
1662
1663 #[test]
1666 fn snapshot_values_captures_existing_keys() {
1667 let state = State::new();
1668 let _ = state.set("a", 1);
1669 let _ = state.set("b", "hello");
1670 let _ = state.set("c", true);
1671
1672 let snap = state.snapshot_values(&["a", "b", "missing"]);
1673 assert_eq!(snap.len(), 2);
1674 assert_eq!(snap.get("a"), Some(&serde_json::json!(1)));
1675 assert_eq!(snap.get("b"), Some(&serde_json::json!("hello")));
1676 assert!(!snap.contains_key("missing"));
1677 }
1678
1679 #[test]
1680 fn snapshot_values_empty_keys() {
1681 let state = State::new();
1682 let _ = state.set("a", 1);
1683 let snap = state.snapshot_values(&[]);
1684 assert!(snap.is_empty());
1685 }
1686
1687 #[test]
1690 fn diff_values_detects_changed_value() {
1691 let state = State::new();
1692 let _ = state.set("x", 1);
1693 let snap = state.snapshot_values(&["x"]);
1694
1695 let _ = state.set("x", 2);
1696 let diffs = state.diff_values(&snap, &["x"]);
1697 assert_eq!(diffs.len(), 1);
1698 assert_eq!(diffs[0].0, "x");
1699 assert_eq!(diffs[0].1, serde_json::json!(1));
1700 assert_eq!(diffs[0].2, serde_json::json!(2));
1701 }
1702
1703 #[test]
1704 fn diff_values_detects_new_key() {
1705 let state = State::new();
1706 let snap = state.snapshot_values(&["y"]);
1707
1708 let _ = state.set("y", "new");
1709 let diffs = state.diff_values(&snap, &["y"]);
1710 assert_eq!(diffs.len(), 1);
1711 assert_eq!(diffs[0].0, "y");
1712 assert_eq!(diffs[0].1, Value::Null);
1713 assert_eq!(diffs[0].2, serde_json::json!("new"));
1714 }
1715
1716 #[test]
1717 fn diff_values_detects_removed_key() {
1718 let state = State::new();
1719 let _ = state.set("z", 42);
1720 let snap = state.snapshot_values(&["z"]);
1721
1722 state.remove("z");
1723 let diffs = state.diff_values(&snap, &["z"]);
1724 assert_eq!(diffs.len(), 1);
1725 assert_eq!(diffs[0].0, "z");
1726 assert_eq!(diffs[0].1, serde_json::json!(42));
1727 assert_eq!(diffs[0].2, Value::Null);
1728 }
1729
1730 #[test]
1731 fn diff_values_no_change() {
1732 let state = State::new();
1733 let _ = state.set("stable", 10);
1734 let snap = state.snapshot_values(&["stable"]);
1735
1736 let diffs = state.diff_values(&snap, &["stable"]);
1738 assert!(diffs.is_empty());
1739 }
1740
1741 #[test]
1742 fn diff_values_multiple_keys_mixed_changes() {
1743 let state = State::new();
1744 let _ = state.set("a", 1);
1745 let _ = state.set("b", 2);
1746 let snap = state.snapshot_values(&["a", "b", "c"]);
1747
1748 let _ = state.set("a", 10); let _ = state.set("c", 3); let diffs = state.diff_values(&snap, &["a", "b", "c"]);
1753 assert_eq!(diffs.len(), 2); let diff_keys: Vec<&str> = diffs.iter().map(|(k, _, _)| k.as_str()).collect();
1755 assert!(diff_keys.contains(&"a"));
1756 assert!(diff_keys.contains(&"c"));
1757 }
1758
1759 #[test]
1762 fn clear_prefix_removes_matching_keys() {
1763 let state = State::new();
1764 let _ = state.set("turn:a", 1);
1765 let _ = state.set("turn:b", 2);
1766 let _ = state.set("app:c", 3);
1767 let _ = state.set("session:d", 4);
1768
1769 state.clear_prefix("turn:");
1770 assert!(!state.contains("turn:a"));
1771 assert!(!state.contains("turn:b"));
1772 assert!(state.contains("app:c"));
1773 assert!(state.contains("session:d"));
1774 }
1775
1776 #[test]
1777 fn clear_prefix_no_matching_keys_is_noop() {
1778 let state = State::new();
1779 let _ = state.set("app:x", 1);
1780 state.clear_prefix("turn:");
1781 assert!(state.contains("app:x"));
1782 }
1783
1784 #[test]
1785 fn clear_prefix_also_clears_delta() {
1786 let state = State::new();
1787 let _ = state.set("turn:committed", 1);
1788 let tracked = state.with_delta_tracking();
1789 let _ = tracked.set("turn:delta_val", 2);
1790
1791 assert!(tracked.contains("turn:committed"));
1793 assert!(tracked.contains("turn:delta_val"));
1794
1795 tracked.clear_prefix("turn:");
1796 assert!(!tracked.contains("turn:committed"));
1797 assert!(!tracked.contains("turn:delta_val"));
1798 }
1799
1800 #[test]
1801 fn clear_prefix_via_turn_accessor() {
1802 let state = State::new();
1803 let _ = state.turn().set("x", 1);
1804 let _ = state.turn().set("y", 2);
1805 let _ = state.app().set("z", 3);
1806
1807 state.clear_prefix("turn:");
1808 assert!(state.turn().keys().is_empty());
1809 assert!(state.app().contains("z"));
1810 }
1811
1812 #[test]
1815 fn modify_increment_existing() {
1816 let state = State::new();
1817 let _ = state.set("count", 5u32);
1818 let result = state.modify("count", 0u32, |n| n + 1).unwrap();
1819 assert_eq!(result, 6);
1820 assert_eq!(state.get::<u32>("count"), Some(6));
1821 }
1822
1823 #[test]
1824 fn modify_uses_default_when_missing() {
1825 let state = State::new();
1826 let result = state.modify("new_count", 0u32, |n| n + 1).unwrap();
1827 assert_eq!(result, 1);
1828 assert_eq!(state.get::<u32>("new_count"), Some(1));
1829 }
1830
1831 #[test]
1832 fn modify_with_delta_tracking() {
1833 let state = State::new();
1834 let _ = state.set("x", 10u32);
1835 let tracked = state.with_delta_tracking();
1836 let result = tracked.modify("x", 0u32, |n| n * 2).unwrap();
1837 assert_eq!(result, 20);
1838 assert_eq!(tracked.get::<u32>("x"), Some(20));
1840 assert_eq!(state.get::<u32>("x"), Some(10)); }
1842
1843 #[test]
1846 fn get_falls_back_to_derived_prefix() {
1847 let state = State::new();
1848 let _ = state.set("derived:risk", 0.85);
1849 assert_eq!(state.get::<f64>("risk"), Some(0.85));
1851 }
1852
1853 #[test]
1854 fn get_prefers_direct_key_over_derived() {
1855 let state = State::new();
1856 let _ = state.set("score", 1.0);
1857 let _ = state.set("derived:score", 0.5);
1858 assert_eq!(state.get::<f64>("score"), Some(1.0));
1860 }
1861
1862 #[test]
1863 fn get_derived_fallback_skipped_for_prefixed_keys() {
1864 let state = State::new();
1865 let _ = state.set("derived:risk", 0.85);
1866 assert_eq!(state.get::<f64>("app:risk"), None);
1868 }
1869
1870 #[test]
1871 fn get_derived_fallback_with_delta_tracking() {
1872 let state = State::new();
1873 let tracked = state.with_delta_tracking();
1874 let _ = tracked.set("derived:computed_val", 42);
1875 assert_eq!(tracked.get::<i32>("computed_val"), Some(42));
1876 }
1877
1878 #[test]
1881 fn with_reads_from_inner() {
1882 let state = State::new();
1883 let _ = state.set("name", "Alice");
1884 let len = state.with("name", |v| v.as_str().unwrap().len());
1885 assert_eq!(len, Some(5));
1886 }
1887
1888 #[test]
1889 fn with_reads_from_delta_first() {
1890 let state = State::new();
1891 let _ = state.set("x", 1);
1892 let tracked = state.with_delta_tracking();
1893 let _ = tracked.set("x", 99);
1894 let val = tracked.with("x", |v| v.as_i64().unwrap());
1895 assert_eq!(val, Some(99));
1896 }
1897
1898 #[test]
1899 fn with_falls_back_to_inner_when_not_in_delta() {
1900 let state = State::new();
1901 let _ = state.set("committed", "yes");
1902 let tracked = state.with_delta_tracking();
1903 let val = tracked.with("committed", |v| v.as_str().unwrap().to_string());
1904 assert_eq!(val, Some("yes".to_string()));
1905 }
1906
1907 #[test]
1908 fn with_falls_back_to_derived() {
1909 let state = State::new();
1910 let _ = state.set("derived:risk", 0.85);
1911 let val = state.with("risk", |v| v.as_f64().unwrap());
1912 assert_eq!(val, Some(0.85));
1913 }
1914
1915 #[test]
1916 fn with_derived_fallback_skipped_for_prefixed() {
1917 let state = State::new();
1918 let _ = state.set("derived:risk", 0.85);
1919 let val = state.with("app:risk", |v| v.as_f64().unwrap());
1920 assert_eq!(val, None);
1921 }
1922
1923 #[test]
1924 fn with_returns_none_for_missing() {
1925 let state = State::new();
1926 let val = state.with("missing", |v| v.clone());
1927 assert_eq!(val, None);
1928 }
1929
1930 #[test]
1931 fn with_on_prefixed_state() {
1932 let state = State::new();
1933 let _ = state.app().set("flag", true);
1934 let val = state.app().with("flag", |v| v.as_bool().unwrap());
1935 assert_eq!(val, Some(true));
1936 }
1937
1938 #[test]
1939 fn with_on_read_only_prefixed_state() {
1940 let state = State::new();
1941 let _ = state.set("derived:score", serde_json::json!(0.95));
1942 let val = state.derived().with("score", |v| v.as_f64().unwrap());
1943 assert_eq!(val, Some(0.95));
1944 }
1945
1946 const TURN_COUNT: StateKey<u32> = StateKey::new("session:turn_count");
1949 const NAME: StateKey<String> = StateKey::new("user:name");
1950
1951 #[test]
1952 fn state_key_get_and_set() {
1953 let state = State::new();
1954 let _ = state.set_key(&TURN_COUNT, 5);
1955 assert_eq!(state.get_key(&TURN_COUNT), Some(5));
1956 }
1957
1958 #[test]
1959 fn state_key_get_missing() {
1960 let state = State::new();
1961 assert_eq!(state.get_key(&TURN_COUNT), None);
1962 }
1963
1964 #[test]
1965 fn state_key_string_type() {
1966 let state = State::new();
1967 let _ = state.set_key(&NAME, "Alice".to_string());
1968 assert_eq!(state.get_key(&NAME), Some("Alice".to_string()));
1969 }
1970
1971 #[test]
1972 fn state_key_with() {
1973 let state = State::new();
1974 let _ = state.set_key(&TURN_COUNT, 42);
1975 let val = state.with_key(&TURN_COUNT, |v| v.as_u64().unwrap());
1976 assert_eq!(val, Some(42));
1977 }
1978
1979 #[test]
1980 fn state_key_interop_with_raw() {
1981 let state = State::new();
1982 let _ = state.set_key(&TURN_COUNT, 10);
1983 assert_eq!(state.get::<u32>("session:turn_count"), Some(10));
1985 }
1986
1987 #[test]
1988 fn slot_evidence_aggregates_value_provenance_and_journal() {
1989 let state = State::new();
1990 let _ = state.set("party_size", 6u8);
1991 let _ = state.set(
1993 "state_meta:party_size",
1994 serde_json::json!({ "source": "extraction", "confidence": 0.9 }),
1995 );
1996
1997 let ev = state.evidence("party_size");
1998 assert!(ev.present);
1999 assert_eq!(ev.value, Some(serde_json::json!(6)));
2000 assert_eq!(ev.source.as_deref(), Some("extraction"));
2001 assert_eq!(ev.confidence, Some(0.9));
2002 assert!(ev.last_sequence.is_some());
2003 assert_eq!(ev.last_origin, Some(StateMutationOrigin::Set));
2004
2005 let missing = state.evidence("nope");
2007 assert!(!missing.present);
2008 assert!(missing.source.is_none());
2009 }
2010
2011 #[test]
2014 fn rollback_restores_base_after_remove() {
2015 let base = State::new();
2018 let _ = base.set("k", "original");
2019
2020 let tx = base.with_delta_tracking();
2021 assert_eq!(tx.remove("k"), Some(serde_json::json!("original")));
2022 assert_eq!(tx.get::<String>("k"), None); assert_eq!(base.get::<String>("k"), Some("original".into())); tx.rollback();
2026 assert_eq!(tx.get::<String>("k"), Some("original".into()));
2027 assert_eq!(base.get::<String>("k"), Some("original".into()));
2028 }
2029
2030 #[test]
2031 fn rollback_restores_base_after_clear_prefix() {
2032 let base = State::new();
2034 let _ = base.set("app:a", 1u32);
2035 let _ = base.set("app:b", 2u32);
2036 let _ = base.set("user:c", 3u32);
2037
2038 let tx = base.with_delta_tracking();
2039 tx.clear_prefix("app:");
2040 assert_eq!(tx.get::<u32>("app:a"), None);
2041 assert_eq!(tx.get::<u32>("app:b"), None);
2042 assert_eq!(tx.get::<u32>("user:c"), Some(3));
2043 assert_eq!(base.get::<u32>("app:a"), Some(1));
2045
2046 tx.rollback();
2047 assert_eq!(tx.get::<u32>("app:a"), Some(1));
2048 assert_eq!(tx.get::<u32>("app:b"), Some(2));
2049 }
2050
2051 #[test]
2052 fn commit_applies_removals() {
2053 let base = State::new();
2054 let _ = base.set("k", "v");
2055 let tx = base.with_delta_tracking();
2056 tx.remove("k");
2057 tx.commit();
2058 assert_eq!(base.get::<String>("k"), None);
2059 }
2060
2061 #[test]
2062 fn commit_applies_prefix_clear() {
2063 let base = State::new();
2064 let _ = base.set("app:a", 1u32);
2065 let _ = base.set("user:c", 3u32);
2066 let tx = base.with_delta_tracking();
2067 tx.clear_prefix("app:");
2068 tx.commit();
2069 assert_eq!(base.get::<u32>("app:a"), None);
2070 assert_eq!(base.get::<u32>("user:c"), Some(3));
2071 }
2072
2073 #[test]
2074 fn modify_is_atomic_under_concurrency() {
2075 use std::sync::Arc;
2076 use std::thread;
2077
2078 let state = Arc::new(State::new());
2079 let _ = state.set("count", 0u64);
2080
2081 let threads = 8;
2082 let per_thread = 1000;
2083 let handles: Vec<_> = (0..threads)
2084 .map(|_| {
2085 let state = state.clone();
2086 thread::spawn(move || {
2087 for _ in 0..per_thread {
2088 let _ = state.modify("count", 0u64, |n| n + 1);
2089 }
2090 })
2091 })
2092 .collect();
2093 for h in handles {
2094 h.join().unwrap();
2095 }
2096 assert_eq!(
2098 state.get::<u64>("count"),
2099 Some((threads * per_thread) as u64)
2100 );
2101 }
2102}
2103
2104#[cfg(test)]
2105mod proptests {
2106 use super::*;
2107 use proptest::prelude::*;
2108
2109 proptest! {
2112 #[test]
2113 fn rollback_always_restores_base(
2114 base_keys in proptest::collection::vec(("[a-c]", 0u32..5), 0..6),
2115 ops in proptest::collection::vec(
2116 prop_oneof![
2117 ("[a-c]", 0u32..5).prop_map(|(k, v)| (k, Some(v))),
2118 "[a-c]".prop_map(|k| (k, None)),
2119 ],
2120 0..12,
2121 ),
2122 ) {
2123 let base = State::new();
2124 for (k, v) in &base_keys {
2125 let _ = base.set(k.clone(), *v);
2126 }
2127 let snapshot = |s: &State| -> std::collections::BTreeMap<String, Value> {
2128 s.keys().into_iter().filter_map(|k| s.get_raw(&k).map(|v| (k, v))).collect()
2129 };
2130 let before = snapshot(&base);
2131
2132 let tx = base.with_delta_tracking();
2133 for (k, v) in &ops {
2134 match v {
2135 Some(v) => { let _ = tx.set(k.clone(), *v); }
2136 None => { tx.remove(k); }
2137 }
2138 }
2139 prop_assert_eq!(&before, &snapshot(&base));
2141
2142 tx.rollback();
2143 prop_assert_eq!(&before, &snapshot(&tx));
2144 }
2145 }
2146}