Gemini CLI / Claude Code Clone – Production Coding Agent Harness¶
Tip
What you’ll learn How to build a fully-functional autonomous coding runtime using adk-fluent’s harness primitives – the Harness Path. This is the proof that the same framework that builds single-purpose agents can build a Claude-Code-class system. Covers all 5 layers and all 4 foundation primitives.
Source: 79_coding_agent_harness.py · Pathway: Harness · 27 tests
The 5-Layer Architecture¶
Every production harness has five layers. Skip one and your harness will be fragile in a specific, predictable way.
┌──────────────────────────────────────────────────────────┐
│ 5. RUNTIME REPL, slash commands, interrupt │
│ 4. OBSERVABILITY EventBus, tape, hooks, renderer │
│ 3. SAFETY Permissions, sandbox, budgets │
│ 2. TOOLS Workspace, web, git, processes, MCP │
│ 1. INTELLIGENCE Agent + skills + manifold │
└──────────────────────────────────────────────────────────┘
Layer |
What it does |
What breaks without it |
|---|---|---|
Intelligence |
LLM + expertise + capability discovery |
Agent doesn’t know how to do the task |
Tools |
File I/O, shell, search, web, git |
Agent can think but can’t act |
Safety |
Permissions, sandboxing, token budgets |
Agent can act but might destroy things |
Observability |
Events, logging, hooks, replay |
You can’t debug when things go wrong |
Runtime |
REPL, streaming, compression, interrupts |
No way to interact with the agent |
Layer 1: Intelligence¶
Agent with domain expertise from skills and rolling context window:
from adk_fluent import Agent, H
from adk_fluent._context import C
agent = (
Agent("coder", "gemini-2.5-pro")
.use_skill("examples/skills/code_reviewer/")
.instruct(
"You are an expert coding assistant. Read the codebase, "
"edit files, run tests, and self-correct until the task is done."
)
.context(C.rolling(n=20, summarize=True))
)
built = agent.build()
# Skills compile to cached static_instruction (not re-sent every turn)
assert "<skills>" in (built.static_instruction or "")
Layer 2: Tools¶
Workspace tools with diff-mode and multimodal¶
tools = H.workspace(project, diff_mode=True, multimodal=True)
names = [t.__name__ for t in tools]
assert "apply_edit" in names # diff_mode adds apply_edit
assert "edit_file" in names
assert "read_file" in names # multimodal version for images/PDFs
assert "glob_search" in names
assert "grep_search" in names
assert "bash" in names
Tool composition with + operator¶
tools = (
H.workspace(project) # read, edit, write, glob, grep, bash, ls (7)
+ H.web() # fetch + search (2)
+ H.git_tools(project) # status, diff, log, commit, branch (5)
+ H.processes(project) # start, check, stop (3)
)
assert len(tools) >= 15
Streaming workspace (PTY-based)¶
chunks = []
tools = H.workspace(
project,
streaming=True,
on_output=lambda chunk: chunks.append(chunk),
)
Layer 3: Safety¶
Permission policies (allow / ask / deny)¶
permissions = (
H.auto_allow("read_file", "glob_search", "grep_search", "list_dir")
.merge(H.ask_before("edit_file", "write_file", "bash"))
.merge(H.deny("rm_rf"))
)
# Deny wins over ask wins over allow
assert "rm_rf" in permissions.deny
assert "bash" in permissions.ask
assert "read_file" in permissions.allow
Pattern-based permissions¶
permissions = (
H.allow_patterns("read_*", "list_*", "grep_*")
.merge(H.deny_patterns("*_delete", "*_destroy"))
)
Sandbox policy¶
sandbox = H.sandbox(
workspace=project,
allow_shell=True,
allow_network=True,
read_paths=["/usr/share/dict"],
write_paths=["/tmp/agent-work"],
)
Budget monitor with threshold callbacks¶
monitor = (
H.budget_monitor(200_000)
.on_threshold(0.7, lambda m: print(f"Warning: {m.utilization:.0%}"))
.on_threshold(0.9, lambda m: print("Critical: compressing..."))
)
# Simulate usage
for _ in range(5):
monitor.record_usage(input_tokens=20_000, output_tokens=10_000)
# At 150k/200k = 75%, warning fires
assert monitor.estimated_turns_remaining > 0
# After compression, adjust resets utilization
monitor.adjust(50_000)
Per-tool error recovery¶
policy = (
H.tool_policy()
.retry("bash", max_attempts=3, backoff=1.0)
.retry("web_fetch", max_attempts=2, backoff=0.5)
.skip("glob_search", fallback="No matching files found.")
.ask("edit_file", handler=lambda name, args, err: True)
)
assert policy.rule_for("bash").action == "retry"
assert policy.rule_for("glob_search").action == "skip"
assert policy.rule_for("unknown_tool").action == "propagate"
Merging tool policies from different sources¶
base = H.tool_policy().retry("bash", max_attempts=2).skip("glob_search")
override = (
H.tool_policy()
.retry("bash", max_attempts=5) # override base
.retry("web_fetch", max_attempts=3) # new rule
)
merged = base.merge(override)
assert merged.rule_for("bash").max_attempts == 5 # override wins
assert merged.rule_for("glob_search").action == "skip" # kept from base
assert merged.rule_for("web_fetch").action == "retry" # added from override
Layer 4: Observability¶
EventBus backbone¶
Everything subscribes to the EventBus instead of building its own observation layer:
bus = H.event_bus(max_buffer=100)
events = []
bus.on("tool_call_start", lambda e: events.append(("start", e.tool_name)))
bus.on("tool_call_end", lambda e: events.append(("end", e.tool_name)))
bus.emit(ToolCallStart(tool_name="read_file", args={"path": "main.py"}))
bus.emit(ToolCallEnd(tool_name="read_file", result="...", duration_ms=42.0))
assert events == [("start", "read_file"), ("end", "read_file")]
assert len(bus.buffer) == 2 # buffered for late subscribers
Error isolation¶
One failing subscriber never blocks others:
def bad_handler(e):
raise RuntimeError("observer crashed")
def good_handler(e):
results.append(e.tool_name)
bus.on("tool_call_start", bad_handler)
bus.on("tool_call_start", good_handler)
bus.emit(ToolCallStart(tool_name="bash"))
assert results == ["bash"] # good handler runs despite bad handler crash
Session tape (replayable recording)¶
tape = bus.tape(max_events=1000)
bus.emit(ToolCallStart(tool_name="read_file"))
bus.emit(ToolCallEnd(tool_name="read_file", duration_ms=15.0))
bus.emit(UsageUpdate(input_tokens=1000, output_tokens=500))
assert tape.size == 3
Hooks (shell commands on events)¶
hooks = (
H.hooks(project)
.on_edit("ruff check {file_path}")
.on_error("notify-send 'Agent error: {error}'")
.on("turn_complete", "echo 'turn done'")
)
Event renderer¶
renderer = H.renderer("plain", show_timing=True, show_args=True)
text = renderer.render(ToolCallStart(tool_name="edit_file", args={"path": "main.py"}))
assert "edit_file" in text
Foundation primitives compose through EventBus¶
bus = H.event_bus()
# All four foundations emit through the single bus
policy = H.tool_policy().retry("bash").with_bus(bus)
monitor = H.budget_monitor(100).on_threshold(0.9, lambda m: None).with_bus(bus)
ledger = H.task_ledger().with_bus(bus)
Layer 5: Runtime¶
Slash commands¶
cmds = H.commands()
cmds.register("model", lambda args: f"Model: {args}", description="Switch model")
cmds.register("clear", lambda args: "Context cleared.", description="Clear context")
cmds.register("compact", lambda args: "Compacted.", description="Compress context")
cmds.register("help", lambda args: cmds.help_text(), description="Show commands")
assert cmds.dispatch("/model gemini-2.5-flash") == "Model: gemini-2.5-flash"
Cooperative cancellation (interrupt and resume)¶
token = H.cancellation_token()
token.begin_turn("Fix the bug in auth.py")
token.record_tool_call("read_file", {"path": "auth.py"})
token.record_tool_call("grep_search", {"pattern": "authenticate"})
# User hits Ctrl-C
token.cancel()
assert token.is_cancelled
# Snapshot captures mid-turn state
snapshot = token.snapshot
assert snapshot is not None
assert "Fix the bug" in snapshot.prompt
assert len(snapshot.tool_calls_completed) == 2
# Resume prompt includes context of what was done
resume = snapshot.resume_prompt()
assert "Resuming" in resume
assert "read_file" in resume
assert "grep_search" in resume
# Reset for next turn
token.reset()
assert not token.is_cancelled
Cancellation callback (blocks tools when cancelled)¶
from adk_fluent._harness._interrupt import CancellationToken, make_cancellation_callback
token = CancellationToken()
callback = make_cancellation_callback(token)
# Normal operation: callback returns None (allow execution)
result = callback(None, type("T", (), {"name": "read_file"})(), {}, None)
assert result is None
# After cancellation: callback returns error dict
token.cancel()
result = callback(None, type("T", (), {"name": "bash"})(), {}, None)
assert isinstance(result, dict)
assert "cancelled" in result["error"].lower()
Task ledger (LLM-callable background tasks)¶
ledger = H.task_ledger(max_tasks=5).with_bus(bus)
tools = ledger.tools()
tool_names = [t.__name__ for t in tools]
assert tool_names == ["launch_task", "check_task", "list_tasks", "cancel_task"]
# LLM launches a task
launch, check, list_tasks, cancel = tools
result = launch("run-tests", "Execute pytest suite")
assert "registered" in result
# LLM checks status
result = check("run-tests")
assert "pending" in result
# Simulate completion
ledger.start("run-tests")
ledger.complete("run-tests", "All 47 tests passed")
result = check("run-tests")
assert "complete" in result
assert "47 tests passed" in result
# Lifecycle events emitted through bus
assert len(events) >= 3 # pending, running, complete
Git checkpoint (undo support)¶
cp = H.git(project)
# GitCheckpointer wraps git stash/tag operations
assert cp is not None
assert hasattr(cp, "create")
assert hasattr(cp, "restore")
REPL configuration¶
from adk_fluent._harness._repl import ReplConfig
config = ReplConfig(
prompt_prefix="coder> ",
welcome_message="Ready. Type /help for commands.",
max_turns=100,
auto_checkpoint=True,
)
assert config.prompt_prefix == "coder> "
assert "/exit" in config.exit_commands
Context compressor¶
compressor = H.compressor(threshold=100_000)
assert compressor.should_compress(150_000) is True
assert compressor.should_compress(50_000) is False
Full Assembly: Complete Coding Agent (~50 lines)¶
All 5 layers wired together into a production-ready autonomous runtime:
from adk_fluent import Agent, H
from adk_fluent._context import C
from adk_fluent._harness import make_cancellation_callback, ReplConfig
project = "/path/to/project"
# --- EventBus backbone ---
bus = H.event_bus(max_buffer=1000)
tape = bus.tape()
# --- Layer 1: Intelligence ---
agent = (
Agent("coder", "gemini-2.5-pro")
.use_skill("skills/code_review/")
.instruct("You are an expert coding assistant.")
.context(C.rolling(n=20, summarize=True))
)
# --- Layer 2: Tools ---
ledger = H.task_ledger().with_bus(bus)
agent = agent.tools(
H.workspace(project, diff_mode=True, multimodal=True)
+ H.web()
+ H.git_tools(project)
+ H.processes(project)
+ ledger.tools()
)
# --- Layer 3: Safety ---
agent = agent.harness(
permissions=(
H.auto_allow("read_file", "glob_search", "grep_search", "list_dir")
.merge(H.ask_before("edit_file", "write_file", "bash"))
.merge(H.deny("rm_rf"))
),
sandbox=H.workspace_only(project),
memory=H.memory(f"{project}/.agent-memory.md"),
on_error=H.on_error(retry={"bash", "web_fetch"}, skip={"glob_search"}),
)
# --- Layer 4: Observability ---
token = H.cancellation_token()
monitor = H.budget_monitor(200_000).on_threshold(0.9, lambda m: None).with_bus(bus)
policy = H.tool_policy().retry("bash", max_attempts=3).with_bus(bus)
hooks = H.hooks(project).on_edit("ruff check {file_path}")
bus.hooks(hooks)
agent = (
agent
.before_tool(bus.before_tool_hook())
.before_tool(make_cancellation_callback(token))
.after_tool(bus.after_tool_hook())
.after_tool(policy.after_tool_hook())
.after_model(bus.after_model_hook())
.after_model(monitor.after_model_hook())
)
# --- Layer 5: Runtime ---
cmds = H.commands()
cmds.register("clear", lambda a: "Context cleared.", description="Clear context")
cmds.register("model", lambda a: f"Model: {a}", description="Switch model")
cmds.register("help", lambda a: cmds.help_text(), description="Show commands")
built = agent.build()
repl = H.repl(
built,
hooks=hooks,
compressor=H.compressor(100_000),
config=ReplConfig(
prompt_prefix="coder> ",
welcome_message="Coding agent ready. Type /help for commands.",
auto_checkpoint=True,
),
)
# await repl.run() # Start the interactive loop
# ---- Verify the complete assembly ----
# Intelligence: skills loaded
assert "<skills>" in (built.static_instruction or "")
# Tools: workspace + web + git + processes + tasks
tool_count = len(built.tools)
assert tool_count >= 20, f"Expected 20+ tools, got {tool_count}"
# Safety: permission callback wired
assert built.before_tool_callback is not None
# Observability: bus has subscribers
assert bus.subscriber_count >= 2
# Runtime: commands registered
assert cmds.size == 5
assert cmds.dispatch("/help") is not None
# Runtime: cancellation token ready
assert not token.is_cancelled
# Runtime: budget monitor tracks usage
monitor.record_usage(input_tokens=5000, output_tokens=2000)
assert monitor.current_tokens == 7000
# Runtime: task ledger functional
ledger.register("test-run", "pytest execution")
ledger.start("test-run")
assert ledger.active_count == 1
# Runtime: REPL can be constructed
assert isinstance(repl, HarnessRepl)
assert repl.config.prompt_prefix == "coder> "
Manifold: Runtime Capability Discovery¶
When you have 100+ tools but the LLM can only handle ~30 at once:
manifold = H.manifold(
tools=None, # ToolRegistry (BM25-indexed)
skills="skills/", # Skill directory
always_loaded=["search_code"],
max_tools=30,
)
# Provides meta-tools: search_capabilities, load_capability, finalize_capabilities
result = manifold.search("code review")
Composition Proof: Foundation Primitives Working Together¶
All four foundation primitives compose through EventBus. One subscription point, one observation layer, zero duplication:
bus = H.event_bus(max_buffer=100)
all_events = []
bus.subscribe(lambda e: all_events.append(e.kind))
# ToolPolicy -> emits errors
_policy = H.tool_policy().retry("bash").with_bus(bus)
# BudgetMonitor -> emits compression triggers
monitor = H.budget_monitor(100).on_threshold(0.9, lambda m: None).with_bus(bus)
# TaskLedger -> emits task lifecycle
ledger = H.task_ledger().with_bus(bus)
# Simulate activity
monitor.record_usage(95, 0) # triggers 0.95 threshold
ledger.register("build", "npm run build")
ledger.start("build")
ledger.complete("build", "success")
# All events flow through the single bus
assert "compression_triggered" in all_events
assert "task_event" in all_events
assert all_events.count("task_event") == 3 # pending, running, complete
Complete Source Code¶
The full source with all 26 test functions is at examples/cookbook/79_coding_agent_harness.py.
Run all tests:
uv run pytest examples/cookbook/79_coding_agent_harness.py -v
Design Philosophy¶
Why composable layers, not a monolithic HarnessAgent?¶
Every harness is different:
Code review needs diff-mode editing + git tools but no web access
Research needs web tools but no file editing
DevOps needs process management + MCP servers but no notebooks
The H namespace gives you building blocks. You compose what you need. The composition is the configuration.
The Four Foundation Primitives¶
Primitive |
What it prevents |
Composes with |
|---|---|---|
|
Building separate observation layers |
SessionTape, HookRegistry, Renderer |
|
Reimplementing per-tool retry/fallback |
|
|
Reimplementing token tracking |
|
|
Reimplementing task tracking |
Core dispatch primitives |
They exist because the core framework (S, C, M, T, G) is declarative and agent-scoped. Harness building needs imperative, session-scoped control.
See also
Harness User Guide – full reference
Recipe 78: Harness + Skills – combining skills with harness
Recipe 77: Skill-Based Agents – pure skills patterns
Three Pathways – how Harness fits into the bigger picture