Deep Research Agent — Gemini Deep Research in 20 Lines¶
Modules in play:
>>sequential,|parallel,* until()conditional loop,@typed output,S.*state transforms,C.*context engineering
The Real-World Problem¶
Your analyst team spends 4 hours on a research brief: they decompose a question, search three sources independently, synthesize the findings, then review-revise until the report meets quality standards. Most of that time is mechanical — the same decompose-search-synthesize-review pattern every time. You need a pipeline that runs web, academic, and news searches in parallel, feeds results into a synthesizer, loops a quality review until confidence hits 85%, and outputs a typed report — not free-form text that breaks your downstream systems.
The Fluent Solution¶
from pydantic import BaseModel
from adk_fluent import Agent, Pipeline, S, C
MODEL = "gemini-2.5-flash"
class ResearchReport(BaseModel):
"""Structured output for the final research report."""
title: str
executive_summary: str
key_findings: list[str]
confidence_score: float
# Stage 1: Decompose the research query into sub-questions
query_analyzer = (
Agent("query_analyzer", MODEL)
.instruct(
"Analyze the research query. Identify the core question, "
"decompose it into 3-5 sub-questions, and determine which "
"sources are most relevant (academic, news, web)."
)
.writes("research_plan")
)
# Stage 2: Search three source types IN PARALLEL
parallel_search = (
Agent("web_searcher", MODEL)
.instruct("Search the web for relevant articles. Summarize key findings.")
.context(C.from_state("research_plan"))
.writes("web_results")
| Agent("academic_searcher", MODEL)
.instruct("Search academic databases for peer-reviewed papers.")
.context(C.from_state("research_plan"))
.writes("academic_results")
| Agent("news_searcher", MODEL)
.instruct("Search recent news for current developments.")
.context(C.from_state("research_plan"))
.writes("news_results")
)
# Stage 3: Synthesize findings from all sources
synthesizer = (
Agent("synthesizer", MODEL)
.instruct(
"Synthesize findings from web, academic, and news sources. "
"Identify consensus, contradictions, and gaps. "
"Rate confidence on a 0-1 scale."
)
.context(C.from_state("web_results", "academic_results", "news_results"))
.writes("synthesis")
)
# Stage 4: Quality review loop — reviewer scores until confident
quality_loop = (
Agent("quality_reviewer", MODEL)
.instruct(
"Review the synthesis for accuracy, completeness, and bias. "
"Score quality from 0 to 1. If below 0.85, specify improvements."
)
.context(C.from_state("synthesis"))
.writes("quality_score")
>> Agent("revision_agent", MODEL)
.instruct("Revise the synthesis based on reviewer feedback.")
.context(C.from_state("synthesis", "quality_score"))
.writes("synthesis")
).loop_until(lambda s: float(s.get("quality_score", 0)) >= 0.85, max_iterations=3)
# Stage 5: Format final report with TYPED output
report_writer = (
Agent("report_writer", MODEL)
.instruct("Write the final report with executive summary and key findings.")
.context(C.from_state("synthesis"))
@ ResearchReport
)
# THE SYMPHONY: one expression, five stages
deep_research = (
query_analyzer >> parallel_search >> synthesizer >> quality_loop >> report_writer
)
The Interplay Breakdown¶
This pipeline isn’t just “agents in sequence.” Every module choice solves a specific problem:
Why | (parallel) for the search stage?
Web, academic, and news searches are independent. Running them sequentially
would triple the latency for zero benefit. The | operator launches all three
as concurrent branches under a ParallelAgent. When all three finish, their
results are merged into shared state — web_results, academic_results,
news_results — available to the synthesizer.
Why C.from_state() instead of default context?
Without context engineering, each searcher would see the full conversation
history — including the query analyzer’s internal reasoning. That’s noise.
C.from_state("research_plan") gives each searcher only the decomposed
plan, not the raw user query or other agents’ outputs. The synthesizer uses
C.from_state("web_results", "academic_results", "news_results") to see
only the search results, not the plan or the original query.
Why * until() for quality review?
A fixed number of review rounds either wastes time (3 rounds when 1 suffices)
or produces low-quality output (1 round when 3 are needed). The loop_until
predicate — quality_score >= 0.85 — makes the loop adaptive. The max_iterations=3
cap prevents runaway loops when the reviewer is never satisfied.
Why @ (typed output) on the report writer?
Free-form text breaks downstream systems. The @ ResearchReport binding
forces the LLM to output JSON conforming to the Pydantic schema. If the
output doesn’t parse, ADK raises immediately — no silent corruption.
Why .writes() everywhere?
Each agent writes to a named state key. This creates an explicit data contract:
query_analyzer produces research_plan, searchers produce *_results,
synthesizer produces synthesis. If you rename a key, check_contracts()
catches the break at build time, not in production.
Pipeline Topology¶
query_analyzer ──► ┌─ web_searcher ────┐
│ academic_searcher │──► synthesizer ──► ┌─ quality_reviewer ─┐ * until(≥0.85)
│ news_searcher ────┘ │ revision_agent ───┘
──► report_writer @ ResearchReport
Execution Sequence¶
This is what actually happens at runtime — the order of LLM calls, data flow between agents, and where parallel execution occurs. Auto-generated with .to_sequence_diagram().
sequenceDiagram
participant QA as query_analyzer
participant WS as web_searcher
participant AS as academic_searcher
participant NS as news_searcher
participant SY as synthesizer
participant QR as quality_reviewer
participant RA as revision_agent
participant RW as report_writer
QA->>QA: LLM call
Note right of QA: writes research_plan
par Parallel execution
QA-->>WS: state[research_plan]
Note right of WS: C.from_state("research_plan")
WS->>WS: LLM call
Note right of WS: writes web_results
and
QA-->>AS: state[research_plan]
Note right of AS: C.from_state("research_plan")
AS->>AS: LLM call
Note right of AS: writes academic_results
and
QA-->>NS: state[research_plan]
Note right of NS: C.from_state("research_plan")
NS->>NS: LLM call
Note right of NS: writes news_results
end
WS-->>SY: state[web_results, academic_results, news_results]
Note right of SY: C.from_state("web_results", "academic_results", "news_results")
SY->>SY: LLM call
Note right of SY: writes synthesis
loop max 3 iterations (until quality_score >= 0.85)
SY-->>QR: state[synthesis]
Note right of QR: C.from_state("synthesis")
QR->>QR: LLM call
Note right of QR: writes quality_score
alt score < 0.85
QR-->>RA: state[synthesis, quality_score]
Note right of RA: C.from_state("synthesis", "quality_score")
RA->>RA: LLM call
Note right of RA: overwrites synthesis
else score >= 0.85
Note over QR,RA: Loop exits
end
end
RA-->>RW: state[synthesis]
Note right of RW: C.from_state("synthesis")
RW->>RW: LLM call
Note right of RW: @ ResearchReport (typed output)
Running on Different Backends¶
The pipeline definition above is backend-agnostic. Here’s how to execute it on each backend:
# Default backend — no .engine() needed
response = deep_research.ask("What are the latest advances in quantum computing?")
print(response)
from temporalio.client import Client
client = await Client.connect("localhost:7233")
# Same pipeline definition, durable execution
durable_research = deep_research.engine("temporal", client=client, task_queue="research")
response = await durable_research.ask_async("What are the latest advances in quantum computing?")
# If the process crashes during the quality loop, Temporal replays:
# - query_analyzer: cached (0 LLM cost)
# - 3 searchers: cached (0 LLM cost)
# - synthesizer: cached (0 LLM cost)
# - quality loop: resumes from last completed iteration
# Pure asyncio, no ADK dependency at runtime
async_research = deep_research.engine("asyncio")
response = await async_research.ask_async("What are the latest advances in quantum computing?")
Framework Comparison¶
Framework |
Lines |
Notes |
|---|---|---|
adk-fluent |
~50 |
One expression per stage, compose with |
Native ADK |
~120 |
7 LlmAgent + ParallelAgent + SequentialAgent + custom LoopAgent |
LangGraph |
~80 |
StateGraph + conditional back-edges + fan-out nodes |
CrewAI |
~70 |
No native parallel; no typed output; no quality loop control |
The adk-fluent version reads like a business process document. The native ADK version reads like infrastructure code. Both produce identical runtime objects.