Investment Analysis Pipeline — Route, Loop, and Conditional Delivery¶
Modules in play:
Routedeterministic branching,>>sequential,|parallel,loop_untilquality review,proceed_ifconditional gating,Presetshared configuration
The Real-World Problem¶
Your portfolio management team handles three asset classes — equities, fixed income, and alternatives — each requiring different analysis workflows. Today, a human triages each request to the right team. Fixed income needs a credit analyst then a rate modeler (sequential). Alternatives need a quant modeler and sentiment analyzer running in parallel, then a risk aggregator. After analysis, a portfolio reviewer iterates until quality is “approved,” and only then does a client-ready report get generated.
You need: instant routing by asset class (no LLM waste), per-class analysis topologies (sequential, parallel, or hybrid), bounded quality iteration, and a gated final deliverable.
The Fluent Solution¶
from adk_fluent import Agent, Pipeline
from adk_fluent._routing import Route
from adk_fluent.presets import Preset
# Shared preset — every agent logs for SEC/FINRA compliance
def compliance_log(callback_context, llm_response):
"""Audit trail for all model responses."""
pass
production = Preset(model="gemini-2.5-flash", after_model=compliance_log)
# Step 1: Classify asset type
asset_classifier = (
Agent("asset_classifier")
.instruct("Classify as 'equity', 'fixed_income', or 'alternative'.")
.writes("asset_class")
.use(production)
)
# Step 2: Per-class analysis topologies
equity_analysis = (
Agent("equity_screener")
.instruct("Screen equities: P/E ratio, revenue growth, moat.")
.use(production)
)
fixed_income_analysis = (
Agent("credit_analyst")
.instruct("Analyze credit risk, yield curves, duration.")
.use(production)
>> Agent("rate_modeler")
.instruct("Model interest rate scenarios and bond price impact.")
.use(production)
)
alternative_analysis = (
Agent("quant_modeler")
.instruct("Build quantitative models for alternative assets.")
.use(production)
| Agent("market_sentiment")
.instruct("Analyze sentiment from news and social media.")
.use(production)
) >> Agent("risk_aggregator").instruct("Aggregate risk factors.").use(production)
# Step 3: Quality review loop — iterate until approved
quality_review = (
Agent("portfolio_reviewer")
.instruct("Review for completeness and accuracy. Rate quality.")
.writes("review_quality")
.use(production)
>> Agent("analysis_refiner")
.instruct("Refine based on feedback. Address gaps.")
.use(production)
).loop_until(lambda s: s.get("review_quality") == "approved", max_iterations=3)
# Step 4: Generate report ONLY if approved
report_generator = (
Agent("report_generator")
.instruct("Generate client-ready investment memo with thesis and risks.")
.proceed_if(lambda s: s.get("review_quality") == "approved")
.use(production)
)
# THE SYMPHONY
pipeline = (
asset_classifier
>> Route("asset_class")
.eq("equity", equity_analysis)
.eq("fixed_income", fixed_income_analysis)
.eq("alternative", alternative_analysis)
>> quality_review
>> report_generator
)
The Interplay Breakdown¶
Why Route() instead of LLM delegation?
The classifier already wrote "equity" or "fixed_income" to state. Sending
that string to another LLM to “decide” which team to call is pure waste.
Route("asset_class").eq("equity", ...) dispatches in microseconds with zero
API cost. Adding a new asset class (e.g., “crypto”) is one .eq() line — no
prompt engineering, no retraining.
Why different topologies per route? Each asset class has a structurally different analysis workflow:
Equity: single screener (one agent)
Fixed income: sequential
>>— credit analysis must precede rate modelingAlternative: parallel
|(quant + sentiment) then sequential>>aggregation
This is impossible in frameworks that force uniform topology. adk-fluent lets each route branch have its own composition.
Why Preset for compliance?
Every agent in a financial pipeline needs an audit trail. Without Preset,
you’d copy-paste .model("gemini-2.5-flash").after_model(compliance_log) on
every single agent. Preset applies shared configuration via .use(production) —
change the model or logging once, it propagates everywhere.
Why loop_until with max_iterations=3?
Quality review is inherently iterative — some analyses need one pass, others three.
A fixed number of rounds is wasteful or insufficient. loop_until(approved)
makes iteration adaptive. The max_iterations=3 cap prevents the reviewer from
endlessly rejecting, ensuring the pipeline always terminates.
Why proceed_if on the report generator?
If the review loop exhausts all 3 iterations without approval, generating a
client report would deliver substandard analysis. proceed_if(approved) gates
the final deliverable — if not approved after 3 rounds, the pipeline exits
without producing a report, signaling that human intervention is needed.
Running on Different Backends¶
response = pipeline.ask("Analyze NVIDIA as a potential portfolio addition")
from temporalio.client import Client
client = await Client.connect("localhost:7233")
# Route() is deterministic — replays identically (zero cost)
# Review loop iterations are individually checkpointed
durable = pipeline.engine("temporal", client=client, task_queue="invest")
response = await durable.ask_async("Analyze NVIDIA as a potential portfolio addition")
response = await pipeline.engine("asyncio").ask_async("Analyze NVIDIA")
Pipeline Topology¶
asset_classifier ──► Route("asset_class")
├─ "equity" → equity_screener
├─ "fixed_income" → credit_analyst ──► rate_modeler
└─ "alternative" → (quant_modeler | market_sentiment) ──► risk_aggregator
──► (portfolio_reviewer >> analysis_refiner) * until(approved)
──► report_generator [gated: approved only]