T Module: Fluent Tool Composition and Dynamic Loading

Demonstrates the T module for composing, wrapping, and dynamically loading tools using the fluent API.

Key concepts:

  • TComposite: composable tool chain with | operator

  • T.fn(): wrap callable as FunctionTool

  • T.agent(): wrap agent as AgentTool

  • T.toolset(): wrap any ADK toolset

  • T.google_search(): built-in Google Search

  • T.schema(): attach ToolSchema for contract checking

  • T.search(): BM25-indexed dynamic tool loading

  • ToolRegistry: tool catalog with search

  • SearchToolset: two-phase discovery/execution

Tip

What you’ll learn How to attach tools to an agent using the fluent API.

Source: 66_t_module_tools.py

from adk_fluent._tools import T, TComposite

# --- 1. TComposite composition ---
a = TComposite(["tool_a"])
b = TComposite(["tool_b"])
c = a | b
assert len(c) == 2
assert c.to_tools() == ["tool_a", "tool_b"]

# Three-way composition
d = TComposite(["tool_c"])
result = a | b | d
assert len(result) == 3

# Raw value on left
mixed = "raw_tool" | TComposite(["wrapped"])
assert len(mixed) == 2
assert mixed.to_tools()[0] == "raw_tool"

# Repr
assert "TComposite" in repr(c)


# --- 2. T.fn() wrapping ---
def search_web(query: str) -> str:
    """Search the web for information."""
    return f"Results for {query}"


tc = T.fn(search_web)
assert len(tc) == 1

from google.adk.tools.function_tool import FunctionTool

assert isinstance(tc.to_tools()[0], FunctionTool)

# With confirmation
tc_confirm = T.fn(search_web, confirm=True)
assert isinstance(tc_confirm.to_tools()[0], FunctionTool)

# --- 3. T.agent() wrapping ---
from adk_fluent import Agent

helper = Agent("helper").instruct("Help the user.")
tc_agent = T.agent(helper)
assert len(tc_agent) == 1

from google.adk.tools.agent_tool import AgentTool

assert isinstance(tc_agent.to_tools()[0], AgentTool)

# --- 4. T.google_search() ---
tc_gs = T.google_search()
assert len(tc_gs) == 1

# --- 5. Composition: T.fn() | T.google_search() ---
composed = T.fn(search_web) | T.google_search()
assert len(composed) == 2
assert isinstance(composed.to_tools()[0], FunctionTool)

# --- 6. T.schema() ---
from adk_fluent._tools import _SchemaMarker


class FakeToolSchema:
    pass


tc_schema = T.schema(FakeToolSchema)
assert len(tc_schema) == 1
assert isinstance(tc_schema.to_tools()[0], _SchemaMarker)
assert tc_schema.to_tools()[0]._schema_cls is FakeToolSchema

# --- 7. ToolRegistry ---
from adk_fluent._tool_registry import ToolRegistry


def send_email(to: str, body: str) -> str:
    """Send an email message to a recipient."""
    return f"Email sent to {to}"


def calculate(expr: str) -> str:
    """Perform mathematical calculations."""
    return f"Result: {expr}"


def translate(text: str, lang: str) -> str:
    """Translate text to another language."""
    return f"Translated to {lang}: {text}"


registry = ToolRegistry.from_tools(search_web, send_email, calculate, translate)

# Get tool by name
assert registry.get_tool("search_web") is not None
assert registry.get_tool("nonexistent") is None

# Search
results = registry.search("search web", top_k=2)
assert len(results) <= 2
assert any("search_web" in r["name"] for r in results)

# --- 8. SearchToolset two-phase ---
from adk_fluent._tool_registry import SearchToolset

toolset = SearchToolset(registry, always_loaded=["calculate"], max_tools=10)

# Meta-tool functions
search_result = toolset._search_fn("email")
assert "send_email" in search_result

load_result = toolset._load_fn("send_email")
assert "send_email" in toolset._loaded_names

finalize_result = toolset._finalize_fn()
assert toolset._frozen is True
assert "finalized" in finalize_result.lower() or "frozen" in finalize_result.lower()

# --- 9. T.search() factory ---
registry2 = ToolRegistry.from_tools(search_web, send_email)
tc_search = T.search(registry2)
assert len(tc_search) == 1
assert isinstance(tc_search.to_tools()[0], SearchToolset)

# --- 10. compress_large_result ---
from adk_fluent._tool_registry import compress_large_result

small = "hello"
assert compress_large_result(small, threshold=100) == small

large = "x" * 200
compressed = compress_large_result(large, threshold=100)
assert len(compressed) < len(large)

# --- 11. _ResultVariator ---
from adk_fluent._tool_registry import _ResultVariator

variator = _ResultVariator()
r0 = variator.vary("data", 0)
r1 = variator.vary("data", 1)
assert "data" in r0
assert "data" in r1
assert r0 != r1  # Different prefixes


