gemini-rs

Full Rust SDK for the Gemini Multimodal Live API — wire protocol, agent runtime, and fluent DX in three layered crates.

┌─────────────────────────────────────────────────────┐
│  gemini-adk-fluent-rs (L2 — Fluent DX)                    │
│  AgentBuilder · Live · S·C·T·P·M·A operators       │
├─────────────────────────────────────────────────────┤
│  gemini-adk-rs (L1 — Agent Runtime)                       │
│  Agent · Tools · State · Phases · TextAgent · LLM  │
├─────────────────────────────────────────────────────┤
│  gemini-genai-rs (L0 — Wire Protocol)                     │
│  Transport · Session · Protocol · VAD · Buffers    │
└─────────────────────────────────────────────────────┘

Quick Start

use gemini_adk_fluent_rs::prelude::*;

let handle = Live::builder()
    .model(GeminiModel::Gemini2_0Flash)
    .voice(Voice::Kore)
    .instruction("You are a helpful assistant.")
    .on_audio(|audio| { /* play audio */ })
    .on_text(|text| { /* display text */ })
    .connect()
    .await?;

Guide Structure

This book is organized into six sections:

  • Getting Started — Local setup, architecture overview, migration guide, and best practices
  • Voice & Live Sessions — Building real-time voice agents with phases, state, and watchers
  • Tools & Extraction — Tool system and structured data extraction from conversations
  • Composition & Patterns — Text agent combinators, S·C·T·P·M·A operators, middleware
  • Examples — 30 progressive cookbook examples (Crawl/Walk/Run) plus interactive gemini-adk-web-rs demos
  • ADK Web UI — Design system, dark/light mode, DevTools panels, and the cookbook browser

API Reference

For detailed type and method documentation, see the rustdoc API reference.

CrateLayerAPI Docs
gemini-genai-rsL0 — Wire Protocolgemini_genai_rs
gemini-adk-rsL1 — Agent Runtimegemini_adk_rs
gemini-adk-fluent-rsL2 — Fluent DXgemini_adk_fluent_rs

Setup and Running

This guide gets the workspace, cookbook examples, and ADK Web UI running from a fresh checkout.

1. Install Prerequisites

Ubuntu or Debian

sudo apt-get update
sudo apt-get install -y pkg-config libssl-dev libasound2-dev build-essential

macOS

xcode-select --install
brew install pkg-config openssl

You also need a stable Rust toolchain:

rustup update stable
rustup default stable

2. Choose Authentication

Create .env in the repository root.

Google AI

Use this for the fastest local setup.

GOOGLE_API_KEY=your-api-key

Vertex AI

Use this for project-scoped Google Cloud usage.

GOOGLE_CLOUD_PROJECT=your-project-id
GOOGLE_CLOUD_LOCATION=us-central1
GOOGLE_GENAI_USE_VERTEXAI=TRUE

Then authenticate application default credentials:

gcloud auth application-default login

3. Run ADK Web

cargo run -p gemini-adk-web-rs

Open:

http://localhost:25125

The landing page lists every bundled app. Open a voice app such as voice-chat, call-screening, or debt-collection, allow microphone access, and use the DevTools panel on the right to inspect state, phases, metrics, tools, traces, and cookbook guidance.

4. Run Cookbook Examples

The cookbook examples are plain Rust binaries:

cargo run -p example-cookbook --example 01_simple_agent
cargo run -p example-cookbook --example 17_evaluation_suite
cargo run -p example-cookbook --example 29_live_voice

The learning path is:

TierExamplesFocus
Crawl01-10Single-agent foundations, tools, callbacks, state, guards
Walk11-20Routing, fallback, middleware, context, evaluation, artifacts
Run21-30Production compositions, testing, voice, dispatch, telemetry

5. Verify the Workspace

Useful checks while developing:

cargo test -p gemini-adk-rs
cargo test -p gemini-adk-fluent-rs
cargo test -p gemini-adk-web-rs

For frontend-only changes:

node --check apps/gemini-adk-web-rs/static/js/app.js
node --check apps/gemini-adk-web-rs/static/js/devtools.js

Troubleshooting

SymptomCheck
Web UI does not openConfirm the server printed http://localhost:25125 and no firewall is blocking the port.
Microphone is silentBrowser microphone permission must be allowed; Linux also needs libasound2-dev.
Live API auth failsConfirm .env is in the repository root and contains either GOOGLE_API_KEY or Vertex AI settings.
Vertex AI rejects setup fieldsThe SDK strips Google AI-only fields automatically; confirm GOOGLE_GENAI_USE_VERTEXAI=TRUE.
Linker fails with ld terminatedRetry after closing other large builds; this is usually local linker memory pressure, not Rust code.

What to Inspect in DevTools

PanelUse it for
TimelineEvent ordering, interruptions, tool calls, turn boundaries
EventsRaw JSON payloads for exact debugging
StateCanonical state, raw extractor output, state_meta:* provenance
PhasesCurrent phase, requirements, transitions, state promotion decisions
MetricsLatency, tokens, interruptions, playback buffer health
TracesSpan timing across model, tools, and runtime work
CookbookSource path, run command, and app-specific inspection checklist

Architecture Overview

This guide explains how the gemini-genai-rs workspace is structured, how data flows through the system, and how to decide which layer to build on.

The Three-Crate Stack

The workspace is organized into three crates, each adding a layer of abstraction on top of the one below:

+--------------------------------------------------+
|  gemini-adk-fluent-rs  (L2)  — Fluent DX                |
|  Live::builder(), operator algebra, composition   |
+--------------------------------------------------+
|  gemini-adk-rs  (L1)  — Agent Runtime                   |
|  LiveSessionBuilder, callbacks, tool dispatch,    |
|  state, phases, watchers, extractors, telemetry   |
+--------------------------------------------------+
|  gemini-genai-rs  (L0)  — Wire Protocol                 |
|  SessionHandle, SessionConfig, Transport, Codec,  |
|  AuthProvider, events, commands, VAD, buffers     |
+--------------------------------------------------+
|          Gemini Multimodal Live API               |
|       (WebSocket, full-duplex audio/text)         |
+--------------------------------------------------+

L0: gemini-genai-rs

The wire protocol crate. Maps 1:1 to the Gemini API surface. No application logic, no opinions about how you structure your app.

What it provides:

  • SessionConfig for building the setup message (model, voice, tools, VAD)
  • SessionHandle for sending commands and subscribing to events
  • ConnectBuilder for establishing the WebSocket connection
  • Transport / Codec / AuthProvider traits for pluggable I/O
  • SessionEvent enum (17 variants) for everything the server can send
  • SessionCommand enum (9 variants) for everything you can send
  • SessionPhase FSM with validated transitions
  • Audio buffers (lock-free SPSC ring buffer, adaptive jitter buffer)
  • Client-side VAD (Voice Activity Detection)

L1: gemini-adk-rs

The agent runtime. Adds the event processing loop, typed callbacks, automatic tool dispatch, state management, and the phase machine.

What it provides:

  • LiveSessionBuilder to wire up config + callbacks + tools in one place
  • EventCallbacks with typed fast-lane (sync) and control-lane (async) hooks
  • ToolDispatcher with ToolFunction, StreamingTool, InputStreamingTool
  • State (concurrent key-value store with prefix scoping and delta tracking)
  • PhaseMachine for multi-step conversation flows
  • WatcherRegistry for state-reactive triggers
  • TranscriptBuffer for accumulating conversation history
  • TurnExtractor / LlmExtractor for structured data extraction
  • LiveHandle as the runtime API surface
  • SessionSignals + SessionTelemetry for observability
  • TextAgent trait and combinators for text-based LLM pipelines

L2: gemini-adk-fluent-rs

The fluent developer experience layer. Wraps L1 in a chainable builder API and adds operator-algebra composition (S, C, T, P, M modules).

What it provides:

  • Live::builder() with method chaining for the entire configuration surface
  • .phase() / .watch() / .computed() sub-builders that flow back naturally
  • .connect_google_ai() / .connect_vertex() for one-line connection
  • T::simple(), T::google_search() for composable tool registration
  • S, C, T, P, M operator modules with | composition
  • let_clone! macro for reducing Arc::clone boilerplate in closures
  • Test utilities and mock helpers

Data Flow

Here is how data moves through the system during a live session:

  Client App                    gemini-genai-rs                   Gemini API
  ----------                    --------------                   ----------

  Microphone
      |
      v
  [PCM16 16kHz] --send_audio()--> SessionHandle --WebSocket--> Gemini Live
                                       |                          |
                                  SessionCommand                  |
                                  (mpsc channel)                  |
                                       |                          |
                                  Transport::send()               |
                                       |                          v
                                       |                    Model processes
                                       |                    audio/text/tools
                                       |                          |
                                  Transport::recv()               |
                                       |                          |
                                  Codec::decode()                 |
                                       |                          |
                                  SessionEvent          <--- WebSocket frames
                                  (broadcast channel)
                                       |
                              +--------+--------+
                              |        |        |
                          Fast Lane  Ctrl Lane  Telemetry Lane
                              |        |        |
                          on_audio  on_tool  SessionSignals
                          on_text   phases   SessionTelemetry
                          on_vad    extract
                              |        |
                              v        v
                          Speaker   State
                          Display   Updates

Outbound path: Your app calls send_audio() / send_text() on the LiveHandle (L1/L2) or SessionHandle (L0). These become SessionCommand variants sent through an mpsc channel to the transport loop, which encodes them via the Codec and sends them over the WebSocket.

Inbound path: The transport loop receives WebSocket frames, decodes them via the Codec into SessionEvent variants, and broadcasts them. The three-lane processor (L1) routes each event to the appropriate lane.

Three-Lane Processor

Audio arrives at 40-100 events per second. Tool dispatch can take 1-30 seconds. Sharing one processing loop causes audio stutter during tool execution. The solution: split the event stream into three priority lanes.

Fast Lane (sync, <1ms)

Handles latency-sensitive events with sync callbacks that must never block:

EventCallback
AudioDataon_audio(&Bytes)
TextDeltaon_text(&str)
TextCompleteon_text_complete(&str)
InputTranscriptionon_input_transcript(&str, bool)
OutputTranscriptionon_output_transcript(&str, bool)
VoiceActivityStarton_vad_start()
VoiceActivityEndon_vad_end()
InterruptedSets interrupted flag, stops forwarding audio

Fast lane callbacks are Fn (not FnMut, not async). If your callback takes longer than 1ms, audio playback will stutter.

Control Lane (async, can block)

Handles events that require I/O, state mutation, or multi-step processing:

EventCallback
ToolCallon_tool_call (auto-dispatch or manual)
ToolCallCancelledCancels pending tool tasks
Interruptedon_interrupted()
TurnCompleteExtractors, phase transitions, on_turn_complete()
GoAwayon_go_away(Duration)
Connectedon_connected()
Disconnectedon_disconnected(Option<String>)
Erroron_error(String)

The control lane also owns the TranscriptBuffer (no Arc<Mutex<>>) and runs extractors concurrently via join_all.

Telemetry Lane (async, debounced)

Runs on its own broadcast receiver. Collects SessionSignals (activity timestamps, timing, token usage) and SessionTelemetry (atomic counters for audio chunks, tool calls, interruptions, latency tracking, token counts). Flushes periodically (100ms debounce) with zero overhead on the hot path.

The telemetry lane also handles UsageMetadata events from the Gemini API, recording prompt/response/cached/thoughts token counts in both SessionSignals (as session: state keys) and SessionTelemetry (as atomic counters). The .on_usage() callback fires here for real-time token observation.

The Router

The router is the zero-work dispatcher that sits between the broadcast channel and the two processing lanes. It pattern-matches each SessionEvent and sends it to the correct lane(s) via mpsc channels. No session signals, no telemetry, no allocations on the hot path.

Key Traits

TraitCratePurpose
TransportL0 (gemini_genai_rs::transport::ws)Bidirectional byte transport (WebSocket or mock)
CodecL0 (gemini_genai_rs::transport::codec)Encode commands / decode server messages (JSON default)
AuthProviderL0 (gemini_genai_rs::transport::auth)URL construction + auth headers (Google AI / Vertex AI)
SessionWriterL0 (gemini_genai_rs::session)Send audio/text/video/tools/instructions (trait object safe)
SessionReaderL0 (gemini_genai_rs::session)Subscribe to events, observe phase
ToolFunctionL1 (gemini_adk_rs::tool)One-shot tool: call(args) -> Result<Value>
StreamingToolL1 (gemini_adk_rs::tool)Background tool yielding multiple results
InputStreamingToolL1 (gemini_adk_rs::tool)Tool receiving live input while running
TurnExtractorL1 (gemini_adk_rs::live::extractor)Extract structured data from transcript window
TextAgentL1 (gemini_adk_rs::text)Text-based LLM agent (generate(), not Live)
BaseLlmL1 (gemini_adk_rs::llm)LLM abstraction for generate() calls

Choosing Your Layer

Use L0 (gemini-genai-rs) if you need:

  • Raw WebSocket access with no abstraction overhead
  • Custom event loop logic that does not fit the callback model
  • A custom transport (e.g., Unix domain socket, QUIC)
  • To build your own agent runtime
  • Maximum control over every message sent and received
use gemini_genai_rs::prelude::*;

let config = SessionConfig::from_endpoint(ApiEndpoint::google_ai("YOUR_KEY"))
    .model(GeminiModel::Gemini2_0FlashLive);

let handle = ConnectBuilder::new(config).build().await?;
let mut events = handle.subscribe();

handle.send_text("Hello").await?;
while let Some(event) = recv_event(&mut events).await {
    match event {
        SessionEvent::TextDelta(text) => print!("{text}"),
        SessionEvent::TurnComplete => break,
        _ => {}
    }
}

Use L1 (gemini-adk-rs) if you need:

  • Automatic tool dispatch without manual message matching
  • State management with prefix scoping (session:, turn:, app:)
  • Phase machine for multi-step conversation flows
  • Turn extraction (LLM-based or custom) between turns
  • Telemetry and session signals
  • Full control over callback registration without the fluent syntax

Use L2 (gemini-adk-fluent-rs) if you want:

  • The fastest path from zero to working voice agent
  • Chainable builder API with sub-builders for phases and watchers
  • Operator algebra for composing tools (T::simple() | T::google_search())
  • One-line connection (connect_vertex(project, location, token))
  • Sensible defaults (auto-enables transcription when extractors are used)
use gemini_adk_fluent_rs::prelude::*;

let handle = Live::builder()
    .model(GeminiModel::Gemini2_0FlashLive)
    .voice(Voice::Kore)
    .instruction("You are a helpful assistant")
    .on_audio(|data| { /* play audio */ })
    .on_text(|t| print!("{t}"))
    .connect_google_ai("YOUR_KEY")
    .await?;

handle.send_text("Hello!").await?;
handle.done().await?;

Most developers should start at L2 and drop to L1/L0 only when they hit a specific limitation. The layers are designed to compose: you can use L0 types (like SessionConfig) with L2 builders via Live::connect(config).

Migration Guide: L0 -> L1 -> L2

This guide shows the same voice agent implemented at all three layers, so you can see what each layer adds and decide where to build.

Why Migrate?

Each layer removes a category of boilerplate:

What you writeL0 (gemini-genai-rs)L1 (gemini-adk-rs)L2 (gemini-adk-fluent-rs)
WebSocket connectionManualManualOne line
Event loop (select!)ManualAutomaticAutomatic
Tool dispatch + responseManualAutomaticAutomatic
State managementNoneBuilt-inBuilt-in
Phase transitionsManualPhaseMachine.phase() builder
Turn extractionNoneTurnExtractor.extract_turns::<T>()
TelemetryNoneSessionTelemetryAuto-collected
Instruction updatesManualinstruction_template.instruction_template()

The tradeoff is control. L0 gives you total control over every message. L2 handles the common patterns automatically but gives you less room to customize the event processing loop itself.

L0: Wire Protocol

At L0, you work directly with SessionHandle, SessionEvent, and SessionCommand. You write your own event loop, dispatch tools manually, and manage all state yourself.

Here is a weather assistant with one tool:

use gemini_genai_rs::prelude::*;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Build session config with tool declaration
    let config = SessionConfig::from_endpoint(
        ApiEndpoint::google_ai(std::env::var("GEMINI_API_KEY")?)
    )
        .model(GeminiModel::Gemini2_0FlashLive)
        .system_instruction("You are a weather assistant. Use get_weather for queries.")
        .add_tool(Tool {
            function_declarations: Some(vec![FunctionDeclaration {
                name: "get_weather".into(),
                description: "Get current weather for a city".into(),
                parameters: Some(json!({
                    "type": "object",
                    "properties": {
                        "city": { "type": "string", "description": "City name" }
                    },
                    "required": ["city"]
                })),
            }]),
            ..Default::default()
        });

    // 2. Connect
    let handle = ConnectBuilder::new(config).build().await?;
    handle.wait_for_phase(SessionPhase::Active).await;

    // 3. Subscribe to events
    let mut events = handle.subscribe();

    // 4. Send a question
    handle.send_text("What's the weather in Tokyo?").await?;

    // 5. Manual event loop
    while let Some(event) = recv_event(&mut events).await {
        match event {
            SessionEvent::TextDelta(text) => {
                print!("{text}");
            }
            SessionEvent::TurnComplete => {
                println!();
            }
            SessionEvent::ToolCall(calls) => {
                // Manual tool dispatch
                let mut responses = Vec::new();
                for call in calls {
                    let result = match call.name.as_str() {
                        "get_weather" => {
                            let city = call.args.get("city")
                                .and_then(|v| v.as_str())
                                .unwrap_or("unknown");
                            json!({ "city": city, "temp_c": 22, "condition": "sunny" })
                        }
                        _ => json!({ "error": "unknown tool" }),
                    };
                    responses.push(FunctionResponse {
                        name: call.name.clone(),
                        id: call.id.clone(),
                        response: result,
                    });
                }
                // Manual response send
                handle.send_tool_response(responses).await?;
            }
            SessionEvent::Disconnected(_) => break,
            _ => {}
        }
    }

    Ok(())
}

Lines of code: ~70 What you manage: Event loop, tool dispatch, tool response serialization, phase waiting, all state.

L1: Agent Runtime

At L1, LiveSessionBuilder handles the event loop, tool dispatch, and state. You register callbacks and a ToolDispatcher instead of writing a match over every event variant.

Same weather assistant:

use gemini_adk_rs::{SimpleTool, ToolDispatcher, LiveSessionBuilder};
use gemini_genai_rs::prelude::*;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Create tool dispatcher
    let mut dispatcher = ToolDispatcher::new();
    dispatcher.register(SimpleTool::new(
        "get_weather",
        "Get current weather for a city",
        |args| async move {
            let city = args["city"].as_str().unwrap_or("unknown");
            Ok(json!({ "city": city, "temp_c": 22, "condition": "sunny" }))
        },
    ));

    // 2. Build session config
    let config = SessionConfig::from_endpoint(
        ApiEndpoint::google_ai(std::env::var("GEMINI_API_KEY")?)
    )
        .model(GeminiModel::Gemini2_0FlashLive)
        .system_instruction("You are a weather assistant. Use get_weather for queries.");

    // 3. Build callbacks
    let mut callbacks = gemini_adk_rs::EventCallbacks::default();
    callbacks.on_text = Some(Box::new(|t| print!("{t}")));
    callbacks.on_turn_complete = Some(std::sync::Arc::new(|| {
        Box::pin(async { println!() })
    }));

    // 4. Build and connect
    let handle = LiveSessionBuilder::new(config)
        .dispatcher(dispatcher)
        .callbacks(callbacks)
        .connect()
        .await?;

    // 5. Send a question (tools are auto-dispatched)
    handle.send_text("What's the weather in Tokyo?").await?;
    handle.done().await?;

    Ok(())
}

Lines of code: ~40 What changed: No event loop. No manual tool dispatch. No manual send_tool_response. The ToolDispatcher handles tool calls automatically: it matches the function name, deserializes args, calls your function, and sends the response back to the model.

You also get State (via handle.state()), SessionTelemetry (via handle.telemetry()), and the full three-lane processor for free.

L2: Fluent DX

At L2, Live::builder() wraps everything in a chainable API. The same weather assistant:

use gemini_adk_fluent_rs::prelude::*;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let handle = Live::builder()
        .model(GeminiModel::Gemini2_0FlashLive)
        .instruction("You are a weather assistant. Use get_weather for queries.")
        .with_tools(
            T::simple("get_weather", "Get current weather for a city", |args| async move {
                let city = args["city"].as_str().unwrap_or("unknown");
                Ok(json!({ "city": city, "temp_c": 22, "condition": "sunny" }))
            })
        )
        .on_text(|t| print!("{t}"))
        .on_turn_complete(|| async { println!() })
        .connect_google_ai(std::env::var("GEMINI_API_KEY")?)
        .await?;

    handle.send_text("What's the weather in Tokyo?").await?;
    handle.done().await?;
    Ok(())
}

Lines of code: ~20 What changed: No SessionConfig construction. No ToolDispatcher setup. No EventCallbacks struct. The builder infers everything:

  • .with_tools() creates and configures the ToolDispatcher
  • .instruction() sets the system instruction on the underlying SessionConfig
  • .connect_google_ai() builds the endpoint and connects in one call

L2 with Multiple Tools

Tools compose with the | operator:

