Live Translation Pipeline – Streaming with .stream()

Real-world use case: Real-time translation pipeline for live event transcription. Transcribes audio, translates, and formats subtitles – all streaming. Critical for live conferences, court interpreting, and broadcast captioning where latency matters.

In other frameworks: LangGraph supports streaming via astream_events but requires graph compilation and manual event filtering. adk-fluent exposes .stream() directly on any pipeline, making token-by-token output a single async for loop.

Tip

What you’ll learn How to compose agents into a sequential pipeline.

Source: 09_streaming.py

from adk_fluent import Agent

# The fluent API makes streaming a single async for loop:
# async for chunk in pipeline.stream("audio data here"):
#     print(chunk, end="")

transcriber = (
    Agent("transcriber")
    .model("gemini-2.5-flash")
    .instruct("Transcribe the incoming audio stream to text. Preserve speaker labels and timestamps.")
    .writes("transcript")
)

translator = (
    Agent("translator")
    .model("gemini-2.5-flash")
    .instruct("Translate the transcript to Spanish. Preserve speaker labels and formatting.")
)

pipeline_fluent = transcriber >> translator

# Build both to compare
built_native = pipeline_native
built_fluent = pipeline_fluent.build()
from google.adk.agents.llm_agent import LlmAgent
from google.adk.agents.sequential_agent import SequentialAgent

# Native ADK: streaming requires manual event iteration
#   async for event in runner.run_async(...):
#       if event.content and event.content.parts:
#           for part in event.content.parts:
#               if part.text:
#                   yield part.text

transcriber_native = LlmAgent(
    name="transcriber",
    model="gemini-2.5-flash",
    instruction=("Transcribe the incoming audio stream to text. Preserve speaker labels and timestamps."),
    output_key="transcript",
)

translator_native = LlmAgent(
    name="translator",
    model="gemini-2.5-flash",
    instruction=("Translate the transcript to Spanish. Preserve speaker labels and formatting."),
)

pipeline_native = SequentialAgent(
    name="translation_pipeline",
    sub_agents=[transcriber_native, translator_native],
)
        graph TD
    n1[["transcriber_then_translator (sequence)"]]
    n2["transcriber"]
    n3["translator"]
    n2 --> n3
    

Equivalence

# Both produce SequentialAgent with 2 sub-agents
assert type(built_native) == type(built_fluent)
assert len(built_native.sub_agents) == len(built_fluent.sub_agents)
assert built_fluent.sub_agents[0].name == "transcriber"
assert built_fluent.sub_agents[1].name == "translator"

# Transcriber stores output in state for downstream consumption
assert built_fluent.sub_agents[0].output_key == "transcript"

# stream() is available on Agent builders
assert hasattr(transcriber, "stream")
assert callable(transcriber.stream)

# stream() is also available on the translator Agent builder
assert hasattr(translator, "stream")
assert callable(translator.stream)