# --- 12. Builder integration ---
def tool_a(x: str) -> str:
    """Tool A."""
    return x


def tool_b(x: str) -> str:
    """Tool B."""
    return x


agent = Agent("composer").tools(T.fn(tool_a) | T.fn(tool_b))
ir = agent.to_ir()
assert len(ir.tools) >= 2

# .tools() replaces, .tool() appends — use .tools() first, then .tool() to add
agent2 = Agent("combined").tools(T.fn(tool_a)).tool(tool_b)
ir2 = agent2.to_ir()
assert len(ir2.tools) >= 2

# --- 13. T module with delegate pattern ---
specialist = Agent("specialist").instruct("Specialist work.")
coordinator = Agent("coordinator").tools(T.agent(specialist) | T.fn(calculate))
ir3 = coordinator.to_ir()
assert len(ir3.tools) >= 2

# --- 14. T.mock() ---
from adk_fluent._tools import _MockWrapper

# Create mock with fixed return value
tc_mock = T.mock("search_api", returns="mock result")
assert len(tc_mock) == 1
assert tc_mock._kind == "mock"
mock_tool = tc_mock.to_tools()[0]
assert isinstance(mock_tool, _MockWrapper)
assert mock_tool.name == "search_api"

# Mock with side-effect callable
call_count = 0


def side_effect_fn(query: str) -> str:
    global call_count
    call_count += 1
    return f"Called {call_count} times with {query}"


tc_mock_se = T.mock("counter", side_effect=side_effect_fn)
se_tool = tc_mock_se.to_tools()[0]
assert isinstance(se_tool, _MockWrapper)

# Verify async execution
import asyncio


async def _run_mock_async():
    result = await se_tool.run_async(args={"query": "test"}, tool_context=None)
    return result


result = asyncio.run(_run_mock_async())
assert "Called 1 times" in result

# --- 15. T.confirm() ---
from adk_fluent._tools import _ConfirmWrapper


def risky_operation(target: str) -> str:
    """Perform a risky operation."""
    return f"Executed on {target}"


# Wrap single tool
tc_confirm_single = T.confirm(T.fn(risky_operation))
assert len(tc_confirm_single) == 1
confirm_tool = tc_confirm_single.to_tools()[0]
assert isinstance(confirm_tool, _ConfirmWrapper)
assert confirm_tool.require_confirmation is True

# Wrap with custom message
tc_confirm_msg = T.confirm(T.fn(risky_operation), message="Are you sure?")
msg_tool = tc_confirm_msg.to_tools()[0]
assert isinstance(msg_tool, _ConfirmWrapper)
assert msg_tool._message == "Are you sure?"

# Confirm a composite (wraps each item)
tc_multi = T.fn(search_web) | T.fn(send_email)
tc_confirm_multi = T.confirm(tc_multi)
assert len(tc_confirm_multi) == 2
assert all(isinstance(t, _ConfirmWrapper) for t in tc_confirm_multi.to_tools())

# --- 16. T.timeout() ---
from adk_fluent._tools import _TimeoutWrapper


def slow_operation(data: str) -> str:
    """Potentially slow operation."""
    return f"Processed {data}"


# Wrap with default 30s timeout
tc_timeout = T.timeout(T.fn(slow_operation))
assert len(tc_timeout) == 1
timeout_tool = tc_timeout.to_tools()[0]
assert isinstance(timeout_tool, _TimeoutWrapper)
assert timeout_tool._seconds == 30

# Custom timeout
tc_timeout_5s = T.timeout(T.fn(slow_operation), seconds=5)
timeout_5s_tool = tc_timeout_5s.to_tools()[0]
assert timeout_5s_tool._seconds == 5

# Timeout a composite
tc_timeout_multi = T.timeout(T.fn(search_web) | T.fn(send_email), seconds=10)
assert len(tc_timeout_multi) == 2
assert all(isinstance(t, _TimeoutWrapper) for t in tc_timeout_multi.to_tools())
assert all(t._seconds == 10 for t in tc_timeout_multi.to_tools())

# --- 17. T.cache() ---
from adk_fluent._tools import _CachedWrapper


def expensive_query(query: str) -> str:
    """Expensive API call."""
    return f"Result for {query}"


# Wrap with default 300s TTL
tc_cache = T.cache(T.fn(expensive_query))
assert len(tc_cache) == 1
cache_tool = tc_cache.to_tools()[0]
assert isinstance(cache_tool, _CachedWrapper)
assert cache_tool._ttl == 300
assert cache_tool._cache == {}

# Custom TTL
tc_cache_60s = T.cache(T.fn(expensive_query), ttl=60)
cache_60s_tool = tc_cache_60s.to_tools()[0]
assert cache_60s_tool._ttl == 60


# Custom key function
def custom_key(args: dict) -> str:
    return args.get("query", "default")