let handle = Live::builder()
    .model(GeminiModel::Gemini2_0FlashLive)
    .instruction("You are a helpful assistant with access to tools.")
    .with_tools(
        T::simple("get_weather", "Get weather", |args| async move {
            Ok(json!({ "temp_c": 22 }))
        })
        | T::simple("get_time", "Get current time", |_| async move {
            Ok(json!({ "time": "14:30" }))
        })
        | T::google_search()
    )
    .on_text(|t| print!("{t}"))
    .connect_google_ai(api_key)
    .await?;

Feature Comparison Table

FeatureL0L1L2
WebSocket connectionConnectBuilder::new(config).build()LiveSessionBuilder::new(config).connect()Live::builder().connect_*()
Event loopManual while let + matchAutomatic (three-lane processor)Automatic
Audio callbackManual match SessionEvent::AudioDatacallbacks.on_audio = Some(...).on_audio(|data| ...)
Tool dispatchManual match + response sendToolDispatcher auto-dispatch.tools() or .with_tools()
Tool declarationManual Tool + FunctionDeclarationAuto from ToolFunction::parameters()Auto from T::simple()
State managementNone (DIY)State with prefixesState with prefixes
Phase machineNone (DIY)PhaseMachine::new().phase("name").instruction().done()
WatchersNone (DIY)WatcherRegistry.watch("key").became_true().then()
Turn extractionNone (DIY)TurnExtractor trait.extract_turns::<T>(llm, prompt)
Instruction templatehandle.update_instruction()callbacks.instruction_template.instruction_template(|state| ...)
Greetinghandle.send_text() after connectbuilder.greeting("...").greeting("...")
TelemetryNoneSessionTelemetry auto-collectedAuto-collected
Session signalsNoneSessionSignals auto-collectedAuto-collected
Transcription toggleconfig.enable_input_transcription()Same.transcription(true, true)
Computed stateNoneComputedRegistry.computed("key", &["deps"], |s| ...)
Temporal patternsNoneTemporalRegistry.when_sustained() / .when_rate()
Text agent toolsNoneTextAgentTool.agent_tool("name", "desc", agent)

When to Stay at L0

L0 is the right choice when you need:

Custom transport: You want to route WebSocket frames through a proxy, use a Unix socket, or implement a custom reconnection strategy.

let handle = ConnectBuilder::new(config)
    .transport(MyCustomTransport::new())
    .codec(MyCustomCodec::new())
    .build()
    .await?;

Non-standard event processing: Your application needs to process events in an order or pattern that does not fit the callback model (e.g., batching audio chunks before processing, custom priority queuing).

Embedding in a larger runtime: You are building your own agent framework and want wire-level access without the L1 runtime's task spawning.

Minimal binary size: L0 has fewer dependencies than L1/L2.

When to Stay at L1

L1 is the right choice when you need:

Programmatic callback registration: You build callbacks dynamically based on configuration or plugin systems, and the fluent builder syntax gets in the way.

let mut callbacks = EventCallbacks::default();
if config.enable_logging {
    callbacks.on_text = Some(Box::new(|t| println!("{t}")));
}
if config.enable_audio {
    callbacks.on_audio = Some(Box::new(move |data| {
        audio_tx.send(data.clone()).ok();
    }));
}

Custom PhaseMachine setup: You need to build the phase machine programmatically (e.g., phases loaded from a database at runtime).

Direct registry access: You want to add/configure ComputedRegistry, WatcherRegistry, or TemporalRegistry objects directly rather than through sub-builders.

Mixing Layers

The layers are designed to compose. Common patterns:

L0 config + L2 builder: Build a SessionConfig at L0 and pass it to the L2 builder. Useful when build_session_config() handles credential detection for you:

let config = build_session_config(Some("gemini-2.0-flash-live"))?
    .voice(Voice::Kore)
    .response_modalities(vec![Modality::Audio])
    .system_instruction("You are a helpful assistant.");

let handle = Live::builder()
    .on_audio(|data| { /* play */ })
    .on_text(|t| print!("{t}"))
    .connect(config)
    .await?;

L1 types in L2 callbacks: The on_tool_call callback receives State (an L1 type) that you can query and mutate:

let handle = Live::builder()
    .on_tool_call(|calls, state| async move {
        // Promote tool context to state
        state.set("last_tool", calls[0].name.clone());
        None // auto-dispatch
    })
    .connect_google_ai(api_key)
    .await?;

L0 handle from L2: Access the underlying SessionHandle for operations not exposed on LiveHandle:

let live_handle = Live::builder()
    .connect_google_ai(api_key)
    .await?;

// Access raw L0 handle
let session = live_handle.session();
let events = session.subscribe();
let phase = session.phase();

Migration Checklist

When migrating from L0 to L2:

  1. Replace SessionConfig::from_endpoint(...) with Live::builder().model().instruction()
  2. Replace manual Tool declarations with .tools(dispatcher) or .with_tools(T::simple(...))
  3. Replace the while let Some(event) = recv_event(...) loop with callbacks
  4. Replace match SessionEvent::AudioData with .on_audio()
  5. Replace match SessionEvent::TextDelta with .on_text()
  6. Replace manual send_tool_response() with ToolDispatcher auto-dispatch
  7. Replace ConnectBuilder::new(config).build() with .connect_google_ai() or .connect_vertex()
  8. Replace manual phase tracking with .phase("name").instruction().transition().done()
  9. Replace manual state HashMaps with .extract_turns::<T>() and handle.state()
  10. Remove the tokio::select! loop -- the three-lane processor handles it

Best Practices & Common Mistakes

Practical guidance for building with the gemini-rs stack. Organized by category: architecture decisions, performance constraints, common pitfalls, and testing patterns.

Architecture Best Practices

Use the highest-level crate that fits your needs

The three-crate stack is layered for a reason. Reach for the highest level that covers your use case:

L2 (gemini-adk-fluent-rs)  -- Fluent DX, operator algebra, AgentBuilder, Live builder
L1 (gemini-adk-rs)         -- Agent runtime, tools, state, phases, TextAgent
L0 (gemini-genai-rs)       -- Wire protocol, transport, auth, raw WebSocket

For applications, start with L2. Drop to L1 if you need custom processor logic. Drop to L0 only for raw WebSocket access or custom transport implementations.

// Recommended for applications
use gemini_adk_fluent_rs::prelude::*;

// Only if building custom processors
use gemini_adk_rs::*;

// Only for raw wire access
use gemini_genai_rs::prelude::*;

Use ContextInjection steering for multi-phase voice apps

Most multi-phase voice apps share a stable base persona across phases. Use SteeringMode::ContextInjection to set the persona once at connect and deliver phase-specific behavior as lightweight model-role context turns. This avoids the latency spike of system instruction replacement on every phase transition.

// Recommended for most apps
Live::builder()
    .instruction("You are a helpful restaurant reservation assistant.")
    .steering_mode(SteeringMode::ContextInjection)
    .phase("greeting")
        .instruction("Welcome the guest and ask how you can help.")
        .done()
    .phase("booking")
        .instruction("Help find an available time slot.")
        .done()
    .initial_phase("greeting")

Only use InstructionUpdate when phases represent genuinely different agent personas (e.g., switching from a receptionist to a triage nurse). See the Steering Modes guide for the full decision matrix and anti-patterns.

Use deferred context delivery for voice apps

When using ContextInjection, context turns sent during silence (between user speech) can cause audio glitches or confuse the model. Use ContextDelivery::Deferred to queue context and flush it alongside the next user interaction:

// Voice app: context arrives with user audio, not during silence
Live::builder()
    .steering_mode(SteeringMode::ContextInjection)
    .context_delivery(ContextDelivery::Deferred)
    .phase("greeting")
        .instruction("Welcome the guest")
        .done()
    .initial_phase("greeting")

The DeferredWriter wraps the session writer at the LiveHandle level. When handle.send_audio() is called, it drains any pending context first — two back-to-back frames with no gap. For text-only apps, Immediate (the default) is fine since there's no audio pipeline to disrupt.

Keep tool callbacks fast — or use background execution

The model waits for standard tool responses before continuing. A slow tool blocks the entire conversation turn. For tools that need to do expensive work (database queries, external API calls, LLM pipelines), you have two options:

  1. Set timeouts and cache for tools that must complete before the model continues
  2. Use background execution for tools where the model can continue speaking while results arrive
// Option 1: fast tool with timeout
let tool = SimpleTool::new("lookup", "Quick lookup", None, |args| async move {
    let result = tokio::time::timeout(
        Duration::from_secs(5),
        db.query(&args["id"]),
    ).await
    .map_err(|_| ToolError::ExecutionFailed("Database timeout".into()))?;
    Ok(json!(result))
});

// Option 2: background execution — model gets an ack immediately
Live::builder()
    .tools(dispatcher)
    .tool_background("search_knowledge_base")  // zero dead-air

Use concurrent callbacks for fire-and-forget work

Control-lane callbacks default to Blocking — the event loop waits for completion. For fire-and-forget work (logging, analytics, broadcasting to a UI), use _concurrent variants to avoid blocking the pipeline:

// Blocking: appropriate when ordering matters
.on_turn_complete(|| async { tx.send(TurnComplete).ok(); })

// Concurrent: fire-and-forget — doesn't block the next event
.on_extracted_concurrent(|name, val| async move {
    broadcast_to_ui(name, val).await;
})
.on_error_concurrent(|e| async move {
    send_to_error_tracker(&e).await;
})
.on_disconnected_concurrent(|reason| async move {
    info!("Disconnected: {reason:?}");
})

Use State::modify() for atomic updates

state.get() followed by state.set() is a race condition under concurrent access. Use modify() for atomic read-modify-write:

// Bad: race condition
let count: u32 = state.get("count").unwrap_or(0);
state.set("count", count + 1);

// Good: atomic read-modify-write
let count = state.modify("count", 0u32, |n| n + 1);

Extraction is out-of-band

Turn extractors run asynchronously after each turn completes. They do not block the model's response. This means:

  • Extracted values may not be available immediately after a turn
  • Do not rely on extraction results being instant for the next tool call
  • Use watchers if you need to react when extracted values change
// Extraction runs asynchronously -- the model may start its next turn
// before extraction completes
handle.extracted::<OrderState>("OrderState"); // may return stale data briefly

Phase transitions are reactive

Phase transitions fire on the next state check after the condition becomes true, not the instant state changes. This is by design -- it prevents mid-turn phase switching that would confuse the model.

// The transition predicate is checked after each turn, not continuously
.phase("greeting")
    .instruction("Welcome the user")
    .transition("main", S::is_true("greeted"))
    .done()

Declare tools at session start

Voice sessions (Live API) do not support adding or removing tool definitions mid-session. All tools must be declared in the SessionConfig before connecting. Only instructions can be updated during the session.

// Tools declared at build time -- cannot change after connect
let handle = Live::builder()
    .tools(dispatcher)  // fixed for the session's lifetime
    .connect_vertex(project, location, token)
    .await?;

// Instructions CAN be updated mid-session
handle.update_instruction("New instruction text").await?;

Use typed tools over simple tools

TypedTool auto-generates JSON Schema from your Rust struct via schemars::JsonSchema. This prevents schema drift and gives you compile-time type safety on arguments:

// Prefer TypedTool -- schema stays in sync with code
#[derive(Deserialize, JsonSchema)]
struct WeatherArgs {
    /// The city to get weather for
    city: String,
    /// Temperature unit (celsius or fahrenheit)
    unit: Option<String>,
}

let tool = TypedTool::new::<WeatherArgs>(
    "get_weather", "Get weather for a city",
    |args: WeatherArgs| async move {
        Ok(json!({"temp": 22, "city": args.city}))
    },
);

Use StateKey<T> for frequently accessed keys

Compile-time typed keys prevent typos and give you type inference:

const TURN_COUNT: StateKey<u32> = StateKey::new("session:turn_count");
const RISK_LEVEL: StateKey<f64> = StateKey::new("derived:risk");

// Type-safe access -- no risk of typos or wrong types
state.set_key(&TURN_COUNT, 5);
let count: Option<u32> = state.get_key(&TURN_COUNT);

Performance Best Practices

Fast lane callbacks must complete in under 1ms

The three-lane processor architecture separates hot-path audio processing (fast lane) from control logic (control lane). Fast lane callbacks are synchronous and must not:

  • Allocate heap memory
  • Acquire locks or mutexes
  • Perform async operations
  • Make system calls
// Good: fast lane callback -- just forward to a channel
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
.on_audio(move |data| { tx.send(data.clone()).ok(); })

// Bad: allocating and locking in the fast lane
.on_audio(move |data| {
    let processed = expensive_processing(data);  // too slow
    mutex.lock().push(processed);                 // blocks
})

Use Arc<dyn SessionWriter> -- do not clone session handles

When you need to share the session writer across tasks, wrap it in Arc:

// Good: share via Arc
let writer: Arc<dyn SessionWriter> = handle.writer();
let writer_clone = writer.clone();
tokio::spawn(async move { writer_clone.send_text("hello").await; });

// Bad: cloning the entire handle
let handle_clone = handle.clone();  // unnecessary overhead

Extractors run concurrently

Multiple turn extractors execute via futures::future::join_all, not sequentially. This means adding more extractors does not linearly increase latency -- they run in parallel.

// These three extractors run concurrently after each turn
Live::builder()
    .extract_turns::<Sentiment>(flash, "Extract emotional state")
    .extract_turns::<OrderInfo>(flash, "Extract order details")
    .extract_turns::<RiskScore>(flash, "Assess compliance risk")

SessionSignals uses AtomicU64

last_activity_ns is tracked with atomic operations (~1ns overhead), not Mutex<Instant>. Telemetry counters use atomic CAS operations. This means telemetry collection has near-zero impact on the hot path.

Common Mistakes

Vertex AI sends Binary WebSocket frames

Vertex AI sends Binary frames, not Text frames. The TungsteniteTransport handles this transparently, but if you are debugging at the WebSocket level, do not expect text frames.

Native audio model only supports AUDIO output

The Gemini2_0FlashLive model supports only Modality::Audio output, not Modality::Text. If you need text responses, use .text_only() on the builder, which sets Modality::Text explicitly:

// Voice output (default for live model)
Live::builder().model(GeminiModel::Gemini2_0FlashLive)
    // response_modalities defaults to [Audio]

// Text-only output
Live::builder().model(GeminiModel::Gemini2_0FlashLive).text_only()
    // response_modalities set to [Text]

Cannot update tool definitions mid-session

This is a Gemini Live API constraint. Tools are declared once at session start. Only instructions can be updated. If you need different tools in different phases, declare all tools up front and use phase-scoped tool filtering:

// Declare ALL tools at build time
Live::builder()
    .tools(all_tools_dispatcher)
    .phase("greeting")
        .tools(vec![])  // no tools in greeting phase
        .done()
    .phase("main")
        .tools(vec!["search".into(), "lookup".into()])  // filter to these
        .done()

The processor rejects tool calls not in the current phase's tools_enabled list.

Wrong Vertex AI endpoint

The global Vertex AI endpoint is wss://aiplatform.googleapis.com/..., NOT wss://global-aiplatform.googleapis.com/.... This is handled automatically by the Platform enum, but matters if you are constructing URLs manually.

API version mismatch

Google AI uses v1beta, Vertex AI uses v1beta1. Again, the Platform enum handles this, but be aware when reading API docs.

State prefix confusion

State keys can have prefixes: session:, derived:, turn:, app:, bg:, user:, temp:. When using state.get("risk"), the derived fallback automatically checks derived:risk if risk is not found. You do not need to manually check both:

// The derived fallback handles this automatically
state.set("derived:risk", 0.85);
assert_eq!(state.get::<f64>("risk"), Some(0.85));

// Use scoped accessors for clarity
state.derived().set("risk", 0.85);  // writes "derived:risk"
state.app().set("mode", "production");  // writes "app:mode"
state.turn().set("transcript", text);   // writes "turn:transcript" (cleared each turn)

Forgetting to declare tools in SessionConfig

Tools must be declared at session start. If you register tools in the ToolDispatcher but do not include their declarations in the session config, the model will not know they exist.

Blocking in on_audio callback

The on_audio callback runs on the fast lane. Blocking it stalls the entire audio pipeline:

// Bad: blocking the audio pipeline
.on_audio(|data| {
    std::thread::sleep(Duration::from_millis(10));  // stalls everything
})

// Good: non-blocking forward
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
.on_audio(move |data| { tx.send(data.clone()).ok(); })

Forgetting .done() on phase builders

Phase builder chains must end with .done() to return to the Live builder. Without it, you are still configuring the phase when you think you are configuring the session:

// Wrong: missing .done() -- next call configures the phase, not the session
Live::builder()
    .phase("greeting")
        .instruction("Welcome the user")
        // .done() is missing!
    .phase("main")  // this might not do what you expect
        .instruction("Handle the request")

// Correct
Live::builder()
    .phase("greeting")
        .instruction("Welcome the user")
        .done()  // returns to Live builder
    .phase("main")
        .instruction("Handle the request")
        .done()

Forgetting .initial_phase()

The phase machine requires an explicit initial phase. Without it, no phase is active and phase-scoped instructions will not apply:

Live::builder()
    .phase("greeting").instruction("...").done()
    .phase("main").instruction("...").done()
    .initial_phase("greeting")  // required

Using instruction_template with phases

instruction_template replaces the entire instruction, overwriting phase-specific instructions. For additive composition, use instruction_amendment or phase modifiers:

// Bad: template replaces everything, including phase instruction
.instruction_template(|state| format!("Context: {}", state.get::<String>("ctx").unwrap_or_default()))

// Good: amendment adds to the phase instruction
.instruction_amendment(|state| format!("\nContext: {}", state.get::<String>("ctx").unwrap_or_default()))

// Better: use P:: modifiers on phases
.phase("main")
    .instruction("Handle customer requests")
    .modifiers(vec![
        P::with_state(&["emotional_state"]),
        P::context_fn(|s| format!("Customer: {}", s.get::<String>("name").unwrap_or_default())),
    ])
    .done()

Testing Patterns

Use MockTransport for unit testing

MockTransport lets you test without real WebSocket connections. Inject scripted server responses:

use gemini_genai_rs::transport::MockTransport;

let mock = MockTransport::new(vec![
    // Scripted server messages
    ServerMessage::SetupComplete { ... },
    ServerMessage::ServerContent { ... },
]);

let (handle, _) = ConnectBuilder::new(config)
    .transport(mock)
    .build()
    .await?;

State is cheap to construct

State::new() creates an empty concurrent map. Use it freely in tests:

#[tokio::test]
async fn test_my_agent() {
    let state = State::new();
    state.set("input", "test query");
    state.set("user:name", "Test User");

    let result = my_agent.run(&state).await.unwrap();
    assert!(result.contains("expected output"));

    // Verify state mutations
    assert_eq!(state.get::<bool>("processed"), Some(true));
}

Test text agent pipelines with mock LLMs

Implement BaseLlm to create deterministic test fixtures:

struct MockLlm(String);

#[async_trait]
impl BaseLlm for MockLlm {
    fn model_id(&self) -> &str { "mock" }

    async fn generate(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
        Ok(LlmResponse {
            content: Content {
                role: Some(Role::Model),
                parts: vec![Part::Text { text: self.0.clone() }],
            },
            finish_reason: Some("STOP".into()),
            usage: None,
        })
    }
}

#[tokio::test]
async fn test_pipeline() {
    let llm: Arc<dyn BaseLlm> = Arc::new(MockLlm("mock output".into()));
    let agent = AgentBuilder::new("test")
        .instruction("Analyze this")
        .build(llm);

    let state = State::new();
    state.set("input", "test data");
    let result = agent.run(&state).await.unwrap();
    assert_eq!(result, "mock output");
}

Test composable operators structurally

Verify the operator tree structure without running agents:

#[test]
fn pipeline_structure() {
    let pipeline = AgentBuilder::new("a") >> AgentBuilder::new("b") >> AgentBuilder::new("c");
    match pipeline {
        Composable::Pipeline(p) => assert_eq!(p.steps.len(), 3),
        _ => panic!("expected Pipeline"),
    }
}

#[test]
fn fan_out_structure() {
    let fan = AgentBuilder::new("x") | AgentBuilder::new("y");
    match fan {
        Composable::FanOut(f) => assert_eq!(f.branches.len(), 2),
        _ => panic!("expected FanOut"),
    }
}

Test state transforms in isolation

S:: transforms operate on serde_json::Value and can be tested without any agent infrastructure:

#[test]
fn state_transform_chain() {
    let chain = S::pick(&["name", "age"]) >> S::rename(&[("name", "customer")]);
    let mut state = json!({"name": "Alice", "age": 30, "internal": "x"});
    chain.apply(&mut state);
    assert_eq!(state, json!({"customer": "Alice", "age": 30}));
}

Test context policies in isolation

C:: policies operate on Vec<Content> slices:

#[test]
fn context_window() {
    let history = vec![
        Content::user("a"),
        Content::model("b"),
        Content::user("c"),
    ];
    let result = C::window(2).apply(&history);
    assert_eq!(result.len(), 2);
}

Voice & Live Sessions

This guide covers everything you need to build voice-enabled agents with the Gemini Multimodal Live API using gemini-genai-rs.

What is a Live Session?

