ML Inference Monitoring: Performance Tap for Pure Observation¶
Tip
What you’ll learn How to use ml inference monitoring: performance tap for pure observation with the fluent API.
Source: 35_tap_observation.py
from adk_fluent import Agent, Pipeline, tap
# tap(): creates a pure observation step — reads state, never mutates.
# Perfect for monitoring ML pipeline health without affecting predictions.
ml_pipeline = (
Agent("feature_engineer")
.model("gemini-2.5-flash")
.instruct("Extract and normalize features from the raw input data.")
>> tap(lambda s: print(f"Features extracted: {len(s)} keys in state"))
>> Agent("inference_engine")
.model("gemini-2.5-flash")
.instruct("Run inference on the prepared features and return predictions.")
)
# Named functions keep their name — better observability in production dashboards
def log_prediction_metrics(state):
"""Log prediction metadata to the monitoring system."""
confidence = state.get("confidence", "unknown")
latency = state.get("latency_ms", "unknown")
print(f"[MONITOR] Confidence: {confidence}, Latency: {latency}ms")
pipeline_with_monitoring = (
Agent("model_server")
.model("gemini-2.5-flash")
.instruct("Execute the ML model and return predictions with confidence scores.")
>> tap(log_prediction_metrics)
>> Agent("response_formatter")
.model("gemini-2.5-flash")
.instruct("Format the prediction into a human-readable response.")
)
# .tap() method on any builder — inline monitoring for quick debugging
pipeline_method = (
Agent("anomaly_detector")
.model("gemini-2.5-flash")
.instruct("Detect anomalies in the incoming data stream.")
.tap(lambda s: print(f"Anomaly detection complete, state keys: {list(s.keys())}"))
)
# Native ADK requires subclassing BaseAgent for a pure observation step.
# In an ML inference pipeline, you need to log latency and prediction
# metadata without mutating the pipeline state:
from google.adk.agents.base_agent import BaseAgent as NativeBaseAgent
from google.adk.agents.llm_agent import LlmAgent
from google.adk.agents.sequential_agent import SequentialAgent
class LogInferenceMetrics(NativeBaseAgent):
"""Custom agent just to log inference metrics without modifying state."""
async def _run_async_impl(self, ctx):
prediction = ctx.session.state.get("prediction", {})
print(f"Inference result: {prediction}")
print(f"Model confidence: {ctx.session.state.get('confidence', 'N/A')}")
# yield nothing -- pure observation
preprocessor = LlmAgent(name="preprocessor", model="gemini-2.5-flash", instruction="Preprocess input.")
logger = LogInferenceMetrics(name="metrics_logger")
postprocessor = LlmAgent(name="postprocessor", model="gemini-2.5-flash", instruction="Format output.")
pipeline_native = SequentialAgent(name="pipeline", sub_agents=[preprocessor, logger, postprocessor])
graph TD
n1[["feature_engineer_then_tap_1_then_inference_engine (sequence)"]]
n2["feature_engineer"]
n3>"tap_1 tap"]
n4["inference_engine"]
n2 --> n3
n3 --> n4
Equivalence¶
from adk_fluent._primitive_builders import _TapBuilder
# tap() creates a _TapBuilder
t = tap(lambda s: None)
assert isinstance(t, _TapBuilder)
# >> tap() creates a Pipeline with 3 steps
assert isinstance(ml_pipeline, Pipeline)
built = ml_pipeline.build()
assert len(built.sub_agents) == 3
# Named function keeps its name
named = tap(log_prediction_metrics)
assert named._config["name"] == "log_prediction_metrics"
# Lambda gets sanitized name
anon = tap(lambda s: None)
assert anon._config["name"].startswith("tap_")
assert anon._config["name"].isidentifier()
# .tap() method returns a Pipeline (self >> tap_step)
assert isinstance(pipeline_method, Pipeline)