tc_cache_custom = T.cache(T.fn(expensive_query), ttl=120, key_fn=custom_key)
cache_custom_tool = tc_cache_custom.to_tools()[0]
assert cache_custom_tool._key_fn is custom_key

# Cache a composite
tc_cache_multi = T.cache(T.fn(search_web) | T.fn(translate), ttl=180)
assert len(tc_cache_multi) == 2
assert all(isinstance(t, _CachedWrapper) for t in tc_cache_multi.to_tools())
assert all(t._ttl == 180 for t in tc_cache_multi.to_tools())

# --- 18. T.transform() ---
from adk_fluent._tools import _TransformWrapper


def process_data(text: str) -> str:
    """Process some text."""
    return f"Processed: {text}"


# Pre-transform (modify arguments)
def pre_fn(args: dict) -> dict:
    args["text"] = args["text"].upper()
    return args


tc_transform_pre = T.transform(T.fn(process_data), pre=pre_fn)
assert len(tc_transform_pre) == 1
transform_tool = tc_transform_pre.to_tools()[0]
assert isinstance(transform_tool, _TransformWrapper)
assert transform_tool._pre is pre_fn
assert transform_tool._post is None


# Post-transform (modify result)
def post_fn(result: str) -> str:
    return result + " [verified]"


tc_transform_post = T.transform(T.fn(process_data), post=post_fn)
post_tool = tc_transform_post.to_tools()[0]
assert post_tool._post is post_fn

# Both pre and post
tc_transform_both = T.transform(T.fn(process_data), pre=pre_fn, post=post_fn)
both_tool = tc_transform_both.to_tools()[0]
assert both_tool._pre is pre_fn
assert both_tool._post is post_fn

# Transform a composite
tc_transform_multi = T.transform(T.fn(search_web) | T.fn(translate), pre=pre_fn)
assert len(tc_transform_multi) == 2
assert all(isinstance(t, _TransformWrapper) for t in tc_transform_multi.to_tools())

# --- 19. Wrapper composition (nesting) ---

# Cache → Timeout → Tool
nested_tool = T.fn(expensive_query)
nested_with_timeout = T.timeout(nested_tool, seconds=5)
nested_with_cache = T.cache(nested_with_timeout, ttl=60)

assert len(nested_with_cache) == 1
outer = nested_with_cache.to_tools()[0]
assert isinstance(outer, _CachedWrapper)
assert outer._ttl == 60

# The inner is a TimeoutWrapper
assert isinstance(outer._inner, _TimeoutWrapper)
assert outer._inner._seconds == 5

# Innermost is FunctionTool
assert isinstance(outer._inner._inner, FunctionTool)

# Multi-layer: Transform → Confirm → Cache → Tool
base = T.fn(process_data)
cached = T.cache(base, ttl=100)
confirmed = T.confirm(cached, message="Proceed?")
transformed = T.transform(confirmed, post=post_fn)

assert len(transformed) == 1
t_outer = transformed.to_tools()[0]
assert isinstance(t_outer, _TransformWrapper)
assert isinstance(t_outer._inner, _ConfirmWrapper)
assert isinstance(t_outer._inner._inner, _CachedWrapper)

# --- 20. Integration with Agent.tools() ---

# Single mock
agent_mock = Agent("tester").tools(T.mock("fake_search", returns="fake data"))
ir_mock = agent_mock.to_ir()
assert len(ir_mock.tools) >= 1

# Composite: mock | real function
agent_mixed = Agent("mixed").tools(T.mock("search", returns="ok") | T.fn(tool_a))
ir_mixed = agent_mixed.to_ir()
assert len(ir_mixed.tools) >= 2

# Cached timeout tool
cached_timeout = T.cache(T.timeout(T.fn(expensive_query), 10), ttl=120)
agent_wrapped = Agent("wrapped").tools(cached_timeout)
ir_wrapped = agent_wrapped.to_ir()
assert len(ir_wrapped.tools) >= 1


# Combine schema + wrapped tools
class ExampleToolSchema:
    pass


agent_schema = Agent("schema_agent").tools(T.schema(ExampleToolSchema) | T.confirm(T.fn(risky_operation)))
ir_schema = agent_schema.to_ir()
assert len(ir_schema.tools) >= 1  # schema marker is extracted, confirm wrapper remains


# Multi-tool integration
def fetch_data(url: str) -> str:
    """Fetch data from URL."""
    return f"Data from {url}"


agent_full = Agent("full_demo").tools(
    T.mock("db_query", returns="cached")
    | T.cache(T.fn(fetch_data), ttl=60)
    | T.timeout(T.fn(slow_operation), seconds=15)
    | T.confirm(T.fn(risky_operation), message="Confirm risky action")
)
ir_full = agent_full.to_ir()
assert len(ir_full.tools) >= 4

print("All T module assertions passed!")

See also

API reference: FunctionTool