A Live Session is a full-duplex WebSocket connection to the Gemini API that supports simultaneous audio/video input and audio/text output. Unlike the standard generateContent REST API, a Live Session:

  • Streams audio bidirectionally (you talk while the model talks)
  • Uses server-side VAD (Voice Activity Detection) for turn management
  • Supports barge-in (interrupt the model mid-sentence)
  • Handles function calling inline with speech
  • Maintains conversation context server-side
  • Runs for up to ~10 minutes per session (with resumption support)

Audio formats:

  • Input: PCM16, 16 kHz, mono
  • Output: PCM16, 24 kHz, mono

Quick Start

A minimal live session in under 15 lines:

use gemini_adk_fluent_rs::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let handle = Live::builder()
        .model(GeminiModel::Gemini2_0FlashLive)
        .instruction("You are a helpful voice assistant.")
        .on_text(|t| print!("{t}"))
        .on_turn_complete(|| async { println!() })
        .connect_google_ai(std::env::var("GEMINI_API_KEY")?)
        .await?;

    handle.send_text("What is the capital of France?").await?;
    handle.done().await?;
    Ok(())
}

This connects to Gemini, sends a text message, prints the streamed response, and waits for the session to end. For audio, replace send_text() with send_audio() and add an on_audio callback.

The Live Builder

Live::builder() returns a chainable builder that configures the entire session. Here is the full chain with all major options:

let handle = Live::builder()
    // Model and voice
    .model(GeminiModel::Gemini2_5FlashNativeAudio)
    .voice(Voice::Kore)
    .temperature(0.7)
    .instruction("You are a restaurant order assistant.")

    // Tools (auto-dispatched when model calls them)
    .tools(dispatcher)

    // Audio/transcription config
    .transcription(true, true)   // input, output
    .affective_dialog(true)      // emotionally expressive responses

    // Server-side VAD
    .vad(AutomaticActivityDetection::default())

    // Session lifecycle
    .session_resume(true)
    .context_compression(4000, 2000)  // trigger_tokens, target_tokens

    // Greeting (model speaks first)
    .greeting("Greet the customer and ask what they'd like to order.")

    // Fast-lane callbacks (sync, <1ms)
    .on_audio(|data| { /* forward to speaker */ })
    .on_text(|t| print!("{t}"))
    .on_input_transcript(|text, _is_final| { /* display what user said */ })
    .on_output_transcript(|text, _is_final| { /* display what model said */ })
    .on_vad_start(|| { /* user started speaking */ })
    .on_vad_end(|| { /* user stopped speaking */ })

    // Telemetry callback (sync, telemetry lane)
    .on_usage(|usage| {
        if let Some(total) = usage.total_token_count {
            println!("Tokens: {total}");
        }
    })

    // Control-lane callbacks (async)
    .on_tool_call(|calls, state| async move { None })  // None = auto-dispatch
    .on_interrupted(|| async { /* flush playback buffer */ })
    .on_turn_complete(|| async { /* turn finished */ })
    .on_connected(|writer| async move { /* session ready */ })
    .on_disconnected(|reason| async move { /* session ended */ })
    .on_error(|msg| async move { eprintln!("Error: {msg}") })

    // Connect
    .connect_vertex("my-project", "us-central1", access_token)
    .await?;

The builder validates configuration at connect time, not at each method call. All methods are optional except the connect method.

Callbacks

Callbacks are split into two categories based on latency requirements.

Fast Lane (Sync)

These fire on the fast lane and must complete in under 1ms. They receive references (not owned values) and cannot be async.

// Audio: receives zero-copy Bytes (cloning bumps an Arc refcount, not data)
.on_audio(|data: &Bytes| {
    playback_tx.send(data.clone()).ok();
})

// Text: incremental deltas as the model generates
.on_text(|text: &str| {
    print!("{text}");
})

// Text complete: full text when model finishes a text response
.on_text_complete(|text: &str| {
    println!("\nComplete: {text}");
})

// Transcription: text version of audio (input or output)
// Second parameter is `is_final` (true when transcription is finalized)
.on_input_transcript(|text: &str, is_final: bool| {
    if is_final { println!("User said: {text}"); }
})

// VAD: voice activity detection events from the server
.on_vad_start(|| { /* user started talking */ })
.on_vad_end(|| { /* user stopped talking */ })

// Usage metadata: token counts from the server (fires on telemetry lane)
.on_usage(|usage: &UsageMetadata| {
    if let Some(total) = usage.total_token_count {
        println!("Total tokens: {total}");
    }
    // Also available: prompt_token_count, response_token_count,
    // cached_content_token_count, thoughts_token_count,
    // tool_use_prompt_token_count, plus per-modality breakdowns
})

Control Lane (Async)

These fire on the control lane and can perform I/O, state access, or any async work. They block the control lane while running (other control events queue behind them).

// Tool calls: return None for auto-dispatch, Some for manual responses
.on_tool_call(|calls: Vec<FunctionCall>, state: State| async move {
    // Read state if needed
    let user_id: Option<String> = state.get("user_id");
    // Return None to let the ToolDispatcher handle it
    None
})

// Interrupted: model was interrupted by barge-in
.on_interrupted(|| async {
    playback_buffer.flush().await;
})

// Turn complete: model finished its response
.on_turn_complete(|| async {
    println!("--- turn complete ---");
})

// Connected: session is ready (receives SessionWriter for advanced use)
.on_connected(|writer: Arc<dyn SessionWriter>| async move {
    println!("Session connected");
})

// Disconnected: session ended (receives optional reason string)
.on_disconnected(|reason: Option<String>| async move {
    println!("Disconnected: {reason:?}");
})

// Error: non-fatal error (session continues)
.on_error(|msg: String| async move {
    eprintln!("Error: {msg}");
})

Callback Execution Modes

Control-lane callbacks support two execution modes via CallbackMode:

Blocking (default) — the event loop waits for the callback to complete. Use when subsequent events depend on the callback's side effects, or when ordering guarantees are required.

Concurrent — the callback is spawned as a detached tokio task. The event loop continues immediately. Use for fire-and-forget work: logging, analytics, webhook dispatch, or background agent triggering.

Use _concurrent suffixed methods to opt in:

Live::builder()
    // Blocking (default) — client depends on TurnComplete ordering
    .on_turn_complete(|| async { tx.send(TurnComplete).ok(); })

    // Concurrent — fire-and-forget broadcast, doesn't block the pipeline
    .on_extracted_concurrent(|name, val| async move {
        tx.send(StateUpdate { key: name, value: val }).ok();
    })
    .on_error_concurrent(|e| async move {
        webhook::send_alert(&e).await;
    })
    .on_disconnected_concurrent(|reason| async move {
        info!("Disconnected: {reason:?}");
    })

Forced-blocking callbacks (no concurrent variant):

CallbackReason
on_interruptedMust clear interrupted flag before audio resumes
on_tool_callReturn value is the tool response
before_tool_responseTransforms data in the pipeline
on_turn_boundaryContent injection must complete before turn_complete

Tool Dispatch

When the model calls a tool, the dispatch logic follows this priority:

  1. If on_tool_call is registered and returns Some(responses) -- use those responses.
  2. If on_tool_call returns None (or is not registered) and a ToolDispatcher is set -- auto-dispatch to the registered tool, send the result back to the model automatically.
  3. If neither -- log a warning and skip.

Register tools with the dispatcher:

use gemini_adk_rs::{SimpleTool, ToolDispatcher};

let mut dispatcher = ToolDispatcher::new();
dispatcher.register(SimpleTool::new(
    "get_weather",
    "Get current weather for a city",
    |args| async move {
        let city = args["city"].as_str().unwrap_or("unknown");
        Ok(serde_json::json!({ "city": city, "temp_c": 22, "condition": "sunny" }))
    },
));

let handle = Live::builder()
    .model(GeminiModel::Gemini2_0FlashLive)
    .instruction("You are a weather assistant. Use get_weather to answer questions.")
    .tools(dispatcher)
    .on_text(|t| print!("{t}"))
    .connect_google_ai(api_key)
    .await?;

Audio Pipeline

The audio pipeline for a typical voice agent:

Microphone (PCM16 16kHz)
    |
    v
handle.send_audio(bytes)       --- outbound --->  Gemini Live API
                                                       |
                                                  Server-side VAD
                                                  Model inference
                                                       |
on_audio(|data: &Bytes|)       <--- inbound ---   Audio response
    |                                             (PCM16 24kHz)
    v
Speaker / Playback buffer

Key points:

  • Input format: PCM16, 16 kHz, mono. Send raw bytes, not base64. The SDK handles base64 encoding on the wire.
  • Output format: PCM16, 24 kHz, mono. The on_audio callback receives decoded bytes ready for playback.
  • Buffer sizes: Audio arrives in variable-size chunks. Use an AudioJitterBuffer (from L0) if you need smooth playback.
  • Barge-in: When the user speaks while the model is responding, the server sends an Interrupted event. The fast lane sets the interrupted flag and stops forwarding audio; the control lane fires on_interrupted.

Greeting

Use .greeting() to make the model speak first without waiting for user input. The greeting prompt is sent immediately after the WebSocket setup completes.

let handle = Live::builder()
    .model(GeminiModel::Gemini2_5FlashNativeAudio)
    .voice(Voice::Kore)
    .instruction("You are a receptionist at a dental clinic.")
    .greeting("Greet the caller and ask how you can help them today.")
    .on_audio(|data| { playback_tx.send(data.clone()).ok(); })
    .connect_vertex(project, location, token)
    .await?;
// Model will immediately start speaking a greeting

The greeting text is sent as a user-role client_content message with turn_complete: true, which triggers the model to generate a response.

Transcription

Enable text transcription of audio streams to get text versions of what the user said and what the model said:

let handle = Live::builder()
    .model(GeminiModel::Gemini2_0FlashLive)
    .transcription(true, true)  // input, output
    .on_input_transcript(|text, is_final| {
        if is_final {
            println!("User: {text}");
        }
    })
    .on_output_transcript(|text, is_final| {
        if is_final {
            println!("Model: {text}");
        }
    })
    .connect_google_ai(api_key)
    .await?;

Transcription is required for turn extraction (.extract_turns()) to work. When you add an extractor, transcription is enabled automatically.

Session Lifecycle

A session progresses through these phases:

Disconnected --> Connecting --> SetupSent --> Active --> Disconnected
                                               |
                                               +--> GoAway (60s warning)
                                               +--> Interrupted (barge-in)
PhaseDescription
DisconnectedInitial state, or after clean/unclean disconnect
ConnectingWebSocket handshake in progress
SetupSentSetup message sent, waiting for setupComplete
ActiveSession is live, audio/text flowing

The GoAway event signals the server will disconnect in ~60 seconds. Save state and prepare to reconnect. With .session_resume(true), you receive a SessionResumeHandle that can be used to continue the conversation in a new session.

Interacting with a Running Session

The LiveHandle returned by .connect_*() provides the runtime API:

// Send audio (raw PCM16 16kHz bytes)
handle.send_audio(pcm_bytes).await?;

// Send text
handle.send_text("What's the weather?").await?;

// Send video frame (raw JPEG bytes)
handle.send_video(jpeg_bytes).await?;

// Update system instruction mid-session
handle.update_instruction("Now focus on dessert orders.").await?;

// Read state (populated by extractors)
let order: Option<OrderState> = handle.extracted("OrderState");

// Access telemetry
let snapshot = handle.telemetry().snapshot();

// Get current session phase
let phase = handle.phase();

// Subscribe to raw events (for custom processing)
let mut events = handle.subscribe();

// Graceful disconnect
handle.disconnect().await?;

// Wait for session to end naturally
handle.done().await?;

Vertex AI vs Google AI

The SDK supports both Google AI (API key) and Vertex AI (OAuth2 token) backends. The wire protocol is the same; only the endpoint URL and authentication differ.

Google AI

let handle = Live::builder()
    .model(GeminiModel::Gemini2_0FlashLive)
    .connect_google_ai("YOUR_API_KEY")
    .await?;
  • Endpoint: wss://generativelanguage.googleapis.com/v1beta/models/{model}
  • Auth: API key in query parameter

Vertex AI

// Get token via gcloud: gcloud auth print-access-token
let token = std::env::var("GCLOUD_ACCESS_TOKEN")?;

let handle = Live::builder()
    .model(GeminiModel::Gemini2_0FlashLive)
    .connect_vertex("my-gcp-project", "us-central1", token)
    .await?;
  • Endpoint: wss://aiplatform.googleapis.com/v1beta1/projects/{project}/locations/{location}/publishers/google/models/{model}
  • Auth: Bearer token in WebSocket upgrade headers
  • Note: Uses the global endpoint (aiplatform.googleapis.com), not global-aiplatform.googleapis.com

Pre-configured SessionConfig

For advanced scenarios (custom auth, proxy, etc.), build the config yourself and pass it to .connect():

use gemini_genai_rs::prelude::*;

let config = SessionConfig::from_endpoint(
    ApiEndpoint::vertex("my-project", "us-central1", token)
)
    .model(GeminiModel::Gemini2_5FlashNativeAudio)
    .voice(Voice::Kore)
    .response_modalities(vec![Modality::Audio])
    .system_instruction("You are a helpful assistant.")
    .enable_input_transcription()
    .enable_output_transcription();

let handle = Live::builder()
    .on_audio(|data| { /* play audio */ })
    .connect(config)
    .await?;

When using .connect(config), model/voice/instruction settings on the SessionConfig take precedence. The .model() / .voice() / .instruction() methods on the Live builder configure the same underlying SessionConfig, so you can use either approach -- just not both for the same setting.

Key Differences

FeatureGoogle AIVertex AI
AuthAPI key (string)OAuth2 Bearer token
API versionv1betav1beta1
Frame formatText WebSocket framesBinary WebSocket frames
BillingPer-token pricingGCP billing account
RegionGlobalRegional (e.g., us-central1)

Phase System

The phase system models conversations as a state machine. Each phase carries its own instruction, tool filter, and transition rules. The SDK evaluates transition guards after every state mutation and automatically switches phases when conditions are met -- updating the model's instruction, running lifecycle callbacks, and filtering tools, all without manual wiring.

What Are Phases?

A phase is a named conversation stage. A debt collection call might have: disclosure -> verify_identity -> inform_debt -> negotiate -> close. A support call might have: greet -> identify -> investigate -> resolve -> close.

Each phase defines:

  • Instruction -- what the model should do in this stage
  • Transitions -- guard conditions that trigger moves to other phases
  • Tools -- which tools the model can call (optional filter)
  • Lifecycle callbacks -- on_enter / on_exit hooks

Defining Phases

Use the fluent Live::builder() API. Each .phase() call starts a PhaseBuilder that returns to the main builder via .done():

use gemini_adk_fluent_rs::prelude::*;

let handle = Live::builder()
    .model(GeminiModel::Gemini2_0FlashLive)
    .phase("greeting")
        .instruction("Welcome the user warmly and ask how you can help.")
        .transition("main", |s| s.get::<bool>("greeted").unwrap_or(false))
        .done()
    .phase("main")
        .instruction("Handle the user's request.")
        .terminal()
        .done()
    .initial_phase("greeting")
    .connect_vertex(project, location, token)
    .await?;

Key points:

  • .initial_phase("greeting") declares which phase the machine starts in.
  • .terminal() marks a phase with no outbound transitions.
  • The machine is validated at connect time -- missing initial phase or dangling transition targets produce clear errors.

Phase Transitions

Transitions are guard-based: a closure that receives &State and returns bool. When the guard returns true, the machine transitions to the target phase.

.phase("disclosure")
    .instruction(DISCLOSURE_INSTRUCTION)
    // When disclosure_given becomes true, move to verify_identity
    .transition("verify_identity", |s| {
        s.get::<bool>("disclosure_given").unwrap_or(false)
    })
    // Emergency exit: cease-and-desist goes straight to close
    .transition("close", |s| {
        s.get::<bool>("cease_desist_requested").unwrap_or(false)
    })
    .done()

Transitions are evaluated in order -- the first guard that returns true wins. This means you should order transitions from most specific to most general.

Transition Descriptions

Use transition_with() to add a human-readable description to each transition. These descriptions are used by the phase navigation context (see below) to give the model awareness of where it can go and why:

.phase("identify_caller")
    .instruction("Get the caller's full name and organization.")
    .transition_with("determine_purpose", |s| {
        s.get::<String>("caller_name").is_some()
    }, "when caller provides their name")
    .transition_with("take_message", |s| {
        let tc: u32 = s.session().get("turn_count").unwrap_or(0);
        tc >= 12
    }, "after 12 turns if caller refuses to identify")
    .done()

The plain .transition() method still works and sets description: None.

S:: Predicates

The S module provides ergonomic predicate factories that eliminate boilerplate:

use gemini_adk_fluent_rs::prelude::S;

.transition("verify_identity", S::is_true("disclosure_given"))
.transition("negotiate", S::is_true("debt_acknowledged"))
.transition("arrange_payment", S::one_of("negotiation_intent", &["full_pay", "partial_pay"]))
.transition("tech_support", S::eq("issue_type", "technical"))

Available predicates:

  • S::is_true(key) -- key holds true
  • S::eq(key, value) -- key equals the given string
  • S::one_of(key, &[values]) -- key matches any of the given strings

Guards

A phase-level guard prevents the phase from being entered unless a condition is met. This is different from transition guards -- a transition guard decides when to leave a phase, while a phase guard decides whether the phase can be entered.

.phase("verify_identity")
    .instruction(VERIFY_IDENTITY_INSTRUCTION)
    // This phase can only be entered after disclosure is acknowledged
    .guard(S::is_true("disclosure_given"))
    .transition("inform_debt", S::is_true("identity_verified"))
    .done()

If a transition guard fires but the target phase's guard returns false, the machine skips that transition and evaluates the next one in order.

Dynamic Instructions

For instructions that depend on runtime state, use dynamic_instruction:

.phase("discuss")
    .dynamic_instruction(|s| {
        let topic: String = s.get("topic").unwrap_or_default();
        let mood: String = s.get("derived:sentiment").unwrap_or_default();
        format!("Discuss {topic}. The user's mood is {mood}. Adjust tone accordingly.")
    })
    .done()

The closure is evaluated at transition time, so the instruction always reflects current state.

Instruction Modifiers

Modifiers append context to a phase's instruction without replacing it. Three types are available:

StateAppend -- Inject Key/Value Context

.phase("negotiate")
    .instruction("Help the customer resolve their debt.")
    // Appends: [Context: emotional_state=frustrated, willingness_to_pay=0.3]
    .with_state(&["emotional_state", "willingness_to_pay", "derived:call_risk_level"])
    .done()

Conditional -- Append Text When True

fn risk_is_elevated(s: &State) -> bool {
    let risk: String = s.get("derived:call_risk_level").unwrap_or_default();
    risk == "high" || risk == "critical"
}

.phase("negotiate")
    .instruction("Help the customer resolve their debt.")
    .when(risk_is_elevated, "IMPORTANT: Use extra empathy. Never threaten.")
    .done()

CustomAppend -- Arbitrary Formatting

.phase("investigate")
    .instruction("Investigate the issue.")
    .with_context(|state| {
        let items: Vec<String> = state.get("order_items").unwrap_or_default();
        if items.is_empty() {
            String::new()
        } else {
            format!("Current order: {}", items.join(", "))
        }
    })
    .done()

Tool Filtering

Each phase can restrict which tools the model is allowed to call. Tool calls for tools not in the filter are rejected by the processor:

.phase("verify_identity")
    .instruction("Verify the caller's identity.")
    .tools(vec!["verify_identity".into(), "log_compliance_event".into()])
    .done()
.phase("negotiate")
    .instruction("Negotiate a payment plan.")
    .tools(vec!["calculate_payment_plan".into(), "log_compliance_event".into()])
    .done()

Omitting .tools() means all registered tools are available in that phase.

Phase Needs

Declare what state keys a phase requires. The SDK uses these to generate the navigation context (see below), showing the model what information is still missing. This helps guide the conversation without over-constraining the LLM:

.phase("identify_caller")
    .instruction("Get the caller's full name and organization.")
    .needs(&["caller_name", "caller_org"])
    .transition_with("determine_purpose", |s| {
        s.get::<String>("caller_name").is_some()
    }, "when caller provides their name")
    .done()

At runtime, needs are filtered against the current state -- only keys not yet present are shown as "still needed" in the navigation context.

Phase Navigation Context

The .navigation() modifier (available on both PhaseBuilder and phase_defaults) injects a structured description of the phase graph into the model's instruction. This gives the model geolocation awareness:

[Navigation]
Current phase: identify_caller -- Get the caller's full name and organization.
Previous: greeting (turn 2)
Still needed: caller_org
Possible next:
  -> determine_purpose: when caller provides their name
  -> take_message: after 12 turns if caller refuses to identify

This is auto-generated from .needs() keys filtered by state, .transition_with() descriptions, and phase history. Apply it via phase_defaults so all phases benefit:

Live::builder()
    .phase_defaults(|d| d
        .with_state(&["caller_name", "caller_org"])
        .navigation()  // inject navigation context into every phase
    )

The navigation context is stored in session:navigation_context and regenerated on every turn and phase transition.

Phase Lifecycle Callbacks

on_enter and on_exit are async callbacks that run during transitions:

.phase("verify_identity")
    .instruction(VERIFY_IDENTITY_INSTRUCTION)
    .on_enter(|state, writer| async move {
        // Log the transition, initialize phase-specific state
        state.set("verification_attempts", 0u32);
        tracing::info!("Entered verify_identity phase");
    })
    .on_exit(|state, writer| async move {
        // Clean up, log compliance event
        tracing::info!("Exiting verify_identity phase");
    })
    .done()

