Text Agent Combinators

Text agents are composable units for text-based LLM pipelines. Each one implements the TextAgent trait, making a standard request/response call to a language model (no WebSocket session required). You can snap them together -- sequential, parallel, branching, looping -- to build multi-step reasoning pipelines.

The TextAgent Trait

Every text agent implements one method:

#[async_trait]
pub trait TextAgent: Send + Sync {
    fn name(&self) -> &str;
    async fn run(&self, state: &State) -> Result<String, AgentError>;
}

State is a concurrent typed key-value store shared across the pipeline. Agents read input from state.get::<String>("input") and write output to state.set("output", &result). That is the entire contract.

Combinator Reference

CombinatorPurposeAnalogy
LlmTextAgentCore agent -- generate, tool dispatch, loopGemini(prompt)
FnTextAgentWrap a closure as an agent (no LLM call)|state| { ... }
SequentialTextAgentRun agents in order, pipe output forwardA >> B >> C
ParallelTextAgentRun agents concurrently, collect all results[A, B, C]
RaceTextAgentRun concurrently, return first successA | B | C
RouteTextAgentRoute to agent based on state predicateif X -> A, if Y -> B
FallbackTextAgentTry agents in order until one succeedsA ?? B ?? C
LoopTextAgentRepeat until max iterations or predicatewhile(!done) { A }
MapOverTextAgentApply agent to each item in a state listitems.map(A)
TapTextAgentRead-only side effect (logging, metrics)tap(log)
TimeoutTextAgentWrap an agent with a time limittimeout(5s, A)
DispatchTextAgentFire-and-forget background tasksspawn(A, B)
JoinTextAgentWait for dispatched tasks to completejoin(tasks)

LlmTextAgent -- The Core Agent

LlmTextAgent is the workhorse. It calls BaseLlm::generate(), dispatches any tool calls the model makes, feeds tool results back, and loops until the model produces a final text response (up to 10 rounds).

use gemini_adk_rs::text::LlmTextAgent;
use gemini_adk_rs::llm::GeminiLlm;

let llm = Arc::new(GeminiLlm::new(GeminiModel::Gemini2_0Flash));

let agent = LlmTextAgent::new("analyst", llm)
    .instruction("Analyze the given topic and produce a summary.")
    .temperature(0.3)
    .max_output_tokens(2048)
    .tools(Arc::new(tool_dispatcher));

let state = State::new();
state.set("input", "Explain Rust's ownership model");

let result = agent.run(&state).await?;
println!("{result}");

FnTextAgent -- Zero-Cost Transforms

When you need a pipeline step that does not call an LLM -- data formatting, validation, state manipulation -- use FnTextAgent:

use gemini_adk_rs::text::FnTextAgent;

let formatter = FnTextAgent::new("format_output", |state| {
    let raw = state.get::<String>("input").unwrap_or_default();
    let formatted = format!("## Summary\n\n{raw}");
    state.set("output", &formatted);
    Ok(formatted)
});

Building Pipelines

Sequential: A >> B >> C

Each agent's output becomes the next agent's input via state.set("input", &output). The final agent's output is the pipeline result.

use gemini_adk_rs::text::SequentialTextAgent;

let pipeline = SequentialTextAgent::new("analysis_pipeline", vec![
    Arc::new(LlmTextAgent::new("extract", llm.clone())
        .instruction("Extract key claims from the text.")),
    Arc::new(LlmTextAgent::new("validate", llm.clone())
        .instruction("Fact-check each claim. Flag unsupported ones.")),
    Arc::new(LlmTextAgent::new("summarize", llm.clone())
        .instruction("Produce a final summary with confidence ratings.")),
]);

let state = State::new();
state.set("input", raw_document);
let summary = pipeline.run(&state).await?;

Parallel: Run concurrently, collect all

All branches execute concurrently via tokio::spawn. Results are joined with newlines.

use gemini_adk_rs::text::ParallelTextAgent;

let multi_perspective = ParallelTextAgent::new("perspectives", vec![
    Arc::new(LlmTextAgent::new("technical", llm.clone())
        .instruction("Analyze from a technical perspective.")),
    Arc::new(LlmTextAgent::new("business", llm.clone())
        .instruction("Analyze from a business perspective.")),
    Arc::new(LlmTextAgent::new("legal", llm.clone())
        .instruction("Analyze from a legal perspective.")),
]);

