Map Over: Batch Processing Customer Feedback with Iteration¶
Tip
What you’ll learn How to use map over: batch processing customer feedback with iteration with the fluent API.
Source: 39_map_over.py
from adk_fluent import Agent, Pipeline, map_over
# Scenario: A customer success platform ingests feedback from multiple channels.
# Each feedback entry needs individual sentiment analysis before aggregation.
# map_over(): iterate an agent over each item in a state list
# For each item in state["feedback_entries"], runs the sentiment analyzer
feedback_mapper = map_over(
"feedback_entries",
Agent("sentiment_analyzer")
.model("gemini-2.5-flash")
.instruct("Analyze the sentiment of the customer feedback in _item. Rate as positive, neutral, or negative."),
output_key="sentiment_scores",
)
# Custom item_key and output_key for processing support tickets
ticket_mapper = map_over(
"support_tickets",
Agent("priority_classifier")
.model("gemini-2.5-flash")
.instruct("Classify the urgency of the support ticket in _ticket. Assign P1, P2, or P3."),
item_key="_ticket",
output_key="priority_assignments",
)
# map_over in a full feedback processing pipeline:
# 1. Collect feedback from all channels
# 2. Analyze each piece individually
# 3. Generate an executive summary
feedback_pipeline = (
Agent("feedback_collector")
.model("gemini-2.5-flash")
.instruct("Collect customer feedback from all channels.")
.writes("feedback_entries")
>> map_over(
"feedback_entries",
Agent("sentiment_analyzer").model("gemini-2.5-flash").instruct("Analyze sentiment of this feedback entry."),
)
>> Agent("summary_writer")
.model("gemini-2.5-flash")
.instruct("Write an executive summary of the sentiment analysis results.")
)
# Native ADK requires a custom BaseAgent to iterate over list items:
from google.adk.agents.base_agent import BaseAgent as NativeBaseAgent
from google.adk.agents.llm_agent import LlmAgent
class MapOverAgent(NativeBaseAgent):
"""Custom agent that iterates a sub-agent over each item in a state list."""
async def _run_async_impl(self, ctx):
items = ctx.session.state.get("feedback_entries", [])
results = []
for item in items:
ctx.session.state["_item"] = item
async for event in self.sub_agents[0].run_async(ctx):
yield event
results.append(ctx.session.state.get("_item", None))
ctx.session.state["sentiment_scores"] = results
sentiment_analyzer = LlmAgent(
name="sentiment_analyzer",
model="gemini-2.5-flash",
instruction="Analyze the sentiment of the customer feedback in _item. Rate as positive, neutral, or negative.",
)
native_mapper = MapOverAgent(name="feedback_mapper", sub_agents=[sentiment_analyzer])
graph TD
n1[["feedback_collector_then_map_over_feedback_entries_3_then_summary_writer (sequence)"]]
n2["feedback_collector"]
n3(("map_over_feedback_entries_3 (map feedback_entries)"))
n4["sentiment_analyzer"]
n5["summary_writer"]
n3 --> n4
n2 --> n3
n3 --> n5
Equivalence¶
from adk_fluent._primitive_builders import _MapOverBuilder
from adk_fluent._base import BuilderBase
# map_over returns a _MapOverBuilder
assert isinstance(feedback_mapper, _MapOverBuilder)
assert isinstance(feedback_mapper, BuilderBase)
# Stores configuration correctly
assert feedback_mapper._list_key == "feedback_entries"
assert feedback_mapper._item_key == "_item"
assert feedback_mapper._output_key == "sentiment_scores"
# Custom keys are preserved
assert ticket_mapper._list_key == "support_tickets"
assert ticket_mapper._item_key == "_ticket"
assert ticket_mapper._output_key == "priority_assignments"
# Builds with the sub-agent
built = feedback_mapper.build()
assert len(built.sub_agents) == 1
assert built.sub_agents[0].name == "sentiment_analyzer"
# Composable in pipeline
assert isinstance(feedback_pipeline, Pipeline)
built_pipeline = feedback_pipeline.build()
assert len(built_pipeline.sub_agents) == 3
# Name includes the list key for traceability
assert "feedback_entries" in feedback_mapper._config["name"]