The callbacks receive State and Arc<dyn SessionWriter>, so you can both mutate state and send messages to the model.

enter_prompt -- Model Speaks on Entry

Use enter_prompt to inject a model-role bridge message and prompt the model to respond immediately when entering a phase. This prevents the "cold start" problem where the model says "how can I help?" after a phase transition:

.phase("verify_identity")
    .instruction(VERIFY_IDENTITY_INSTRUCTION)
    // Model will say this, then continue with the phase instruction
    .enter_prompt("The caller confirmed the disclosure. I'll now verify their identity.")
    .done()

For state-dependent prompts, use enter_prompt_fn:

.phase("close")
    .instruction(CLOSE_INSTRUCTION)
    .enter_prompt_fn(|state, _tw| {
        if state.get::<bool>("cease_desist_requested").unwrap_or(false) {
            "Cease-and-desist requested. Closing call respectfully.".into()
        } else {
            "Wrapping up the call.".into()
        }
    })
    .done()

Phase Defaults

Settings shared across all phases are declared with phase_defaults. These are merged into each phase -- phase-specific modifiers extend (not replace) the defaults:

const DEBT_STATE_KEYS: &[&str] = &[
    "emotional_state",
    "willingness_to_pay",
    "derived:call_risk_level",
    "identity_verified",
    "disclosure_given",
];

Live::builder()
    .phase_defaults(|d| d
        .with_state(DEBT_STATE_KEYS)
        .when(risk_is_elevated, "IMPORTANT: Use extra empathy.")
        .prompt_on_enter(true)
    )
    .phase("disclosure")
        .instruction(DISCLOSURE_INSTRUCTION)
        // Inherits with_state, when(), and prompt_on_enter from defaults
        .done()
    .phase("negotiate")
        .instruction(NEGOTIATE_INSTRUCTION)
        // Phase-specific modifier is appended after defaults
        .with_state(&["negotiation_intent"])
        .done()

Multi-Phase Example

A 3-phase conversation (greeting -> service -> close) combining the features covered above:

Live::builder()
    .model(GeminiModel::Gemini2_0FlashLive)
    .greeting("Greet the user warmly.")
    .phase_defaults(|d| d
        .with_state(&["customer_name", "derived:sentiment"])
        .prompt_on_enter(true)
    )
    .phase("greeting")
        .instruction("Welcome the customer. Ask for their name.")
        .transition("service", |s| s.contains("customer_name"))
        .done()
    .phase("service")
        .instruction("Help the customer with their request.")
        .guard(|s| s.contains("customer_name"))
        .tools(vec!["lookup_account".into(), "process_refund".into()])
        .transition("close", S::is_true("resolved"))
        .when(|s| s.get::<String>("derived:sentiment").unwrap_or_default() == "negative",
              "The customer seems upset. Use extra empathy.")
        .enter_prompt("I have the customer's name. I'll help them now.")
        .done()
    .phase("close")
        .instruction("Thank the customer and wrap up.")
        .terminal()
        .done()
    .initial_phase("greeting")
    .connect_vertex(project, location, token)
    .await?;

For a full 7-phase example with compliance gates, computed state, watchers, and temporal patterns, see apps/gemini-adk-web-rs/src/apps/debt_collection.rs.

How Transitions Are Evaluated

The processor evaluates transitions after every state mutation cycle (extractors run, computed variables update, watchers fire). The evaluation is pure -- it checks guards without side effects:

  1. Get the current phase's transition list.
  2. For each transition (in order), check the guard.
  3. If the guard returns true, check the target phase's guard (if any).
  4. If both pass, execute the transition: on_exit -> update current -> on_enter.
  5. The new phase's instruction (with modifiers applied) is sent to the model.

Terminal phases skip transition evaluation entirely.

For the full turn-complete pipeline, timing diagrams, background agent dispatch, and common pitfalls, see Phase Transitions Deep Dive.

Phase Transitions Deep Dive

How phases, state, extraction, and background agents interact in a live voice session. This guide covers timing, data flow, and common pitfalls with visual diagrams.

The Turn-Complete Pipeline

Every model response ends with a TurnComplete event from the Gemini Live API. This triggers a pipeline on the control lane:

  Gemini API                        Control Lane
  ─────────                         ────────────
  Model speaks...
  Model finishes ─── TurnComplete ──>  1. Reset turn state
                                       2. Finalize transcript
                                       3. Snapshot watched keys (before)
                                       4. Run extractors (filtered by trigger)
                                       5. Recompute derived state
                                       6. Build transcript window
                                       7. Evaluate phase transitions
                                       7b. Regenerate navigation context
                                       7c. Run OnPhaseChange extractors (if transitioned)
                                       8. Fire watchers (before vs after)
                                       9. Check temporal patterns
                                      10. Instruction amendment
                                      11. Instruction template
                                      12. Send instruction update (deduped)
                                      13. Send on_enter context
                                      14. Send turnComplete if prompt_on_enter
                                      15. Turn boundary hook
                                      16. User turn-complete callback
                                      17. Increment turn_count

Key insight: extractors (step 4) run BEFORE transitions (step 7). This means freshly extracted state is available for transition guards. Turn count is incremented LAST (step 17), so guards see the current turn number, not the next one.

Extraction triggers: Step 4 filters extractors by their trigger mode. EveryTurn extractors always run. Interval(n) extractors only run every N turns. AfterToolCall and OnPhaseChange extractors are skipped here and fire at their respective points (after tool dispatch and step 7c).

Navigation context: Step 7b always regenerates the navigation context (stored in session:navigation_context), even if no transition fired. This keeps the model's awareness of its position in the phase graph up to date. Phases using .navigation() will include this context in the instruction.

State Flow: Conversation to Transition

Data flows through the system in one direction per turn cycle:

  ┌─────────────┐    ┌──────────────┐    ┌──────────────┐
  │ Conversation │    │  Extractors  │    │    State     │
  │ (transcript) │───>│ (LLM / regex)│───>│  (derived:)  │
  └─────────────┘    └──────────────┘    └──────┬───────┘
                                                │
                     ┌──────────────────────────┘
                     │
          ┌──────────▼───────────┐
          │   Computed Variables  │
          │  (dependency-sorted)  │
          └──────────┬───────────┘
                     │
       ┌─────────────┼──────────────┐
       │             │              │
  ┌────▼────┐  ┌─────▼─────┐  ┌────▼─────────┐
  │Watchers │  │ Temporal   │  │    Phase      │
  │ (diffs) │  │ Patterns   │  │ Transitions   │
  └────┬────┘  └─────┬─────┘  └────┬─────────┘
       │             │              │
       └─────────────┴──────────────┘
                     │
           ┌─────────▼──────────┐
           │ Instruction Update  │
           │ + prompt_on_enter   │
           └─────────┬──────────┘
                     │
             ┌───────▼───────┐
             │  Model speaks  │
             └───────────────┘

When Do Transitions Fire?

Transitions fire at step 7 of the turn-complete pipeline. By this point, all extractors have run and computed variables have been recalculated.

Timeline of a Typical Turn

Time ─────────────────────────────────────────────────────>

User speaks    Model responds       TurnComplete fires
   │               │                       │
   ▼               ▼                       ▼
┌──────┐    ┌────────────┐    ┌─────────────────────────────────┐
│ Audio │───>│ Model turn │───>│ Pipeline:                       │
│ input │    │ (speech)   │    │  4. Extract: "caller_name=Jane" │
└──────┘    └────────────┘    │  5. Computed: risk_level=low    │
                              │  7. Transition: greeting→main   │
                              │ 12. Update instruction          │
                              │ 14. prompt_on_enter → model     │
                              └────────────────────────────────┘
                                                │
                                     ┌──────────▼──────────┐
                                     │ Model speaks in new │
                                     │ phase with updated  │
                                     │ instruction         │
                                     └─────────────────────┘

Transition Guards: What Works, What Doesn't

Good: State-dependent guards

These wait for real data from the conversation:

// Wait for extraction to populate caller_name
.transition("identify", |s| s.get::<String>("caller_name").is_some())

// Wait for a boolean flag from tool execution
.transition("negotiate", S::is_true("debt_acknowledged"))

// Wait for one of several values
.transition("payment", S::one_of("intent", &["full_pay", "partial_pay"]))

Bad: Unconditional guards

// BUG: fires on the FIRST turn_complete — before user speaks!
.transition("next_phase", |_s| true)

