gemini_adk_rs/live/replay.rs
1//! Offline session replay — feed a recorded wire log through the **real**
2//! control plane.
3//!
4//! Recording happens at L0 via
5//! [`SessionConfig::record_wire`](gemini_genai_rs::prelude::SessionConfig::record_wire)
6//! (every wire byte, both directions, as [`WireEntry`] JSONL). This module
7//! closes the loop: [`replay_session`] opens a
8//! [`ReplayTransport`] over the log's inbound frames and attaches the same three-lane processor a
9//! live connection would get — phase machine, extractors, watchers, tool
10//! dispatch, flow governance all run for real. Nothing is mocked above the
11//! transport seam.
12//!
13//! What replay does and does not do:
14//!
15//! - **Does**: re-decode every recorded inbound frame, re-drive the L1
16//! processor (events, state writes, tool dispatch through whatever
17//! dispatcher you attach), and collect the outbound frames the processor
18//! regenerates (setup, tool responses) for comparison against the log.
19//! - **Does not**: re-execute the model. The model's outputs are *in* the
20//! recorded inbound frames. User-originated sends (text/audio) are in the
21//! log's outbound entries but are not re-sent — they only ever existed to
22//! provoke the recorded inbound frames.
23//!
24//! ```rust,no_run
25//! # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
26//! use gemini_adk_rs::live::replay::replay_session;
27//! use gemini_adk_rs::live::LiveSessionBuilder;
28//! use gemini_genai_rs::prelude::{read_wire_log, SessionConfig};
29//!
30//! let entries = read_wire_log("session.wire.jsonl")?;
31//! let config = SessionConfig::new("offline");
32//! let builder = LiveSessionBuilder::new(config.clone());
33//! let replay = replay_session(config, builder, &entries).await?;
34//!
35//! let mut events = replay.handle().events();
36//! replay.release(); // start streaming recorded frames
37//! replay.drained().await; // all frames handed to the session loop
38//! # let _ = events.try_recv();
39//! # Ok(())
40//! # }
41//! ```
42
43use std::time::Duration;
44
45use gemini_genai_rs::prelude::{SessionConfig, SessionPhase, WireEntry};
46use gemini_genai_rs::session::SessionHandle;
47use gemini_genai_rs::transport::replay::{ReplayControl, ReplayTransport};
48use gemini_genai_rs::transport::{connect_with, JsonCodec, TransportConfig};
49
50use crate::error::AgentError;
51
52use super::builder::{build_runtime, spawn_lanes, LiveSessionBuilder};
53use super::events::LiveEvent;
54use super::handle::LiveHandle;
55
56/// Attach the full L1 control plane (three-lane processor, phase machine,
57/// extractors, watchers, tool dispatch, …) to an **already connected** L0
58/// session.
59///
60/// This is the seam that makes replay possible without touching the network:
61/// connect the L0 session over any [`Transport`](gemini_genai_rs::transport::Transport)
62/// (e.g. [`ReplayTransport`] or [`MockTransport`](gemini_genai_rs::transport::MockTransport)), then hand
63/// it here together with a configured [`LiveSessionBuilder`].
64///
65/// Note: the builder's own `SessionConfig` is *not* re-sent — the setup
66/// message was already encoded from the config given to the L0 connect call.
67/// Subscribe to events **after** this returns and only then let the transport
68/// stream (for `ReplayTransport`, call
69/// [`ReplayControl::release`](gemini_genai_rs::transport::replay::ReplayControl::release)),
70/// otherwise early frames race the subscription.
71pub async fn attach_session(
72 builder: LiveSessionBuilder,
73 session: SessionHandle,
74) -> Result<LiveHandle, AgentError> {
75 let plan = builder.into_plan()?;
76 session.wait_for_phase(SessionPhase::Active).await;
77 let runtime = build_runtime(plan, session);
78 spawn_lanes(runtime).await
79}
80
81/// A replayed session: the live handle plus the replay controls.
82pub struct ReplaySession {
83 handle: LiveHandle,
84 control: ReplayControl,
85}
86
87impl ReplaySession {
88 /// The live handle — same type a real connection returns. `state()`,
89 /// `events()`, `telemetry()`, `extracted()` all work.
90 pub fn handle(&self) -> &LiveHandle {
91 &self.handle
92 }
93
94 /// Start streaming the recorded inbound frames. Call after subscribing
95 /// to [`LiveHandle::events`].
96 pub fn release(&self) {
97 self.control.release();
98 }
99
100 /// Wait until every recorded inbound frame has been handed to the session
101 /// loop. The last frame's effects may still be propagating through the
102 /// processor — use [`collect_events_until_idle`] (or assert on state) to
103 /// settle.
104 pub async fn drained(&self) {
105 self.control.drained().await;
106 }
107
108 /// Outbound frames the replayed session has sent so far (setup, tool
109 /// responses, …), in send order, for comparison against the recorded log.
110 pub fn outbound_frames(&self) -> Vec<Vec<u8>> {
111 self.control.outbound_frames()
112 }
113
114 /// Disconnect the replayed session.
115 pub async fn disconnect(&self) -> Result<(), gemini_genai_rs::session::SessionError> {
116 self.handle.disconnect().await
117 }
118}
119
120/// Replay a recorded wire log through the real L1 processor, offline.
121///
122/// - `config` is used to open the replay transport (its re-encoded setup
123/// message becomes the first outbound frame, mirroring the original run).
124/// Use the same configuration as the recorded session for a faithful setup
125/// comparison. No network is touched and no credential is used.
126/// - `builder` supplies the control plane: dispatcher, phases, extractors,
127/// watchers, state, callbacks. Attach the original tool implementations to
128/// re-execute tools deterministically; without a dispatcher, recorded tool
129/// calls surface as events but produce no responses.
130/// - `entries` is the recorded log; only its inbound frames are replayed
131/// (outbound entries are kept in the log purely for comparison/audit).
132///
133/// Frames are delivered as fast as the session loop consumes them (no
134/// original-timing pacing). The replay is gated: nothing past the setup
135/// handshake flows until [`ReplaySession::release`] is called, so subscribe
136/// to events first.
137pub async fn replay_session(
138 config: SessionConfig,
139 builder: LiveSessionBuilder,
140 entries: &[WireEntry],
141) -> Result<ReplaySession, AgentError> {
142 let (transport, control) = ReplayTransport::from_wire_log(entries);
143 let transport_config = TransportConfig {
144 max_reconnect_attempts: 0,
145 connect_timeout_secs: 5,
146 setup_timeout_secs: 5,
147 ..TransportConfig::default()
148 };
149 let session = connect_with(config, transport_config, transport, JsonCodec)
150 .await
151 .map_err(AgentError::Session)?;
152 let handle = attach_session(builder, session).await?;
153 Ok(ReplaySession { handle, control })
154}
155
156/// Collect [`LiveEvent`]s until the stream stays idle for `idle` (or `max`
157/// elapses). Useful for settling an as-fast-as-possible replay where "done"
158/// means "no more effects are propagating".
159pub async fn collect_events_until_idle(
160 rx: &mut tokio::sync::broadcast::Receiver<LiveEvent>,
161 idle: Duration,
162 max: Duration,
163) -> Vec<LiveEvent> {
164 let mut events = Vec::new();
165 let deadline = tokio::time::Instant::now() + max;
166 loop {
167 let timeout = idle.min(deadline.saturating_duration_since(tokio::time::Instant::now()));
168 if timeout.is_zero() {
169 break;
170 }
171 match tokio::time::timeout(timeout, rx.recv()).await {
172 Ok(Ok(event)) => events.push(event),
173 Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
174 Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => break,
175 Err(_) => break, // idle
176 }
177 }
178 events
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use gemini_genai_rs::prelude::GeminiModel;
185
186 #[tokio::test]
187 async fn replay_session_reaches_active_and_emits_events() {
188 let entries = vec![
189 WireEntry {
190 seq: 1,
191 dir: gemini_genai_rs::prelude::WireDirection::Inbound,
192 ts_ms: 1,
193 payload: br#"{"setupComplete":{}}"#.to_vec(),
194 },
195 WireEntry {
196 seq: 2,
197 dir: gemini_genai_rs::prelude::WireDirection::Inbound,
198 ts_ms: 2,
199 payload:
200 br#"{"serverContent":{"modelTurn":{"parts":[{"text":"Hi"}]},"turnComplete":true}}"#
201 .to_vec(),
202 },
203 ];
204 let config = SessionConfig::new("offline").model(GeminiModel::Gemini2_0FlashLive);
205 let builder = LiveSessionBuilder::new(config.clone());
206
207 let replay = replay_session(config, builder, &entries).await.unwrap();
208 let mut events = replay.handle().events();
209 replay.release();
210 replay.drained().await;
211
212 let collected = collect_events_until_idle(
213 &mut events,
214 Duration::from_millis(200),
215 Duration::from_secs(5),
216 )
217 .await;
218
219 assert!(
220 collected
221 .iter()
222 .any(|e| matches!(e, LiveEvent::TextDelta(t) if t == "Hi")),
223 "expected replayed TextDelta, got {collected:?}"
224 );
225 assert!(collected
226 .iter()
227 .any(|e| matches!(e, LiveEvent::TurnComplete)));
228
229 // The replayed session re-encoded and "sent" the setup message.
230 let outbound = replay.outbound_frames();
231 assert!(!outbound.is_empty());
232 assert!(String::from_utf8(outbound[0].clone())
233 .unwrap()
234 .contains("\"setup\""));
235
236 replay.disconnect().await.unwrap();
237 }
238}