Compression¶
The adk_fluent._compression package is adk-fluent’s message-level
compression mechanism. It complements C.* transforms and the
_budget package:
C.*shapes the context that the LLM sees on a single turn._budgettracks cumulative usage and fires threshold callbacks at the session level._compressionrewrites the persistent message history when it exceeds a threshold.
The split matters: compression destroys information, so you want a
clear trigger (budget threshold, token count) and a clear veto point
(the pre_compact hook) before the rewrite runs.
The two pieces¶
CompressionStrategy¶
A frozen description of how to compress. Three constructors:
CompressionStrategy.keep_recent(n=10) # keep last N turn-pairs
CompressionStrategy.drop_old(keep_turns=5) # drop oldest, keep last
CompressionStrategy.summarize(model="gemini-2.5-flash")
System messages (role == "system") are always preserved. The
summarize method is a contract: the compressor passes older messages
to a summariser callable and replaces them with a single summary
message.
ContextCompressor¶
ContextCompressor(
threshold: int = 100_000,
strategy: CompressionStrategy | None = None,
on_compress: Callable[[int], None] | None = None,
*,
hook_registry: HookRegistry | None = None,
)
Two entry points:
compress_messages(messages)— sync. Forsummarizeit falls back tokeep_recentbecause LLM summarisation is async.compress_messages_async(messages, summarizer=...)— async. Use this when you want real summaries.
Both honour the pre_compact hook.
Quick start¶
from adk_fluent import ContextCompressor, CompressionStrategy
compressor = ContextCompressor(
threshold=100_000,
strategy=CompressionStrategy.keep_recent(n=10),
on_compress=lambda tokens: print(f"compressed at {tokens} tokens"),
)
if compressor.should_compress(current_tokens=120_000):
messages = compressor.compress_messages(messages)
import { ContextCompressor, CompressionStrategy } from "adk-fluent-ts";
const compressor = new ContextCompressor({
threshold: 100_000,
strategy: CompressionStrategy.keepRecent({ n: 10 }),
onCompress: (tokens) => console.log(`compressed at ${tokens} tokens`),
});
if (compressor.shouldCompress({ currentTokens: 120_000 })) {
messages = compressor.compressMessages(messages);
}
pre_compact hook integration¶
Wire a HookRegistry to get fine-grained control over every
compression pass. This mirrors the Claude Agent SDK’s PreCompact hook:
from adk_fluent import ContextCompressor, H
from adk_fluent._hooks import HookDecision, HookEvent
def audit(ctx):
print(f"about to compress {ctx.extra['token_count']} tokens")
return HookDecision.allow()
registry = H.hooks().on(HookEvent.PRE_COMPACT, audit)
compressor = ContextCompressor(threshold=100_000).with_hooks(registry)
The hook can return any of:
Decision |
Effect |
|---|---|
|
Compression proceeds with the configured strategy. |
|
Compression is cancelled; the original messages are returned unchanged. |
|
Hook supplies the compressed message list directly. The configured strategy is not run. |
|
Currently supported via |
The HookContext receives:
event—HookEvent.PRE_COMPACTextra.messages— a shallow copy of the message listextra.token_count— the pre-compression token estimateextra.strategy— the strategy method name
Use cases for pre_compact¶
Audit logging — write the pre-compression transcript to disk before it’s destroyed.
Veto on sensitive content — refuse to compress messages that contain unresolved tool calls or pending user input.
Custom compression — replace the built-in strategies entirely with a project-specific summariser.
Slack/PagerDuty notification — alert the operator before the agent self-compresses in a production session.
Bridge to BudgetMonitor¶
ContextCompressor.to_monitor() returns a BudgetMonitor wired to
fire on_compress at 95% of the compressor’s threshold. Use this when
you want a single call to tie token tracking and compression together:
compressor = ContextCompressor(threshold=150_000)
compressor.on_compress = lambda tokens: run_compression()
monitor = compressor.to_monitor()
agent = Agent("coder").after_model(monitor.after_model_hook())
Testing¶
Both the compressor and hook integration are plain Python, so tests
don’t need a live model. Use HookRegistry directly and assert on the
output:
from adk_fluent import ContextCompressor, CompressionStrategy
from adk_fluent._hooks import HookDecision
from adk_fluent._hooks._events import HookEvent
from adk_fluent._hooks._registry import HookRegistry
registry = HookRegistry()
registry.on(HookEvent.PRE_COMPACT, lambda ctx: HookDecision.deny("nope"))
compressor = ContextCompressor(
threshold=10,
strategy=CompressionStrategy.keep_recent(1),
hook_registry=registry,
)
msgs = [{"role": "user", "content": f"msg-{i}"} for i in range(20)]
out = compressor.compress_messages(msgs)
assert out == msgs # hook vetoed
assert compressor.compression_count == 0
Design notes¶
The compressor is sync-first. Message rewriting is CPU-bound and should not require an event loop. The async variant exists only for LLM-backed summarisation.
CompressionStrategyis frozen — you can hash it, diff it, and share it across threads.The
pre_compacthook dispatch usesasyncio.runfrom sync compression only when no event loop is already running. Inside a running loop the sync path skips the hook and falls straight to the strategy. Usecompress_messages_asyncwhen you need guaranteed hook dispatch.The compressor does not call a tokenizer.
estimate_tokens()uses a ~4 chars-per-token heuristic; swap in a real tokenizer at the caller when precision matters.