Why this breaks:

  Session connects
       │
       ▼
  ┌───────────────────────┐
  │ greeting phase enters │
  │ prompt_on_enter fires │
  │ Model: "Hello!"       │
  │                       │ TurnComplete
  │ Guard: true ──────────┼────> Transition fires!
  └───────────────────────┘     (user hasn't spoken yet)
       │
       ▼
  ┌──────────────────────────────┐
  │ next_phase enters            │
  │ enter_prompt: "User said..." │  ← LIE: user said nothing
  │ Model HALLUCINATES response  │
  └──────────────────────────────┘

Fix: Turn-count guards for greeting phases

.phase("greeting")
    .instruction("Welcome the caller.")
    .transition("identify", |s| {
        // Turn 0 = prompt_on_enter (no user input yet)
        // Turn 1 = greeting model response
        // Turn 2+ = user has spoken at least once
        let tc: u32 = s.session().get("turn_count").unwrap_or(0);
        tc >= 2
    })
    .done()

Better: Combine turn count with state check

.transition("identify", |s| {
    let tc: u32 = s.session().get("turn_count").unwrap_or(0);
    let has_name = s.get::<String>("caller_name").is_some();
    tc >= 2 || has_name  // user spoke, or extraction already got the name
})

enter_prompt: How It Works

enter_prompt injects a Content::model() message when entering a phase. This appears in the conversation as the model's own previous speech, giving it continuity across the phase boundary.

  Phase A (exiting)              Phase B (entering)
  ──────────────────             ──────────────────

  Model: "How can I help?"      Instruction updated to Phase B
                                enter_prompt injected as Content::model():
                                  "I have the caller's name. I'll verify."
                                turnComplete:true sent
                                       │
                                       ▼
                                Model sees its "own" previous output
                                and generates a coherent continuation

Pitfall: False context in enter_prompt

// BAD: claims something that hasn't happened
.enter_prompt("The caller has responded with their name and reason.")

// GOOD: states the agent's intent (doesn't assert facts about the user)
.enter_prompt("I'll now verify the caller's identity.")

// BEST: state-aware prompt that reflects actual state
.enter_prompt_fn(|state, _tw| {
    let name: String = state.get("caller_name").unwrap_or_default();
    format!("The caller identified as {name}. I'll check our records.")
})

Phase Transition + Extraction Interplay

The most common pattern: extractors populate state, transitions check it.

  Turn 1: User says "Hi, I'm Jane Smith from Acme Corp"
  ─────────────────────────────────────────────────────

  Model responds: "Hello Jane! How can I help?"

  TurnComplete fires:
    Step 4 ─ LlmExtractor runs ──> caller_name="Jane Smith"
                                    caller_org="Acme Corp"
                                    intent="unknown"

    Step 5 ─ Computed vars ──────> is_known_contact=true (lookup)

    Step 7 ─ Transitions:
             greeting guard: caller_name.is_some() ── true!
             ──> transition to identify_purpose

    Step 12 ─ Instruction update: "Ask Jane why she's calling"
    Step 14 ─ prompt_on_enter ──> model speaks in new phase

What happens when extraction fails

  Turn 1: User says "Hi, I'm Jane Smith"
  ─────────────────────────────────────────

  TurnComplete fires:
    Step 4 ─ LlmExtractor FAILS (401 auth error)
             ──> on_extraction_error callback fires
             ──> NO state written

    Step 7 ─ Transitions:
             greeting guard: caller_name.is_some() ── false
             ──> NO transition, stays in greeting

    Model continues in greeting phase (correct behavior)

This is why state-dependent guards are self-healing: if extraction fails, the guard simply doesn't fire, and the conversation stays in the current phase until extraction succeeds.

Phase-Scoped Tool Filtering

Each phase can restrict which tools the model may call. The processor rejects calls to tools not in the phase's list.

  Phase: greeting              Phase: determine_purpose
  ┌──────────────────────┐     ┌──────────────────────────┐
  │ tools: [             │     │ tools: [                 │
  │   "check_contact"    │     │   "check_calendar"       │
  │ ]                    │     │   "check_availability"   │
  │                      │     │ ]                        │
  │ Model calls          │     │                          │
  │ "check_calendar" ──X │     │ Model calls              │
  │ REJECTED (not in     │     │ "check_calendar" ──✓     │
  │ phase tools)         │     │ ALLOWED                  │
  └──────────────────────┘     └──────────────────────────┘

If a phase omits .tools(), ALL registered tools are available.

Why tools become "unreachable"

  greeting ──(needs caller_name)──> determine_purpose
                                         │
                                    check_calendar
                                    is ONLY here

  If extraction fails:
    caller_name never set
    determine_purpose never reached
    check_calendar never available
    Model says "I can't check the calendar"

Fix: ensure extraction works (auth, schema), or make critical tools available in multiple phases.

Callback Modes: Blocking vs Concurrent

Control-lane callbacks support two execution modes:

  Blocking (default)              Concurrent
  ──────────────────              ──────────

  Event ──> callback ──> await    Event ──> tokio::spawn(callback)
            (blocks)     done               (fire-and-forget)
                          │                       │
                     next event              next event
                                          (immediately)

When to use each

Use CaseModeWhy
State mutationBlockingNext event needs the state
Tool responseBlocking (forced)Return value IS the response
LoggingConcurrentDon't block the pipeline
Analytics webhookConcurrentFire and forget
Background agentConcurrentLong-running, don't block
Error notificationConcurrentNon-critical side effect

L2 API

Live::builder()
    // Blocking (default) — awaited inline
    .on_turn_complete(|| async { update_dashboard().await; })

    // Concurrent — spawned, doesn't block pipeline
    .on_turn_complete_concurrent(|| async { log_to_cloud().await; })

    // Concurrent error/lifecycle callbacks
    .on_error_concurrent(|msg| async move { webhook(&msg).await; })
    .on_disconnected_concurrent(|reason| async move { cleanup(reason).await; })
    .on_extracted_concurrent(|name, val| async move { broadcast(name, val).await; })

Forced-blocking callbacks (no concurrent variant)

CallbackWhy forced blocking
on_tool_callReturn value IS the tool response
on_interruptedMust clear state before audio resumes
before_tool_responseTransforms data in the pipeline
on_turn_boundaryContent injection must complete first

Background Agent Dispatch

Fire-and-forget agent execution from callbacks. The agent runs independently while the voice conversation continues.

  Voice Session (Live)            Background Agent
  ────────────────────            ────────────────

  Turn completes
       │
  on_turn_complete fires
       │
       ├── dispatch agent ──────> Agent runs generate()
       │   (fire-and-forget)      against flash LLM
       │                          │
  Next turn continues             │ Agent reads State
  (no blocking)                   │ Agent writes State
       │                          │
       │                          ▼
       │                     Agent completes
       │                     Results in State
       │                          │
  Next turn_complete              │
       │                          │
  Transition guard checks ◄───────┘
  state set by agent

Using BackgroundAgentDispatcher

use gemini_adk_rs::live::BackgroundAgentDispatcher;

let bg_dispatcher = BackgroundAgentDispatcher::new();

let handle = Live::builder()
    .on_extracted_concurrent({
        let bg = bg_dispatcher.clone();
        let llm = flash_llm.clone();
        move |name, value| {
            let bg = bg.clone();
            let llm = llm.clone();
            async move {
                if name == "CallerState" {
                    // Dispatch a background agent to analyze the caller
                    let analyzer = AgentBuilder::new("caller_analyzer")
                        .instruction("Analyze caller risk profile")
                        .build(llm);
                    bg.dispatch("analyze_caller", analyzer, state.clone());
                }
            }
        }
    })
    .connect(config).await?;

Using agent_tool for synchronous agent dispatch

When the model needs to wait for the agent's result:

let verifier = AgentBuilder::new("verifier")
    .instruction("Verify caller identity against database")
    .build(llm.clone());

Live::builder()
    .agent_tool("verify_identity", "Verify caller", verifier)
    .phase("verify")
        .tools(vec!["verify_identity".into()])
        .transition("main", S::is_true("identity_verified"))
        .done()
  Model calls "verify_identity"
       │
       ▼
  TextAgentTool runs
  (synchronous — model waits)
       │
       ├── Agent calls generate() on flash LLM
       │   Agent reads/writes shared State
       │   Agent returns result
       │
       ▼
  FunctionResponse sent to model
  Model continues with result

Background Tool Execution (Zero Dead Air)

For tools that take seconds (DB queries, API calls, agent pipelines), background execution eliminates silence in voice sessions:

  Standard tool                 Background tool
  ─────────────                 ───────────────

  Model: "Let me check..."     Model: "Let me check..."
       │                             │
  ┌────▼────────────┐          ┌─────▼─────────────┐
  │ Tool executes   │          │ Ack sent: "running"│──> Model receives ack
  │ (3 seconds)     │          └─────┬─────────────┘    Model keeps talking:
  │                 │                │                   "While I look that up..."
  │ Dead air...     │          ┌─────▼─────────────┐
  │                 │          │ Tool executes      │
  └────┬────────────┘          │ (in background)    │
       │                       │ (3 seconds)        │
  Model gets result            └─────┬─────────────┘
  Model speaks                       │
                               Result injected
                               Model incorporates naturally

L2 API

Live::builder()
    .tools(dispatcher)
    .tool_background("search_knowledge_base")
    .tool_background_with_formatter("analyze_doc", Arc::new(VerboseFormatter))
    .connect_vertex(project, location, token)
    .await?;

Complete Example: Call Screening Pipeline

A 7-phase call screening system showing how all the pieces fit together:

  ┌─────────────────────────────────────────────────────────┐
  │                    SESSION START                         │
  │  Extraction LLM: gemini-2.5-flash (VertexAI)           │
  │  Live model: gemini-2.0-flash-live (VertexAI)          │
  │  Transcription: input + output enabled                  │
  └────────────────────────┬────────────────────────────────┘
                           │
                           ▼
  ┌─────────────────────────────────────────────────────────┐
  │  PHASE: greeting                                        │
  │  Tools: [check_contact_list]                            │
  │  Guard: tc >= 2 (user must speak before transitioning)  │
  │                                                         │
  │  Model: "Hello, you've reached Alex Rivera's office."   │
  │  User: "Hi, I'm Jane Smith from Marketing."             │
  │                                                         │
  │  TurnComplete:                                          │
  │    Extract: caller_name="Jane Smith"                    │
  │    Extract: caller_org="Marketing"                      │
  │    Computed: is_known → check_contact_list              │
  │    Watcher: is_known_contact=true fires                 │
  │    Guard: tc=2 → transition!                            │
  └────────────────────────┬────────────────────────────────┘
                           │
                           ▼
  ┌─────────────────────────────────────────────────────────┐
  │  PHASE: identify_caller                                 │
  │  Tools: [check_contact_list]                            │
  │  enter_prompt: "Ask for full name and organization."    │
  │                                                         │
  │  Guard: caller_name.is_some() → determine_purpose       │
  │  Guard: tc >= 3 && name.is_none() → take_message        │
  └────────────────────────┬────────────────────────────────┘
                           │ (caller_name already set)
                           ▼
  ┌─────────────────────────────────────────────────────────┐
  │  PHASE: determine_purpose                               │
  │  Tools: [check_calendar]     ← NOW AVAILABLE            │
  │                                                         │
  │  Model: "How can I help you today?"                     │
  │  User: "I need to discuss the Q3 budget."               │
  │                                                         │
  │  TurnComplete:                                          │
  │    Extract: call_purpose="Q3 budget discussion"         │
  │    Extract: urgency=0.5                                 │
  │    Guard: call_purpose.is_some() → screen_decision      │
  └────────────────────────┬────────────────────────────────┘
                           │
                           ▼
  ┌─────────────────────────────────────────────────────────┐
  │  PHASE: screen_decision                                 │
  │  Tools: [transfer_call, take_message, block_caller]     │
  │  Computed: screen_recommendation = "transfer"           │
  │            (known contact → auto-transfer)              │
  │                                                         │
  │  Guard: is_known || urgency > 0.8 → transfer            │
  │  Guard: caller_blocked → farewell                       │
  │  Guard: !known && urgency <= 0.8 → take_message         │
  └────────────────────────┬────────────────────────────────┘
                           │ (known contact)
                           ▼
  ┌─────────────────────────────────────────────────────────┐
  │  PHASE: transfer                                        │
  │  Tools: [transfer_call]                                 │
  │  Model calls transfer_call → state: call_transferred    │
  │  Guard: call_transferred → farewell                     │
  └────────────────────────┬────────────────────────────────┘
                           │
                           ▼
  ┌─────────────────────────────────────────────────────────┐
  │  PHASE: farewell (terminal)                             │
  │  Model: "I'm connecting you now. Have a great call!"    │
  └─────────────────────────────────────────────────────────┘

Reactive overlays running in parallel

  Watchers (fire on state diffs):
  ─────────────────────────────────────────────────
  urgency_level crossed_above(0.8)  → alert UI
  is_known_contact became_true      → prioritize call
  caller_sentiment changed_to("hostile") → show warning

  Temporal patterns (fire on sustained conditions):
  ─────────────────────────────────────────────────
  caller impatient for 20s  → inject de-escalation prompt
  screening stalled 4 turns → suggest taking a message

  Computed variables (recalculate on dependency change):
  ─────────────────────────────────────────────────
  screen_recommendation = f(is_known, urgency, sentiment)

Design Rules for Phase Transitions

1. Greeting phases need turn-count guards

The greeting is model-initiated. The first TurnComplete is the greeting itself, not a user response. Always gate on tc >= 2:

.phase("greeting")
    .instruction("Welcome the caller.")
    .transition("next", |s| {
        s.session().get::<u32>("turn_count").unwrap_or(0) >= 2
    })
    .done()

2. Use state-dependent guards, not unconditional ones

// BAD: fires immediately, before any meaningful state exists
.transition("next", |_| true)

// GOOD: waits for real data
.transition("next", S::is_true("disclosure_given"))
.transition("next", |s| s.get::<String>("caller_name").is_some())

3. Order transitions from specific to general

Guards are evaluated in order. First match wins:

.phase("screening")
    // Most specific: hostile caller → decline immediately
    .transition("farewell", |s| {
        s.get::<String>("sentiment").as_deref() == Some("hostile")
    })
    // Specific: known contact or urgent → transfer
    .transition("transfer", |s| {
        s.get::<bool>("is_known").unwrap_or(false)
        || s.get::<f64>("urgency").unwrap_or(0.0) > 0.8
    })
    // General: unknown, not urgent → take message
    .transition("take_message", |s| {
        s.get::<String>("call_purpose").is_some()
    })
    .done()

4. Use phase guards for prerequisite enforcement

.phase("negotiate")
    // Cannot enter until identity is verified
    .guard(S::is_true("identity_verified"))
    .instruction("Negotiate a payment plan.")
    .done()

If a transition guard fires but the target's phase guard fails, the machine skips it and evaluates the next transition.

5. enter_prompt should state intent, not assert facts

// BAD: asserts something about the user that may be false
.enter_prompt("The caller provided their details and reason for calling.")

// GOOD: states the agent's intent (always true)
.enter_prompt("I'll now verify the caller's identity.")

// BEST: state-aware, reflects actual extracted data
.enter_prompt_fn(|state, _tw| {
    let name: String = state.get("caller_name").unwrap_or("the caller".into());
    format!("I'll verify {name}'s identity now.")
})

6. Make transitions resilient to extraction failure

If extraction fails (network error, 401, malformed response), no state is written. Your transition guards should handle this gracefully:

// Self-healing: if extraction fails, guard stays false, no transition
.transition("next_phase", |s| s.get::<String>("caller_name").is_some())

// Fallback: if stuck too long, offer an alternative
.transition("take_message", |s| {
    let tc: u32 = s.session().get("turn_count").unwrap_or(0);
    let name: Option<String> = s.get("caller_name");
    tc >= 5 && name.is_none()  // 5 turns without a name → give up
})

7. Use concurrent callbacks for fire-and-forget work

// BAD: blocks the pipeline for a webhook call
.on_extracted(|name, val| async move {
    slow_webhook(&name, &val).await;  // 500ms blocks next event!
})

// GOOD: fire-and-forget, pipeline continues immediately
.on_extracted_concurrent(|name, val| async move {
    slow_webhook(&name, &val).await;  // runs in background
})

Debugging Phase Transitions

Enable tracing

// In your main.rs or app setup
tracing_subscriber::fmt()
    .with_env_filter("gemini_adk_rs::live::processor=debug")
    .init();

Key log lines to watch

DEBUG processor: Phase transition: greeting -> identify_caller
DEBUG processor: Instruction updated (123 chars)
DEBUG processor: Extractor "CallerState" produced 5 fields
WARN  processor: Extraction failed: LLM request failed: API error 401
DEBUG processor: Turn 3 complete, turn_count=3

Common symptoms and causes

SymptomLikely Cause
Model hallucinates user inputUnconditional transition + misleading enter_prompt
Phase never transitionsExtraction failing (check on_extraction_error)
"Tool not available"Tool scoped to unreachable phase
Model repeats itselfNo transition guard matches (stuck in phase)
Callback blocks pipelineBlocking callback doing slow I/O (use _concurrent)

State Management

The State type is a shared, concurrent key-value store that flows through every component of a live session: callbacks, tool calls, extractors, watchers, phases, and computed variables. Values are stored as serde_json::Value and deserialized on read, giving you type safety without a rigid schema.

Reading and Writing

use gemini_adk_rs::State;

let state = State::new();

// Write any serializable value
state.set("customer_name", "Alice");
state.set("turn_count", 5u32);
state.set("scores", vec![0.8, 0.9, 0.7]);

// Read with type inference
let name: Option<String> = state.get("customer_name");
let count: Option<u32> = state.get("turn_count");

// Read with a default fallback
let count: u32 = state.get("turn_count").unwrap_or(0);

// Check existence
if state.contains("customer_name") {
    // ...
}

// Remove a key
let removed: Option<serde_json::Value> = state.remove("customer_name");

Zero-Copy Reads

For hot paths where you want to avoid cloning, use with() to borrow the underlying Value directly through the DashMap ref-guard:

// Borrow without cloning
let len = state.with("customer_name", |v| v.as_str().unwrap().len());

// Avoids: state.get_raw("key").map(|v| ...) which clones the Value

Prefix Scoping

Every state key belongs to a namespace defined by its prefix. Prefixes establish ownership, lifecycle, and read/write rules:

PrefixWho writesLifecycleExample keys
session:SDK (SessionSignals)Entire sessionsession:is_user_speaking
derived:SDK (ComputedRegistry)Recomputed each turnderived:sentiment_score
turn:SDK / User codeCleared each turnturn:transcript
app:User codeEntire sessionapp:order_total
user:User codeEntire sessionuser:name
bg:Background tasksEntire sessionbg:search_failed
temp:User codeNo automatic lifecycletemp:scratch

Keys without a prefix (e.g. "customer_name") are valid and commonly used for extractor-populated fields. The prefix convention is for organizational clarity, not enforcement -- the store does not reject unprefixed keys.

Scoped Accessors

Each prefix has a corresponding accessor that automatically prepends the prefix. This reduces typos and keeps code clean:

// These two are equivalent:
state.set("app:flag", true);
state.app().set("flag", true);

// Reading
let flag: Option<bool> = state.app().get("flag");

// Listing keys in a scope (prefix stripped from results)
let app_keys: Vec<String> = state.app().keys();
// Returns: ["flag"] not ["app:flag"]

// Other scoped accessors
state.session().set("turn_count", 5);
state.user().set("name", "Alice");
state.turn().set("transcript", "hello");
state.bg().set("task_id", "abc-123");
state.temp().set("scratch", 42);

// derived() is read-only -- no set() or remove()
let score: Option<f64> = state.derived().get("sentiment_score");

Atomic Modify

When multiple components read-modify-write the same key, use modify() to avoid lost updates. It reads the current value (or a default), applies your function, and writes the result back:

// Increment a counter (uses 0 if key doesn't exist yet)
let new_count = state.modify("turn_count", 0u32, |n| n + 1);

// Toggle a boolean
state.modify("muted", false, |b| !b);

// Append to a running total
state.modify("app:total_score", 0.0f64, |total| total + new_score);

Note: modify() uses the same DashMap as get/set. It is atomic in the sense that no other modify on the same key can interleave, but it is not a database transaction.

Derived Fallback

When you call state.get("risk") and the key "risk" does not exist, State automatically checks "derived:risk" as a fallback. This means computed variables are accessible without the prefix tax:

// ComputedRegistry writes to "derived:risk_level"
// You can read it either way:
let risk: Option<String> = state.get("derived:risk_level");
let risk: Option<String> = state.get("risk_level"); // same result

// Direct key wins if both exist:
state.set("score", 1.0);
state.set("derived:score", 0.5);
let score: f64 = state.get("score").unwrap(); // returns 1.0

The fallback only triggers for unprefixed keys. state.get("app:risk") will never fall back to "derived:risk".

StateKey -- Type-Safe Keys

For keys used in multiple places, define a StateKey<T> constant to eliminate string typos and enforce type consistency at compile time:

use gemini_adk_rs::state::StateKey;

const TURN_COUNT: StateKey<u32> = StateKey::new("session:turn_count");
const SENTIMENT: StateKey<f64> = StateKey::new("derived:sentiment_score");
const USER_NAME: StateKey<String> = StateKey::new("user:name");

// Usage
state.set_key(&TURN_COUNT, 5);
let count: Option<u32> = state.get_key(&TURN_COUNT);

// Zero-copy borrow with typed key
let val = state.with_key(&TURN_COUNT, |v| v.as_u64().unwrap());

// Interoperable with raw string access
assert_eq!(state.get::<u32>("session:turn_count"), Some(5));

Delta Tracking

Delta tracking creates a transactional view of state. Writes go to a separate delta map that can be committed or rolled back:

let state = State::new();
state.set("committed_key", "original");

// Create a delta-tracking view (shares the same backing store)
let tracked = state.with_delta_tracking();

// Writes go to delta, not to the committed store
tracked.set("new_key", "pending");
assert!(tracked.contains("new_key"));    // visible through tracked
assert!(!state.contains("new_key"));     // NOT visible in original

// Reads check delta first, then committed store
let val: String = tracked.get("committed_key").unwrap(); // reads from committed

// Commit: merges delta into the committed store
tracked.commit();
assert!(state.contains("new_key")); // now visible everywhere

// Or rollback: discards all pending changes
tracked.rollback();

Useful for extractor pipelines where you want to validate extracted data before committing it to the shared state.

State in Tool Calls

The on_tool_call callback receives State so you can promote tool results into state keys that watchers and phase transitions react to:

Live::builder()
    .on_tool_call(|calls, state| async move {
        // Let the dispatcher handle execution, but promote results
        None // returning None means "auto-dispatch"
    })
    .before_tool_response(|responses, state| async move {
        // Inspect tool results and promote to state
        for r in &responses {
            if r.name == "verify_identity" {
                if r.response.get("verified") == Some(&json!(true)) {
                    state.set("identity_verified", true);
                }
            }
        }
        responses
    })

Auto-Tracked Session State

SessionSignals automatically writes session-level signals to the session: prefix. You never need to set these manually:

KeyTypeUpdated on
session:is_user_speakingboolVoiceActivityStart/End
session:is_model_speakingboolPhaseChanged(ModelSpeaking)
session:interrupt_countu64Each interruption
session:error_countu64Each error event
session:last_errorStringEach error event
session:silence_msu64Periodic flush (~100ms)
session:elapsed_msu64Periodic flush (~100ms)
session:remaining_budget_msu64Periodic flush (~100ms)
session:go_away_receivedboolGoAway from server
session:go_away_time_left_msu64GoAway with time left
session:resumableboolSessionResumeHandle
session:total_token_countu32Each UsageMetadata event
session:prompt_token_countu32Each UsageMetadata event
session:response_token_countu32Each UsageMetadata event
session:cached_content_token_countu32Each UsageMetadata event
session:thoughts_token_countu32Each UsageMetadata event
session:last_input_transcriptionStringEach input transcription
session:last_output_transcriptionStringEach output transcription
session:phaseStringPhaseChanged
session:session_typeStringConnected / mark_video_sent
session:disconnectedboolDisconnected

Read them anywhere:

let speaking: bool = state.session().get("is_user_speaking").unwrap_or(false);
let elapsed: u64 = state.session().get("elapsed_ms").unwrap_or(0);
let budget: u64 = state.session().get("remaining_budget_ms").unwrap_or(0);

Utility Methods

// Snapshot specific keys (for diffing later)
let snap = state.snapshot_values(&["score", "mood"]);

// Diff against a previous snapshot
state.set("score", 99);
let diffs = state.diff_values(&snap, &["score", "mood"]);
// diffs: [("score", old_value, new_value)]

// Pick a subset of keys into a new State
let subset = state.pick(&["name", "score"]);

// Merge another state in (overwrites on conflict)
state.merge(&other_state);

// Rename a key
state.rename("old_key", "new_key");

// Clear all keys with a given prefix
state.clear_prefix("turn:");

State Watchers & Temporal Patterns

Watchers and temporal patterns are reactive primitives that fire callbacks when state conditions are met. Watchers respond to value changes (numeric thresholds, boolean flips, value transitions). Temporal patterns respond to time-based conditions (sustained state, event rates, consecutive turns). Together they let you build escalation logic, compliance monitoring, and adaptive behavior without polling.

What Are Watchers?

A watcher observes a single state key and fires an async action when a predicate matches the state diff. The SDK evaluates all watchers after each mutation cycle (extractors + computed variables), comparing a snapshot of watched keys taken before mutations to the values after.

use gemini_adk_fluent_rs::prelude::*;

Live::builder()
    .watch("app:score")
        .crossed_above(0.9)
        .then(|old, new, state| async move {
            state.set("high_score_alert", true);
            tracing::info!("Score crossed 0.9: {old} -> {new}");
        })

The .watch(key) call starts a WatchBuilder. You chain a predicate, then .then(action) to complete it and return to the Live builder.

Numeric Watchers

Fire when a numeric value crosses a threshold in a specific direction.

crossed_above

Fires when the old value was below the threshold and the new value is at or above it. Does not fire again if the value stays above the threshold.

// Fire when willingness_to_pay crosses above 0.7
.watch("willingness_to_pay")
    .crossed_above(0.7)
    .then(|_old, new, _state| async move {
        tracing::info!("Willingness crossed above 0.7: {new}");
    })

crossed_below

Fires when the old value was at or above the threshold and the new value drops below it.

// Fire when sentiment drops below 0.3
.watch("derived:sentiment_score")
    .crossed_below(0.3)
    .then(|_old, new, state| async move {
        state.set("low_sentiment_alert", true);
        tracing::warn!("Sentiment dropped below 0.3: {new}");
    })

Both predicates require numeric JSON values. Non-numeric values (strings, bools) will not trigger the watcher.

Boolean Watchers

Fire on boolean state transitions.

became_true

Fires when the value changes from any non-true value to true. This includes the transition from not-set (null) to true.

// Fire when cease_desist_requested flips to true
.watch("cease_desist_requested")
    .became_true()
    .blocking()  // await this action before continuing
    .then(|_old, _new, state| async move {
        state.set("cease_desist_active", true);
        tracing::warn!("Cease-and-desist requested");
    })

became_false

Fires when the value changes from true to any non-true value.

// Fire when identity_verified reverts to false
.watch("identity_verified")
    .became_false()
    .then(|_old, _new, _state| async move {
        tracing::warn!("Identity verification revoked");
    })

Value Watchers

changed

Fires on any change to the watched key, regardless of old or new value. This is the default predicate if you call .then() without setting one.

.watch("negotiation_intent")
    .changed()
    .then(|old, new, _state| async move {
        tracing::info!("Intent changed: {old} -> {new}");
    })

changed_to

Fires only when the new value equals a specific serde_json::Value.

use serde_json::json;

// Fire when negotiation_intent becomes "dispute"
.watch("negotiation_intent")
    .changed_to(json!("dispute"))
    .then(|_old, _new, _state| async move {
        tracing::warn!("Debtor is disputing the debt");
    })

Blocking vs Concurrent

By default, watcher actions are spawned concurrently. Use .blocking() to make the processor await the action before continuing. Use blocking for actions that set state other watchers or phases depend on. Use concurrent (the default) for fire-and-forget side effects like logging and notifications.

Temporal Patterns

Temporal patterns detect conditions that unfold over time. Unlike watchers (which react to single state diffs), temporal patterns track duration, rates, and consecutive counts.

when_sustained -- State Held for Duration

Fires when a state-based condition remains true for at least the specified duration. Resets if the condition becomes false. Requires periodic timer checks, which the SDK handles automatically.

// Fire when sentiment stays below 0.4 for 30 seconds
.when_sustained(
    "sustained_frustration",
    |s| {
        let sentiment: f64 = s.get("derived:sentiment_score").unwrap_or(0.5);
        sentiment < 0.4
    },
    Duration::from_secs(30),
    |state, writer| async move {
        // Inject a de-escalation prompt
        writer.send_client_content(
            vec![Content::user("[System: User appears frustrated. Use empathetic tone.]")],
            false,
        ).await.ok();
        state.set("de_escalation_triggered", true);
    },
)

How it works internally:

  1. First check where condition is true: records the start time. Does not fire.
  2. Subsequent checks while condition holds: compares elapsed time to duration.
  3. When elapsed >= duration: fires the action.
  4. If condition becomes false at any point: resets the start time.

when_rate -- Event Rate Threshold

Fires when at least count matching events occur within a sliding time window. The filter function selects which SessionEvent types count.

use gemini_genai_rs::session::SessionEvent;

// Fire when 3+ interruptions happen within 60 seconds
.when_rate(
    "rapid_interruptions",
    |evt| matches!(evt, SessionEvent::Interrupted),
    3,
    Duration::from_secs(60),
    |_state, writer| async move {
        writer.update_instruction(
            "The user is interrupting frequently. Speak more concisely.".into()
        ).await.ok();
    },
)

Old timestamps outside the window are automatically expired on each check.

when_turns -- Consecutive Turn Threshold

Fires when a condition is true for N consecutive turns. Resets the counter if the condition is false on any turn.

// Fire when no_progress is true for 5 consecutive turns
.when_turns(
    "stalled_conversation",
    |s| {
        let progress: bool = s.get("making_progress").unwrap_or(true);
        !progress
    },
    5,
    |state, writer| async move {
        state.set("escalation_needed", true);
        writer.send_text(
            "It seems we're having difficulty. Let me connect you with a specialist.".into()
        ).await.ok();
    },
)

Computed Variables

Computed variables are pure functions of other state keys. They auto-recalculate when dependencies change and write results to the derived: prefix.

Live::builder()
    // Simple computed var: sentiment from emotion
    .computed("sentiment_score", &["emotional_state"], |state| {
        let emotion: String = state.get("emotional_state")?;
        let score = match emotion.as_str() {
            "cooperative" => 0.9,
            "calm" => 0.7,
            "frustrated" => 0.4,
            "angry" => 0.2,
            _ => 0.5,
        };
        Some(serde_json::json!(score))
    })
    // Computed var that depends on another computed var
    .computed("call_risk_level", &["derived:sentiment_score", "cease_desist_requested"], |state| {
        let sentiment: f64 = state.get("derived:sentiment_score").unwrap_or(0.5);
        let cease_desist: bool = state.get("cease_desist_requested").unwrap_or(false);
        let level = if cease_desist { "critical" }
            else if sentiment < 0.3 { "high" }
            else if sentiment < 0.5 { "medium" }
            else { "low" };
        Some(serde_json::json!(level))
    })

Key behaviors:

  • Dependency ordering: topologically sorted, dependencies evaluated first
  • Change detection: watchers only fire on keys that actually changed
  • Derived fallback: written to derived:{key}, readable without prefix (see State Management)
  • Cycle detection: panics at registration if you create circular dependencies
  • Returning None: skips the key (no write, no change detection)

Watcher + Phase Integration

Watchers and phases work together naturally. A watcher can set state that triggers a phase transition, or a phase transition can create state that a watcher reacts to.

Pattern: Watcher triggers phase transition

Live::builder()
    // Watcher sets state when cease-and-desist is requested
    .watch("cease_desist_requested")
        .became_true()
        .blocking()
        .then(|_old, _new, state| async move {
            state.set("cease_desist_active", true);
        })
    // Phase transition reacts to that state
    .phase("negotiate")
        .instruction(NEGOTIATE_INSTRUCTION)
        .transition("close", S::is_true("cease_desist_requested"))
        .done()

Pattern: Computed var drives phase transition

Live::builder()
    .computed("risk_level", &["derived:sentiment_score"], |state| {
        let sentiment: f64 = state.get("derived:sentiment_score").unwrap_or(0.5);
        if sentiment < 0.3 { Some(json!("high")) }
        else { Some(json!("low")) }
    })
    .phase("normal")
        .instruction("Handle the conversation normally.")
        .transition("de_escalate", S::eq("risk_level", "high"))
        .done()
    .phase("de_escalate")
        .instruction("The user is upset. De-escalate with empathy.")
        .terminal()
        .done()

Real-World Example: Debt Collection Escalation

The debt collection demo (apps/gemini-adk-web-rs/src/apps/debt_collection.rs) combines all reactive primitives in a single builder chain:

  1. Computed chain: emotional_state (from extractor) -> sentiment_score -> call_risk_level
  2. Watchers: crossed_below(0.3) on sentiment triggers alerts; became_true on cease_desist_requested runs a blocking action
  3. Temporal: when_sustained detects 30 seconds of frustration; when_rate catches 3+ interruptions in 60 seconds; when_turns flags 5 consecutive stalled turns
  4. Phase defaults: inject computed state into every phase instruction and conditionally append empathy warnings
  5. Transitions: guards use S::is_true, S::eq, S::one_of to move between 7 phases with compliance gates

The data flows in one direction: extractors populate raw state -> computed variables derive higher-level signals -> watchers react to changes -> temporal patterns detect sustained conditions -> phase transitions evaluate guards. All within a single turn cycle.

Evaluation Order

After each turn, the control lane processes mutations in this order:

  1. Extractors run and write to state (e.g. emotional_state, willingness_to_pay)
  2. Computed variables recompute in dependency order
  3. Watchers evaluate against the diff (snapshot before vs after)
  4. Temporal patterns check against current state and event
  5. Phase transitions evaluate guards

This means a computed variable can react to extractor output, a watcher can react to a computed variable's change, and a phase transition can react to state set by a watcher -- all within a single turn cycle.

Tool System

Tools let the model call your Rust functions during a live session. Gemini sends a FunctionCall, your tool executes, and you return a FunctionResponse.

SimpleTool

The quickest way to define a tool -- wrap an async closure:

use gemini_adk_rs::tool::SimpleTool;
use serde_json::json;

let weather = SimpleTool::new(
    "get_weather",
    "Get current weather for a city",
    Some(json!({
        "type": "object",
        "properties": {
            "city": { "type": "string", "description": "City name" }
        },
        "required": ["city"]
    })),
    |args| async move {
        let city = args["city"].as_str().unwrap_or("Unknown");
        Ok(json!({ "city": city, "temperature_c": 22, "condition": "Partly cloudy" }))
    },
);

The fourth argument is the JSON Schema for parameters. Pass None for parameterless tools.

TypedTool

Type-safe tools with auto-generated schemas. Define a struct with JsonSchema and Deserialize:

use gemini_adk_rs::tool::TypedTool;
use schemars::JsonSchema;
use serde::Deserialize;

#[derive(Deserialize, JsonSchema)]
struct WeatherArgs {
    /// The city to get weather for
    city: String,
    /// Temperature units (celsius or fahrenheit)
    #[serde(default = "default_units")]
    units: String,
}
fn default_units() -> String { "celsius".to_string() }

let tool = TypedTool::new(
    "get_weather",
    "Get current weather for a city",
    |args: WeatherArgs| async move {
        Ok(serde_json::json!({ "temp": 22, "city": args.city, "units": args.units }))
    },
);

Doc comments on fields become parameter descriptions. Required vs optional is inferred from #[serde(default)]. Invalid arguments return ToolError::InvalidArgs.

ToolFunction Trait

For full control, implement ToolFunction directly. Use this when your tool holds state (connection pools, caches):

use async_trait::async_trait;
use gemini_adk_rs::tool::ToolFunction;
use gemini_adk_rs::error::ToolError;

struct DatabaseLookup { pool: sqlx::PgPool }

#[async_trait]
impl ToolFunction for DatabaseLookup {
    fn name(&self) -> &str { "lookup_account" }
    fn description(&self) -> &str { "Look up an account by ID" }

    fn parameters(&self) -> Option<serde_json::Value> {
        Some(serde_json::json!({
            "type": "object",
            "properties": { "account_id": { "type": "string" } },
            "required": ["account_id"]
        }))
    }

    async fn call(&self, args: serde_json::Value) -> Result<serde_json::Value, ToolError> {
        let id = args["account_id"].as_str()
            .ok_or_else(|| ToolError::InvalidArgs("missing account_id".into()))?;
        Ok(serde_json::json!({ "account_id": id, "balance": 4250.00 }))
    }
}

StreamingTool

For tools that yield multiple results over time via an mpsc::Sender:

#[async_trait]
impl StreamingTool for ProgressTracker {
    fn name(&self) -> &str { "track_progress" }
    fn description(&self) -> &str { "Track a long-running operation" }
    fn parameters(&self) -> Option<serde_json::Value> { None }

    async fn run(
        &self,
        args: serde_json::Value,
        yield_tx: mpsc::Sender<serde_json::Value>,
    ) -> Result<(), ToolError> {
        for step in 0..5 {
            yield_tx.send(json!({ "step": step })).await
                .map_err(|e| ToolError::ExecutionFailed(e.to_string()))?;
        }
        Ok(())
    }
}

Register via dispatcher.register_streaming(Arc::new(tool)).

InputStreamingTool

For tools that receive live input (audio, video) while running. They get a broadcast::Receiver<InputEvent> alongside the yield channel:

async fn run(
    &self,
    _args: serde_json::Value,
    mut input_rx: broadcast::Receiver<InputEvent>,
    yield_tx: mpsc::Sender<serde_json::Value>,
) -> Result<(), ToolError> {
    while let Ok(event) = input_rx.recv().await {
        // Process input events, yield partial results
    }
    Ok(())
}

Built-in Tools

Gemini provides server-side tools requiring no implementation:

// Direct methods
Live::builder().google_search().code_execution().url_context()

// Or T:: composition with pipe operator
Live::builder().with_tools(T::google_search() | T::code_execution() | T::url_context())

Agent as Tool

TextAgentTool wraps a text-mode agent as a callable tool for voice sessions. The agent runs via BaseLlm::generate() and shares the session's State:

// Direct registration
let tool = TextAgentTool::new("verify_identity", "Verify caller", verifier, state.clone());
dispatcher.register(tool);

// Fluent API
Live::builder()
    .agent_tool("verify_identity", "Verify caller identity", verifier_agent)
    .agent_tool("calc_payment", "Calculate payment plans", calc_pipeline)

State sharing is bidirectional -- the text agent reads live-extracted values and its mutations are visible to watchers and phase transitions.

Tool Registration

ToolDispatcher (L1)

let mut dispatcher = ToolDispatcher::new();
dispatcher.register(my_tool);                        // impl ToolFunction
dispatcher.register_function(Arc::new(my_tool));     // Arc<dyn ToolFunction>
dispatcher.register_streaming(Arc::new(stream_tool));

Live::builder().tools(dispatcher).connect(config).await?;

T:: composition (fluent API)

Live::builder()
    .with_tools(
        T::function(Arc::new(weather_tool))
        | T::simple("calculate", "Evaluate expression", |args| async move {
            Ok(json!({"result": 42}))
        })
        | T::google_search()
    )

Toolset from a vec

let tools: Vec<Arc<dyn ToolFunction>> = vec![Arc::new(a), Arc::new(b), Arc::new(c)];
Live::builder().with_tools(T::toolset(tools))

Tool Call Handling

The on_tool_call callback fires when the model requests tool execution. Return Some(responses) to handle manually, or None for auto-dispatch:

.on_tool_call(|calls, state| async move {
    let responses: Vec<FunctionResponse> = calls.iter().map(|call| {
        let result = match call.name.as_str() {
            "get_weather" => execute_weather(&call.args),
            "verify_identity" => {
                let result = verify(&call.args);
                if result["verified"].as_bool() == Some(true) {
                    state.set("identity_verified", true);  // promote to state
                }
                result
            }
            _ => json!({"error": "unknown tool"}),
        };
        FunctionResponse { name: call.name.clone(), response: result, id: call.id.clone() }
    }).collect();
    Some(responses)
})

The callback receives State so you can promote tool results to keys that drive phase transitions and watchers.

Phase-Scoped Tools

Restrict available tools per conversation phase. The processor rejects calls to tools not in the phase's tools_enabled list:

.phase("verify_identity")
    .instruction("Verify the caller's identity")
    .tools(vec!["verify_identity".into(), "log_compliance_event".into()])
    .transition("inform_debt", S::is_true("identity_verified"))
    .done()
.phase("negotiate")
    .instruction("Negotiate a payment plan")
    .tools(vec!["calculate_payment_plan".into()])
    .transition("arrange_payment", S::is_true("plan_agreed"))
    .done()

If tools_enabled is None (default), all registered tools are available.

Long-Running Tools

LongRunningFunctionTool wraps any ToolFunction and tells the model not to re-invoke while a previous call is pending:

use gemini_adk_rs::tools::LongRunningFunctionTool;

let long_running = LongRunningFunctionTool::new(Arc::new(MySlowTool::new()));
dispatcher.register(long_running);

The ToolDispatcher supports timeouts and cancellation:

// Custom timeout
dispatcher.call_function_with_timeout("slow_tool", args, Duration::from_secs(60)).await?;

// Cancel via token
dispatcher.call_function_with_cancel("slow_tool", args, cancel_token).await?;

// Configure default timeout (30s default)
let dispatcher = ToolDispatcher::new().with_timeout(Duration::from_secs(10));

Background Tool Execution

For tools that take significant time (database queries, API calls, LLM pipelines), background execution eliminates dead air in voice sessions.

How It Works

  1. Model calls a background tool
  2. An immediate "running" acknowledgment is sent back
  3. The model continues speaking (e.g., "Let me look that up for you...")
  4. When the tool completes, the result is injected into the conversation
  5. The model incorporates the result naturally

L2 API

Live::builder()
    .tools(dispatcher)
    .tool_background("search_knowledge_base")
    .tool_background_with_formatter("analyze_doc", Arc::new(MyFormatter))
    .connect_vertex(project, location, token)
    .await?;

L1 API

LiveSessionBuilder::new(config)
    .dispatcher(dispatcher)
    .tool_execution_mode("search_knowledge_base", ToolExecutionMode::Background {
        formatter: None,
    })
    .connect()
    .await?;

Custom Result Formatting

Implement ResultFormatter to control acknowledgment and result shapes:

struct VerboseFormatter;

impl ResultFormatter for VerboseFormatter {
    fn format_running(&self, call: &FunctionCall) -> Value {
        json!({ "status": "searching", "query": call.args["query"] })
    }

    fn format_result(&self, call: &FunctionCall, result: Result<Value, ToolError>) -> Value {
        match result {
            Ok(val) => json!({ "status": "done", "tool": call.name, "result": val }),
            Err(e) => json!({ "status": "error", "tool": call.name, "error": e.to_string() }),
        }
    }

    fn format_cancelled(&self, call_id: &str) -> Value {
        json!({ "status": "cancelled", "call_id": call_id })
    }
}

Cancellation

Background tools are automatically cancelled when:

  • The server sends ToolCallCancellation
  • The session disconnects
  • LiveHandle is dropped

The BackgroundToolTracker provides belt-and-suspenders cleanup: both the CancellationToken is triggered and the JoinHandle is aborted.

Intercepting Tool Responses

Transform tool results before they reach Gemini. Use for PII redaction, state promotion, or result augmentation:

.before_tool_response(|responses, state| async move {
    responses.into_iter().map(|mut r| {
        if r.name == "verify_identity" {
            if r.response["verified"].as_bool() == Some(true) {
                state.set("identity_verified", true);
            }
        }
        if r.name == "lookup_account" {
            r.response = redact_pii(&r.response);
        }
        r
    }).collect()
})

Extraction Pipeline

What Is Extraction?

Extraction turns unstructured conversation into structured data. As the user and model talk, extractors analyze the transcript and produce typed JSON: customer name, order items, emotional state, account numbers. These values flow into session State where they drive phase transitions, trigger watchers, and inform instruction composition.

Why Out-of-Band?

Extraction runs on the control lane, not on the conversation path. An LLM extraction call takes 1-5 seconds. Voice conversations cannot pause for that. Extractors run concurrently after each turn completes while the conversation continues uninterrupted.

Turn completes
  -> Transcript buffer finalizes the turn
  -> Extractors run concurrently (control lane)
  -> Results written to State under derived: prefix
  -> Watchers evaluate -> Phase transitions fire
  -> Conversation continues (no blocking)

TurnExtractor

The base trait for all extractors. Implement it for synchronous extraction (regex, keyword matching, heuristics):

use async_trait::async_trait;
use gemini_adk_rs::live::extractor::TurnExtractor;
use gemini_adk_rs::live::transcript::TranscriptTurn;
use gemini_adk_rs::llm::LlmError;

struct OrderNumberExtractor;

#[async_trait]
impl TurnExtractor for OrderNumberExtractor {
    fn name(&self) -> &str { "order_info" }  // State key for results
    fn window_size(&self) -> usize { 5 }     // Look at last 5 turns

    fn should_extract(&self, window: &[TranscriptTurn]) -> bool {
        // Skip trivial turns -- checked before async extraction
        window.last()
            .map(|t| t.user.split_whitespace().count() >= 3)
            .unwrap_or(false)
    }

    async fn extract(&self, window: &[TranscriptTurn]) -> Result<serde_json::Value, LlmError> {
        let text: String = window.iter()
            .map(|t| format!("{} {}", t.user, t.model))
            .collect::<Vec<_>>().join(" ");

        let re = regex::Regex::new(r"order\s+#?(\d+)").unwrap();
        let mut result = serde_json::Map::new();
        if let Some(caps) = re.captures(&text) {
            result.insert("order_number".into(), serde_json::json!(caps[1].to_string()));
        }
        Ok(serde_json::Value::Object(result))
    }
}

should_extract is checked before launching async work. Return false to skip the LLM round-trip entirely on trivial turns.

LlmExtractor

For extraction requiring understanding (sentiment, intent, entity recognition), LlmExtractor sends the transcript to an OOB LLM:

use gemini_adk_rs::live::extractor::LlmExtractor;

let extractor = LlmExtractor::new(
    "SentimentAnalysis",
    llm,  // Arc<dyn BaseLlm>
    "Analyze conversation sentiment and extract the customer's emotional state.",
    3,    // window size
)
.with_schema(serde_json::json!({
    "type": "object",
    "properties": {
        "sentiment": { "type": "string", "enum": ["positive", "neutral", "negative"] },
        "score": { "type": "number" }
    }
}))
.with_min_words(5);  // Skip "uh huh", "ok", "yes" turns

Schema Definition

The fluent API's extract_turns auto-generates the schema from a Rust struct:

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, JsonSchema)]
struct DebtorState {
    /// "calm", "cooperative", "frustrated", "angry"
    emotional_state: Option<String>,
    /// 0.0 (refusing) to 1.0 (eager)
    willingness_to_pay: Option<f32>,
    /// "full_pay", "partial_pay", "dispute", "refuse", "delay"
    negotiation_intent: Option<String>,
    /// Whether debtor explicitly requested cease-and-desist
    cease_desist_requested: Option<bool>,
}

