gemini_adk_rs/live/
replay.rs

1//! Offline session replay — feed a recorded wire log through the **real**
2//! control plane.
3//!
4//! Recording happens at L0 via
5//! [`SessionConfig::record_wire`](gemini_genai_rs::prelude::SessionConfig::record_wire)
6//! (every wire byte, both directions, as [`WireEntry`] JSONL). This module
7//! closes the loop: [`replay_session`] opens a
8//! [`ReplayTransport`] over the log's inbound frames and attaches the same three-lane processor a
9//! live connection would get — phase machine, extractors, watchers, tool
10//! dispatch, flow governance all run for real. Nothing is mocked above the
11//! transport seam.
12//!
13//! What replay does and does not do:
14//!
15//! - **Does**: re-decode every recorded inbound frame, re-drive the L1
16//!   processor (events, state writes, tool dispatch through whatever
17//!   dispatcher you attach), and collect the outbound frames the processor
18//!   regenerates (setup, tool responses) for comparison against the log.
19//! - **Does not**: re-execute the model. The model's outputs are *in* the
20//!   recorded inbound frames. User-originated sends (text/audio) are in the
21//!   log's outbound entries but are not re-sent — they only ever existed to
22//!   provoke the recorded inbound frames.
23//!
24//! ```rust,no_run
25//! # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
26//! use gemini_adk_rs::live::replay::replay_session;
27//! use gemini_adk_rs::live::LiveSessionBuilder;
28//! use gemini_genai_rs::prelude::{read_wire_log, SessionConfig};
29//!
30//! let entries = read_wire_log("session.wire.jsonl")?;
31//! let config = SessionConfig::new("offline");
32//! let builder = LiveSessionBuilder::new(config.clone());
33//! let replay = replay_session(config, builder, &entries).await?;
34//!
35//! let mut events = replay.handle().events();
36//! replay.release(); // start streaming recorded frames
37//! replay.drained().await; // all frames handed to the session loop
38//! # let _ = events.try_recv();
39//! # Ok(())
40//! # }
41//! ```
42
43use std::time::Duration;
44
45use gemini_genai_rs::prelude::{SessionConfig, SessionPhase, WireEntry};
46use gemini_genai_rs::session::SessionHandle;
47use gemini_genai_rs::transport::replay::{ReplayControl, ReplayTransport};
48use gemini_genai_rs::transport::{connect_with, JsonCodec, TransportConfig};
49
50use crate::error::AgentError;
51
52use super::builder::{build_runtime, spawn_lanes, LiveSessionBuilder};
53use super::events::LiveEvent;
54use super::handle::LiveHandle;
55
56/// Attach the full L1 control plane (three-lane processor, phase machine,
57/// extractors, watchers, tool dispatch, …) to an **already connected** L0
58/// session.
59///
60/// This is the seam that makes replay possible without touching the network:
61/// connect the L0 session over any [`Transport`](gemini_genai_rs::transport::Transport)
62/// (e.g. [`ReplayTransport`] or [`MockTransport`](gemini_genai_rs::transport::MockTransport)), then hand
63/// it here together with a configured [`LiveSessionBuilder`].
64///
65/// Note: the builder's own `SessionConfig` is *not* re-sent — the setup
66/// message was already encoded from the config given to the L0 connect call.
67/// Subscribe to events **after** this returns and only then let the transport
68/// stream (for `ReplayTransport`, call
69/// [`ReplayControl::release`](gemini_genai_rs::transport::replay::ReplayControl::release)),
70/// otherwise early frames race the subscription.
71pub async fn attach_session(
72    builder: LiveSessionBuilder,
73    session: SessionHandle,
74) -> Result<LiveHandle, AgentError> {
75    let plan = builder.into_plan()?;
76    session.wait_for_phase(SessionPhase::Active).await;
77    let runtime = build_runtime(plan, session);
78    spawn_lanes(runtime).await
79}
80
81/// A replayed session: the live handle plus the replay controls.
82pub struct ReplaySession {
83    handle: LiveHandle,
84    control: ReplayControl,
85}
86
87impl ReplaySession {
88    /// The live handle — same type a real connection returns. `state()`,
89    /// `events()`, `telemetry()`, `extracted()` all work.
90    pub fn handle(&self) -> &LiveHandle {
91        &self.handle
92    }
93
94    /// Start streaming the recorded inbound frames. Call after subscribing
95    /// to [`LiveHandle::events`].
96    pub fn release(&self) {
97        self.control.release();
98    }
99
100    /// Wait until every recorded inbound frame has been handed to the session
101    /// loop. The last frame's effects may still be propagating through the
102    /// processor — use [`collect_events_until_idle`] (or assert on state) to
103    /// settle.
104    pub async fn drained(&self) {
105        self.control.drained().await;
106    }
107
108    /// Outbound frames the replayed session has sent so far (setup, tool
109    /// responses, …), in send order, for comparison against the recorded log.
110    pub fn outbound_frames(&self) -> Vec<Vec<u8>> {
111        self.control.outbound_frames()
112    }
113
114    /// Disconnect the replayed session.
115    pub async fn disconnect(&self) -> Result<(), gemini_genai_rs::session::SessionError> {
116        self.handle.disconnect().await
117    }
118}
119
120/// Replay a recorded wire log through the real L1 processor, offline.
121///
122/// - `config` is used to open the replay transport (its re-encoded setup
123///   message becomes the first outbound frame, mirroring the original run).
124///   Use the same configuration as the recorded session for a faithful setup
125///   comparison. No network is touched and no credential is used.
126/// - `builder` supplies the control plane: dispatcher, phases, extractors,
127///   watchers, state, callbacks. Attach the original tool implementations to
128///   re-execute tools deterministically; without a dispatcher, recorded tool
129///   calls surface as events but produce no responses.
130/// - `entries` is the recorded log; only its inbound frames are replayed
131///   (outbound entries are kept in the log purely for comparison/audit).
132///
133/// Frames are delivered as fast as the session loop consumes them (no
134/// original-timing pacing). The replay is gated: nothing past the setup
135/// handshake flows until [`ReplaySession::release`] is called, so subscribe
136/// to events first.
137pub async fn replay_session(
138    config: SessionConfig,
139    builder: LiveSessionBuilder,
140    entries: &[WireEntry],
141) -> Result<ReplaySession, AgentError> {
142    let (transport, control) = ReplayTransport::from_wire_log(entries);
143    let transport_config = TransportConfig {
144        max_reconnect_attempts: 0,
145        connect_timeout_secs: 5,
146        setup_timeout_secs: 5,
147        ..TransportConfig::default()
148    };
149    let session = connect_with(config, transport_config, transport, JsonCodec)
150        .await
151        .map_err(AgentError::Session)?;
152    let handle = attach_session(builder, session).await?;
153    Ok(ReplaySession { handle, control })
154}
155
156/// Collect [`LiveEvent`]s until the stream stays idle for `idle` (or `max`
157/// elapses). Useful for settling an as-fast-as-possible replay where "done"
158/// means "no more effects are propagating".
159pub async fn collect_events_until_idle(
160    rx: &mut tokio::sync::broadcast::Receiver<LiveEvent>,
161    idle: Duration,
162    max: Duration,
163) -> Vec<LiveEvent> {
164    let mut events = Vec::new();
165    let deadline = tokio::time::Instant::now() + max;
166    loop {
167        let timeout = idle.min(deadline.saturating_duration_since(tokio::time::Instant::now()));
168        if timeout.is_zero() {
169            break;
170        }
171        match tokio::time::timeout(timeout, rx.recv()).await {
172            Ok(Ok(event)) => events.push(event),
173            Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
174            Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => break,
175            Err(_) => break, // idle
176        }
177    }
178    events
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use gemini_genai_rs::prelude::GeminiModel;
185
186    #[tokio::test]
187    async fn replay_session_reaches_active_and_emits_events() {
188        let entries = vec![
189            WireEntry {
190                seq: 1,
191                dir: gemini_genai_rs::prelude::WireDirection::Inbound,
192                ts_ms: 1,
193                payload: br#"{"setupComplete":{}}"#.to_vec(),
194            },
195            WireEntry {
196                seq: 2,
197                dir: gemini_genai_rs::prelude::WireDirection::Inbound,
198                ts_ms: 2,
199                payload:
200                    br#"{"serverContent":{"modelTurn":{"parts":[{"text":"Hi"}]},"turnComplete":true}}"#
201                        .to_vec(),
202            },
203        ];
204        let config = SessionConfig::new("offline").model(GeminiModel::Gemini2_0FlashLive);
205        let builder = LiveSessionBuilder::new(config.clone());
206
207        let replay = replay_session(config, builder, &entries).await.unwrap();
208        let mut events = replay.handle().events();
209        replay.release();
210        replay.drained().await;
211
212        let collected = collect_events_until_idle(
213            &mut events,
214            Duration::from_millis(200),
215            Duration::from_secs(5),
216        )
217        .await;
218
219        assert!(
220            collected
221                .iter()
222                .any(|e| matches!(e, LiveEvent::TextDelta(t) if t == "Hi")),
223            "expected replayed TextDelta, got {collected:?}"
224        );
225        assert!(collected
226            .iter()
227            .any(|e| matches!(e, LiveEvent::TurnComplete)));
228
229        // The replayed session re-encoded and "sent" the setup message.
230        let outbound = replay.outbound_frames();
231        assert!(!outbound.is_empty());
232        assert!(String::from_utf8(outbound[0].clone())
233            .unwrap()
234            .contains("\"setup\""));
235
236        replay.disconnect().await.unwrap();
237    }
238}