Race: First to finish wins

Like ParallelTextAgent, but returns only the first result and cancels the rest. Useful for redundancy or trying multiple model configurations:

use gemini_adk_rs::text::RaceTextAgent;

let fastest = RaceTextAgent::new("race", vec![
    Arc::new(LlmTextAgent::new("fast", fast_llm.clone())
        .instruction("Answer the question.")),
    Arc::new(LlmTextAgent::new("thorough", slow_llm.clone())
        .instruction("Answer the question in depth.")),
]);

Route: State-driven branching

Evaluate predicates against state. First match wins, with a default fallback:

use gemini_adk_rs::text::{RouteTextAgent, RouteRule};

let router = RouteTextAgent::new(
    "issue_router",
    vec![
        RouteRule::new(
            |s| s.get::<String>("category") == Some("billing".into()),
            Arc::new(billing_agent),
        ),
        RouteRule::new(
            |s| s.get::<String>("category") == Some("technical".into()),
            Arc::new(tech_agent),
        ),
    ],
    Arc::new(general_agent), // default
);

Fallback: Try until one succeeds

Attempts each candidate in order. Returns the first Ok result. If all fail, returns the last error:

use gemini_adk_rs::text::FallbackTextAgent;

let robust = FallbackTextAgent::new("robust_lookup", vec![
    Arc::new(primary_agent),
    Arc::new(cache_agent),
    Arc::new(fallback_agent),
]);

Loop: Repeat with termination

Runs the body up to max times, optionally breaking early when a state predicate returns true:

use gemini_adk_rs::text::LoopTextAgent;

let refiner = LoopTextAgent::new("refine", Arc::new(draft_agent), 5)
    .until(|state| {
        state.get::<String>("quality")
            .map(|q| q == "good")
            .unwrap_or(false)
    });

MapOver: Apply agent to each item

Reads a list from state, runs the agent once per item, collects results:

use gemini_adk_rs::text::MapOverTextAgent;

let processor = MapOverTextAgent::new("process_items", Arc::new(item_agent), "items")
    .item_key("current_item")
    .output_key("processed_results");

// State must contain: state.set("items", vec!["item1", "item2", "item3"]);

Tap: Observe without mutation

For logging, metrics, or debugging. Returns an empty string and does not mutate the pipeline flow:

use gemini_adk_rs::text::TapTextAgent;

let logger = TapTextAgent::new("log_state", |state| {
    let input = state.get::<String>("input").unwrap_or_default();
    tracing::info!("Pipeline state - input length: {}", input.len());
});

Timeout: Time-limited execution

Wraps any agent with a deadline. Returns AgentError::Timeout if exceeded:

use gemini_adk_rs::text::TimeoutTextAgent;

let bounded = TimeoutTextAgent::new(
    "bounded_analysis",
    Arc::new(slow_agent),
    Duration::from_secs(30),
);

Dispatch + Join: Background tasks

DispatchTextAgent spawns agents as background tasks with a concurrency budget. JoinTextAgent waits for them:

use gemini_adk_rs::text::{DispatchTextAgent, JoinTextAgent, TaskRegistry};

let registry = TaskRegistry::new();
let budget = Arc::new(tokio::sync::Semaphore::new(4)); // max 4 concurrent

let dispatcher = DispatchTextAgent::new(
    "spawn_tasks",
    vec![
        ("research".into(), Arc::new(research_agent) as Arc<dyn TextAgent>),
        ("analysis".into(), Arc::new(analysis_agent) as Arc<dyn TextAgent>),
    ],
    registry.clone(),
    budget,
);

let joiner = JoinTextAgent::new("collect", registry)
    .timeout(Duration::from_secs(60));

// In a pipeline: dispatch, do other work, then join
let pipeline = SequentialTextAgent::new("bg_pipeline", vec![
    Arc::new(dispatcher),
    Arc::new(other_work_agent),
    Arc::new(joiner),
]);