Live::builder()
    .extract_turns::<DebtorState>(
        llm,
        "Extract: emotional state, willingness to pay, negotiation intent, cease-and-desist.",
    )
    .connect(config).await?;

The type name (DebtorState) becomes the extractor name and State key. extract_turns auto-enables transcription, generates the JSON schema, and defaults to a 3-turn window. Use extract_turns_windowed for custom sizes.

Extraction Triggers

By default, extractors run on every TurnComplete event. For many use cases this is wasteful -- trivial utterances ("yeah", "ok") rarely contain extractable data, and each extraction is an OOB LLM call. Extraction triggers control when extractors fire:

use gemini_adk_rs::live::extractor::ExtractionTrigger;

Live::builder()
    // Extract every 2 turns instead of every turn (reduces LLM costs by ~50%)
    .extract_turns_triggered::<DebtorState>(
        llm,
        "Extract debtor emotional state and negotiation intent",
        5,  // transcript window size
        ExtractionTrigger::Interval(2),
    )
    .connect(config).await?;
TriggerWhen it firesUse case
EveryTurnAfter every TurnCompleteDefault — high-frequency extraction
Interval(n)Every N turnsReduce LLM costs for slow-changing data
AfterToolCallAfter tool dispatch completesExtract from tool results
OnPhaseChangeWhen phase transitions fireRe-extract on context shift

The TurnExtractor trait also has a trigger() method with a default implementation returning EveryTurn, so custom extractors get the old behavior for free:

impl TurnExtractor for MyExtractor {
    fn trigger(&self) -> ExtractionTrigger {
        ExtractionTrigger::AfterToolCall
    }
    // ...
}

Transcript Window

Extractors receive a slice of TranscriptTurn values:

pub struct TranscriptTurn {
    pub turn_number: u32,
    pub user: String,                    // Accumulated user speech
    pub model: String,                   // Accumulated model speech
    pub tool_calls: Vec<ToolCallSummary>,
    pub timestamp: Instant,
}

The TranscriptBuffer is a ring buffer (default 50 turns) that evicts the oldest turns to prevent unbounded memory growth:

let mut buf = TranscriptBuffer::new();
let recent = buf.window(3);          // last 3 completed turns
let formatted = buf.format_window(3); // human-readable text
let snapshot = buf.snapshot_window(5); // cheap read-only clone for callbacks

Auto-Flatten

When an extractor returns a JSON object, the framework automatically flattens it to individual state keys under the derived: prefix. Given this result:

{ "emotional_state": "frustrated", "willingness_to_pay": 0.3 }

The framework writes:

  • derived:emotional_state = "frustrated"
  • derived:willingness_to_pay = 0.3

The prefix is transparent -- state.get("emotional_state") auto-checks derived:emotional_state if the unprefixed key is not found:

.transition("close", S::is_true("cease_desist_requested"))
// Internally checks derived:cease_desist_requested

Concurrent Extraction

Multiple extractors run in parallel via futures::future::join_all:

Live::builder()
    .extractor(Arc::new(regex_extractor))      // instant
    .extract_turns::<DebtorState>(llm, "...")   // 1-3 seconds
    .on_extracted(|name, value| async move {
        println!("Extractor '{name}' produced: {value}");
    })
    .on_extraction_error(|name, error| async move {
        eprintln!("Extractor '{name}' failed: {error}");
    })
    .connect(config).await?;

Extraction to State to Watchers

The full data flow after each turn:

  1. Extractors run concurrently on the control lane.
  2. Results auto-flatten -- each JSON field becomes a derived: key.
  3. Computed state evaluates -- derived variables that depend on extracted keys re-compute.
  4. Watchers fire -- any watcher observing a changed key triggers.
  5. Phase transitions evaluate -- guards check, machine transitions.
Live::builder()
    .extract_turns::<DebtorState>(llm, "Extract emotional state")
    .computed("call_risk_level", &["derived:sentiment_score"], |state| {
        let score: f64 = state.get("derived:sentiment_score").unwrap_or(0.5);
        if score < 0.3 { Some(json!("high")) } else { Some(json!("low")) }
    })
    .watch("derived:call_risk_level")
        .changed_to(json!("high"))
        .then(|_old, _new, state| async move {
            state.set("alert:risk_escalation", true);
        })
    .phase("negotiate")
        .instruction("Negotiate payment")
        .transition("close", S::is_true("cease_desist_requested"))
        .done()
    .connect(config).await?;

Real Example

The debt collection demo combines regex and LLM extractors:

// Regex: captures dollar amounts, phone numbers, disclosure acknowledgment
let regex_extractor = Arc::new(RegexExtractor::new("debt_fields", 10, |text, existing| {
    let mut extracted = HashMap::new();
    if !existing.contains_key("dollar_amount") {
        if let Some(m) = DOLLAR_RE.find(text) {
            extracted.insert("dollar_amount".into(), json!(m.as_str()));
        }
    }
    if !existing.contains_key("disclosure_given") {
        if DISCLOSURE_ACK_RE.is_match(text) {
            extracted.insert("disclosure_given".into(), json!(true));
        }
    }
    extracted
}));

let handle = Live::builder()
    .extractor(regex_extractor)
    .extract_turns::<DebtorState>(llm, "Extract debtor emotional state and intent")
    .computed("sentiment_score", &["emotional_state"], |state| {
        let emotion: String = state.get("emotional_state")?;
        Some(json!(match emotion.as_str() {
            "cooperative" => 0.9, "calm" => 0.7,
            "frustrated" => 0.4, "angry" => 0.2, _ => 0.5,
        }))
    })
    .computed("call_risk_level", &["derived:sentiment_score", "cease_desist_requested"], |state| {
        let sentiment: f64 = state.get("derived:sentiment_score").unwrap_or(0.5);
        let cease: bool = state.get("cease_desist_requested").unwrap_or(false);
        Some(json!(if cease { "critical" } else if sentiment < 0.3 { "high" } else { "low" }))
    })
    .phase("disclosure")
        .instruction("Deliver the Mini-Miranda disclosure")
        .transition("verify_identity", S::is_true("disclosure_given"))
        .transition("close", S::is_true("cease_desist_requested"))
        .done()
    .initial_phase("disclosure")
    .connect(config).await?;

Extracted fields flow through the full pipeline: extraction produces raw values, computed state derives higher-level signals, guards evaluate on every turn, and the phase machine transitions when conditions are met.

Text Agent Combinators

Text agents are composable units for text-based LLM pipelines. Each one implements the TextAgent trait, making a standard request/response call to a language model (no WebSocket session required). You can snap them together -- sequential, parallel, branching, looping -- to build multi-step reasoning pipelines.

The TextAgent Trait

Every text agent implements one method:

#[async_trait]
pub trait TextAgent: Send + Sync {
    fn name(&self) -> &str;
    async fn run(&self, state: &State) -> Result<String, AgentError>;
}

State is a concurrent typed key-value store shared across the pipeline. Agents read input from state.get::<String>("input") and write output to state.set("output", &result). That is the entire contract.

Combinator Reference

CombinatorPurposeAnalogy
LlmTextAgentCore agent -- generate, tool dispatch, loopGemini(prompt)
FnTextAgentWrap a closure as an agent (no LLM call)|state| { ... }
SequentialTextAgentRun agents in order, pipe output forwardA >> B >> C
ParallelTextAgentRun agents concurrently, collect all results[A, B, C]
RaceTextAgentRun concurrently, return first successA | B | C
RouteTextAgentRoute to agent based on state predicateif X -> A, if Y -> B
FallbackTextAgentTry agents in order until one succeedsA ?? B ?? C
LoopTextAgentRepeat until max iterations or predicatewhile(!done) { A }
MapOverTextAgentApply agent to each item in a state listitems.map(A)
TapTextAgentRead-only side effect (logging, metrics)tap(log)
TimeoutTextAgentWrap an agent with a time limittimeout(5s, A)
DispatchTextAgentFire-and-forget background tasksspawn(A, B)
JoinTextAgentWait for dispatched tasks to completejoin(tasks)

