Timeout: Real-Time Trading Agent with Strict Execution Deadline¶
Tip
What you’ll learn How to use timeout: real-time trading agent with strict execution deadline with the fluent API.
Source: 40_timeout.py
from adk_fluent import Agent, Pipeline
# Scenario: A real-time trading system where market analysis must complete
# within strict time bounds. Stale analysis is worse than no analysis.
# .timeout(seconds): wrap any agent with a time limit
# Market data analysis must complete within 5 seconds or be discarded
market_analyzer = (
Agent("market_analyzer")
.model("gemini-2.5-flash")
.instruct(
"Analyze current market conditions for the requested ticker symbol. Identify trend direction and volatility."
)
.timeout(5)
)
# Timeout in a pipeline -- only the slow step is time-bounded
# The strategy computation gets 30 seconds; other steps run without limits
trading_pipeline = (
Agent("data_ingest").model("gemini-2.5-flash").instruct("Ingest real-time market data for the portfolio.")
>> Agent("strategy_engine")
.model("gemini-2.5-flash")
.instruct("Compute optimal trading strategy based on current positions and market conditions.")
.timeout(30)
>> Agent("order_formatter").model("gemini-2.5-flash").instruct("Format the strategy as executable trade orders.")
)
# Timeout on an entire pipeline -- the full analysis-to-execution flow
# must complete within 60 seconds to catch the trading window
bounded_execution = (
Agent("pre_trade_check").model("gemini-2.5-flash").instruct("Verify margin requirements and position limits.")
>> Agent("trade_executor").model("gemini-2.5-flash").instruct("Execute the trade orders against the exchange.")
).timeout(60)
# Native ADK has no built-in timeout mechanism. You'd need to:
# 1. Subclass BaseAgent
# 2. Run the sub-agent in an asyncio.create_task
# 3. Use asyncio.Queue to forward events with deadline tracking
# 4. Cancel the task on timeout
# For trading systems, a missed deadline can mean significant losses.
# This is ~40 lines of async boilerplate per timeout.
graph TD
n1[["data_ingest_then_strategy_engine_timeout_2_then_order_formatter (sequence)"]]
n2["data_ingest"]
n3["strategy_engine_timeout_2 (timeout 30s)"]
n4["strategy_engine"]
n5["order_formatter"]
n3 --> n4
n2 --> n3
n3 --> n5
Equivalence¶
from adk_fluent._primitive_builders import TimedAgent
from adk_fluent._base import BuilderBase
# .timeout() returns a TimedAgent
assert isinstance(market_analyzer, TimedAgent)
assert isinstance(market_analyzer, BuilderBase)
# Stores the timeout duration
assert market_analyzer._seconds == 5
# Builds with sub-agent
built = market_analyzer.build()
assert len(built.sub_agents) == 1
assert built.sub_agents[0].name == "market_analyzer"
# Name includes original agent name for tracing
assert "market_analyzer" in market_analyzer._config["name"]
# Composable in pipeline -- timeout is transparent to pipeline construction
assert isinstance(trading_pipeline, Pipeline)
built_pipeline = trading_pipeline.build()
assert len(built_pipeline.sub_agents) == 3
# Pipeline-level timeout
assert isinstance(bounded_execution, TimedAgent)
assert bounded_execution._seconds == 60