Dispatch & Join — Fire-and-Continue Background Execution

Modules in play: dispatch() background tasks, join() barriers, >> sequential, named tasks, callbacks, progress streaming

The Real-World Problem

Your content publishing pipeline writes a blog post, then needs to send an email notification and optimize for SEO. But email and SEO don’t depend on each other, and neither blocks the main pipeline — the post should be formatted and published immediately while email and SEO run in the background. Only after publishing do you want to collect the SEO results to update metadata.

FanOut (|) won’t work — it blocks until all branches complete. race() won’t work — it cancels the losers. You need fire-and-continue: launch background tasks, keep going, and optionally wait later.

The Fluent Solution

from adk_fluent import Agent, Pipeline, dispatch, join

writer = Agent("content_writer", "gemini-2.5-flash").instruct("Write a blog post.")
email_sender = Agent("email_sender", "gemini-2.5-flash").instruct("Send email notification.")
seo_optimizer = Agent("seo_optimizer", "gemini-2.5-flash").instruct("Optimize for SEO.")
formatter = Agent("formatter", "gemini-2.5-flash").instruct("Format for the website.")
publisher = Agent("publisher", "gemini-2.5-flash").instruct("Publish the formatted content.")

# Pattern 1: Basic — fire background tasks, continue, then wait
basic_pipeline = (
    writer
    >> dispatch(email_sender, seo_optimizer)   # non-blocking
    >> formatter                                # runs immediately
    >> join()                                   # barrier: wait for all
    >> publisher
)

# Pattern 2: Named + selective join — wait for SEO before publishing,
# collect email results at the very end
named_pipeline = (
    writer
    >> dispatch(email_sender, seo_optimizer, names=["email", "seo"])
    >> formatter
    >> join("seo", timeout=30)     # wait only for SEO before publishing
    >> publisher
    >> join("email")               # collect email result at the end
)

# Pattern 3: Dispatch with callbacks
callback_results = []
callback_pipeline = (
    writer
    >> dispatch(
        email_sender,
        on_complete=lambda name, result: callback_results.append((name, "ok")),
        on_error=lambda name, exc: callback_results.append((name, "error")),
    )
    >> formatter
    >> join()
)

# Pattern 4: Progress streaming — partial results visible while running
progress_pipeline = (
    writer
    >> dispatch(seo_optimizer, progress_key="seo_progress")
    >> formatter    # can read state["seo_progress"] for live updates
    >> join()
)

The Interplay Breakdown

Why dispatch() instead of | (FanOut)? | creates a ParallelAgent that blocks until all branches complete. If email takes 10 seconds and SEO takes 2 seconds, the pipeline waits 10 seconds before proceeding. dispatch() launches both as asyncio.Tasks and returns immediately — the formatter starts while email and SEO run concurrently in the background.

Why join() as a separate step? The barrier is explicit and positional. In basic_pipeline, join() appears after the formatter — meaning formatting runs concurrently with background tasks. Moving join() before the formatter would negate the benefit.

Why named tasks + selective join()? Sometimes you need one background result before proceeding but can collect another later. join("seo") waits only for the SEO optimizer (its result might affect metadata), while join("email") at the end collects email status for logging. This is impossible with FanOut — it’s all-or-nothing.

Why progress_key? Long-running background tasks can stream partial results via progress_key. The main pipeline reads state["seo_progress"] for live updates without waiting for completion. This is essential for user-facing progress bars or dashboard updates.

Why callbacks on dispatch? on_complete and on_error provide hooks for monitoring without blocking. Log completion times, send alerts on failure, update metrics — all without adding agents to the pipeline.

Running on Different Backends

response = basic_pipeline.ask("Write a blog post about AI safety")
from temporalio.client import Client
client = await Client.connect("localhost:7233")

# dispatch() → Temporal child workflow (fire-and-forget)
# join() → await child workflow handle
# Background tasks survive main pipeline crashes
durable = basic_pipeline.engine("temporal", client=client, task_queue="content")
response = await durable.ask_async("Write a blog post about AI safety")
response = await basic_pipeline.engine("asyncio").ask_async("Write a blog post")

Pipeline Topology

writer ──► dispatch(email_sender, seo_optimizer)   [background]
           │
           ├──► formatter ──► join() ──► publisher  [main pipeline]
           │
           └──► email_sender ─┐
                seo_optimizer ─┘  [background tasks]