LlmTextAgent -- The Core Agent

LlmTextAgent is the workhorse. It calls BaseLlm::generate(), dispatches any tool calls the model makes, feeds tool results back, and loops until the model produces a final text response (up to 10 rounds).

use gemini_adk_rs::text::LlmTextAgent;
use gemini_adk_rs::llm::GeminiLlm;

let llm = Arc::new(GeminiLlm::new(GeminiModel::Gemini2_0Flash));

let agent = LlmTextAgent::new("analyst", llm)
    .instruction("Analyze the given topic and produce a summary.")
    .temperature(0.3)
    .max_output_tokens(2048)
    .tools(Arc::new(tool_dispatcher));

let state = State::new();
state.set("input", "Explain Rust's ownership model");

let result = agent.run(&state).await?;
println!("{result}");

FnTextAgent -- Zero-Cost Transforms

When you need a pipeline step that does not call an LLM -- data formatting, validation, state manipulation -- use FnTextAgent:

use gemini_adk_rs::text::FnTextAgent;

let formatter = FnTextAgent::new("format_output", |state| {
    let raw = state.get::<String>("input").unwrap_or_default();
    let formatted = format!("## Summary\n\n{raw}");
    state.set("output", &formatted);
    Ok(formatted)
});

Building Pipelines

Sequential: A >> B >> C

Each agent's output becomes the next agent's input via state.set("input", &output). The final agent's output is the pipeline result.

use gemini_adk_rs::text::SequentialTextAgent;

let pipeline = SequentialTextAgent::new("analysis_pipeline", vec![
    Arc::new(LlmTextAgent::new("extract", llm.clone())
        .instruction("Extract key claims from the text.")),
    Arc::new(LlmTextAgent::new("validate", llm.clone())
        .instruction("Fact-check each claim. Flag unsupported ones.")),
    Arc::new(LlmTextAgent::new("summarize", llm.clone())
        .instruction("Produce a final summary with confidence ratings.")),
]);

let state = State::new();
state.set("input", raw_document);
let summary = pipeline.run(&state).await?;

Parallel: Run concurrently, collect all

All branches execute concurrently via tokio::spawn. Results are joined with newlines.

use gemini_adk_rs::text::ParallelTextAgent;

let multi_perspective = ParallelTextAgent::new("perspectives", vec![
    Arc::new(LlmTextAgent::new("technical", llm.clone())
        .instruction("Analyze from a technical perspective.")),
    Arc::new(LlmTextAgent::new("business", llm.clone())
        .instruction("Analyze from a business perspective.")),
    Arc::new(LlmTextAgent::new("legal", llm.clone())
        .instruction("Analyze from a legal perspective.")),
]);

Race: First to finish wins

Like ParallelTextAgent, but returns only the first result and cancels the rest. Useful for redundancy or trying multiple model configurations:

use gemini_adk_rs::text::RaceTextAgent;

let fastest = RaceTextAgent::new("race", vec![
    Arc::new(LlmTextAgent::new("fast", fast_llm.clone())
        .instruction("Answer the question.")),
    Arc::new(LlmTextAgent::new("thorough", slow_llm.clone())
        .instruction("Answer the question in depth.")),
]);

Route: State-driven branching

Evaluate predicates against state. First match wins, with a default fallback:

use gemini_adk_rs::text::{RouteTextAgent, RouteRule};

let router = RouteTextAgent::new(
    "issue_router",
    vec![
        RouteRule::new(
            |s| s.get::<String>("category") == Some("billing".into()),
            Arc::new(billing_agent),
        ),
        RouteRule::new(
            |s| s.get::<String>("category") == Some("technical".into()),
            Arc::new(tech_agent),
        ),
    ],
    Arc::new(general_agent), // default
);

Fallback: Try until one succeeds

Attempts each candidate in order. Returns the first Ok result. If all fail, returns the last error:

use gemini_adk_rs::text::FallbackTextAgent;

let robust = FallbackTextAgent::new("robust_lookup", vec![
    Arc::new(primary_agent),
    Arc::new(cache_agent),
    Arc::new(fallback_agent),
]);

Loop: Repeat with termination

Runs the body up to max times, optionally breaking early when a state predicate returns true:

use gemini_adk_rs::text::LoopTextAgent;

let refiner = LoopTextAgent::new("refine", Arc::new(draft_agent), 5)
    .until(|state| {
        state.get::<String>("quality")
            .map(|q| q == "good")
            .unwrap_or(false)
    });

MapOver: Apply agent to each item

Reads a list from state, runs the agent once per item, collects results:

use gemini_adk_rs::text::MapOverTextAgent;

let processor = MapOverTextAgent::new("process_items", Arc::new(item_agent), "items")
    .item_key("current_item")
    .output_key("processed_results");

// State must contain: state.set("items", vec!["item1", "item2", "item3"]);

Tap: Observe without mutation

For logging, metrics, or debugging. Returns an empty string and does not mutate the pipeline flow:

use gemini_adk_rs::text::TapTextAgent;

let logger = TapTextAgent::new("log_state", |state| {
    let input = state.get::<String>("input").unwrap_or_default();
    tracing::info!("Pipeline state - input length: {}", input.len());
});

Timeout: Time-limited execution

Wraps any agent with a deadline. Returns AgentError::Timeout if exceeded:

use gemini_adk_rs::text::TimeoutTextAgent;

let bounded = TimeoutTextAgent::new(
    "bounded_analysis",
    Arc::new(slow_agent),
    Duration::from_secs(30),
);

Dispatch + Join: Background tasks

DispatchTextAgent spawns agents as background tasks with a concurrency budget. JoinTextAgent waits for them:

use gemini_adk_rs::text::{DispatchTextAgent, JoinTextAgent, TaskRegistry};

let registry = TaskRegistry::new();
let budget = Arc::new(tokio::sync::Semaphore::new(4)); // max 4 concurrent

let dispatcher = DispatchTextAgent::new(
    "spawn_tasks",
    vec![
        ("research".into(), Arc::new(research_agent) as Arc<dyn TextAgent>),
        ("analysis".into(), Arc::new(analysis_agent) as Arc<dyn TextAgent>),
    ],
    registry.clone(),
    budget,
);

let joiner = JoinTextAgent::new("collect", registry)
    .timeout(Duration::from_secs(60));

// In a pipeline: dispatch, do other work, then join
let pipeline = SequentialTextAgent::new("bg_pipeline", vec![
    Arc::new(dispatcher),
    Arc::new(other_work_agent),
    Arc::new(joiner),
]);

Agent as Tool: Bridging Voice and Text

TextAgentTool wraps any TextAgent as a ToolFunction, so the live voice model can dispatch text agent pipelines as tool calls. State is shared bidirectionally -- the text agent reads live-extracted values, and its mutations are visible to watchers and phase transitions.

use gemini_adk_rs::text_agent_tool::TextAgentTool;

// Build a multi-step verification pipeline
let verifier = SequentialTextAgent::new("verify_pipeline", vec![
    Arc::new(LlmTextAgent::new("lookup", flash_llm.clone())
        .instruction("Look up the account in the database")
        .tools(Arc::new(db_tools))),
    Arc::new(LlmTextAgent::new("cross_ref", flash_llm.clone())
        .instruction("Cross-reference identity against account record")),
]);

// Wrap as a tool for the voice session
let tool = TextAgentTool::new(
    "verify_identity",
    "Verify caller identity against account records",
    verifier,
    state.clone(), // shared state with the live session
);

dispatcher.register_function(Arc::new(tool));

When the voice model calls verify_identity, the entire sequential pipeline runs via BaseLlm::generate() (not over WebSocket), and the result is returned as the tool response.

Fluent Operator Algebra

If you use the gemini-adk-fluent-rs crate (L2), you get operator syntax for composing agents:

use gemini_adk_fluent_rs::prelude::*;

// Sequential pipeline: >>
let pipeline = AgentBuilder::new("writer").instruction("Write a draft")
    >> AgentBuilder::new("reviewer").instruction("Review and improve");

// Parallel fan-out: |
let analysis = AgentBuilder::new("tech").instruction("Technical analysis")
    | AgentBuilder::new("business").instruction("Business analysis");

// Fixed loop: * N
let polished = AgentBuilder::new("refiner").instruction("Polish the text") * 3;

// Conditional loop: * until(predicate)
let converge = AgentBuilder::new("iterate").instruction("Improve")
    * until(|v| v["quality"].as_str() == Some("good"));

// Fallback chain: /
let robust = AgentBuilder::new("primary").instruction("Try this first")
    / AgentBuilder::new("backup").instruction("Fall back to this");

// Compile the tree into an executable TextAgent
let agent = pipeline.compile(llm);
let result = agent.run(&state).await?;

Real-World Pattern: Multi-Step Analysis Pipeline

Here is a complete pipeline that extracts claims from a document, validates them in parallel, and produces a confidence-rated summary:

use gemini_adk_fluent_rs::prelude::*;

let extract = AgentBuilder::new("extract")
    .instruction("Extract all factual claims from the text. Output one claim per line.")
    .temperature(0.1);

let validate = AgentBuilder::new("validate")
    .instruction("For each claim, determine if it is supported, unsupported, or misleading.")
    .google_search()
    .temperature(0.2);

let summarize = AgentBuilder::new("summarize")
    .instruction("Produce a final report with confidence ratings for each claim.")
    .temperature(0.3);

// extract -> validate -> summarize
let pipeline = extract >> validate >> summarize;

let agent = pipeline.compile(llm);
let state = State::new();
state.set("input", document_text);

let report = agent.run(&state).await?;

For pre-built patterns like review loops and supervised workflows, see the patterns module:

use gemini_adk_fluent_rs::patterns::{review_loop, cascade, fan_out_merge, supervised};

// Worker -> Reviewer -> repeat until quality passes
let reviewed = review_loop(
    AgentBuilder::new("writer").instruction("Write a report"),
    AgentBuilder::new("reviewer").instruction("Rate quality as 'good' or 'needs_work'"),
    "quality",  // state key to check
    "good",     // target value
    3,          // max rounds
);

// Try each model until one succeeds
let robust = cascade(vec![primary_agent, secondary_agent, fallback_agent]);

// Run all in parallel, merge results
let multi = fan_out_merge(vec![tech_analyst, business_analyst, legal_analyst]);

// Worker -> Supervisor with approval gate
let approved = supervised(
    AgentBuilder::new("drafter").instruction("Draft the document"),
    AgentBuilder::new("supervisor").instruction("Approve or request revisions"),
    "approved",  // boolean state key
    5,           // max revisions
);

S.C.T.P.M.A Operator Algebra

Six namespace modules for declaratively composing agent primitives. Each maps to a dimension of agent configuration: State, Context, Tools, Prompt, Middleware, Artifacts. They compose using Rust operators so agent definitions read like algebraic expressions.

The Six Operators

NamespaceOperatorPurposeKey Methods
S::>>State transformspick, rename, merge, flatten, set, defaults, drop, map
C::+Context engineeringwindow, user_only, model_only, head, truncate, filter, from_state
T::|Tool compositionsimple, function, google_search, code_execution, toolset
P::+Prompt compositionrole, task, constraint, format, example, persona, guidelines
M::|Middleware layerslog, latency, timeout, retry, audit, circuit_breaker
A::+Artifact schemasoutput, input, json_output, json_input, text_output, text_input

S -- State Transforms

State transforms mutate a serde_json::Value representing agent state. Chain them with >> for sequential application.

Methods

MethodWhat it does
S::pick(&["a", "b"])Keep only the listed keys, drop everything else
S::drop(&["x"])Remove the listed keys
S::rename(&[("old", "new")])Rename keys according to mappings
S::merge(&["x", "y"], "combined")Merge listed keys into a single nested object
S::flatten("nested")Flatten a nested object into the top level
S::set("key", json!(42))Set a key to a fixed value
S::defaults(json!({"k": "v"}))Set default values for missing keys
S::map(fn)Apply a custom transformation function

State Predicates

S also provides predicates for phase transition guards and .when() modifiers:

PredicateWhat it checks
S::is_true("key")Key holds a truthy boolean
S::eq("key", "value")Key equals a specific string
S::one_of("key", &["a", "b"])Key matches any of the listed strings

Example

use gemini_adk_fluent_rs::compose::S;

// Chain transforms: pick keys, then rename
let transform = S::pick(&["name", "age"]) >> S::rename(&[("name", "customer_name")]);

let mut state = json!({"name": "Alice", "age": 30, "internal_id": "x123"});
transform.apply(&mut state);
// Result: {"customer_name": "Alice", "age": 30}

// Predicates for phase transitions
Live::builder()
    .phase("verify")
        .instruction("Verify the customer's identity")
        .transition("main", S::is_true("verified"))
        .done()
    .phase("main")
        .instruction("Handle the request")
        .transition("billing", S::eq("issue_type", "billing"))
        .transition("tech", S::one_of("issue_type", &["technical", "setup"]))
        .done()

C -- Context Engineering

Context policies filter and transform conversation history. Compose them with + to combine multiple policies.

Methods

MethodWhat it does
C::window(n)Keep only the last n messages
C::head(n)Keep only the first n messages
C::last(n)Alias for window(n)
C::user_only()Keep only user messages
C::model_only()Keep only model messages
C::text_only()Keep only messages containing text parts
C::exclude_tools()Remove messages with function call/response parts
C::sample(n)Keep every n-th message
C::truncate(max_chars)Truncate to approximately max_chars total text (keeps most recent)
C::prepend(content)Add a message at the start of context
C::append(content)Add a message at the end of context
C::from_state(&["key1", "key2"])Inject state values as a context preamble
C::dedup()Remove adjacent duplicate messages
C::empty()Return empty context (for isolated agents)
C::filter(fn)Filter messages by a custom predicate on Content
C::map(fn)Transform each message with a custom function
C::custom(fn)Full custom filter over the entire history

Example

use gemini_adk_fluent_rs::compose::C;

// Keep recent context, no tool noise, inject state
let policy = C::window(20) + C::exclude_tools() + C::from_state(&["user:name", "app:balance"]);

// For isolated sub-agents that should not see conversation history
let isolated = C::empty();

// Character-budget context for cost control
let budget = C::truncate(4000) + C::dedup();

T -- Tool Composition

Compose tools with |. Mix runtime function tools with built-in Gemini tools.

Methods

MethodWhat it does
T::simple(name, desc, fn)Create a tool from a name, description, and async closure
T::function(arc_fn)Register an existing Arc<dyn ToolFunction>
T::google_search()Add built-in Google Search
T::url_context()Add built-in URL context fetching
T::code_execution()Add built-in code execution
T::toolset(vec)Combine multiple tool functions into one composite

Example

use gemini_adk_fluent_rs::compose::T;

// Combine custom tools with built-ins
let tools = T::simple("get_weather", "Get weather for a city", |args| async move {
        let city = args["city"].as_str().unwrap_or("Unknown");
        Ok(json!({"temp": 22, "city": city}))
    })
    | T::google_search()
    | T::code_execution();

assert_eq!(tools.len(), 3);

// Use in a Live session builder
Live::builder()
    .with_tools(tools)

P -- Prompt Composition

Compose structured prompt sections with +. Each section has a semantic kind that determines its rendering format.

Methods

MethodRenders asKind
P::role("analyst")"You are analyst."Role
P::task("analyze data")"Your task: analyze data"Task
P::constraint("be concise")"Constraint: be concise"Constraint
P::format("JSON")"Output format: JSON"Format
P::example("input", "output")"Example:\nInput: ...\nOutput: ..."Example
P::context("background info")"Context: background info"Context
P::persona("friendly, direct")"Persona: friendly, direct"Persona
P::guidelines(&["be clear", ...])"Guidelines:\n- be clear\n- ..."Guidelines
P::text("free-form text")The text as-isText

PromptSection Kinds

The PromptSectionKind enum provides semantic categories:

KindPurpose
RoleAgent role definition
TaskTask description
ConstraintBehavioral constraint
FormatOutput format specification
ExampleInput/output example
ContextBackground context
PersonaPersonality description
GuidelinesBulleted guideline list
TextFree-form text

Instruction Modifiers

P also provides instruction modifier factories that bridge the prompt module to the live phase system:

MethodWhat it does
P::with_state(&["key1", "key2"])Append selected state keys to the instruction
P::when(predicate, text)Conditionally append text based on state
P::context_fn(fn)Append dynamic text from a formatting function

Example

use gemini_adk_fluent_rs::compose::P;

// Build a structured prompt
let prompt = P::role("a senior financial analyst")
    + P::task("Review the quarterly earnings report and identify trends")
    + P::constraint("Use only data from the provided report")
    + P::constraint("Flag any numbers that seem inconsistent")
    + P::format("Markdown with headers for each section")
    + P::guidelines(&[
        "Start with an executive summary",
        "Include specific numbers when citing trends",
        "End with a risk assessment",
    ]);

// Render to a single instruction string
let instruction: String = prompt.into();
// "You are a senior financial analyst.\n\nYour task: Review the quarterly...\n\n..."

// Instruction modifiers for phases
Live::builder()
    .phase("negotiation")
        .instruction("Negotiate a payment arrangement")
        .modifiers(vec![
            P::with_state(&["emotional_state", "willingness_to_pay"]),
            P::when(
                |s| s.get::<String>("risk").unwrap_or_default() == "high",
                "IMPORTANT: Show extra empathy and offer flexible options.",
            ),
            P::context_fn(|s| {
                let name = s.get::<String>("user:name").unwrap_or_default();
                format!("Customer: {name}")
            }),
        ])
        .done()

M -- Middleware Composition

Compose middleware layers with |. Middleware intercepts agent events, tool calls, and errors.

Methods

MethodWhat it does
M::log()Log all agent events
M::latency()Track execution latency
M::timeout(duration)Enforce a time limit
M::retry(max)Retry on failure up to max times
M::cost()Track tool call counts as a cost proxy
M::rate_limit(rps)Enforce max requests per second
M::circuit_breaker(threshold)Open circuit after consecutive failures
M::trace()Create distributed tracing spans
M::audit()Record all tool calls for review
M::tap(fn)Custom event observer
M::before_tool(fn)Custom filter before each tool invocation
M::validate(fn)Validate tool input arguments

Example

use gemini_adk_fluent_rs::compose::M;
use std::time::Duration;

// Production middleware stack
let middleware = M::log()
    | M::latency()
    | M::timeout(Duration::from_secs(30))
    | M::retry(3)
    | M::circuit_breaker(5)
    | M::audit();

assert_eq!(middleware.len(), 6);

// Custom validation
let validated = M::validate(|call| {
    if call.name == "delete_account" && call.args.get("confirm").is_none() {
        return Err("delete_account requires 'confirm' argument".into());
    }
    Ok(())
});

A -- Artifact Schemas

Declare input/output artifact schemas with +. Artifacts describe data that flows between agents as typed, named entities.

Methods

MethodWhat it does
A::output(name, mime, desc)Declare an output artifact
A::input(name, mime, desc)Declare an input artifact
A::json_output(name, desc)Shorthand for application/json output
A::json_input(name, desc)Shorthand for application/json input
A::text_output(name, desc)Shorthand for text/plain output
A::text_input(name, desc)Shorthand for text/plain input

Example

use gemini_adk_fluent_rs::compose::A;

// Declare what an analysis agent produces and consumes
let artifacts = A::text_input("source_document", "The document to analyze")
    + A::json_output("analysis_report", "Structured analysis results")
    + A::json_output("risk_assessment", "Risk scores and flags");

assert_eq!(artifacts.all_inputs().len(), 1);
assert_eq!(artifacts.all_outputs().len(), 2);

Composable Operators

Beyond the six namespace modules, the operators module provides structural composition via Rust operators on AgentBuilder:

OperatorTypeMeaning
>>ShrSequential pipeline
|BitOrParallel fan-out
*Mul<u32>Fixed-count loop
*Mul<LoopPredicate>Conditional loop
/DivFallback chain

These produce Composable nodes that form a tree. Call .compile(llm) to turn the tree into an executable TextAgent.

use gemini_adk_fluent_rs::prelude::*;

// Build a tree
let workflow = AgentBuilder::new("research").instruction("Research the topic")
    >> (AgentBuilder::new("tech").instruction("Technical review")
        | AgentBuilder::new("biz").instruction("Business review"))
    >> AgentBuilder::new("merge").instruction("Merge perspectives");

// Compile and execute
let agent = workflow.compile(llm);
let result = agent.run(&state).await?;

The tree auto-flattens: a >> b >> c produces a single Pipeline with 3 steps, not nested pipelines.

Combining Operators into Full Agent Configuration

The six namespaces compose orthogonally. Each configures a separate dimension:

use gemini_adk_fluent_rs::prelude::*;

// S: transform state before agent sees it
let state_prep = S::pick(&["customer", "order"]) >> S::defaults(json!({"priority": "normal"}));

// C: control what context the agent sees
let context = C::window(10) + C::exclude_tools();

// T: equip the agent with tools
let tools = T::simple("lookup", "Look up order status", |args| async move {
        Ok(json!({"status": "shipped"}))
    })
    | T::google_search();