Agent as Tool: Bridging Voice and Text

TextAgentTool wraps any TextAgent as a ToolFunction, so the live voice model can dispatch text agent pipelines as tool calls. State is shared bidirectionally -- the text agent reads live-extracted values, and its mutations are visible to watchers and phase transitions.

use gemini_adk_rs::text_agent_tool::TextAgentTool;

// Build a multi-step verification pipeline
let verifier = SequentialTextAgent::new("verify_pipeline", vec![
    Arc::new(LlmTextAgent::new("lookup", flash_llm.clone())
        .instruction("Look up the account in the database")
        .tools(Arc::new(db_tools))),
    Arc::new(LlmTextAgent::new("cross_ref", flash_llm.clone())
        .instruction("Cross-reference identity against account record")),
]);

// Wrap as a tool for the voice session
let tool = TextAgentTool::new(
    "verify_identity",
    "Verify caller identity against account records",
    verifier,
    state.clone(), // shared state with the live session
);

dispatcher.register_function(Arc::new(tool));

When the voice model calls verify_identity, the entire sequential pipeline runs via BaseLlm::generate() (not over WebSocket), and the result is returned as the tool response.

Fluent Operator Algebra

If you use the gemini-adk-fluent-rs crate (L2), you get operator syntax for composing agents:

use gemini_adk_fluent_rs::prelude::*;

// Sequential pipeline: >>
let pipeline = AgentBuilder::new("writer").instruction("Write a draft")
    >> AgentBuilder::new("reviewer").instruction("Review and improve");

// Parallel fan-out: |
let analysis = AgentBuilder::new("tech").instruction("Technical analysis")
    | AgentBuilder::new("business").instruction("Business analysis");

// Fixed loop: * N
let polished = AgentBuilder::new("refiner").instruction("Polish the text") * 3;

// Conditional loop: * until(predicate)
let converge = AgentBuilder::new("iterate").instruction("Improve")
    * until(|v| v["quality"].as_str() == Some("good"));

// Fallback chain: /
let robust = AgentBuilder::new("primary").instruction("Try this first")
    / AgentBuilder::new("backup").instruction("Fall back to this");

// Compile the tree into an executable TextAgent
let agent = pipeline.compile(llm);
let result = agent.run(&state).await?;

Real-World Pattern: Multi-Step Analysis Pipeline

Here is a complete pipeline that extracts claims from a document, validates them in parallel, and produces a confidence-rated summary:

use gemini_adk_fluent_rs::prelude::*;

let extract = AgentBuilder::new("extract")
    .instruction("Extract all factual claims from the text. Output one claim per line.")
    .temperature(0.1);

let validate = AgentBuilder::new("validate")
    .instruction("For each claim, determine if it is supported, unsupported, or misleading.")
    .google_search()
    .temperature(0.2);

let summarize = AgentBuilder::new("summarize")
    .instruction("Produce a final report with confidence ratings for each claim.")
    .temperature(0.3);

// extract -> validate -> summarize
let pipeline = extract >> validate >> summarize;

let agent = pipeline.compile(llm);
let state = State::new();
state.set("input", document_text);

let report = agent.run(&state).await?;

For pre-built patterns like review loops and supervised workflows, see the patterns module:

use gemini_adk_fluent_rs::patterns::{review_loop, cascade, fan_out_merge, supervised};

// Worker -> Reviewer -> repeat until quality passes
let reviewed = review_loop(
    AgentBuilder::new("writer").instruction("Write a report"),
    AgentBuilder::new("reviewer").instruction("Rate quality as 'good' or 'needs_work'"),
    "quality",  // state key to check
    "good",     // target value
    3,          // max rounds
);

// Try each model until one succeeds
let robust = cascade(vec![primary_agent, secondary_agent, fallback_agent]);

// Run all in parallel, merge results
let multi = fan_out_merge(vec![tech_analyst, business_analyst, legal_analyst]);

// Worker -> Supervisor with approval gate
let approved = supervised(
    AgentBuilder::new("drafter").instruction("Draft the document"),
    AgentBuilder::new("supervisor").instruction("Approve or request revisions"),
    "approved",  // boolean state key
    5,           // max revisions
);