gemini_adk_rs/
state.rs

1//! Typed key-value state container for agents.
2//!
3//! Supports optional delta tracking for transactional state management
4//! and prefix-scoped accessors for namespace isolation.
5
6use std::collections::{HashMap, VecDeque};
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::SystemTime;
11
12use dashmap::DashMap;
13use serde_json::Value;
14
15const DEFAULT_MUTATION_JOURNAL_CAPACITY: usize = 1024;
16
17/// A compile-time typed state key that eliminates typo bugs and type mismatches.
18///
19/// Create as a const and use with `State::get_key()` / `State::set_key()`:
20///
21/// ```rust,ignore
22/// const TURN_COUNT: StateKey<u32> = StateKey::new("session:turn_count");
23/// const SENTIMENT: StateKey<String> = StateKey::new("derived:sentiment");
24///
25/// state.set_key(&TURN_COUNT, 5);
26/// let count: Option<u32> = state.get_key(&TURN_COUNT);
27/// ```
28pub struct StateKey<T> {
29    key: &'static str,
30    _phantom: PhantomData<fn() -> T>,
31}
32
33impl<T> StateKey<T> {
34    /// Create a new typed state key.
35    pub const fn new(key: &'static str) -> Self {
36        Self {
37            key,
38            _phantom: PhantomData,
39        }
40    }
41
42    /// The string key.
43    pub const fn key(&self) -> &'static str {
44        self.key
45    }
46}
47
48/// Where a state mutation came from.
49#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum StateMutationOrigin {
52    /// Regular `State::set` or prefixed state write.
53    Set,
54    /// Direct committed-store write that bypasses delta tracking.
55    SetCommitted,
56    /// Removal of a single key.
57    Remove,
58    /// Removal caused by clearing a prefix.
59    ClearPrefix,
60    /// Delta changes committed into the base state.
61    Commit,
62}
63
64/// A single state mutation recorded in the bounded mutation journal.
65///
66/// Serializes to/from JSON for durable journaling (see [`JournalSink`]);
67/// `timestamp` is encoded as integer milliseconds since the Unix epoch.
68#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
69pub struct StateMutation {
70    /// Monotonic sequence number assigned when the mutation was recorded.
71    pub sequence: u64,
72    /// State key that changed.
73    pub key: String,
74    /// Value before the mutation, or `None` when the key did not exist.
75    pub old: Option<Value>,
76    /// Value after the mutation, or `None` when the key was removed.
77    pub new: Option<Value>,
78    /// Operation that recorded the mutation.
79    pub origin: StateMutationOrigin,
80    /// Wall-clock time at which the mutation was recorded.
81    /// Serialized as milliseconds since the Unix epoch (`timestamp_ms`).
82    #[serde(rename = "timestamp_ms", with = "systemtime_epoch_millis")]
83    pub timestamp: SystemTime,
84    /// Whether the mutation was written to a delta-tracked view.
85    pub delta: bool,
86}
87
88/// Serde codec mapping [`SystemTime`] to/from integer epoch milliseconds.
89mod systemtime_epoch_millis {
90    use std::time::{Duration, SystemTime, UNIX_EPOCH};
91
92    use serde::{Deserialize, Deserializer, Serializer};
93
94    pub fn serialize<S: Serializer>(t: &SystemTime, ser: S) -> Result<S::Ok, S::Error> {
95        let millis = t
96            .duration_since(UNIX_EPOCH)
97            .map(|d| d.as_millis() as u64)
98            .unwrap_or(0);
99        ser.serialize_u64(millis)
100    }
101
102    pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<SystemTime, D::Error> {
103        let millis = u64::deserialize(de)?;
104        Ok(UNIX_EPOCH + Duration::from_millis(millis))
105    }
106}
107
108/// Synchronous, durable sink for state mutations.
109///
110/// The in-memory mutation journal is a bounded ring (1024 entries) — long
111/// sessions lose history. A `JournalSink` receives every mutation as it is
112/// recorded so it can be persisted in full.
113///
114/// `write` runs on the state-write hot path (under the journal lock): it must
115/// be cheap, must not await, and must not panic — implementations log internal
116/// errors instead of surfacing them.
117pub trait JournalSink: Send + Sync {
118    /// Persist one mutation. Must not panic; log errors internally.
119    fn write(&self, m: &StateMutation);
120}
121
122/// Shared, swappable [`JournalSink`] slot — one slot per [`State`] family
123/// (clones and delta views share it, like the in-memory ring).
124#[derive(Clone, Default)]
125struct JournalSinkSlot(Arc<parking_lot::RwLock<Option<Arc<dyn JournalSink>>>>);
126
127impl std::fmt::Debug for JournalSinkSlot {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        let installed = self.0.read().is_some();
130        f.debug_tuple("JournalSinkSlot").field(&installed).finish()
131    }
132}
133
134const JOURNAL_FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
135
136/// Log a journal-sink internal error without panicking the write path.
137/// Emits a `tracing::warn!` when the `tracing-support` feature is enabled;
138/// otherwise the error is swallowed (journaling is infallible by contract).
139fn journal_log_error(context: &'static str, e: &dyn std::fmt::Display) {
140    #[cfg(feature = "tracing-support")]
141    tracing::warn!(error = %e, "{context}");
142    #[cfg(not(feature = "tracing-support"))]
143    let _ = (context, e);
144}
145
146struct FileJournalInner {
147    writer: std::io::BufWriter<std::fs::File>,
148    last_flush: std::time::Instant,
149}
150
151/// Durable [`JournalSink`] writing one JSON object per line (JSONL).
152///
153/// Writes are buffered behind a `parking_lot::Mutex` and flushed at least
154/// every second and on drop. I/O errors are logged (via `tracing::warn!` when
155/// the `tracing-support` feature is enabled) — journaling never panics a
156/// state write.
157///
158/// ```jsonl
159/// {"sequence":1,"key":"app:last_city","old":null,"new":"London","origin":"set","timestamp_ms":1718000000000,"delta":false}
160/// ```
161pub struct FileJournalSink {
162    inner: parking_lot::Mutex<FileJournalInner>,
163}
164
165impl FileJournalSink {
166    /// Create (truncating) the journal file at `path`.
167    pub fn create(path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
168        let file = std::fs::File::create(path)?;
169        Ok(Self {
170            inner: parking_lot::Mutex::new(FileJournalInner {
171                writer: std::io::BufWriter::new(file),
172                last_flush: std::time::Instant::now(),
173            }),
174        })
175    }
176
177    /// Flush buffered mutations to disk now.
178    pub fn flush(&self) {
179        let mut inner = self.inner.lock();
180        if let Err(e) = std::io::Write::flush(&mut inner.writer) {
181            journal_log_error("FileJournalSink flush failed", &e);
182        }
183        inner.last_flush = std::time::Instant::now();
184    }
185}
186
187impl JournalSink for FileJournalSink {
188    fn write(&self, m: &StateMutation) {
189        let line = match serde_json::to_string(m) {
190            Ok(line) => line,
191            Err(e) => {
192                journal_log_error("FileJournalSink serialize failed", &e);
193                return;
194            }
195        };
196        let mut inner = self.inner.lock();
197        if let Err(e) = std::io::Write::write_all(&mut inner.writer, line.as_bytes())
198            .and_then(|()| std::io::Write::write_all(&mut inner.writer, b"\n"))
199        {
200            journal_log_error("FileJournalSink write failed", &e);
201            return;
202        }
203        if inner.last_flush.elapsed() >= JOURNAL_FLUSH_INTERVAL {
204            if let Err(e) = std::io::Write::flush(&mut inner.writer) {
205                journal_log_error("FileJournalSink flush failed", &e);
206            }
207            inner.last_flush = std::time::Instant::now();
208        }
209    }
210}
211
212impl Drop for FileJournalSink {
213    fn drop(&mut self) {
214        if let Err(e) = std::io::Write::flush(&mut self.inner.lock().writer) {
215            journal_log_error("FileJournalSink final flush failed", &e);
216        }
217    }
218}
219
220/// In-memory [`JournalSink`] for tests and replay harnesses. Unbounded.
221#[derive(Default)]
222pub struct MemoryJournalSink {
223    entries: parking_lot::Mutex<Vec<StateMutation>>,
224}
225
226impl MemoryJournalSink {
227    /// Create an empty sink.
228    pub fn new() -> Self {
229        Self::default()
230    }
231
232    /// Snapshot all recorded mutations (in write order).
233    pub fn entries(&self) -> Vec<StateMutation> {
234        self.entries.lock().clone()
235    }
236
237    /// Number of recorded mutations.
238    pub fn len(&self) -> usize {
239        self.entries.lock().len()
240    }
241
242    /// Whether nothing has been recorded yet.
243    pub fn is_empty(&self) -> bool {
244        self.entries.lock().is_empty()
245    }
246}
247
248impl JournalSink for MemoryJournalSink {
249    fn write(&self, m: &StateMutation) {
250        self.entries.lock().push(m.clone());
251    }
252}
253
254/// Error returned by fallible state writes.
255#[derive(Debug, thiserror::Error)]
256pub enum StateError {
257    /// The value could not be serialized to JSON.
258    #[error("failed to serialize state value for key '{key}': {source}")]
259    Serialize {
260        /// The key that was being written.
261        key: String,
262        /// The underlying serde error.
263        source: serde_json::Error,
264    },
265}
266
267/// A pending write in a delta-tracked view.
268///
269/// Unlike a bare value, this distinguishes a *write* from a *removal* so that a
270/// delta can record tombstones and `rollback()` can restore the base state
271/// after removals and prefix clears.
272#[derive(Debug, Clone)]
273enum DeltaOp {
274    /// Set the key to this value on commit.
275    Put(Value),
276    /// Remove the key on commit (tombstone — shadows the committed value).
277    Delete,
278}
279
280/// Provenance and confidence for a single state slot — the evidence behind a
281/// value, aggregated from the mutation journal and the `state_meta:{key}` record.
282///
283/// This is what lets the model confirm principled-ly ("I heard 6, right?"):
284/// whether a slot was directly set, resolved from a system, or carries low
285/// confidence, and when it last changed.
286#[derive(Debug, Clone, serde::Serialize)]
287pub struct SlotEvidence {
288    /// The state key.
289    pub key: String,
290    /// Whether the key currently has a value.
291    pub present: bool,
292    /// The current value, if any.
293    pub value: Option<Value>,
294    /// Provenance source from `state_meta:{key}.source` (e.g. `agent`/`fetch`/
295    /// `llm`/`extraction`), if recorded.
296    pub source: Option<String>,
297    /// Confidence from `state_meta:{key}.confidence` (0.0–1.0), if recorded.
298    pub confidence: Option<f64>,
299    /// Journal sequence of the most recent write to this key, if still in the
300    /// bounded journal window.
301    pub last_sequence: Option<u64>,
302    /// Origin of the most recent recorded write, if known.
303    pub last_origin: Option<StateMutationOrigin>,
304}
305
306/// A concurrent, type-safe state container that agents read from and write to.
307///
308/// By default, `set()` writes directly to the inner store. When delta tracking
309/// is enabled via `with_delta_tracking()`, writes go to a separate delta map
310/// (with tombstones) that can be atomically committed or rolled back.
311#[derive(Debug, Clone)]
312pub struct State {
313    inner: Arc<DashMap<String, Value>>,
314    delta: Arc<DashMap<String, DeltaOp>>,
315    mutations: Arc<std::sync::Mutex<VecDeque<StateMutation>>>,
316    next_mutation_sequence: Arc<AtomicU64>,
317    mutation_capacity: usize,
318    journal_sink: JournalSinkSlot,
319    track_delta: bool,
320}
321
322impl Default for State {
323    fn default() -> Self {
324        Self::new()
325    }
326}
327
328impl State {
329    /// Create a new empty state container.
330    pub fn new() -> Self {
331        Self {
332            inner: Arc::new(DashMap::new()),
333            delta: Arc::new(DashMap::new()),
334            mutations: Arc::new(std::sync::Mutex::new(VecDeque::new())),
335            next_mutation_sequence: Arc::new(AtomicU64::new(1)),
336            mutation_capacity: DEFAULT_MUTATION_JOURNAL_CAPACITY,
337            journal_sink: JournalSinkSlot::default(),
338            track_delta: false,
339        }
340    }
341
342    /// Create a new State with delta tracking enabled.
343    /// Writes go to the delta map; reads check delta first, then inner.
344    pub fn with_delta_tracking(&self) -> State {
345        State {
346            inner: self.inner.clone(),
347            delta: Arc::new(DashMap::new()),
348            mutations: self.mutations.clone(),
349            next_mutation_sequence: self.next_mutation_sequence.clone(),
350            mutation_capacity: self.mutation_capacity,
351            journal_sink: self.journal_sink.clone(),
352            track_delta: true,
353        }
354    }
355
356    /// Install a durable [`JournalSink`] that receives every state mutation.
357    ///
358    /// The sink is shared with all clones and delta views of this `State`
359    /// (like the in-memory ring) and is invoked synchronously on the write
360    /// path — keep it cheap. The in-memory ring keeps serving
361    /// [`recent_mutations`](Self::recent_mutations)/[`evidence`](Self::evidence);
362    /// the sink adds unbounded durability.
363    pub fn set_journal_sink(&self, sink: Arc<dyn JournalSink>) {
364        *self.journal_sink.0.write() = Some(sink);
365    }
366
367    /// Builder-style variant of [`set_journal_sink`](Self::set_journal_sink).
368    pub fn with_journal_sink(self, sink: Arc<dyn JournalSink>) -> Self {
369        self.set_journal_sink(sink);
370        self
371    }
372
373    /// Get a value by key, attempting to deserialize to the requested type.
374    /// When delta tracking is enabled, checks delta first, then inner.
375    pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
376        self.get_raw(key)
377            .and_then(|v| serde_json::from_value(v).ok())
378    }
379
380    /// Borrow a value by key without cloning, applying `f` to the reference.
381    ///
382    /// This is the zero-copy alternative to `get_raw()`. The closure receives
383    /// a `&Value` directly from the DashMap ref-guard, avoiding the
384    /// `Value::clone()` + `serde_json::from_value()` overhead of `get()`.
385    ///
386    /// Lookup order: delta (if tracking) → inner → derived fallback.
387    pub fn with<F, R>(&self, key: &str, f: F) -> Option<R>
388    where
389        F: FnOnce(&Value) -> R,
390    {
391        if self.track_delta {
392            match self.delta.get(key).map(|r| r.value().clone()) {
393                Some(DeltaOp::Put(v)) => return Some(f(&v)),
394                Some(DeltaOp::Delete) => return None, // tombstone shadows inner
395                None => {}
396            }
397        }
398        if let Some(ref_multi) = self.inner.get(key) {
399            return Some(f(ref_multi.value()));
400        }
401        if !key.contains(':') {
402            let mut derived_key = String::with_capacity(8 + key.len());
403            use std::fmt::Write;
404            let _ = write!(derived_key, "derived:{}", key);
405            if self.track_delta {
406                match self.delta.get(&derived_key).map(|r| r.value().clone()) {
407                    Some(DeltaOp::Put(v)) => return Some(f(&v)),
408                    Some(DeltaOp::Delete) => return None,
409                    None => {}
410                }
411            }
412            if let Some(ref_multi) = self.inner.get(&derived_key) {
413                return Some(f(ref_multi.value()));
414            }
415        }
416        None
417    }
418
419    /// Get a raw JSON value by key.
420    /// When delta tracking is enabled, checks delta first, then inner.
421    /// If the key is not found and doesn't contain a prefix, also checks `derived:{key}`
422    /// as a transparent fallback for computed variables.
423    pub fn get_raw(&self, key: &str) -> Option<Value> {
424        if self.track_delta {
425            match self.delta.get(key).map(|r| r.value().clone()) {
426                Some(DeltaOp::Put(v)) => return Some(v),
427                Some(DeltaOp::Delete) => return None, // tombstone shadows inner
428                None => {}
429            }
430        }
431        if let Some(v) = self.inner.get(key) {
432            return Some(v.value().clone());
433        }
434        // Transparent derived fallback: if key has no prefix, check derived:{key}
435        if !key.contains(':') {
436            use std::fmt::Write;
437            let mut derived_key = String::with_capacity(8 + key.len());
438            let _ = write!(derived_key, "derived:{}", key);
439            if self.track_delta {
440                match self.delta.get(&derived_key).map(|r| r.value().clone()) {
441                    Some(DeltaOp::Put(v)) => return Some(v),
442                    Some(DeltaOp::Delete) => return None,
443                    None => {}
444                }
445            }
446            return self.inner.get(&derived_key).map(|v| v.value().clone());
447        }
448        None
449    }
450
451    /// Get a typed value using a `StateKey<T>`.
452    pub fn get_key<T: serde::de::DeserializeOwned>(&self, key: &StateKey<T>) -> Option<T> {
453        self.get(key.key())
454    }
455
456    /// Set a typed value using a `StateKey<T>`.
457    ///
458    /// Returns [`StateError`] if `value` cannot be serialized to JSON.
459    pub fn set_key<T: serde::Serialize>(
460        &self,
461        key: &StateKey<T>,
462        value: T,
463    ) -> Result<(), StateError> {
464        self.set(key.key(), value)
465    }
466
467    /// Zero-copy borrow using a `StateKey<T>`.
468    pub fn with_key<T, F, R>(&self, key: &StateKey<T>, f: F) -> Option<R>
469    where
470        F: FnOnce(&Value) -> R,
471    {
472        self.with(key.key(), f)
473    }
474
475    /// Set a value by key.
476    ///
477    /// When delta tracking is enabled, writes to the delta view instead of the
478    /// committed store. Returns [`StateError`] if `value` cannot be serialized
479    /// to JSON — a public SDK write never panics on caller data.
480    pub fn set(
481        &self,
482        key: impl Into<String>,
483        value: impl serde::Serialize,
484    ) -> Result<(), StateError> {
485        let key = key.into();
486        let v = serde_json::to_value(value).map_err(|source| StateError::Serialize {
487            key: key.clone(),
488            source,
489        })?;
490        self.put_value(key, v, StateMutationOrigin::Set);
491        Ok(())
492    }
493
494    /// Infallible internal write of an already-serialized [`Value`].
495    ///
496    /// Shared by `set` and the value-level helpers (`merge`/`pick`/`rename`/
497    /// `from_hashmap`) so those do not re-serialize and cannot fail.
498    fn put_value(&self, key: String, v: Value, origin: StateMutationOrigin) {
499        let old = self.get_raw(&key);
500        if self.track_delta {
501            self.delta.insert(key.clone(), DeltaOp::Put(v.clone()));
502        } else {
503            self.inner.insert(key.clone(), v.clone());
504        }
505        self.record_mutation(key, old, Some(v), origin);
506    }
507
508    /// Set a value directly in the committed store, bypassing delta tracking.
509    ///
510    /// Returns [`StateError`] if `value` cannot be serialized to JSON.
511    pub fn set_committed(
512        &self,
513        key: impl Into<String>,
514        value: impl serde::Serialize,
515    ) -> Result<(), StateError> {
516        let key = key.into();
517        let v = serde_json::to_value(value).map_err(|source| StateError::Serialize {
518            key: key.clone(),
519            source,
520        })?;
521        let old = self.inner.insert(key.clone(), v.clone());
522        self.record_mutation(key, old, Some(v), StateMutationOrigin::SetCommitted);
523        Ok(())
524    }
525
526    /// Atomically read-modify-write a value under a per-key lock.
527    ///
528    /// If the key doesn't exist, `default` is used as the initial value. The
529    /// function `f` receives the current value and returns the new value. The
530    /// read-modify-write is performed while holding the map shard for `key`, so
531    /// concurrent `modify` calls on the same key do not lose updates. Returns
532    /// the new value, or [`StateError`] if it cannot be serialized.
533    pub fn modify<T, F>(&self, key: &str, default: T, f: F) -> Result<T, StateError>
534    where
535        T: serde::Serialize + serde::de::DeserializeOwned,
536        F: FnOnce(T) -> T,
537    {
538        use dashmap::mapref::entry::Entry;
539
540        let serialize = |key: &str, val: &T| {
541            serde_json::to_value(val).map_err(|source| StateError::Serialize {
542                key: key.to_string(),
543                source,
544            })
545        };
546
547        if self.track_delta {
548            // Atomic w.r.t. the delta shard; the committed base is read as the
549            // initial value only when the delta has no entry for this key.
550            match self.delta.entry(key.to_string()) {
551                Entry::Occupied(mut o) => {
552                    let current = match o.get() {
553                        DeltaOp::Put(v) => serde_json::from_value(v.clone()).unwrap_or(default),
554                        DeltaOp::Delete => default,
555                    };
556                    let old = self.inner.get(key).map(|r| r.value().clone());
557                    let new_val = f(current);
558                    let v = serialize(key, &new_val)?;
559                    o.insert(DeltaOp::Put(v.clone()));
560                    self.record_mutation(key.to_string(), old, Some(v), StateMutationOrigin::Set);
561                    Ok(new_val)
562                }
563                Entry::Vacant(slot) => {
564                    let base = self
565                        .inner
566                        .get(key)
567                        .and_then(|r| serde_json::from_value(r.value().clone()).ok());
568                    let old = self.inner.get(key).map(|r| r.value().clone());
569                    let new_val = f(base.unwrap_or(default));
570                    let v = serialize(key, &new_val)?;
571                    slot.insert(DeltaOp::Put(v.clone()));
572                    self.record_mutation(key.to_string(), old, Some(v), StateMutationOrigin::Set);
573                    Ok(new_val)
574                }
575            }
576        } else {
577            match self.inner.entry(key.to_string()) {
578                Entry::Occupied(mut o) => {
579                    let old = o.get().clone();
580                    let current = serde_json::from_value(old.clone()).unwrap_or(default);
581                    let new_val = f(current);
582                    let v = serialize(key, &new_val)?;
583                    o.insert(v.clone());
584                    self.record_mutation(
585                        key.to_string(),
586                        Some(old),
587                        Some(v),
588                        StateMutationOrigin::Set,
589                    );
590                    Ok(new_val)
591                }
592                Entry::Vacant(slot) => {
593                    let new_val = f(default);
594                    let v = serialize(key, &new_val)?;
595                    slot.insert(v.clone());
596                    self.record_mutation(key.to_string(), None, Some(v), StateMutationOrigin::Set);
597                    Ok(new_val)
598                }
599            }
600        }
601    }
602
603    /// Check if a key exists (in delta or inner).
604    pub fn contains(&self, key: &str) -> bool {
605        if self.track_delta {
606            match self.delta.get(key).map(|r| r.value().clone()) {
607                Some(DeltaOp::Put(_)) => return true,
608                Some(DeltaOp::Delete) => return false, // tombstone shadows inner
609                None => {}
610            }
611        }
612        self.inner.contains_key(key)
613    }
614
615    /// Remove a key.
616    ///
617    /// In delta-tracking mode this records a tombstone in the delta view and
618    /// leaves the committed store untouched, so a subsequent `rollback()` fully
619    /// restores the base state. Returns the value that was visible before removal.
620    pub fn remove(&self, key: &str) -> Option<Value> {
621        if self.track_delta {
622            let removed = self.get_raw(key);
623            // Tombstone in the delta — never mutate `inner` directly, so rollback
624            // can restore the committed value.
625            self.delta.insert(key.to_string(), DeltaOp::Delete);
626            if let Some(ref old) = removed {
627                self.record_mutation(
628                    key.to_string(),
629                    Some(old.clone()),
630                    None,
631                    StateMutationOrigin::Remove,
632                );
633            }
634            removed
635        } else {
636            let removed = self.inner.remove(key).map(|(_, v)| v);
637            if let Some(ref old) = removed {
638                self.record_mutation(
639                    key.to_string(),
640                    Some(old.clone()),
641                    None,
642                    StateMutationOrigin::Remove,
643                );
644            }
645            removed
646        }
647    }
648
649    /// Get all keys (from both inner and delta when tracking).
650    ///
651    /// Keys tombstoned in the delta are excluded.
652    pub fn keys(&self) -> Vec<String> {
653        if !self.track_delta || self.delta.is_empty() {
654            return self.inner.iter().map(|r| r.key().clone()).collect();
655        }
656        let mut seen =
657            std::collections::HashSet::with_capacity(self.inner.len() + self.delta.len());
658        let mut keys = Vec::with_capacity(self.inner.len() + self.delta.len());
659        // Delta first so tombstones win over committed entries.
660        for entry in self.delta.iter() {
661            let key = entry.key().clone();
662            seen.insert(key.clone());
663            if matches!(entry.value(), DeltaOp::Put(_)) {
664                keys.push(key);
665            }
666        }
667        for entry in self.inner.iter() {
668            let key = entry.key().clone();
669            if seen.insert(key.clone()) {
670                keys.push(key);
671            }
672        }
673        keys
674    }
675
676    /// Create a new State containing only the specified keys.
677    pub fn pick(&self, keys: &[&str]) -> State {
678        let new = State::new();
679        for key in keys {
680            if let Some(v) = self.get_raw(key) {
681                new.put_value((*key).to_string(), v, StateMutationOrigin::Set);
682            }
683        }
684        new
685    }
686
687    /// Merge another state into this one (other's values overwrite on conflict).
688    pub fn merge(&self, other: &State) {
689        for entry in other.inner.iter() {
690            self.put_value(
691                entry.key().clone(),
692                entry.value().clone(),
693                StateMutationOrigin::Set,
694            );
695        }
696    }
697
698    /// Rename a key.
699    pub fn rename(&self, from: &str, to: &str) {
700        if let Some(v) = self.remove(from) {
701            self.put_value(to.to_string(), v, StateMutationOrigin::Set);
702        }
703    }
704
705    // ── Delta methods ──────────────────────────────────────────────────────
706
707    /// Whether delta tracking is enabled.
708    pub fn is_tracking_delta(&self) -> bool {
709        self.track_delta
710    }
711
712    /// Whether there are uncommitted delta changes.
713    pub fn has_delta(&self) -> bool {
714        self.track_delta && !self.delta.is_empty()
715    }
716
717    /// Get a snapshot of the current delta's pending writes (tombstones omitted).
718    pub fn delta(&self) -> HashMap<String, Value> {
719        self.delta
720            .iter()
721            .filter_map(|entry| match entry.value() {
722                DeltaOp::Put(v) => Some((entry.key().clone(), v.clone())),
723                DeltaOp::Delete => None,
724            })
725            .collect()
726    }
727
728    /// Commit delta changes into the inner store, then clear the delta.
729    ///
730    /// Pending puts are applied and tombstones remove the committed key, so a
731    /// removal made under delta tracking becomes durable only at commit time.
732    pub fn commit(&self) {
733        // Snapshot first so we don't iterate the delta while mutating `inner`.
734        let ops: Vec<(String, DeltaOp)> = self
735            .delta
736            .iter()
737            .map(|e| (e.key().clone(), e.value().clone()))
738            .collect();
739        for (key, op) in ops {
740            match op {
741                DeltaOp::Put(value) => {
742                    let old = self.inner.insert(key.clone(), value.clone());
743                    self.record_mutation_with_delta(
744                        key,
745                        old,
746                        Some(value),
747                        StateMutationOrigin::Commit,
748                        false,
749                    );
750                }
751                DeltaOp::Delete => {
752                    if let Some((_, old)) = self.inner.remove(&key) {
753                        self.record_mutation_with_delta(
754                            key,
755                            Some(old),
756                            None,
757                            StateMutationOrigin::Commit,
758                            false,
759                        );
760                    }
761                }
762            }
763        }
764        self.delta.clear();
765    }
766
767    /// Discard all uncommitted delta changes, restoring the committed base state.
768    ///
769    /// Because removals and prefix clears under delta tracking only write
770    /// tombstones (never mutating `inner`), dropping the delta is sufficient to
771    /// restore the base — including keys that were removed in the transaction.
772    pub fn rollback(&self) {
773        self.delta.clear();
774    }
775
776    // ── Prefix accessors ───────────────────────────────────────────────────
777
778    /// Access state with the `app:` prefix scope.
779    pub fn app(&self) -> PrefixedState<'_> {
780        PrefixedState {
781            state: self,
782            prefix: "app:",
783        }
784    }
785
786    /// Access state with the `user:` prefix scope.
787    pub fn user(&self) -> PrefixedState<'_> {
788        PrefixedState {
789            state: self,
790            prefix: "user:",
791        }
792    }
793
794    /// Access state with the `temp:` prefix scope.
795    pub fn temp(&self) -> PrefixedState<'_> {
796        PrefixedState {
797            state: self,
798            prefix: "temp:",
799        }
800    }
801
802    /// Access state with the `session:` prefix scope (auto-tracked signals).
803    pub fn session(&self) -> PrefixedState<'_> {
804        PrefixedState {
805            state: self,
806            prefix: "session:",
807        }
808    }
809
810    /// Access state with the `turn:` prefix scope (reset each turn).
811    pub fn turn(&self) -> PrefixedState<'_> {
812        PrefixedState {
813            state: self,
814            prefix: "turn:",
815        }
816    }
817
818    /// Access state with the `bg:` prefix scope (background tasks).
819    pub fn bg(&self) -> PrefixedState<'_> {
820        PrefixedState {
821            state: self,
822            prefix: "bg:",
823        }
824    }
825
826    /// Access read-only state with the `derived:` prefix scope (computed vars only).
827    pub fn derived(&self) -> ReadOnlyPrefixedState<'_> {
828        ReadOnlyPrefixedState {
829            state: self,
830            prefix: "derived:",
831        }
832    }
833
834    // ── Utility methods ───────────────────────────────────────────────────
835
836    /// Snapshot the values of specific keys. Returns HashMap of key -> current value.
837    /// Used by watchers to capture state before mutations.
838    pub fn snapshot_values(&self, keys: &[&str]) -> HashMap<String, Value> {
839        keys.iter()
840            .filter_map(|&k| self.get_raw(k).map(|v| (k.to_string(), v)))
841            .collect()
842    }
843
844    /// Diff current state against a previous snapshot.
845    /// Returns Vec of (key, old_value, new_value) for keys that changed.
846    pub fn diff_values(
847        &self,
848        prev: &HashMap<String, Value>,
849        keys: &[&str],
850    ) -> Vec<(String, Value, Value)> {
851        keys.iter()
852            .filter_map(|&k| {
853                let old = prev.get(k);
854                let new = self.get_raw(k);
855                match (old, new) {
856                    (Some(o), Some(n)) if o != &n => Some((k.to_string(), o.clone(), n)),
857                    (None, Some(n)) => Some((k.to_string(), Value::Null, n)),
858                    (Some(o), None) => Some((k.to_string(), o.clone(), Value::Null)),
859                    _ => None,
860                }
861            })
862            .collect()
863    }
864
865    /// Export all state as a HashMap (for persistence/serialization).
866    pub fn to_hashmap(&self) -> std::collections::HashMap<String, serde_json::Value> {
867        self.inner
868            .iter()
869            .map(|entry| (entry.key().clone(), entry.value().clone()))
870            .collect()
871    }
872
873    /// Restore state from a HashMap (for persistence/deserialization).
874    pub fn from_hashmap(&self, map: std::collections::HashMap<String, serde_json::Value>) {
875        for (key, value) in map {
876            // Values are already `Value`, so this write cannot fail to serialize.
877            let old = self.inner.insert(key.clone(), value.clone());
878            self.record_mutation(key, old, Some(value), StateMutationOrigin::SetCommitted);
879        }
880    }
881
882    /// Remove all keys with the given prefix.
883    ///
884    /// In delta-tracking mode this writes tombstones for matching keys (from both
885    /// the committed store and pending delta puts) without mutating the committed
886    /// store, so `rollback()` restores everything that was cleared.
887    pub fn clear_prefix(&self, prefix: &str) {
888        if self.track_delta {
889            let keys: Vec<String> = self
890                .keys()
891                .into_iter()
892                .filter(|k| k.starts_with(prefix))
893                .collect();
894            for key in keys {
895                let old = self.get_raw(&key);
896                self.delta.insert(key.clone(), DeltaOp::Delete);
897                if let Some(old) = old {
898                    self.record_mutation(key, Some(old), None, StateMutationOrigin::ClearPrefix);
899                }
900            }
901            return;
902        }
903        let keys_to_remove: Vec<String> = self
904            .inner
905            .iter()
906            .filter(|entry| entry.key().starts_with(prefix))
907            .map(|entry| entry.key().clone())
908            .collect();
909        for key in keys_to_remove {
910            if let Some((_, old)) = self.inner.remove(&key) {
911                self.record_mutation(key, Some(old), None, StateMutationOrigin::ClearPrefix);
912            }
913        }
914    }
915
916    /// Return a snapshot of recent state mutations.
917    pub fn recent_mutations(&self) -> Vec<StateMutation> {
918        self.mutations
919            .lock()
920            .expect("state mutation journal poisoned")
921            .iter()
922            .cloned()
923            .collect()
924    }
925
926    /// Return the current monotonic cursor for the mutation journal.
927    pub fn mutation_cursor(&self) -> u64 {
928        self.next_mutation_sequence.load(Ordering::Relaxed) - 1
929    }
930
931    /// Return mutations appended after a previously captured cursor.
932    pub fn mutations_since(&self, cursor: u64) -> Vec<StateMutation> {
933        let mutations = self
934            .mutations
935            .lock()
936            .expect("state mutation journal poisoned");
937        mutations
938            .iter()
939            .filter(|mutation| mutation.sequence > cursor)
940            .cloned()
941            .collect()
942    }
943
944    /// Drain and return all recorded state mutations.
945    pub fn drain_mutations(&self) -> Vec<StateMutation> {
946        self.mutations
947            .lock()
948            .expect("state mutation journal poisoned")
949            .drain(..)
950            .collect()
951    }
952
953    /// Aggregate the [`SlotEvidence`] for a key: its current value, provenance
954    /// (`state_meta:{key}`), confidence, and most-recent journal write.
955    pub fn evidence(&self, key: &str) -> SlotEvidence {
956        let value = self.get_raw(key);
957        let meta = self.get::<Value>(&format!("state_meta:{key}"));
958        let source = meta
959            .as_ref()
960            .and_then(|m| m.get("source"))
961            .and_then(|s| s.as_str().map(String::from));
962        let confidence = meta
963            .as_ref()
964            .and_then(|m| m.get("confidence"))
965            .and_then(serde_json::Value::as_f64);
966
967        let mut last_sequence: Option<u64> = None;
968        let mut last_origin: Option<StateMutationOrigin> = None;
969        for m in self.recent_mutations() {
970            if m.key == key && last_sequence.is_none_or(|s| m.sequence > s) {
971                last_sequence = Some(m.sequence);
972                last_origin = Some(m.origin);
973            }
974        }
975
976        SlotEvidence {
977            key: key.to_string(),
978            present: value.is_some(),
979            value,
980            source,
981            confidence,
982            last_sequence,
983            last_origin,
984        }
985    }
986
987    fn record_mutation(
988        &self,
989        key: String,
990        old: Option<Value>,
991        new: Option<Value>,
992        origin: StateMutationOrigin,
993    ) {
994        self.record_mutation_with_delta(key, old, new, origin, self.track_delta);
995    }
996
997    fn record_mutation_with_delta(
998        &self,
999        key: String,
1000        old: Option<Value>,
1001        new: Option<Value>,
1002        origin: StateMutationOrigin,
1003        delta: bool,
1004    ) {
1005        let mut mutations = self
1006            .mutations
1007            .lock()
1008            .expect("state mutation journal poisoned");
1009        if mutations.len() >= self.mutation_capacity {
1010            mutations.pop_front();
1011        }
1012        let sequence = self.next_mutation_sequence.fetch_add(1, Ordering::Relaxed);
1013        let mutation = StateMutation {
1014            sequence,
1015            key,
1016            old,
1017            new,
1018            origin,
1019            timestamp: SystemTime::now(),
1020            delta,
1021        };
1022        // Durable sink runs under the journal lock so the file order matches
1023        // the ring order exactly. Sinks are sync + cheap by contract.
1024        if let Some(sink) = self.journal_sink.0.read().as_ref() {
1025            sink.write(&mutation);
1026        }
1027        mutations.push_back(mutation);
1028    }
1029}
1030
1031/// A borrowed view of state that automatically prepends a prefix to all keys.
1032pub struct PrefixedState<'a> {
1033    state: &'a State,
1034    prefix: &'static str,
1035}
1036
1037impl<'a> PrefixedState<'a> {
1038    fn prefixed_key(&self, key: &str) -> String {
1039        format!("{}{}", self.prefix, key)
1040    }
1041
1042    /// Get a value by key (with prefix applied).
1043    pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
1044        self.state.get(&self.prefixed_key(key))
1045    }
1046
1047    /// Get a raw JSON value by key (with prefix applied).
1048    pub fn get_raw(&self, key: &str) -> Option<Value> {
1049        self.state.get_raw(&self.prefixed_key(key))
1050    }
1051
1052    /// Zero-copy borrow a value by key (with prefix applied).
1053    pub fn with<F, R>(&self, key: &str, f: F) -> Option<R>
1054    where
1055        F: FnOnce(&Value) -> R,
1056    {
1057        self.state.with(&self.prefixed_key(key), f)
1058    }
1059
1060    /// Set a value by key (with prefix applied).
1061    ///
1062    /// Returns [`StateError`] if `value` cannot be serialized to JSON.
1063    pub fn set(
1064        &self,
1065        key: impl AsRef<str>,
1066        value: impl serde::Serialize,
1067    ) -> Result<(), StateError> {
1068        self.state.set(self.prefixed_key(key.as_ref()), value)
1069    }
1070
1071    /// Check if a key exists (with prefix applied).
1072    pub fn contains(&self, key: &str) -> bool {
1073        self.state.contains(&self.prefixed_key(key))
1074    }
1075
1076    /// Remove a key (with prefix applied).
1077    pub fn remove(&self, key: &str) -> Option<Value> {
1078        self.state.remove(&self.prefixed_key(key))
1079    }
1080
1081    /// Get all keys within this prefix scope (prefix stripped from results).
1082    pub fn keys(&self) -> Vec<String> {
1083        self.state
1084            .keys()
1085            .into_iter()
1086            .filter_map(|k| k.strip_prefix(self.prefix).map(|s| s.to_string()))
1087            .collect()
1088    }
1089}
1090
1091/// A borrowed, read-only view of state that automatically prepends a prefix to all keys.
1092///
1093/// Unlike `PrefixedState`, this does not expose `set()` or `remove()` methods,
1094/// making it suitable for computed/derived state that user code should not mutate.
1095pub struct ReadOnlyPrefixedState<'a> {
1096    state: &'a State,
1097    prefix: &'static str,
1098}
1099
1100impl<'a> ReadOnlyPrefixedState<'a> {
1101    fn prefixed_key(&self, key: &str) -> String {
1102        format!("{}{}", self.prefix, key)
1103    }
1104
1105    /// Get a value by key (with prefix applied).
1106    pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
1107        self.state.get(&self.prefixed_key(key))
1108    }
1109
1110    /// Get a raw JSON value by key (with prefix applied).
1111    pub fn get_raw(&self, key: &str) -> Option<Value> {
1112        self.state.get_raw(&self.prefixed_key(key))
1113    }
1114
1115    /// Zero-copy borrow a value by key (with prefix applied).
1116    pub fn with<F, R>(&self, key: &str, f: F) -> Option<R>
1117    where
1118        F: FnOnce(&Value) -> R,
1119    {
1120        self.state.with(&self.prefixed_key(key), f)
1121    }
1122
1123    /// Check if a key exists (with prefix applied).
1124    pub fn contains(&self, key: &str) -> bool {
1125        self.state.contains(&self.prefixed_key(key))
1126    }
1127
1128    /// Get all keys within this prefix scope (prefix stripped from results).
1129    pub fn keys(&self) -> Vec<String> {
1130        self.state
1131            .keys()
1132            .into_iter()
1133            .filter_map(|k| k.strip_prefix(self.prefix).map(|s| s.to_string()))
1134            .collect()
1135    }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140    use super::*;
1141
1142    #[test]
1143    fn journal_sink_receives_every_mutation_in_ring_order() {
1144        let state = State::new();
1145        let sink = Arc::new(MemoryJournalSink::new());
1146        state.set_journal_sink(sink.clone());
1147
1148        let _ = state.set("a", 1);
1149        let _ = state.set("b", "two");
1150        state.remove("a");
1151
1152        let entries = sink.entries();
1153        assert_eq!(entries.len(), 3);
1154        assert_eq!(entries, state.recent_mutations());
1155        assert_eq!(entries[0].key, "a");
1156        assert_eq!(entries[2].origin, StateMutationOrigin::Remove);
1157    }
1158
1159    #[test]
1160    fn journal_sink_is_shared_with_clones_and_delta_views() {
1161        let state = State::new();
1162        let sink = Arc::new(MemoryJournalSink::new());
1163        state.set_journal_sink(sink.clone());
1164
1165        let clone = state.clone();
1166        let _ = clone.set("from_clone", true);
1167
1168        let tracked = state.with_delta_tracking();
1169        let _ = tracked.set("from_delta", 1);
1170        tracked.commit();
1171
1172        let keys: Vec<_> = sink.entries().iter().map(|m| m.key.clone()).collect();
1173        assert!(keys.contains(&"from_clone".to_string()));
1174        assert!(keys.contains(&"from_delta".to_string()));
1175        // Commit re-records the delta write into the committed store.
1176        assert!(sink
1177            .entries()
1178            .iter()
1179            .any(|m| m.origin == StateMutationOrigin::Commit));
1180    }
1181
1182    #[test]
1183    fn journal_sink_outlives_ring_capacity() {
1184        // The ring is bounded; the sink is not.
1185        let state = State::new();
1186        let sink = Arc::new(MemoryJournalSink::new());
1187        state.set_journal_sink(sink.clone());
1188
1189        for i in 0..(DEFAULT_MUTATION_JOURNAL_CAPACITY + 10) {
1190            let _ = state.set(format!("k{i}"), i);
1191        }
1192
1193        assert_eq!(
1194            state.recent_mutations().len(),
1195            DEFAULT_MUTATION_JOURNAL_CAPACITY
1196        );
1197        assert_eq!(sink.len(), DEFAULT_MUTATION_JOURNAL_CAPACITY + 10);
1198        assert_eq!(sink.entries()[0].key, "k0");
1199    }
1200
1201    #[test]
1202    fn state_mutation_serde_round_trip_uses_epoch_millis() {
1203        let m = StateMutation {
1204            sequence: 42,
1205            key: "app:last_city".into(),
1206            old: None,
1207            new: Some(serde_json::json!("London")),
1208            origin: StateMutationOrigin::Set,
1209            timestamp: std::time::UNIX_EPOCH + std::time::Duration::from_millis(1_718_000_000_123),
1210            delta: false,
1211        };
1212        let json = serde_json::to_string(&m).unwrap();
1213        assert!(json.contains("\"timestamp_ms\":1718000000123"));
1214        assert!(json.contains("\"origin\":\"set\""));
1215        let back: StateMutation = serde_json::from_str(&json).unwrap();
1216        assert_eq!(back, m);
1217    }
1218
1219    #[test]
1220    fn file_journal_sink_round_trip() {
1221        let dir = std::env::temp_dir().join(format!(
1222            "gemini-rs-journal-test-{}-{}",
1223            std::process::id(),
1224            std::time::SystemTime::now()
1225                .duration_since(std::time::UNIX_EPOCH)
1226                .unwrap()
1227                .as_nanos()
1228        ));
1229        std::fs::create_dir_all(&dir).unwrap();
1230        let path = dir.join("session.journal.jsonl");
1231
1232        let state = State::new();
1233        {
1234            let sink = Arc::new(FileJournalSink::create(&path).unwrap());
1235            state.set_journal_sink(sink);
1236            let _ = state.set("a", 1);
1237            let _ = state.set("a", 2);
1238            state.remove("a");
1239            // Replace the sink so the file sink drops (and flushes).
1240            state.set_journal_sink(Arc::new(MemoryJournalSink::new()));
1241        }
1242
1243        let data = std::fs::read_to_string(&path).unwrap();
1244        let parsed: Vec<StateMutation> = data
1245            .lines()
1246            .filter(|l| !l.trim().is_empty())
1247            .map(|l| serde_json::from_str(l).unwrap())
1248            .collect();
1249        assert_eq!(parsed.len(), 3);
1250        assert_eq!(parsed[0].new, Some(serde_json::json!(1)));
1251        assert_eq!(parsed[2].origin, StateMutationOrigin::Remove);
1252
1253        let _ = std::fs::remove_dir_all(&dir);
1254    }
1255
1256    #[test]
1257    fn set_and_get_string() {
1258        let state = State::new();
1259        let _ = state.set("name", "Alice");
1260        assert_eq!(state.get::<String>("name"), Some("Alice".to_string()));
1261    }
1262
1263    #[test]
1264    fn set_and_get_json() {
1265        let state = State::new();
1266        let _ = state.set("data", serde_json::json!({"temp": 22}));
1267        let v: Value = state.get("data").unwrap();
1268        assert_eq!(v["temp"], 22);
1269    }
1270
1271    #[test]
1272    fn pick_subset() {
1273        let state = State::new();
1274        let _ = state.set("a", 1);
1275        let _ = state.set("b", 2);
1276        let _ = state.set("c", 3);
1277        let picked = state.pick(&["a", "c"]);
1278        assert!(picked.contains("a"));
1279        assert!(!picked.contains("b"));
1280        assert!(picked.contains("c"));
1281    }
1282
1283    #[test]
1284    fn merge_states() {
1285        let s1 = State::new();
1286        let _ = s1.set("a", 1);
1287        let s2 = State::new();
1288        let _ = s2.set("b", 2);
1289        s1.merge(&s2);
1290        assert!(s1.contains("a"));
1291        assert!(s1.contains("b"));
1292    }
1293
1294    #[test]
1295    fn rename_key() {
1296        let state = State::new();
1297        let _ = state.set("old", "value");
1298        state.rename("old", "new");
1299        assert!(!state.contains("old"));
1300        assert_eq!(state.get::<String>("new"), Some("value".to_string()));
1301    }
1302
1303    #[test]
1304    fn remove_returns_value() {
1305        let state = State::new();
1306        let _ = state.set("key", 42);
1307        let removed = state.remove("key");
1308        assert!(removed.is_some());
1309        assert!(!state.contains("key"));
1310    }
1311
1312    #[test]
1313    fn get_missing_returns_none() {
1314        let state = State::new();
1315        assert_eq!(state.get::<String>("nope"), None);
1316    }
1317
1318    // ── Delta tracking tests ──────────────────────────────────────────────
1319
1320    #[test]
1321    fn delta_tracking_writes_to_delta() {
1322        let state = State::new();
1323        let _ = state.set("committed", "yes");
1324
1325        let tracked = state.with_delta_tracking();
1326        let _ = tracked.set("new_key", "new_value");
1327
1328        // New key visible through tracked state
1329        assert_eq!(
1330            tracked.get::<String>("new_key"),
1331            Some("new_value".to_string())
1332        );
1333        // But NOT visible in original (non-delta) state's inner
1334        assert!(!state.contains("new_key"));
1335        // Committed key still visible through tracked state
1336        assert_eq!(tracked.get::<String>("committed"), Some("yes".to_string()));
1337    }
1338
1339    #[test]
1340    fn delta_has_delta_reports_correctly() {
1341        let state = State::new();
1342        let tracked = state.with_delta_tracking();
1343        assert!(!tracked.has_delta());
1344
1345        let _ = tracked.set("key", "val");
1346        assert!(tracked.has_delta());
1347    }
1348
1349    #[test]
1350    fn delta_commit_merges_to_inner() {
1351        let state = State::new();
1352        let tracked = state.with_delta_tracking();
1353        let _ = tracked.set("key", "val");
1354        assert!(!state.contains("key"));
1355
1356        tracked.commit();
1357        // Now visible in original state
1358        assert_eq!(state.get::<String>("key"), Some("val".to_string()));
1359        assert!(!tracked.has_delta());
1360    }
1361
1362    #[test]
1363    fn delta_rollback_discards_changes() {
1364        let state = State::new();
1365        let tracked = state.with_delta_tracking();
1366        let _ = tracked.set("key", "val");
1367        assert!(tracked.has_delta());
1368
1369        tracked.rollback();
1370        assert!(!tracked.has_delta());
1371        assert!(!state.contains("key"));
1372        assert!(!tracked.contains("key"));
1373    }
1374
1375    #[test]
1376    fn delta_snapshot() {
1377        let state = State::new();
1378        let tracked = state.with_delta_tracking();
1379        let _ = tracked.set("a", 1);
1380        let _ = tracked.set("b", 2);
1381
1382        let snapshot = tracked.delta();
1383        assert_eq!(snapshot.len(), 2);
1384        assert!(snapshot.contains_key("a"));
1385        assert!(snapshot.contains_key("b"));
1386    }
1387
1388    #[test]
1389    fn set_committed_bypasses_delta() {
1390        let state = State::new();
1391        let tracked = state.with_delta_tracking();
1392        let _ = tracked.set_committed("direct", "value");
1393
1394        // Visible immediately in inner
1395        assert_eq!(state.get::<String>("direct"), Some("value".to_string()));
1396        // Not in delta
1397        assert!(!tracked.has_delta());
1398        // Still visible through tracked (reads inner too)
1399        assert_eq!(tracked.get::<String>("direct"), Some("value".to_string()));
1400    }
1401
1402    #[test]
1403    fn mutation_journal_records_set_and_remove() {
1404        let state = State::new();
1405        let _ = state.set("key", "first");
1406        let _ = state.set("key", "second");
1407        state.remove("key");
1408
1409        let mutations = state.recent_mutations();
1410        assert_eq!(mutations.len(), 3);
1411        assert_eq!(mutations[0].key, "key");
1412        assert_eq!(mutations[0].old, None);
1413        assert_eq!(mutations[0].new, Some(serde_json::json!("first")));
1414        assert_eq!(mutations[0].origin, StateMutationOrigin::Set);
1415
1416        assert_eq!(mutations[1].old, Some(serde_json::json!("first")));
1417        assert_eq!(mutations[1].new, Some(serde_json::json!("second")));
1418
1419        assert_eq!(mutations[2].old, Some(serde_json::json!("second")));
1420        assert_eq!(mutations[2].new, None);
1421        assert_eq!(mutations[2].origin, StateMutationOrigin::Remove);
1422    }
1423
1424    #[test]
1425    fn mutation_journal_is_shared_with_delta_tracking() {
1426        let state = State::new();
1427        let _ = state.set("committed", "yes");
1428
1429        let tracked = state.with_delta_tracking();
1430        let _ = tracked.set("committed", "maybe");
1431        tracked.commit();
1432
1433        let mutations = state.recent_mutations();
1434        assert_eq!(mutations.len(), 3);
1435        assert_eq!(mutations[1].key, "committed");
1436        assert_eq!(mutations[1].old, Some(serde_json::json!("yes")));
1437        assert_eq!(mutations[1].new, Some(serde_json::json!("maybe")));
1438        assert_eq!(mutations[1].origin, StateMutationOrigin::Set);
1439        assert!(mutations[1].delta);
1440
1441        assert_eq!(mutations[2].origin, StateMutationOrigin::Commit);
1442        assert!(!mutations[2].delta);
1443    }
1444
1445    #[test]
1446    fn drain_mutations_clears_journal() {
1447        let state = State::new();
1448        let _ = state.set("a", 1);
1449        let _ = state.set("b", 2);
1450
1451        let drained = state.drain_mutations();
1452        assert_eq!(drained.len(), 2);
1453        assert!(state.recent_mutations().is_empty());
1454    }
1455
1456    #[test]
1457    fn mutation_cursor_reads_only_later_changes() {
1458        let state = State::new();
1459        let _ = state.set("before", 1);
1460        let cursor = state.mutation_cursor();
1461
1462        let _ = state.set("after", 2);
1463        state.remove("before");
1464
1465        let mutations = state.mutations_since(cursor);
1466        assert_eq!(mutations.len(), 2);
1467        assert_eq!(mutations[0].key, "after");
1468        assert_eq!(mutations[1].key, "before");
1469    }
1470
1471    #[test]
1472    fn no_delta_tracking_preserves_existing_behavior() {
1473        let state = State::new();
1474        assert!(!state.is_tracking_delta());
1475        let _ = state.set("key", "val");
1476        assert_eq!(state.get::<String>("key"), Some("val".to_string()));
1477        assert!(!state.has_delta());
1478    }
1479
1480    // ── Prefix tests ──────────────────────────────────────────────────────
1481
1482    #[test]
1483    fn prefix_app_set_and_get() {
1484        let state = State::new();
1485        let _ = state.app().set("flag", true);
1486
1487        // Accessible via prefix accessor
1488        assert_eq!(state.app().get::<bool>("flag"), Some(true));
1489        // Also accessible via raw key
1490        assert_eq!(state.get::<bool>("app:flag"), Some(true));
1491    }
1492
1493    #[test]
1494    fn prefix_user_set_and_get() {
1495        let state = State::new();
1496        let _ = state.user().set("name", "Alice");
1497        assert_eq!(
1498            state.user().get::<String>("name"),
1499            Some("Alice".to_string())
1500        );
1501        assert_eq!(state.get::<String>("user:name"), Some("Alice".to_string()));
1502    }
1503
1504    #[test]
1505    fn prefix_temp_set_and_get() {
1506        let state = State::new();
1507        let _ = state.temp().set("scratch", 42);
1508        assert_eq!(state.temp().get::<i32>("scratch"), Some(42));
1509    }
1510
1511    #[test]
1512    fn prefix_contains_and_remove() {
1513        let state = State::new();
1514        let _ = state.app().set("x", 1);
1515        assert!(state.app().contains("x"));
1516        state.app().remove("x");
1517        assert!(!state.app().contains("x"));
1518    }
1519
1520    #[test]
1521    fn prefix_keys() {
1522        let state = State::new();
1523        let _ = state.app().set("a", 1);
1524        let _ = state.app().set("b", 2);
1525        let _ = state.user().set("c", 3);
1526
1527        let app_keys = state.app().keys();
1528        assert_eq!(app_keys.len(), 2);
1529        assert!(app_keys.contains(&"a".to_string()));
1530        assert!(app_keys.contains(&"b".to_string()));
1531
1532        let user_keys = state.user().keys();
1533        assert_eq!(user_keys.len(), 1);
1534        assert!(user_keys.contains(&"c".to_string()));
1535    }
1536
1537    #[test]
1538    fn prefix_with_delta_tracking() {
1539        let state = State::new();
1540        let tracked = state.with_delta_tracking();
1541        let _ = tracked.app().set("flag", true);
1542
1543        // Visible in tracked state via prefix
1544        assert_eq!(tracked.app().get::<bool>("flag"), Some(true));
1545        // In delta, not committed
1546        assert!(tracked.has_delta());
1547        assert!(!state.contains("app:flag"));
1548
1549        tracked.commit();
1550        assert_eq!(state.get::<bool>("app:flag"), Some(true));
1551    }
1552
1553    // ── New prefix accessor tests ────────────────────────────────────────
1554
1555    #[test]
1556    fn prefix_session_set_and_get() {
1557        let state = State::new();
1558        let _ = state.session().set("turn_count", 5);
1559        assert_eq!(state.session().get::<i32>("turn_count"), Some(5));
1560        assert_eq!(state.get::<i32>("session:turn_count"), Some(5));
1561    }
1562
1563    #[test]
1564    fn prefix_turn_set_and_get() {
1565        let state = State::new();
1566        let _ = state.turn().set("transcript", "hello");
1567        assert_eq!(
1568            state.turn().get::<String>("transcript"),
1569            Some("hello".to_string())
1570        );
1571        assert_eq!(
1572            state.get::<String>("turn:transcript"),
1573            Some("hello".to_string())
1574        );
1575    }
1576
1577    #[test]
1578    fn prefix_bg_set_and_get() {
1579        let state = State::new();
1580        let _ = state.bg().set("task_id", "abc-123");
1581        assert_eq!(
1582            state.bg().get::<String>("task_id"),
1583            Some("abc-123".to_string())
1584        );
1585        assert_eq!(
1586            state.get::<String>("bg:task_id"),
1587            Some("abc-123".to_string())
1588        );
1589    }
1590
1591    #[test]
1592    fn prefix_session_contains_and_remove() {
1593        let state = State::new();
1594        let _ = state.session().set("x", 1);
1595        assert!(state.session().contains("x"));
1596        state.session().remove("x");
1597        assert!(!state.session().contains("x"));
1598    }
1599
1600    #[test]
1601    fn prefix_turn_keys() {
1602        let state = State::new();
1603        let _ = state.turn().set("a", 1);
1604        let _ = state.turn().set("b", 2);
1605        let _ = state.session().set("c", 3);
1606
1607        let turn_keys = state.turn().keys();
1608        assert_eq!(turn_keys.len(), 2);
1609        assert!(turn_keys.contains(&"a".to_string()));
1610        assert!(turn_keys.contains(&"b".to_string()));
1611    }
1612
1613    // ── ReadOnlyPrefixedState (derived) tests ────────────────────────────
1614
1615    #[test]
1616    fn derived_read_only_get() {
1617        let state = State::new();
1618        // Write via raw key (simulating ComputedRegistry)
1619        let _ = state.set("derived:sentiment", "positive");
1620        assert_eq!(
1621            state.derived().get::<String>("sentiment"),
1622            Some("positive".to_string())
1623        );
1624    }
1625
1626    #[test]
1627    fn derived_read_only_get_raw() {
1628        let state = State::new();
1629        let _ = state.set("derived:score", serde_json::json!(0.95));
1630        let raw = state.derived().get_raw("score");
1631        assert!(raw.is_some());
1632        assert_eq!(raw.unwrap(), serde_json::json!(0.95));
1633    }
1634
1635    #[test]
1636    fn derived_read_only_contains() {
1637        let state = State::new();
1638        let _ = state.set("derived:exists", true);
1639        assert!(state.derived().contains("exists"));
1640        assert!(!state.derived().contains("missing"));
1641    }
1642
1643    #[test]
1644    fn derived_read_only_keys() {
1645        let state = State::new();
1646        let _ = state.set("derived:a", 1);
1647        let _ = state.set("derived:b", 2);
1648        let _ = state.set("app:c", 3);
1649
1650        let derived_keys = state.derived().keys();
1651        assert_eq!(derived_keys.len(), 2);
1652        assert!(derived_keys.contains(&"a".to_string()));
1653        assert!(derived_keys.contains(&"b".to_string()));
1654    }
1655
1656    #[test]
1657    fn derived_missing_key_returns_none() {
1658        let state = State::new();
1659        assert_eq!(state.derived().get::<String>("nope"), None);
1660        assert_eq!(state.derived().get_raw("nope"), None);
1661    }
1662
1663    // ── snapshot_values tests ────────────────────────────────────────────
1664
1665    #[test]
1666    fn snapshot_values_captures_existing_keys() {
1667        let state = State::new();
1668        let _ = state.set("a", 1);
1669        let _ = state.set("b", "hello");
1670        let _ = state.set("c", true);
1671
1672        let snap = state.snapshot_values(&["a", "b", "missing"]);
1673        assert_eq!(snap.len(), 2);
1674        assert_eq!(snap.get("a"), Some(&serde_json::json!(1)));
1675        assert_eq!(snap.get("b"), Some(&serde_json::json!("hello")));
1676        assert!(!snap.contains_key("missing"));
1677    }
1678
1679    #[test]
1680    fn snapshot_values_empty_keys() {
1681        let state = State::new();
1682        let _ = state.set("a", 1);
1683        let snap = state.snapshot_values(&[]);
1684        assert!(snap.is_empty());
1685    }
1686
1687    // ── diff_values tests ────────────────────────────────────────────────
1688
1689    #[test]
1690    fn diff_values_detects_changed_value() {
1691        let state = State::new();
1692        let _ = state.set("x", 1);
1693        let snap = state.snapshot_values(&["x"]);
1694
1695        let _ = state.set("x", 2);
1696        let diffs = state.diff_values(&snap, &["x"]);
1697        assert_eq!(diffs.len(), 1);
1698        assert_eq!(diffs[0].0, "x");
1699        assert_eq!(diffs[0].1, serde_json::json!(1));
1700        assert_eq!(diffs[0].2, serde_json::json!(2));
1701    }
1702
1703    #[test]
1704    fn diff_values_detects_new_key() {
1705        let state = State::new();
1706        let snap = state.snapshot_values(&["y"]);
1707
1708        let _ = state.set("y", "new");
1709        let diffs = state.diff_values(&snap, &["y"]);
1710        assert_eq!(diffs.len(), 1);
1711        assert_eq!(diffs[0].0, "y");
1712        assert_eq!(diffs[0].1, Value::Null);
1713        assert_eq!(diffs[0].2, serde_json::json!("new"));
1714    }
1715
1716    #[test]
1717    fn diff_values_detects_removed_key() {
1718        let state = State::new();
1719        let _ = state.set("z", 42);
1720        let snap = state.snapshot_values(&["z"]);
1721
1722        state.remove("z");
1723        let diffs = state.diff_values(&snap, &["z"]);
1724        assert_eq!(diffs.len(), 1);
1725        assert_eq!(diffs[0].0, "z");
1726        assert_eq!(diffs[0].1, serde_json::json!(42));
1727        assert_eq!(diffs[0].2, Value::Null);
1728    }
1729
1730    #[test]
1731    fn diff_values_no_change() {
1732        let state = State::new();
1733        let _ = state.set("stable", 10);
1734        let snap = state.snapshot_values(&["stable"]);
1735
1736        // No mutation
1737        let diffs = state.diff_values(&snap, &["stable"]);
1738        assert!(diffs.is_empty());
1739    }
1740
1741    #[test]
1742    fn diff_values_multiple_keys_mixed_changes() {
1743        let state = State::new();
1744        let _ = state.set("a", 1);
1745        let _ = state.set("b", 2);
1746        let snap = state.snapshot_values(&["a", "b", "c"]);
1747
1748        let _ = state.set("a", 10); // changed
1749                                    // b unchanged
1750        let _ = state.set("c", 3); // new
1751
1752        let diffs = state.diff_values(&snap, &["a", "b", "c"]);
1753        assert_eq!(diffs.len(), 2); // a changed, c new; b unchanged
1754        let diff_keys: Vec<&str> = diffs.iter().map(|(k, _, _)| k.as_str()).collect();
1755        assert!(diff_keys.contains(&"a"));
1756        assert!(diff_keys.contains(&"c"));
1757    }
1758
1759    // ── clear_prefix tests ───────────────────────────────────────────────
1760
1761    #[test]
1762    fn clear_prefix_removes_matching_keys() {
1763        let state = State::new();
1764        let _ = state.set("turn:a", 1);
1765        let _ = state.set("turn:b", 2);
1766        let _ = state.set("app:c", 3);
1767        let _ = state.set("session:d", 4);
1768
1769        state.clear_prefix("turn:");
1770        assert!(!state.contains("turn:a"));
1771        assert!(!state.contains("turn:b"));
1772        assert!(state.contains("app:c"));
1773        assert!(state.contains("session:d"));
1774    }
1775
1776    #[test]
1777    fn clear_prefix_no_matching_keys_is_noop() {
1778        let state = State::new();
1779        let _ = state.set("app:x", 1);
1780        state.clear_prefix("turn:");
1781        assert!(state.contains("app:x"));
1782    }
1783
1784    #[test]
1785    fn clear_prefix_also_clears_delta() {
1786        let state = State::new();
1787        let _ = state.set("turn:committed", 1);
1788        let tracked = state.with_delta_tracking();
1789        let _ = tracked.set("turn:delta_val", 2);
1790
1791        // Both committed and delta have turn: keys
1792        assert!(tracked.contains("turn:committed"));
1793        assert!(tracked.contains("turn:delta_val"));
1794
1795        tracked.clear_prefix("turn:");
1796        assert!(!tracked.contains("turn:committed"));
1797        assert!(!tracked.contains("turn:delta_val"));
1798    }
1799
1800    #[test]
1801    fn clear_prefix_via_turn_accessor() {
1802        let state = State::new();
1803        let _ = state.turn().set("x", 1);
1804        let _ = state.turn().set("y", 2);
1805        let _ = state.app().set("z", 3);
1806
1807        state.clear_prefix("turn:");
1808        assert!(state.turn().keys().is_empty());
1809        assert!(state.app().contains("z"));
1810    }
1811
1812    // ── modify() tests ──────────────────────────────────────────────────
1813
1814    #[test]
1815    fn modify_increment_existing() {
1816        let state = State::new();
1817        let _ = state.set("count", 5u32);
1818        let result = state.modify("count", 0u32, |n| n + 1).unwrap();
1819        assert_eq!(result, 6);
1820        assert_eq!(state.get::<u32>("count"), Some(6));
1821    }
1822
1823    #[test]
1824    fn modify_uses_default_when_missing() {
1825        let state = State::new();
1826        let result = state.modify("new_count", 0u32, |n| n + 1).unwrap();
1827        assert_eq!(result, 1);
1828        assert_eq!(state.get::<u32>("new_count"), Some(1));
1829    }
1830
1831    #[test]
1832    fn modify_with_delta_tracking() {
1833        let state = State::new();
1834        let _ = state.set("x", 10u32);
1835        let tracked = state.with_delta_tracking();
1836        let result = tracked.modify("x", 0u32, |n| n * 2).unwrap();
1837        assert_eq!(result, 20);
1838        // Written to delta, not committed
1839        assert_eq!(tracked.get::<u32>("x"), Some(20));
1840        assert_eq!(state.get::<u32>("x"), Some(10)); // original unchanged
1841    }
1842
1843    // ── derived fallback tests ──────────────────────────────────────────
1844
1845    #[test]
1846    fn get_falls_back_to_derived_prefix() {
1847        let state = State::new();
1848        let _ = state.set("derived:risk", 0.85);
1849        // Access without prefix — should find derived:risk
1850        assert_eq!(state.get::<f64>("risk"), Some(0.85));
1851    }
1852
1853    #[test]
1854    fn get_prefers_direct_key_over_derived() {
1855        let state = State::new();
1856        let _ = state.set("score", 1.0);
1857        let _ = state.set("derived:score", 0.5);
1858        // Direct key should win
1859        assert_eq!(state.get::<f64>("score"), Some(1.0));
1860    }
1861
1862    #[test]
1863    fn get_derived_fallback_skipped_for_prefixed_keys() {
1864        let state = State::new();
1865        let _ = state.set("derived:risk", 0.85);
1866        // Prefixed key should NOT trigger fallback
1867        assert_eq!(state.get::<f64>("app:risk"), None);
1868    }
1869
1870    #[test]
1871    fn get_derived_fallback_with_delta_tracking() {
1872        let state = State::new();
1873        let tracked = state.with_delta_tracking();
1874        let _ = tracked.set("derived:computed_val", 42);
1875        assert_eq!(tracked.get::<i32>("computed_val"), Some(42));
1876    }
1877
1878    // ── with() zero-copy borrow tests ──────────────────────────────────
1879
1880    #[test]
1881    fn with_reads_from_inner() {
1882        let state = State::new();
1883        let _ = state.set("name", "Alice");
1884        let len = state.with("name", |v| v.as_str().unwrap().len());
1885        assert_eq!(len, Some(5));
1886    }
1887
1888    #[test]
1889    fn with_reads_from_delta_first() {
1890        let state = State::new();
1891        let _ = state.set("x", 1);
1892        let tracked = state.with_delta_tracking();
1893        let _ = tracked.set("x", 99);
1894        let val = tracked.with("x", |v| v.as_i64().unwrap());
1895        assert_eq!(val, Some(99));
1896    }
1897
1898    #[test]
1899    fn with_falls_back_to_inner_when_not_in_delta() {
1900        let state = State::new();
1901        let _ = state.set("committed", "yes");
1902        let tracked = state.with_delta_tracking();
1903        let val = tracked.with("committed", |v| v.as_str().unwrap().to_string());
1904        assert_eq!(val, Some("yes".to_string()));
1905    }
1906
1907    #[test]
1908    fn with_falls_back_to_derived() {
1909        let state = State::new();
1910        let _ = state.set("derived:risk", 0.85);
1911        let val = state.with("risk", |v| v.as_f64().unwrap());
1912        assert_eq!(val, Some(0.85));
1913    }
1914
1915    #[test]
1916    fn with_derived_fallback_skipped_for_prefixed() {
1917        let state = State::new();
1918        let _ = state.set("derived:risk", 0.85);
1919        let val = state.with("app:risk", |v| v.as_f64().unwrap());
1920        assert_eq!(val, None);
1921    }
1922
1923    #[test]
1924    fn with_returns_none_for_missing() {
1925        let state = State::new();
1926        let val = state.with("missing", |v| v.clone());
1927        assert_eq!(val, None);
1928    }
1929
1930    #[test]
1931    fn with_on_prefixed_state() {
1932        let state = State::new();
1933        let _ = state.app().set("flag", true);
1934        let val = state.app().with("flag", |v| v.as_bool().unwrap());
1935        assert_eq!(val, Some(true));
1936    }
1937
1938    #[test]
1939    fn with_on_read_only_prefixed_state() {
1940        let state = State::new();
1941        let _ = state.set("derived:score", serde_json::json!(0.95));
1942        let val = state.derived().with("score", |v| v.as_f64().unwrap());
1943        assert_eq!(val, Some(0.95));
1944    }
1945
1946    // ── StateKey typed key tests ───────────────────────────────────────
1947
1948    const TURN_COUNT: StateKey<u32> = StateKey::new("session:turn_count");
1949    const NAME: StateKey<String> = StateKey::new("user:name");
1950
1951    #[test]
1952    fn state_key_get_and_set() {
1953        let state = State::new();
1954        let _ = state.set_key(&TURN_COUNT, 5);
1955        assert_eq!(state.get_key(&TURN_COUNT), Some(5));
1956    }
1957
1958    #[test]
1959    fn state_key_get_missing() {
1960        let state = State::new();
1961        assert_eq!(state.get_key(&TURN_COUNT), None);
1962    }
1963
1964    #[test]
1965    fn state_key_string_type() {
1966        let state = State::new();
1967        let _ = state.set_key(&NAME, "Alice".to_string());
1968        assert_eq!(state.get_key(&NAME), Some("Alice".to_string()));
1969    }
1970
1971    #[test]
1972    fn state_key_with() {
1973        let state = State::new();
1974        let _ = state.set_key(&TURN_COUNT, 42);
1975        let val = state.with_key(&TURN_COUNT, |v| v.as_u64().unwrap());
1976        assert_eq!(val, Some(42));
1977    }
1978
1979    #[test]
1980    fn state_key_interop_with_raw() {
1981        let state = State::new();
1982        let _ = state.set_key(&TURN_COUNT, 10);
1983        // Can also read via raw key
1984        assert_eq!(state.get::<u32>("session:turn_count"), Some(10));
1985    }
1986
1987    #[test]
1988    fn slot_evidence_aggregates_value_provenance_and_journal() {
1989        let state = State::new();
1990        let _ = state.set("party_size", 6u8);
1991        // Provenance written under the state_meta convention (as resolvers do).
1992        let _ = state.set(
1993            "state_meta:party_size",
1994            serde_json::json!({ "source": "extraction", "confidence": 0.9 }),
1995        );
1996
1997        let ev = state.evidence("party_size");
1998        assert!(ev.present);
1999        assert_eq!(ev.value, Some(serde_json::json!(6)));
2000        assert_eq!(ev.source.as_deref(), Some("extraction"));
2001        assert_eq!(ev.confidence, Some(0.9));
2002        assert!(ev.last_sequence.is_some());
2003        assert_eq!(ev.last_origin, Some(StateMutationOrigin::Set));
2004
2005        // An absent key reports no evidence.
2006        let missing = state.evidence("nope");
2007        assert!(!missing.present);
2008        assert!(missing.source.is_none());
2009    }
2010
2011    // ── Transaction-invariant tests (the verified correctness bugs) ──────────
2012
2013    #[test]
2014    fn rollback_restores_base_after_remove() {
2015        // Regression: previously remove() in delta mode deleted from the committed
2016        // store, so rollback() could not restore it.
2017        let base = State::new();
2018        let _ = base.set("k", "original");
2019
2020        let tx = base.with_delta_tracking();
2021        assert_eq!(tx.remove("k"), Some(serde_json::json!("original")));
2022        assert_eq!(tx.get::<String>("k"), None); // tombstoned in the tx view
2023        assert_eq!(base.get::<String>("k"), Some("original".into())); // base intact
2024
2025        tx.rollback();
2026        assert_eq!(tx.get::<String>("k"), Some("original".into()));
2027        assert_eq!(base.get::<String>("k"), Some("original".into()));
2028    }
2029
2030    #[test]
2031    fn rollback_restores_base_after_clear_prefix() {
2032        // Regression: clear_prefix() used to mutate the committed store directly.
2033        let base = State::new();
2034        let _ = base.set("app:a", 1u32);
2035        let _ = base.set("app:b", 2u32);
2036        let _ = base.set("user:c", 3u32);
2037
2038        let tx = base.with_delta_tracking();
2039        tx.clear_prefix("app:");
2040        assert_eq!(tx.get::<u32>("app:a"), None);
2041        assert_eq!(tx.get::<u32>("app:b"), None);
2042        assert_eq!(tx.get::<u32>("user:c"), Some(3));
2043        // Base untouched until commit.
2044        assert_eq!(base.get::<u32>("app:a"), Some(1));
2045
2046        tx.rollback();
2047        assert_eq!(tx.get::<u32>("app:a"), Some(1));
2048        assert_eq!(tx.get::<u32>("app:b"), Some(2));
2049    }
2050
2051    #[test]
2052    fn commit_applies_removals() {
2053        let base = State::new();
2054        let _ = base.set("k", "v");
2055        let tx = base.with_delta_tracking();
2056        tx.remove("k");
2057        tx.commit();
2058        assert_eq!(base.get::<String>("k"), None);
2059    }
2060
2061    #[test]
2062    fn commit_applies_prefix_clear() {
2063        let base = State::new();
2064        let _ = base.set("app:a", 1u32);
2065        let _ = base.set("user:c", 3u32);
2066        let tx = base.with_delta_tracking();
2067        tx.clear_prefix("app:");
2068        tx.commit();
2069        assert_eq!(base.get::<u32>("app:a"), None);
2070        assert_eq!(base.get::<u32>("user:c"), Some(3));
2071    }
2072
2073    #[test]
2074    fn modify_is_atomic_under_concurrency() {
2075        use std::sync::Arc;
2076        use std::thread;
2077
2078        let state = Arc::new(State::new());
2079        let _ = state.set("count", 0u64);
2080
2081        let threads = 8;
2082        let per_thread = 1000;
2083        let handles: Vec<_> = (0..threads)
2084            .map(|_| {
2085                let state = state.clone();
2086                thread::spawn(move || {
2087                    for _ in 0..per_thread {
2088                        let _ = state.modify("count", 0u64, |n| n + 1);
2089                    }
2090                })
2091            })
2092            .collect();
2093        for h in handles {
2094            h.join().unwrap();
2095        }
2096        // With a real per-key atomic RMW, no increments are lost.
2097        assert_eq!(
2098            state.get::<u64>("count"),
2099            Some((threads * per_thread) as u64)
2100        );
2101    }
2102}
2103
2104#[cfg(test)]
2105mod proptests {
2106    use super::*;
2107    use proptest::prelude::*;
2108
2109    // A transaction's puts and removes never leak to the base before commit, and
2110    // a rollback always restores the exact committed base.
2111    proptest! {
2112        #[test]
2113        fn rollback_always_restores_base(
2114            base_keys in proptest::collection::vec(("[a-c]", 0u32..5), 0..6),
2115            ops in proptest::collection::vec(
2116                prop_oneof![
2117                    ("[a-c]", 0u32..5).prop_map(|(k, v)| (k, Some(v))),
2118                    "[a-c]".prop_map(|k| (k, None)),
2119                ],
2120                0..12,
2121            ),
2122        ) {
2123            let base = State::new();
2124            for (k, v) in &base_keys {
2125                let _ = base.set(k.clone(), *v);
2126            }
2127            let snapshot = |s: &State| -> std::collections::BTreeMap<String, Value> {
2128                s.keys().into_iter().filter_map(|k| s.get_raw(&k).map(|v| (k, v))).collect()
2129            };
2130            let before = snapshot(&base);
2131
2132            let tx = base.with_delta_tracking();
2133            for (k, v) in &ops {
2134                match v {
2135                    Some(v) => { let _ = tx.set(k.clone(), *v); }
2136                    None => { tx.remove(k); }
2137                }
2138            }
2139            // Base is never mutated while the tx is open.
2140            prop_assert_eq!(&before, &snapshot(&base));
2141
2142            tx.rollback();
2143            prop_assert_eq!(&before, &snapshot(&tx));
2144        }
2145    }
2146}