// P: compose the instruction
let prompt = P::role("a customer support specialist")
    + P::task("Help the customer with their order inquiry")
    + P::constraint("Never reveal internal order IDs")
    + P::format("Conversational, friendly tone");

// A: declare I/O artifacts
let artifacts = A::json_output("resolution", "How the issue was resolved");

// M: add operational middleware
let middleware = M::log() | M::latency() | M::audit();

Builder Integration

AgentBuilder is the entry point for compiling these into executable agents:

use gemini_adk_fluent_rs::builder::AgentBuilder;

let agent = AgentBuilder::new("support")
    .model(GeminiModel::Gemini2_0Flash)
    .instruction("Help the customer")   // or use P:: composition
    .temperature(0.5)
    .google_search()                     // or use T:: composition
    .thinking(2048)
    .writes("resolution")
    .reads("customer_name")
    .build(llm);

let result = agent.run(&state).await?;

Copy-on-write semantics mean every setter returns a new builder. Use builders as templates:

let base = AgentBuilder::new("analyst")
    .instruction("You are a data analyst")
    .temperature(0.3);

// Variants share the base configuration
let conservative = base.clone().temperature(0.1);
let creative = base.clone().temperature(0.9);

Pre-Built Patterns

The patterns module provides common multi-agent workflows built from these operators:

use gemini_adk_fluent_rs::patterns::*;

// Review loop: worker -> reviewer -> repeat until quality target
let reviewed = review_loop(writer, reviewer, "quality", "good", 3);

// Cascade: try each agent until one succeeds
let robust = cascade(vec![primary, secondary, fallback]);

// Fan-out merge: run all in parallel, merge results
let multi = fan_out_merge(vec![analyst_a, analyst_b, analyst_c]);

// Supervised: worker -> supervisor -> repeat until approval
let approved = supervised(drafter, supervisor, "approved", 5);

// Map-over: apply agent to each item with concurrency limit
let batch = map_over(item_processor, 4);

Middleware & Processors

Middleware wraps the agent lifecycle (before/after execution, tool calls, errors). Processors transform LLM requests and responses in flight. For live voice sessions, most interception uses EventCallbacks instead -- middleware and processors are primarily for text-mode agent pipelines.

Middleware Trait

Implement Middleware to hook into agent and tool lifecycle events. All methods are optional -- implement only what you need:

use async_trait::async_trait;
use gemini_adk_rs::middleware::Middleware;
use gemini_adk_rs::error::{AgentError, ToolError};
use gemini_genai_rs::prelude::FunctionCall;

struct AuditMiddleware {
    log: Arc<Mutex<Vec<String>>>,
}

#[async_trait]
impl Middleware for AuditMiddleware {
    fn name(&self) -> &str { "audit" }

    async fn before_agent(&self, ctx: &InvocationContext) -> Result<(), AgentError> {
        self.log.lock().push("Agent started".into());
        Ok(())
    }

    async fn after_agent(&self, ctx: &InvocationContext) -> Result<(), AgentError> {
        self.log.lock().push("Agent completed".into());
        Ok(())
    }

    async fn before_tool(&self, call: &FunctionCall) -> Result<(), AgentError> {
        self.log.lock().push(format!("Tool '{}' called", call.name));
        Ok(())
    }

    async fn after_tool(
        &self, call: &FunctionCall, result: &serde_json::Value,
    ) -> Result<(), AgentError> {
        self.log.lock().push(format!("Tool '{}' returned", call.name));
        Ok(())
    }

    async fn on_tool_error(
        &self, call: &FunctionCall, err: &ToolError,
    ) -> Result<(), AgentError> {
        self.log.lock().push(format!("Tool '{}' failed: {err}", call.name));
        Ok(())
    }

    async fn on_error(&self, err: &AgentError) -> Result<(), AgentError> {
        self.log.lock().push(format!("Agent error: {err}"));
        Ok(())
    }
}

Returning Err from any hook aborts the pipeline.

MiddlewareChain

Compose multiple middleware into an ordered chain. before_* hooks run in registration order; after_* hooks run in reverse order (unwinding):

use gemini_adk_rs::middleware::MiddlewareChain;

let mut chain = MiddlewareChain::new();
chain.add(Arc::new(LogMiddleware::new()));
chain.add(Arc::new(LatencyMiddleware::new()));
chain.add(Arc::new(RetryMiddleware::new(3)));

// Insert at front
chain.prepend(Arc::new(SecurityMiddleware::new()));

assert_eq!(chain.len(), 4);

RequestProcessor

Transform outbound requests before they reach the LLM:

use async_trait::async_trait;
use gemini_adk_rs::processors::{RequestProcessor, ProcessorError};
use gemini_adk_rs::llm::LlmRequest;

struct ContextInjector { context: String }

#[async_trait]
impl RequestProcessor for ContextInjector {
    fn name(&self) -> &str { "context_injector" }

    async fn process_request(
        &self, mut request: LlmRequest,
    ) -> Result<LlmRequest, ProcessorError> {
        match &mut request.system_instruction {
            Some(existing) => { existing.push_str("\n\n"); existing.push_str(&self.context); }
            None => { request.system_instruction = Some(self.context.clone()); }
        }
        Ok(request)
    }
}

ResponseProcessor

Transform inbound responses after they come from the LLM:

struct ResponseSanitizer;

#[async_trait]
impl ResponseProcessor for ResponseSanitizer {
    fn name(&self) -> &str { "sanitizer" }

    async fn process_response(
        &self, mut response: LlmResponse,
    ) -> Result<LlmResponse, ProcessorError> {
        for part in &mut response.content.parts {
            if let gemini_genai_rs::prelude::Part::Text { text } = part {
                *text = text.replace("```", "");
            }
        }
        Ok(response)
    }
}

Built-in Processors

InstructionInserter -- prepends or appends a system instruction:

use gemini_adk_rs::processors::InstructionInserter;

let inserter = InstructionInserter::new("Always respond in JSON format.");
let processed = inserter.process_request(request).await?;
// Appends to existing instruction if one is already set

ContentFilter -- filters content parts by type:

use gemini_adk_rs::processors::ContentFilter;

let filter = ContentFilter::text_only();
// Removes inline images, audio -- keeps only text parts

Processor Chains

Chain multiple processors into a pipeline:

use gemini_adk_rs::processors::RequestProcessorChain;

let mut chain = RequestProcessorChain::new();
chain.add(InstructionInserter::new("Be concise."));
chain.add(InstructionInserter::new("Respond in English."));
chain.add(ContentFilter::text_only());

let processed = chain.process(request).await?;
// system_instruction = "Be concise.\nRespond in English."

ResponseProcessorChain works the same way for responses.

Built-in Middleware

LogMiddleware -- structured logging via tracing (requires tracing-support feature):

let log = LogMiddleware::new();
// Logs: agent starting/completed, tool call starting/completed/failed, errors

LatencyMiddleware -- records wall-clock timing for tool calls:

let latency = Arc::new(LatencyMiddleware::new());
chain.add(latency.clone());

// After some tool calls...
for record in latency.tool_latencies() {
    println!("{}: {:?} (success={})", record.name, record.elapsed, record.success);
}
latency.clear();  // reset for next window

RetryMiddleware -- advisory retry tracking. Counts errors and exposes should_retry():

let retry = Arc::new(RetryMiddleware::new(3));
chain.add(retry.clone());

// After running the agent...
if retry.should_retry() {
    retry.record_attempt();
    // re-run the agent
}
retry.reset();  // reuse for another run

RetryMiddleware does not automatically retry -- it tracks errors and the caller decides.

Custom Middleware Example

A rate-limiting middleware that tracks tool call frequency:

struct RateLimitMiddleware {
    max_per_minute: u32,
    count: AtomicU32,
    window_start: parking_lot::Mutex<Instant>,
}

#[async_trait]
impl Middleware for RateLimitMiddleware {
    fn name(&self) -> &str { "rate_limit" }

    async fn before_tool(&self, call: &FunctionCall) -> Result<(), AgentError> {
        let mut start = self.window_start.lock();
        if start.elapsed() > Duration::from_secs(60) {
            *start = Instant::now();
            self.count.store(0, Ordering::SeqCst);
        }
        let n = self.count.fetch_add(1, Ordering::SeqCst);
        if n >= self.max_per_minute {
            return Err(AgentError::Other("Rate limit exceeded".into()));
        }
        Ok(())
    }
}

Middleware vs Callbacks

Use caseMechanism
Log every tool callMiddleware (before_tool / after_tool)
Track tool latencyLatencyMiddleware
Handle tool results in live sessionon_tool_call callback
Transform LLM requestsRequestProcessor
Inject context at turn boundarieson_turn_boundary callback
React to extracted state changeswatch() watcher
Intercept tool responses before Geminibefore_tool_response callback
Retry failed agent runsRetryMiddleware

In live voice sessions, most interception uses EventCallbacks because the session runs over a persistent WebSocket. Middleware and processors are for text-mode agent pipelines where request/response cycles are explicit.

Examples

The repository contains two sets of runnable examples:

  1. examples/cookbook/ — 30 progressive text-based examples demonstrating SDK composition patterns (no server required)
  2. gemini-adk-web-rs apps — Interactive voice/text demos bundled into a devtools-enabled web UI

Cookbook Examples (examples/cookbook/)

A structured Crawl → Walk → Run learning path. Each example is a self-contained Rust binary with detailed doc comments explaining every API used.

# Run any example directly
cargo run -p example-cookbook --example 01_simple_agent
cargo run -p example-cookbook --example 17_evaluation_suite
cargo run -p example-cookbook --example 30_production_pipeline

Crawl (01–10) — Foundations

The core builder API and composition primitives. No async runtime required for most examples.

#BinaryWhat it covers
0101_simple_agentAgentBuilder: name, model, instruction, temperature, thinking budget, copy-on-write semantics
0202_agent_with_toolsSimpleTool with raw JSON args; TypedTool with auto-generated JSON Schema from schemars::JsonSchema
0303_callbacksEvent callbacks: on_text, on_audio, on_thought, on_tool_call, on_interrupted, on_turn_complete
0404_sequential_pipeline>> operator: multi-step pipelines, state passing between agents, SequentialTextAgent
0505_parallel_fanout| operator: concurrent fan-out, ParallelTextAgent, merging results
0606_loop_agent* N fixed loop; * until(predicate) conditional loop; LoopTextAgent
0707_state_transformsS::pick, S::rename, S::merge, S::flatten, S::set, S::defaults, S::drop, S::map
0808_prompt_compositionP::role, P::task, P::constraint, P::format, P::example, P::guidelines, P::with_state, P::when
0909_tool_compositionT::simple | T::google_search | T::code_execution | T::url_context| operator for tools
1010_guardsG::rate_limit, G::toxicity, G::grounded, G::hallucination, G::llm_judge — input/output validation

Walk (11–20) — Multi-Agent Patterns

Compound agent topologies, evaluation, artifacts, and advanced state management.

#BinaryWhat it covers
1111_route_branchingRouteTextAgent, FnTextAgent, RouteRule, S::is_true, S::eq — deterministic state-driven routing
1212_fallback_chain/ operator: graceful degradation, FallbackTextAgent, primary/secondary chains
1313_review_loopReviewer + writer feedback cycle, * until(predicate) convergence, inter-agent state sharing
1414_map_overMapOverTextAgent: parallel item-level processing, collecting and aggregating results
1515_middleware_stackM::cache, M::dedup, M::metrics, M::fallback_model — composing middleware with |
1616_context_engineeringC::window, C::user_only, C::model_only, C::summarize, C::priority, C::exclude_tools, C::dedup
1717_evaluation_suiteE::suite, E::response_match, E::contains_match, E::trajectory, E::safety, E::semantic_match, E::hallucination, E::custom, E::persona
1818_artifactsA::json_output, A::text_output, A::publish, A::save, A::load, A::version — artifact I/O schemas
1919_agent_toolTextAgentTool: wrapping a TextAgent as a callable tool; agent-as-tool dispatch
2020_supervisedHuman-in-the-loop approval: TapTextAgent, approval callbacks, blocking and resuming pipelines

Run (21–30) — Production Patterns

Full-system compositions covering real-world architectures and every SDK capability.

#BinaryWhat it covers
2121_full_algebraAll operators together (>>, |, *, /, * until), all six composition namespaces
2222_contract_testingSchema validation, JSON contract tests, A::json_output with schema enforcement
2323_deep_researchMulti-source research pipeline with T::google_search, synthesis agent, and result merging
2424_customer_supportRouting, escalation state machine, RouteTextAgent, multi-phase support flow
2525_code_reviewAutomated code review: linting agent, security agent, summary agent in a >> pipeline
2626_dispatch_joinDispatchTextAgent + JoinTextAgent: fire-and-forget dispatch with join synchronization
2727_race_timeoutRaceTextAgent: first-to-finish wins; TimeoutTextAgent: deadline enforcement
2828_a2a_remoteAgent-to-agent protocol: remote agent declaration, T::a2a tool composition
2929_live_voiceFull Live::builder() API: phases, tools, extraction, watchers, steering, repair, persistence
3030_production_pipelineEnd-to-end production pipeline: telemetry, middleware, evaluation, artifact publishing

ADK Web UI (gemini-adk-web-rs)

The interactive multi-app web UI runs at http://localhost:25125 and bundles all demo apps into a single server with a shared DevTools panel.

cargo run -p gemini-adk-web-rs    # http://127.0.0.1:25125

For more on the web UI design system, dark/light mode, and DevTools panels, see the ADK Web UI guide.

Standalone Examples

These run independently outside of gemini-adk-web-rs, each with their own Axum server.

cargo run -p example-text-chat       # http://127.0.0.1:3001
cargo run -p example-voice-chat      # http://127.0.0.1:3002
cargo run -p example-tool-calling    # http://127.0.0.1:3003
cargo run -p example-transcription   # http://127.0.0.1:3004
ExampleLayerFeatures
text-chatL0Text-only session, streaming deltas, turn lifecycle
voice-chatL0Bidirectional audio, input/output transcription, VAD events
tool-callingL1TypedTool, ToolDispatcher, NonBlocking behavior, WhenIdle scheduling
transcriptionL0Every Gemini Live config option: VAD, compression, resumption, affective dialog

Web UI Apps

Crawl

text-chat — Minimal text session. Live::builder().text_only(), streaming.

voice-chat — Native audio. Modality::Audio, voice selection, transcription.

tool-calling — Three demo tools (get_weather, get_time, calculate). NonBlocking + WhenIdle.

Walk

all-config — Configuration playground. Every Gemini Live option exposed as a JSON config: modality, temperature, Google Search, code execution, session resumption, context compression.

guardrails — Policy monitoring with real-time corrective injection. RegexExtractor, .watch(), .instruction_amendment(). Policies: PII (SSN, credit cards), off-topic, negative sentiment.

playbook — 6-phase customer support state machine. .phase(), .transition_with(), .greeting(), .with_context(), RegexExtractor, .watch().

Run

support-assistant — Multi-agent handoff between billing and technical support. 10-phase dual state machine, .computed() derived state, .watch() escalation, cross-agent transitions, telemetry.

call-screening — Incoming call screening with sentiment analysis and smart routing. NonBlocking tools: check_contact_list, check_calendar, take_message, transfer_call, block_caller.

clinic — HIPAA-aware telehealth appointment scheduling. 8 tools with NonBlocking behavior. Patient intake, department routing, insurance check, appointment booking.

restaurant — Reservation assistant with menu context. 6 tools, dietary and occasion tracking.

debt-collection — FDCPA-compliant debt collection. StateKey<T> typed state, compliance watchers, identity verification, cease-and-desist handling.


Platform Support

All examples work with both Google AI (API key) and Vertex AI (project/location).

FeatureGoogle AIVertex AI
Async tool calling (NonBlocking)✓ SupportedStripped automatically
Response scheduling (WhenIdle / Silent)✓ SupportedStripped automatically
WebSocket framesTextBinary (handled automatically)
Thinking config✓ SupportedStripped automatically

The SDK detects your authentication method and strips unsupported wire fields transparently — no code changes needed when switching platforms.

ADK Web UI

gemini-adk-web-rs is the interactive development environment for building and debugging Gemini Live agents. It runs a single Axum server at http://localhost:25125 that hosts all demo apps and a shared DevTools panel.

cargo run -p gemini-adk-web-rs
# → http://127.0.0.1:25125

Design System

The web UI is built on a CSS design system defined in apps/gemini-adk-web-rs/static/css/design-system.css.

Tokens

80+ CSS custom properties cover the full visual language:

CategoryExamples
Brand--brand, --brand-light, --brand-dark, --brand-glow
Surfaces--bg-root, --bg-card, --bg-elevated, --bg-inset, --bg-code
Text--text-1 (primary), --text-2 (secondary), --text-3 (muted), --text-inverse
Borders--border-1 (subtle), --border-2 (default), --border-3 (strong)
Spacing--space-1 through --space-16 (4px base scale)
Radii--radius-sm, --radius-md, --radius-lg, --radius-xl, --radius-full
Shadows--shadow-sm, --shadow-md, --shadow-lg, --shadow-xl, --shadow-glow
Motion--ease-out, --ease-spring, --duration-fast, --duration-base, --duration-slow
Semantic--success, --warning, --error, --info and their -light / -dark variants

Typography

RoleFontWeights
UI textInter300, 400, 500, 600, 700, 800, 900
Code / monospaceJetBrains Mono400, 500, 600, 700

Dark / Light Mode

Theme is toggled via a button in the navigation bar and persisted to localStorage under the key theme. The active theme is set as a data-theme attribute on <html>:

<html data-theme="dark">   <!-- or "light" -->

All design tokens redefine their values under [data-theme="dark"], so every component inherits the correct colors without any extra CSS.


Landing Page

The index.html landing page showcases the SDK before a user starts a session.

SectionDescription
HeroLaunch actions, run command, runtime preview, live stats counters (apps, layers, namespaces, recipes)
Architecture diagramThree-layer crate stack (L0/L1/L2) with the three-lane processor (Fast/Control/Telemetry)
Operator algebraInteractive showcase of S·C·T·P·M·A operators with syntax-highlighted composition examples
Pipeline visualizationAnimated flow diagram of the >>, |, *, / agent combinators
Cookbook browserFilterable example gallery with Crawl/Walk/Run difficulty tiers (see below)
Feature highlightsCards covering key SDK capabilities: phases, extraction, watchers, async tools
Glassmorphism navFrosted-glass navigation bar with scroll-aware opacity and backdrop blur

Cookbook Browser

The landing page includes a browsable gallery of all 30 cookbook examples.

  • Filter by tier: Crawl / Walk / Run buttons filter by difficulty
  • Each card shows: example number, title, brief description, and a difficulty badge
  • Click to view: links directly to the source file on GitHub

The same data powers the Cookbook panel in DevTools (see below).


DevTools Panel

When a session is active (app.html), a DevTools sidebar gives real-time visibility into every layer of the SDK. Open DevTools by clicking the </> button in the top-right corner of any app.

Panels

PanelWhat it shows
StateLive key-value state with prefix colouring (session:, turn:, app:, derived:) — updates on every turn
TimelineChronological event log with VirtualList rendering — AudioDelta, TextDelta, ToolCall, TurnComplete, etc.
PhasesCurrent phase, phase history with entry/exit timestamps, duration bars, active guards
MetricsToken counts, cost estimation, latency sparkline, turns per minute, audio throughput
TranscriptFull turn-by-turn conversation transcript with role labels
ArtifactsStructured data extracted by the extraction pipeline, grouped by schema name
EvalEvaluation results when the session runs an EvalSuite — scores per criterion
Event InspectorRaw SessionEvent stream with JSON expansion for any event
TraceOpenTelemetry-style span timeline, copy-as-OTLP button, trace ID badge
CookbookApp-specific source path, run command, features, prompts to try, and inspection checklist

Telemetry Integration

The DevTools panel receives structured data from the server via the same WebSocket connection used for the session. The server sends additional message types alongside audio/text frames:

  • SpanEvent — individual OTel span start/end
  • TurnMetrics — per-turn latency, token counts, tool call count
  • StateUpdate — delta state snapshot after each control plane cycle

Architecture

Browser                              Server (Axum)
───────                              ─────────────
index.html ──── static files ──────► apps/gemini-adk-web-rs/static/
app.html   ──── WebSocket ─────────► ws_handler.rs
                                          │
                                     SessionBridge
                                          │
                                     LiveHandle (gemini-adk-rs)
                                          │
                                     Gemini Live API (WebSocket)

SessionBridge (in apps/gemini-adk-web-rs/src/bridge.rs) wires the LiveHandle event stream to the browser WebSocket connection. It translates LiveEvent values into JSON messages the DevTools panels consume.


CSS Files

FilePurpose
design-system.cssDesign tokens, typography, theme variables
main.cssApp shell layout: nav bar, sidebar, content areas, DevTools panel
landing.cssLanding page sections: hero, architecture, algebra, cookbook browser
devtools.cssDevTools panel chrome: tabs, panel containers, scrollable lists

API Reference

The full rustdoc API reference is generated from source code doc comments and deployed alongside this guide.

CrateLayerAPI Docs
gemini-genai-rsL0 — Wire Protocolgemini_genai_rs
gemini-adk-rsL1 — Agent Runtimegemini_adk_rs
gemini-adk-fluent-rsL2 — Fluent DXgemini_adk_fluent_rs

These docs are auto-generated on every push to main and enforced with RUSTDOCFLAGS="-D warnings" on every PR.