Back to skills
SkillHub ClubShip Full StackFull Stack

claw-compactor

Claw Compactor v6.0 — 50%+ savings through rule-based compression, dictionary encoding, session observation compression, and progressive context loading.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,087
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
C62.8

Install command

npx @skill-hub/cli install openclaw-skills-cut-your-tokens-97percent-savings-on-session-transcripts-via-observation-extraction

Repository

openclaw/skills

Skill path: skills/aeromomo/cut-your-tokens-97percent-savings-on-session-transcripts-via-observation-extraction

Claw Compactor v6.0 — 50%+ savings through rule-based compression, dictionary encoding, session observation compression, and progressive context loading.

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install claw-compactor into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding claw-compactor to shared team environments
  • Use claw-compactor for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: claw-compactor
description: "Claw Compactor v6.0 — 50%+ savings through rule-based compression, dictionary encoding, session observation compression, and progressive context loading."
---

# 🦞 Claw Compactor

![Claw Compactor Banner](assets/banner.png)

*"Cut your tokens. Keep your facts."*

**Cut your AI agent's token spend in half.** One command compresses your entire workspace — memory files, session transcripts, sub-agent context — using 5 layered compression techniques. Deterministic. Mostly lossless. No LLM required.

## Features
- **5 compression layers** working in sequence for maximum savings
- **Zero LLM cost** — all compression is rule-based and deterministic
- **Lossless roundtrip** for dictionary, RLE, and rule-based compression
- **~97% savings** on session transcripts via observation extraction
- **Tiered summaries** (L0/L1/L2) for progressive context loading
- **CJK-aware** — full Chinese/Japanese/Korean support
- **One command** (`full`) runs everything in optimal order

## 5 Compression Layers

| # | Layer | Method | Savings | Lossless? |
|---|-------|--------|---------|-----------|
| 1 | Rule engine | Dedup lines, strip markdown filler, merge sections | 4-8% | ✅ |
| 2 | Dictionary encoding | Auto-learned codebook, `$XX` substitution | 4-5% | ✅ |
| 3 | Observation compression | Session JSONL → structured summaries | ~97% | ❌* |
| 4 | RLE patterns | Path shorthand (`$WS`), IP prefix, enum compaction | 1-2% | ✅ |
| 5 | Compressed Context Protocol | ultra/medium/light abbreviation | 20-60% | ❌* |

\*Lossy techniques preserve all facts and decisions; only verbose formatting is removed.

## Quick Start

```bash
git clone https://github.com/aeromomo/claw-compactor.git
cd claw-compactor

# See how much you'd save (non-destructive)
python3 scripts/mem_compress.py /path/to/workspace benchmark

# Compress everything
python3 scripts/mem_compress.py /path/to/workspace full
```

**Requirements:** Python 3.9+. Optional: `pip install tiktoken` for exact token counts (falls back to heuristic).

## Architecture

```
┌─────────────────────────────────────────────────────────────┐
│                      mem_compress.py                        │
│                   (unified entry point)                     │
└──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬────┘
       │      │      │      │      │      │      │      │
       ▼      ▼      ▼      ▼      ▼      ▼      ▼      ▼
  estimate compress  dict  dedup observe tiers  audit optimize
       └──────┴──────┴──┬───┴──────┴──────┴──────┴──────┘
                        ▼
                  ┌────────────────┐
                  │     lib/       │
                  │ tokens.py      │ ← tiktoken or heuristic
                  │ markdown.py    │ ← section parsing
                  │ dedup.py       │ ← shingle hashing
                  │ dictionary.py  │ ← codebook compression
                  │ rle.py         │ ← path/IP/enum encoding
                  │ tokenizer_     │
                  │   optimizer.py │ ← format optimization
                  │ config.py      │ ← JSON config
                  │ exceptions.py  │ ← error types
                  └────────────────┘
```

## Commands

All commands: `python3 scripts/mem_compress.py <workspace> <command> [options]`

| Command | Description | Typical Savings |
|---------|-------------|-----------------|
| `full` | Complete pipeline (all steps in order) | 50%+ combined |
| `benchmark` | Dry-run performance report | — |
| `compress` | Rule-based compression | 4-8% |
| `dict` | Dictionary encoding with auto-codebook | 4-5% |
| `observe` | Session transcript → observations | ~97% |
| `tiers` | Generate L0/L1/L2 summaries | 88-95% on sub-agent loads |
| `dedup` | Cross-file duplicate detection | varies |
| `estimate` | Token count report | — |
| `audit` | Workspace health check | — |
| `optimize` | Tokenizer-level format fixes | 1-3% |

### Global Options
- `--json` — Machine-readable JSON output
- `--dry-run` — Preview changes without writing
- `--since YYYY-MM-DD` — Filter sessions by date
- `--auto-merge` — Auto-merge duplicates (dedup)

## Real-World Savings

| Workspace State | Typical Savings | Notes |
|---|---|---|
| Session transcripts (observe) | **~97%** | Megabytes of JSONL → concise observation MD |
| Verbose/new workspace | **50-70%** | First run on unoptimized workspace |
| Regular maintenance | **10-20%** | Weekly runs on active workspace |
| Already-optimized | **3-12%** | Diminishing returns — workspace is clean |

## cacheRetention — Complementary Optimization

Before compression runs, enable **prompt caching** for a 90% discount on cached tokens:

```json
{
  "models": {
    "model-name": {
      "cacheRetention": "long"
    }
  }
}
```

Compression reduces token count, caching reduces cost-per-token. Together: 50% compression + 90% cache discount = **95% effective cost reduction**.

## Heartbeat Automation

Run weekly or on heartbeat:

```markdown
## Memory Maintenance (weekly)
- python3 skills/claw-compactor/scripts/mem_compress.py <workspace> benchmark
- If savings > 5%: run full pipeline
- If pending transcripts: run observe
```

Cron example:
```
0 3 * * 0 cd /path/to/skills/claw-compactor && python3 scripts/mem_compress.py /path/to/workspace full
```

## Configuration

Optional `claw-compactor-config.json` in workspace root:

```json
{
  "chars_per_token": 4,
  "level0_max_tokens": 200,
  "level1_max_tokens": 500,
  "dedup_similarity_threshold": 0.6,
  "dedup_shingle_size": 3
}
```

All fields optional — sensible defaults are used when absent.

## Artifacts

| File | Purpose |
|------|---------|
| `memory/.codebook.json` | Dictionary codebook (must travel with memory files) |
| `memory/.observed-sessions.json` | Tracks processed transcripts |
| `memory/observations/` | Compressed session summaries |
| `memory/MEMORY-L0.md` | Level 0 summary (~200 tokens) |

## FAQ

**Q: Will compression lose my data?**
A: Rule engine, dictionary, RLE, and tokenizer optimization are fully lossless. Observation compression and CCP are lossy but preserve all facts and decisions.

**Q: How does dictionary decompression work?**
A: `decompress_text(text, codebook)` expands all `$XX` codes back. The codebook JSON must be present.

**Q: Can I run individual steps?**
A: Yes. Every command is independent: `compress`, `dict`, `observe`, `tiers`, `dedup`, `optimize`.

**Q: What if tiktoken isn't installed?**
A: Falls back to a CJK-aware heuristic (chars÷4). Results are ~90% accurate.

**Q: Does it handle Chinese/Japanese/Unicode?**
A: Yes. Full CJK support including character-aware token estimation and Chinese punctuation normalization.

## Troubleshooting

- **`FileNotFoundError` on workspace:** Ensure path points to workspace root (contains `memory/` or `MEMORY.md`)
- **Dictionary decompression fails:** Check `memory/.codebook.json` exists and is valid JSON
- **Zero savings on `benchmark`:** Workspace is already optimized — nothing to do
- **`observe` finds no transcripts:** Check sessions directory for `.jsonl` files
- **Token count seems wrong:** Install tiktoken: `pip3 install tiktoken`

## Credits

- Inspired by [claude-mem](https://github.com/thedotmack/claude-mem) by thedotmack
- Built by Bot777 🤖 for [OpenClaw](https://openclaw.ai)

## License

MIT


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### scripts/mem_compress.py

```python
#!/usr/bin/env python3
"""Unified entry point for claw-compactor skill.

Usage:
    python3 mem_compress.py <workspace> <command> [options]

Commands:
    compress    Rule-based compression of memory files
    estimate    Token count estimation
    dedup       Cross-file duplicate detection
    tiers       Generate tiered summaries
    audit       Workspace memory health check
    observe     Compress session transcripts into observations
    dict        Dictionary-based compression
    optimize    Tokenizer-level format optimization
    full        Run complete pipeline (all steps in order)
    benchmark   Performance report with before/after stats"""

import argparse
import json
import os
import sys
from datetime import datetime, date
from pathlib import Path
from typing import Dict, Any, List, Optional

# Ensure scripts/ is on path for lib imports
sys.path.insert(0, str(Path(__file__).resolve().parent))

from lib.tokens import estimate_tokens, using_tiktoken
from lib.exceptions import FileNotFoundError_, MemCompressError


def _workspace_path(workspace: str) -> Path:
    """Validate and return workspace Path. Exits on error."""
    p = Path(workspace)
    if not p.exists():
        print(f"Error: workspace not found: {workspace}", file=sys.stderr)
        sys.exit(1)
    if not p.is_dir():
        print(f"Error: workspace is not a directory: {workspace}", file=sys.stderr)
        sys.exit(1)
    return p


def _count_tokens_in_workspace(workspace: Path) -> int:
    """Count total tokens in all .md files in workspace."""
    total = 0
    for f in sorted(workspace.glob("*.md")):
        total += estimate_tokens(f.read_text(encoding="utf-8", errors="replace"))
    mem_dir = workspace / "memory"
    if mem_dir.is_dir():
        for f in sorted(mem_dir.glob("*.md")):
            total += estimate_tokens(f.read_text(encoding="utf-8", errors="replace"))
    return total


def _collect_md_files(workspace: Path) -> List[Path]:
    """Collect all .md files in workspace (root + memory/)."""
    files: List[Path] = []
    for f in sorted(workspace.glob("*.md")):
        files.append(f)
    mem_dir = workspace / "memory"
    if mem_dir.is_dir():
        for f in sorted(mem_dir.glob("*.md")):
            if not f.name.startswith('.'):
                files.append(f)
    return files


# ── Command handlers ─────────────────────────────────────────────


def cmd_estimate(workspace: Path, args) -> int:
    """Estimate token counts for workspace files."""
    from estimate_tokens import scan_path, format_human
    files = _collect_md_files(workspace)
    if not files:
        print("No markdown files found.", file=sys.stderr)
        return 1
    results = scan_path(str(workspace), threshold=getattr(args, 'threshold', 0))
    if args.json:
        print(json.dumps({"files": results, "total_tokens": sum(r["tokens"] for r in results)}, indent=2))
    else:
        print(format_human(results))
    return 0


def cmd_compress(workspace: Path, args) -> int:
    """Run rule-based compression on workspace files."""
    from compress_memory import compress_file, _collect_files
    dry_run = getattr(args, 'dry_run', False)
    older_than = getattr(args, 'older_than', None)

    files = _collect_files(str(workspace), older_than=older_than)
    if not files:
        print("No files to compress.", file=sys.stderr)
        return 1

    results = []
    for f in files:
        r = compress_file(f, dry_run=dry_run, no_llm=True)
        r["rule_reduction_pct"] = round(
            (r["original_tokens"] - r["rule_compressed_tokens"]) / r["original_tokens"] * 100, 1
        ) if r["original_tokens"] > 0 else 0.0
        results.append(r)

    total_before = sum(r["original_tokens"] for r in results)
    total_after = sum(r["rule_compressed_tokens"] for r in results)
    total_saved = total_before - total_after

    if args.json:
        print(json.dumps(results, indent=2, ensure_ascii=False))
    else:
        for r in results:
            saved = r["original_tokens"] - r["rule_compressed_tokens"]
            print(f"{r['file']}: {r['original_tokens']} → {r['rule_compressed_tokens']} tokens (saved {saved})")
        print(f"\nTotal: {total_before} → {total_after} tokens (saved {total_saved})")
    return 0


def cmd_dedup(workspace: Path, args) -> int:
    """Find and report duplicate entries."""
    from dedup_memory import run_dedup, format_human
    threshold = getattr(args, 'threshold_val', 0.6)
    auto_merge = getattr(args, 'auto_merge', False)
    result = run_dedup(str(workspace), threshold=threshold, auto_merge=auto_merge)
    if args.json:
        print(json.dumps(result, indent=2, ensure_ascii=False))
    else:
        print(format_human(result))
    return 0


def cmd_tiers(workspace: Path, args) -> int:
    """Generate tiered summaries."""
    from generate_summary_tiers import generate_tiers, format_human, _find_memory_files
    files = _find_memory_files(str(workspace))
    if not files:
        print("No memory files found.", file=sys.stderr)
        return 1
    result = generate_tiers(files)
    if args.json:
        output = {
            "total_tokens": result["total_tokens"],
            "total_sections": result["total_sections"],
            "tiers": {
                k: {kk: vv for kk, vv in v.items() if kk != "sections"}
                for k, v in result["tiers"].items()
            },
        }
        print(json.dumps(output, indent=2))
    else:
        print(format_human(result))
    return 0


def cmd_audit(workspace: Path, args) -> int:
    """Audit workspace memory health."""
    from audit_memory import audit_workspace, format_report
    stale_days = getattr(args, 'stale_days', 14)
    result = audit_workspace(str(workspace), stale_days=stale_days)
    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print(format_report(result))
    return 0


def cmd_observe(workspace: Path, args) -> int:
    """Scan session transcripts and generate observations."""
    from observation_compressor import parse_session_jsonl, extract_tool_interactions, rule_extract_observations, format_observations_md

    sessions_dir = os.path.expanduser("~/.openclaw/sessions")
    if not os.path.isdir(sessions_dir):
        print(f"Sessions directory not found: {sessions_dir}", file=sys.stderr)
        return 1

    # Load tracker
    mem_dir = workspace / "memory"
    mem_dir.mkdir(exist_ok=True)
    tracker_path = mem_dir / ".observed-sessions.json"
    tracker: Dict[str, str] = {}
    if tracker_path.exists():
        try:
            tracker = json.loads(tracker_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, OSError):
            tracker = {}

    # Find session files
    session_files = sorted(Path(sessions_dir).glob("*.jsonl"))
    since = getattr(args, 'since', None)

    new_count = 0
    obs_dir = mem_dir / "observations"
    obs_dir.mkdir(exist_ok=True)

    for sf in session_files:
        if sf.name in tracker:
            continue

        # Apply --since filter
        if since:
            try:
                # Try to extract date from filename
                fname = sf.stem
                if fname < since:
                    continue
            except Exception:
                pass

        try:
            messages = parse_session_jsonl(sf)
            interactions = extract_tool_interactions(messages)
            if not interactions:
                tracker[sf.name] = datetime.now().isoformat()
                continue

            observations = rule_extract_observations(interactions)
            if observations:
                md = format_observations_md(observations)
                obs_file = obs_dir / f"{sf.stem}.md"
                obs_file.write_text(md, encoding="utf-8")
                new_count += 1

            tracker[sf.name] = datetime.now().isoformat()
        except Exception as e:
            print(f"Warning: failed to process {sf.name}: {e}", file=sys.stderr)

    # Save tracker
    tracker_path.write_text(json.dumps(tracker, indent=2), encoding="utf-8")

    if args.json:
        print(json.dumps({"processed": new_count, "total_tracked": len(tracker)}))
    else:
        print(f"Processed {new_count} new session(s), {len(tracker)} total tracked.")
    return 0


def cmd_dict(workspace: Path, args) -> int:
    """Dictionary-based compression."""
    from dictionary_compress import cmd_build, cmd_stats
    from lib.dictionary import save_codebook

    mem_dir = workspace / "memory"
    mem_dir.mkdir(exist_ok=True)
    cb_path = mem_dir / ".codebook.json"

    result = cmd_build(workspace, cb_path, min_freq=2)
    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print(f"Codebook: {result['codebook_entries']} entries from {result['files_scanned']} files")
        print(f"Saved to: {result['codebook_path']}")
    return 0


def cmd_optimize(workspace: Path, args) -> int:
    """Apply tokenizer-level format optimization."""
    from lib.tokenizer_optimizer import optimize_tokens, estimate_savings

    dry_run = getattr(args, 'dry_run', False)
    files = _collect_md_files(workspace)
    if not files:
        print("No files found.", file=sys.stderr)
        return 1

    total_before = 0
    total_after = 0
    for f in files:
        text = f.read_text(encoding="utf-8", errors="replace")
        optimized = optimize_tokens(text, aggressive=True)
        before = estimate_tokens(text)
        after = estimate_tokens(optimized)
        total_before += before
        total_after += after
        if not dry_run:
            f.write_text(optimized, encoding="utf-8")

    saved = total_before - total_after
    if args.json:
        print(json.dumps({
            "before": total_before,
            "after": total_after,
            "saved": saved,
            "files": len(files),
        }))
    else:
        print(f"Tokenizer optimization: {total_before} → {total_after} tokens (saved {saved})")
    return 0


def cmd_full(workspace: Path, args) -> int:
    """Run complete compression pipeline."""
    from compress_memory import compress_file, _collect_files, rule_compress
    from dictionary_compress import cmd_build
    from dedup_memory import run_dedup
    from generate_summary_tiers import generate_tiers, _find_memory_files

    # 1. Count initial tokens
    before_tokens = _count_tokens_in_workspace(workspace)
    print(f"Before: {before_tokens:,} tokens")

    # 2. Observe (scan session transcripts)
    try:
        observe_args = argparse.Namespace(json=False, since=getattr(args, 'since', None))
        cmd_observe(workspace, observe_args)
    except Exception as e:
        print(f"  observe: skipped ({e})")

    # 3. Compress (rule engine)
    files = _collect_files(str(workspace))
    for f in files:
        compress_file(f, dry_run=False, no_llm=True)
    print(f"  compress: processed {len(files)} files")

    # 4. Dict (dictionary compression)
    mem_dir = workspace / "memory"
    mem_dir.mkdir(exist_ok=True)
    cb_path = mem_dir / ".codebook.json"
    try:
        result = cmd_build(workspace, cb_path, min_freq=2)
        print(f"  dict: {result['codebook_entries']} entries")
    except Exception as e:
        print(f"  dict: skipped ({e})")

    # 5. Dedup (report only)
    try:
        dedup_result = run_dedup(str(workspace))
        print(f"  dedup: {dedup_result['duplicate_groups']} groups found")
    except Exception as e:
        print(f"  dedup: skipped ({e})")

    # 6. Tiers
    try:
        tier_files = _find_memory_files(str(workspace))
        if tier_files:
            tier_result = generate_tiers(tier_files)
            print(f"  tiers: {tier_result['total_sections']} sections analyzed")
    except Exception as e:
        print(f"  tiers: skipped ({e})")

    # 7. Final count
    after_tokens = _count_tokens_in_workspace(workspace)
    saved = before_tokens - after_tokens
    pct = (saved / before_tokens * 100) if before_tokens > 0 else 0
    print(f"After: {after_tokens:,} tokens")
    print(f"Tokens saved: {saved:,} ({pct:.0f}%)")
    return 0


def cmd_benchmark(workspace: Path, args) -> int:
    """Non-destructive performance benchmark."""
    from compress_memory import rule_compress
    from lib.dictionary import build_codebook, compress_text
    from lib.rle import compress as rle_compress
    from lib.tokenizer_optimizer import optimize_tokens

    files = _collect_md_files(workspace)
    if not files:
        if not args.json:
            print("No files found.", file=sys.stderr)
        return 1

    # Read all files
    texts = {}
    for f in files:
        texts[str(f)] = f.read_text(encoding="utf-8", errors="replace")
    combined = '\n'.join(texts.values())

    # Baseline
    baseline_tokens = estimate_tokens(combined)

    # Step 1: Rule engine
    rule_compressed = rule_compress(combined)
    rule_tokens = estimate_tokens(rule_compressed)

    # Step 2: Dictionary compress
    cb = build_codebook(list(texts.values()), min_freq=2)
    dict_compressed = compress_text(rule_compressed, cb)
    dict_tokens = estimate_tokens(dict_compressed)

    # Step 3: RLE
    ws_paths = [str(workspace)]
    rle_compressed = rle_compress(dict_compressed, ws_paths)
    rle_tokens = estimate_tokens(rle_compressed)

    # Step 4: Tokenizer optimize
    tok_optimized = optimize_tokens(rle_compressed, aggressive=True)
    tok_tokens = estimate_tokens(tok_optimized)

    steps = [
        {"name": "Rule Engine", "before": baseline_tokens, "after": rule_tokens},
        {"name": "Dictionary Compress", "before": rule_tokens, "after": dict_tokens},
        {"name": "RLE Patterns", "before": dict_tokens, "after": rle_tokens},
        {"name": "Tokenizer Optimize", "before": rle_tokens, "after": tok_tokens},
    ]
    for s in steps:
        s["saved"] = s["before"] - s["after"]
        s["pct"] = round((s["saved"] / s["before"] * 100), 1) if s["before"] > 0 else 0.0

    total_saved = baseline_tokens - tok_tokens
    total_pct = round((total_saved / baseline_tokens * 100), 1) if baseline_tokens > 0 else 0.0

    if args.json:
        print(json.dumps({
            "steps": steps,
            "total_before": baseline_tokens,
            "total_after": tok_tokens,
            "total_saved": total_saved,
            "total_pct": total_pct,
        }))
        return 0

    # Human report
    today = date.today().isoformat()
    print(f"=== claw-compactor Performance Report ===")
    print(f"Date: {today}")
    print(f"Engine: {'tiktoken' if using_tiktoken() else 'heuristic'}")
    print(f"Files: {len(files)}")
    print()
    print(f"{'Step':<22} | {'Before':>8} | {'After':>8} | {'Saved':>6} | {'%':>6}")
    print("-" * 58)
    for s in steps:
        print(f"{s['name']:<22} | {s['before']:>8,} | {s['after']:>8,} | {s['saved']:>6,} | {s['pct']:>5.1f}%")
    print("-" * 58)
    print(f"{'TOTAL (memory)':<22} | {baseline_tokens:>8,} | {tok_tokens:>8,} | {total_saved:>6,} | {total_pct:>5.1f}%")
    print()
    print(f"💰 Total savings: {total_saved:,} tokens ({total_pct:.1f}%)")
    print()

    # Session transcript info
    sessions_dir = os.path.expanduser("~/.openclaw/sessions")
    session_count = 0
    if os.path.isdir(sessions_dir):
        session_count = len(list(Path(sessions_dir).glob("*.jsonl")))
    print(f"Session Transcripts: {session_count} files found")
    print()

    # Recommendations
    print("Recommendations:")
    if total_pct < 5:
        print("  - Workspace is already well-optimized")
    else:
        if steps[0]["pct"] > 3:
            print("  - Run 'compress' to apply rule engine savings")
        if steps[1]["pct"] > 2:
            print("  - Run 'dict' to apply dictionary compression")
        if steps[3]["pct"] > 1:
            print("  - Run 'optimize' for tokenizer-level savings")
    if session_count > 0:
        print(f"  - Run 'observe' to compress {session_count} session transcript(s)")

    return 0


# ── Command map & parser ─────────────────────────────────────────

def cmd_install(workspace: Path, args) -> int:
    """Install heartbeat integration into the workspace HEARTBEAT.md."""
    script_path = Path(__file__).resolve()
    heartbeat_path = workspace / "HEARTBEAT.md"

    # Build the heartbeat entry with absolute paths
    entry = f"""
## Memory Compression (auto — claw-compactor)
- Run `python3 {script_path} {workspace} benchmark --json`
- If savings >5%: auto-run `python3 {script_path} {workspace} full`
- If unprocessed session transcripts exist: run observe
- Record results to `memory/heartbeat-state.json`
"""

    # Check if already installed
    if heartbeat_path.exists():
        existing = heartbeat_path.read_text(encoding="utf-8")
        if "claw-compactor" in existing:
            print("✅ Already installed in HEARTBEAT.md")
            return 0
        # Append to existing
        with open(heartbeat_path, "a", encoding="utf-8") as f:
            f.write(entry)
    else:
        # Create new HEARTBEAT.md
        with open(heartbeat_path, "w", encoding="utf-8") as f:
            f.write("# HEARTBEAT.md\n" + entry)

    print(f"✅ Installed claw-compactor heartbeat into {heartbeat_path}")
    print(f"   Script: {script_path}")
    print(f"   Workspace: {workspace}")
    return 0


COMMAND_MAP = {
    "compress": cmd_compress,
    "estimate": cmd_estimate,
    "dedup": cmd_dedup,
    "tiers": cmd_tiers,
    "audit": cmd_audit,
    "observe": cmd_observe,
    "dict": cmd_dict,
    "optimize": cmd_optimize,
    "full": cmd_full,
    "benchmark": cmd_benchmark,
    "install": cmd_install,
}


def build_parser() -> argparse.ArgumentParser:
    """Build the argument parser."""
    parser = argparse.ArgumentParser(
        description="claw-compactor: workspace memory compression toolkit"
    )
    parser.add_argument("workspace", help="Workspace directory path")

    sub = parser.add_subparsers(dest="command")
    sub.required = True

    # Add -v to all subparsers via parent
    _common = argparse.ArgumentParser(add_help=False)
    _common.add_argument("-v", "--verbose", action="store_true", help="Verbose output")

    # compress
    p = sub.add_parser("compress", help="Rule-based compression", parents=[_common])
    p.add_argument("--json", action="store_true")
    p.add_argument("--dry-run", action="store_true")
    p.add_argument("--older-than", type=int, default=None)

    # estimate
    p = sub.add_parser("estimate", help="Token estimation", parents=[_common])
    p.add_argument("--json", action="store_true")
    p.add_argument("--threshold", type=int, default=0)

    # dedup
    p = sub.add_parser("dedup", help="Duplicate detection", parents=[_common])
    p.add_argument("--json", action="store_true")
    p.add_argument("--auto-merge", action="store_true")
    p.add_argument("--threshold-val", type=float, default=0.6)

    # tiers
    p = sub.add_parser("tiers", help="Generate tiered summaries", parents=[_common])
    p.add_argument("--json", action="store_true")

    # audit
    p = sub.add_parser("audit", help="Workspace audit", parents=[_common])
    p.add_argument("--json", action="store_true")
    p.add_argument("--stale-days", type=int, default=14)

    # observe
    p = sub.add_parser("observe", help="Compress session transcripts", parents=[_common])
    p.add_argument("--json", action="store_true")
    p.add_argument("--since", type=str, default=None)

    # dict
    p = sub.add_parser("dict", help="Dictionary compression", parents=[_common])
    p.add_argument("--json", action="store_true")

    # optimize
    p = sub.add_parser("optimize", help="Tokenizer optimization", parents=[_common])
    p.add_argument("--json", action="store_true")
    p.add_argument("--dry-run", action="store_true")

    # full
    p = sub.add_parser("full", help="Run complete pipeline", parents=[_common])
    p.add_argument("--json", action="store_true")
    p.add_argument("--since", type=str, default=None)

    # benchmark
    p = sub.add_parser("benchmark", help="Performance benchmark", parents=[_common])
    p.add_argument("--json", action="store_true")

    # install
    sub.add_parser("install", help="Install heartbeat auto-compression", parents=[_common])

    return parser


def main():
    parser = build_parser()
    args = parser.parse_args()

    if args.verbose:
        import logging
        logging.basicConfig(level=logging.DEBUG)

    workspace = _workspace_path(args.workspace)
    handler = COMMAND_MAP[args.command]
    sys.exit(handler(workspace, args))


if __name__ == "__main__":
    main()

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### README.md

```markdown
# 🦞 Claw Compactor

![Claw Compactor Banner](assets/banner.png)

*"Cut your tokens. Keep your facts."*

**Cut your AI agent's token spend in half.** One command compresses your entire workspace — memory files, session transcripts, sub-agent context — using 5 layered compression techniques. Deterministic. Mostly lossless. No LLM required.

## Features
- **5 compression layers** working in sequence for maximum savings
- **Zero LLM cost** — all compression is rule-based and deterministic
- **Lossless roundtrip** for dictionary, RLE, and rule-based compression
- **~97% savings** on session transcripts via observation extraction
- **Tiered summaries** (L0/L1/L2) for progressive context loading
- **CJK-aware** — full Chinese/Japanese/Korean support
- **One command** (`full`) runs everything in optimal order

## 5 Compression Layers

| # | Layer | Method | Savings | Lossless? |
|---|-------|--------|---------|-----------|
| 1 | Rule engine | Dedup lines, strip markdown filler, merge sections | 4-8% | ✅ |
| 2 | Dictionary encoding | Auto-learned codebook, `$XX` substitution | 4-5% | ✅ |
| 3 | Observation compression | Session JSONL → structured summaries | ~97% | ❌* |
| 4 | RLE patterns | Path shorthand (`$WS`), IP prefix, enum compaction | 1-2% | ✅ |
| 5 | Compressed Context Protocol | ultra/medium/light abbreviation | 20-60% | ❌* |

\*Lossy techniques preserve all facts and decisions; only verbose formatting is removed.

## Quick Start

```bash
git clone https://github.com/aeromomo/claw-compactor.git
cd claw-compactor

# See how much you'd save (non-destructive)
python3 scripts/mem_compress.py /path/to/workspace benchmark

# Compress everything
python3 scripts/mem_compress.py /path/to/workspace full
```

**Requirements:** Python 3.9+. Optional: `pip install tiktoken` for exact token counts (falls back to heuristic).

## Architecture

```
┌─────────────────────────────────────────────────────────────┐
│                      mem_compress.py                        │
│                   (unified entry point)                     │
└──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬────┘
       │      │      │      │      │      │      │      │
       ▼      ▼      ▼      ▼      ▼      ▼      ▼      ▼
  estimate compress  dict  dedup observe tiers  audit optimize
       └──────┴──────┴──┬───┴──────┴──────┴──────┴──────┘
                        ▼
                  ┌────────────────┐
                  │     lib/       │
                  │ tokens.py      │ ← tiktoken or heuristic
                  │ markdown.py    │ ← section parsing
                  │ dedup.py       │ ← shingle hashing
                  │ dictionary.py  │ ← codebook compression
                  │ rle.py         │ ← path/IP/enum encoding
                  │ tokenizer_     │
                  │   optimizer.py │ ← format optimization
                  │ config.py      │ ← JSON config
                  │ exceptions.py  │ ← error types
                  └────────────────┘
```

## Commands

All commands: `python3 scripts/mem_compress.py <workspace> <command> [options]`

| Command | Description | Typical Savings |
|---------|-------------|-----------------|
| `full` | Complete pipeline (all steps in order) | 50%+ combined |
| `benchmark` | Dry-run performance report | — |
| `compress` | Rule-based compression | 4-8% |
| `dict` | Dictionary encoding with auto-codebook | 4-5% |
| `observe` | Session transcript → observations | ~97% |
| `tiers` | Generate L0/L1/L2 summaries | 88-95% on sub-agent loads |
| `dedup` | Cross-file duplicate detection | varies |
| `estimate` | Token count report | — |
| `audit` | Workspace health check | — |
| `optimize` | Tokenizer-level format fixes | 1-3% |

### Global Options
- `--json` — Machine-readable JSON output
- `--dry-run` — Preview changes without writing
- `--since YYYY-MM-DD` — Filter sessions by date
- `--auto-merge` — Auto-merge duplicates (dedup)

## Real-World Savings

| Workspace State | Typical Savings | Notes |
|---|---|---|
| Session transcripts (observe) | **~97%** | Megabytes of JSONL → concise observation MD |
| Verbose/new workspace | **50-70%** | First run on unoptimized workspace |
| Regular maintenance | **10-20%** | Weekly runs on active workspace |
| Already-optimized | **3-12%** | Diminishing returns — workspace is clean |

## cacheRetention — Complementary Optimization

Before compression runs, enable **prompt caching** for a 90% discount on cached tokens:

```json
{
  "models": {
    "model-name": {
      "cacheRetention": "long"
    }
  }
}
```

Compression reduces token count, caching reduces cost-per-token. Together: 50% compression + 90% cache discount = **95% effective cost reduction**.

## Heartbeat Automation

Run weekly or on heartbeat:

```markdown
## Memory Maintenance (weekly)
- python3 skills/claw-compactor/scripts/mem_compress.py <workspace> benchmark
- If savings > 5%: run full pipeline
- If pending transcripts: run observe
```

Cron example:
```
0 3 * * 0 cd /path/to/skills/claw-compactor && python3 scripts/mem_compress.py /path/to/workspace full
```

## Configuration

Optional `claw-compactor-config.json` in workspace root:

```json
{
  "chars_per_token": 4,
  "level0_max_tokens": 200,
  "level1_max_tokens": 500,
  "dedup_similarity_threshold": 0.6,
  "dedup_shingle_size": 3
}
```

All fields optional — sensible defaults are used when absent.

## Artifacts

| File | Purpose |
|------|---------|
| `memory/.codebook.json` | Dictionary codebook (must travel with memory files) |
| `memory/.observed-sessions.json` | Tracks processed transcripts |
| `memory/observations/` | Compressed session summaries |
| `memory/MEMORY-L0.md` | Level 0 summary (~200 tokens) |

## FAQ

**Q: Will compression lose my data?**
A: Rule engine, dictionary, RLE, and tokenizer optimization are fully lossless. Observation compression and CCP are lossy but preserve all facts and decisions.

**Q: How does dictionary decompression work?**
A: `decompress_text(text, codebook)` expands all `$XX` codes back. The codebook JSON must be present.

**Q: Can I run individual steps?**
A: Yes. Every command is independent: `compress`, `dict`, `observe`, `tiers`, `dedup`, `optimize`.

**Q: What if tiktoken isn't installed?**
A: Falls back to a CJK-aware heuristic (chars÷4). Results are ~90% accurate.

**Q: Does it handle Chinese/Japanese/Unicode?**
A: Yes. Full CJK support including character-aware token estimation and Chinese punctuation normalization.

## Troubleshooting

- **`FileNotFoundError` on workspace:** Ensure path points to workspace root (contains `memory/` or `MEMORY.md`)
- **Dictionary decompression fails:** Check `memory/.codebook.json` exists and is valid JSON
- **Zero savings on `benchmark`:** Workspace is already optimized — nothing to do
- **`observe` finds no transcripts:** Check sessions directory for `.jsonl` files
- **Token count seems wrong:** Install tiktoken: `pip3 install tiktoken`

## Credits

- Inspired by [claude-mem](https://github.com/thedotmack/claude-mem) by thedotmack
- Built by Bot777 🤖 for [OpenClaw](https://openclaw.ai)

## License

MIT

```

### _meta.json

```json
{
  "owner": "aeromomo",
  "slug": "cut-your-tokens-97percent-savings-on-session-transcripts-via-observation-extraction",
  "displayName": "claw-compactor",
  "latest": {
    "version": "6.0.0",
    "publishedAt": 1770684517861,
    "commit": "https://github.com/openclaw/skills/commit/1b5e91e3f7f86c2194b8f936daa45fc16c6d1efd"
  },
  "history": []
}

```

### references/README.md

```markdown
# References

Technical documentation for claw-compactor internals.

## Files
- **compression-techniques.md** — Deep dive into all 5 compression techniques
- **benchmarks.md** — Real-world performance measurements
- **architecture.md** — System architecture and module relationships
- **testing.md** — Test strategy and coverage goals
- **compression-prompts.md** — LLM prompt templates for observation compression

## Key Design Decisions

### Dictionary Encoding
The codebook uses `$XX` codes (uppercase alpha) to avoid conflicts with:
- Shell variables (`$lower_case`)
- Markdown formatting (`**bold**`)
- Natural text (`$100`, `$USD`)

Code length starts at 3 chars (`$AA`) and grows to 4 (`$AAA`) after 676 entries.

### Workspace Path Shorthand
`$WS` replaces the full workspace path. This is the single highest-value substitution for most workspaces since the path appears in every file reference.

Example codebook:
```json
{
 "$A1": "example_user",
 "$A2": "10.0.1",
 "$A3": "workspace"
}
```

**Before:** `ssh [email protected]` / `ssh [email protected]`
**After:** `ssh deploy@$A2.2` / `ssh admin@$A2.3`

### Token Estimation
Two backends:
1. **tiktoken** (preferred) — exact cl100k_base encoding, same as Claude models
2. **Heuristic fallback** — CJK-aware chars÷4 approximation, ~90% accurate

### Workspace paths
- `/home/user/workspace` → `$WS`

All path compression is fully reversible via `decompress_paths()`.

```

### references/architecture.md

```markdown
# Architecture

## System Overview
claw-compactor is a modular compression pipeline with a single entry point (`mem_compress.py`) that routes to specialized compressors, all sharing a common library layer.

```
 ┌──────────────────────┐
 │ mem_compress.py │
 │ 553 lines │
 │ │
 │ • CLI argument parsing │
 │ • Command routing │
 │ • Pipeline orchestrator│
 │ • Progress reporting │
 └──────────┬─────────────┘
 │
 ┌──────────┬───────────┬───┴────┬──────────┬──────────┬─────────┐
 ▼ ▼ ▼ ▼ ▼ ▼ ▼
 ┌─────────┐ ┌────────┐ ┌────────┐ ┌──────┐ ┌────────┐ ┌───────┐ ┌──────┐
 │compress │ │dict_ │ │observ- │ │dedup │ │generate│ │audit │ │estim-│
 │_memory │ │compress│ │ation_ │ │_mem │ │_summary│ │_memory│ │ate_ │
 │ │ │ │ │compres-│ │ │ │_tiers │ │ │ │tokens│
 │ 230 LOC │ │ 170 LOC│ │sor │ │147LOC│ │ 292 LOC│ │216LOC │ │131LOC│
 │ │ │ │ │ 346 LOC│ │ │ │ │ │ │ │ │
 └────┬────┘ └───┬────┘ └───┬────┘ └──┬───┘ └───┬────┘ └──┬────┘ └──┬───┘
 │ │ │ │ │ │ │
 └──────────┴───────────┴────┬────┴─────────┴─────────┴─────────┘
 ▼
 ┌─────────────────┐
 │ lib/ │
 │ │
 │ tokens.py 68 │ Token estimation engine
 │ markdown.py 312 │ MD parsing & manipulation
 │ dedup.py 119 │ Shingle-hash dedup
 │ dictionary.py273│ Codebook compression
 │ rle.py 165 │ Run-length encoding
 │ tokenizer_ │
 │ optimizer 188 │ Format optimization
 │ config.py 81 │ JSON config loading
 │ exceptions 24 │ Custom exception types
 └─────────────────┘
 Total: 3,602 LOC

## Data Flow: Full Pipeline
┌──────────────────────┐ ┌──────────────────────┐
 │ memory/*.md │ │ .openclaw/sessions/ │
 │ MEMORY.md │ │ *.jsonl │
 │ TOOLS.md, etc. │ │ (raw transcripts) │
 └──────────┬───────────┘ └──────────┬────────────┘
 │ │
 ▼ ▼
 │ 1. estimate_tokens │ │ 2. observation_ │
 │ Baseline count │ │ compressor │
 │ (read-only) │ │ JSONL → XML → MD │
 └──────────┬───────────┘ │ 97% compression │
 │ └──────────┬────────────┘
 ▼ │
 ┌──────────────────────┐ │
 │ 3. compress_memory │ │
 │ Rule engine: │ ▼
 │ • dedup lines │ ┌──────────────────────┐
 │ • strip redundancy │ │ memory/observations/ │
 │ • merge sections │ │ (compressed output) │
 └──────────┬───────────┘ └────────────────────────┘
 │ 4. dictionary_ │
 │ compress │
 │ Build codebook → │
 │ Apply $XX codes │
 └──────────┬───────────┘
 │ 5. dedup_memory │ │ memory/.codebook.json│
 │ Cross-file scan │ │ (codebook artifact) │
 │ Shingle hashing │ └────────────────────────┘
 │ 6. generate_ │────▶│ memory/MEMORY-L0.md │
 │ summary_tiers │ │ memory/MEMORY-L1.md │
 │ L0/L1/L2 budgets │ │ (tier summaries) │
 └──────────────────────┘ └────────────────────────┘

## Module Responsibilities

### Entry Point
**`mem_compress.py`** (553 LOC)
The unified CLI. Parses arguments, routes to the appropriate command handler, and orchestrates the full pipeline. Handles progress reporting, JSON output mode, and error formatting.

### Compressor Modules
**`compress_memory.py`** (230 LOC)
Two-phase memory compression. Phase 1: deterministic rule engine (dedup lines, strip markdown filler, merge similar sections). Phase 2: optional LLM prompt generation for semantic compression. Operates on `.md` files in the workspace.

**`dictionary_compress.py`** (170 LOC)
CLI wrapper around `lib/dictionary.py`. Scans workspace markdown files, builds/loads codebook, applies/reverses compression. Manages the `.codebook.json` artifact.

**`observation_compressor.py`** (346 LOC)
Parses OpenClaw `.jsonl` session transcripts, extracts tool call interactions, classifies them by type (feature, bugfix, decision, etc.), and generates structured observation summaries. The single biggest source of savings (~97%). Tracks processed sessions in `.observed-sessions.json`.

**`dedup_memory.py`** (147 LOC)
Cross-file near-duplicate detection. Uses shingle hashing (n-gram fingerprinting) with Jaccard similarity. Reports duplicates or optionally auto-merges them.

**`generate_summary_tiers.py`** (292 LOC)
Creates L0/L1/L2 summaries from MEMORY.md. Classifies sections by priority (decision > action > config > log > archive), then fills each tier within its token budget, highest-priority sections first.

**`estimate_tokens.py`** (131 LOC)
Token counting and compression potential scoring. Scans all markdown files, reports per-file and total token usage. Identifies files with high compression potential.

**`audit_memory.py`** (216 LOC)
Health checker. Reports staleness (files not updated recently), bloat (high token/info ratio), and compression opportunities. Suggests specific actions.

**`compressed_context.py`** (280 LOC)
Compressed Context Protocol. Three compression levels (ultra/medium/light) for context passing between models. Generates decompression instructions for the receiving model's system prompt.

### Library Layer
**`lib/tokens.py`** (68 LOC)
Token estimation. Uses tiktoken's `cl100k_base` encoding when available, falls back to a CJK-aware heuristic (Chinese characters count as 1.5 tokens, others as chars÷4). Single function: `estimate_tokens(text) → int`.

**`lib/markdown.py`** (312 LOC)
Markdown parsing utilities. Section extraction by header level, section merging, content normalization, Chinese punctuation handling, header classification by priority keywords.

**`lib/dedup.py`** (119 LOC)
Shingle-hash deduplication engine. Generates n-gram (shingle) sets from text, computes Jaccard similarity between shingle sets, and groups entries by approximate length to reduce comparison space. O(n×k) instead of O(n²).

**`lib/dictionary.py`** (273 LOC)
The codebook engine. Scans text for n-gram frequencies (1-4 words), scores candidates by `freq × (len(phrase) - len(code)) - codebook_overhead`, builds a codebook of `$XX` codes, and provides `compress_text`/`decompress_text` as perfect inverses.

**`lib/rle.py`** (165 LOC)
Run-length encoding for structured patterns. Path compression (`$WS` shorthand), IP prefix extraction (`$IP` codes), and enumeration detection. All with roundtrip decompression.

**`lib/tokenizer_optimizer.py`** (188 LOC)
Encoding-aware format transformations. Converts markdown tables to key:value notation (60-70% savings), normalizes Chinese fullwidth punctuation, strips bold/italic/backtick markers, minimizes whitespace and indentation, compacts bullet lists.

**`lib/config.py`** (81 LOC)
Configuration loader. Reads `claw-compactor-config.json` from workspace root, merges with sensible defaults. All settings optional.

**`lib/exceptions.py`** (24 LOC)
Custom exception hierarchy: `MemCompressError` (base), `FileNotFoundError_`, etc.

## Layer 0: cacheRetention (Before Compression)
Before any compression runs, **prompt caching** (`cacheRetention: "long"`) provides a 90% discount on cached prompt tokens with a 1-hour TTL. This is orthogonal to compression — it reduces cost on whatever tokens remain.

```
Cost reduction stack:
  Layer 0: cacheRetention: "long"  → 90% cost discount on cached tokens
  Layer 1: observe (transcripts)   → ~97% token reduction
  Layer 2: compress (rule engine)  → 4-8% token reduction
  Layer 3: dict (codebook)         → 4-5% token reduction
  Layer 4: optimize (tokenizer)    → 1-3% token reduction
```

Layers 1-4 reduce token count. Layer 0 reduces cost-per-token. They multiply.

## Heartbeat Integration Flow
```
 ┌─────────────────────────┐
 │ Heartbeat fires │
 │ (every ~30 min) │
 └────────────┬────────────┘
              │
              ▼
 ┌─────────────────────────┐
 │ Read HEARTBEAT.md │
 │ → memory maintenance? │
 └────────────┬────────────┘
              │ yes
              ▼
 ┌─────────────────────────┐
 │ Run: benchmark │
 │ (non-destructive) │
 └────────────┬────────────┘
              │
         ┌────┴────┐
         │ >5% ? │
         └────┬────┘
        yes │    │ no
            ▼    │
 ┌──────────────┐│
 │ Run: full ││
 │ pipeline ││
 └──────────────┘│
              │◀──┘
              ▼
 ┌─────────────────────────┐
 │ New transcripts? │
 │ (unprocessed JSONL) │
 └────────────┬────────────┘
        yes │    │ no
            ▼    ▼
 ┌──────────────┐ HEARTBEAT_OK
 │ Run: observe │
 └──────────────┘
```

**Trigger logic:** The agent checks `HEARTBEAT.md` for a memory maintenance entry. If present, it runs `benchmark` first (cheap read-only). Only if savings exceed 5% does it commit to the full pipeline. New unprocessed transcripts always trigger `observe` regardless of benchmark results.

## Design Decisions

### Why shingle hashing for deduplication?
Naive pairwise comparison is O(n²) — unacceptable for workspaces with hundreds of sections. Shingle hashing (n-gram fingerprinting) gives us:
- O(n × k) complexity where k is the number of shingles per entry
- 3-word shingles with MD5 fingerprints provide good collision resistance
- Jaccard similarity on shingle sets is a well-studied near-duplicate metric
- Bucketing by approximate length further reduces comparisons

### Why tiktoken with heuristic fallback?
tiktoken gives exact token counts but requires compilation (Rust dependency). Many environments don't have it installed. The fallback heuristic (chars÷4, CJK-aware) is ~90% accurate — good enough for compression decisions. No hard dependency means the skill works out of the box everywhere.

### Why `$XX` codes instead of longer variable names?
Two-character codes minimize per-occurrence overhead. The codebook scoring function accounts for this: a phrase is only worth encoding if `freq × (len(phrase) - len(code)) > codebook_overhead`. Short codes win because the overhead term (the codebook entry itself) is amortized across many occurrences.

### Why section-level priority scoring for tiers?
Not all memory content is equal. A decision about architecture is worth more context tokens than a log of which files were edited. Priority classification (decision > action > config > log > archive) ensures L0 summaries contain the most important information, even at ~200 tokens.

### Why non-destructive by default?
Agents make mistakes. Every write operation is opt-in: `--dry-run` shows stats, `dedup` reports without modifying, `benchmark` never writes. This is critical for trust — users need to verify before committing to changes.

### Why XML format for observations (inspired by claude-mem)?
Structured XML (`<observation>`, `<type>`, `<title>`, `<facts>`) is:
1. Unambiguous to parse (unlike free-form markdown)
2. Token-efficient (tags are reusable tokens in cl100k_base)
3. Compatible with claude-mem's proven format
4. Easy to classify and search programmatically
```

### references/benchmarks.md

```markdown
# Performance Benchmarks

## Methodology
All benchmarks run on a production workspace with:
- 30 days of active daily use
- 15 memory files (MEMORY.md, TOOLS.md, AGENTS.md, SOUL.md, daily notes)
- 173 session transcripts (.jsonl files)
- Python 3.12, tiktoken installed

Token counts use tiktoken `cl100k_base` encoding (same as Claude models). All measurements are deterministic — same input produces same output every run.

## Memory File Compression
Workspace: all `.md` files in root + `memory/`

| Rule Engine | 11,855 | 11,398 | 457 | 3.9% |
| Dictionary Encoding | 11,398 | 10,891 | 507 | 4.4% |
| Tokenizer Optimization | 10,891 | 10,766 | 125 | 1.1% |
| RLE Patterns | 10,766 | 10,710 | 56 | 0.5% |
| **Total** | **11,855** | **10,710** | **1,145** | **9.7%** |

### Per-File Breakdown
| TOOLS.md | 3,421 | 2,985 | 12.7% | High repetition (IPs, paths) |
| MEMORY.md | 4,102 | 3,810 | 7.1% | Mixed content |
| AGENTS.md | 2,156 | 2,044 | 5.2% | Mostly prose, less compressible |
| memory/2024-01-15.md | 892 | 831 | 6.8% | Daily notes |
| memory/2024-01-14.md | 734 | 690 | 6.0% | Daily notes |
| SOUL.md | 550 | 540 | 1.8% | Short, unique content |

**Observation:** Files with repetitive structured data (TOOLS.md) compress best. Short, unique prose (SOUL.md) compresses least.

## Session Transcript Compression
173 session transcripts:

- Total transcripts: 173 files
- Total raw size: ~4.5M tokens
- After observation compression: ~135K tokens
- Compression ratio: **97%**
- Average per session (before): ~26,000 tokens

### By Session Type
Long coding session (>100 tool calls), Avg Raw=52,000, Avg Compressed=1,200, Ratio=97.7%
Config/setup session, Avg Raw=18,000, Avg Compressed=520, Ratio=97.1%
Research/browsing session, Avg Raw=31,000, Avg Compressed=890, Ratio=97.1%
Short task (<10 tool calls), Avg Raw=4,200, Avg Compressed=280, Ratio=93.3%

## Tiered Summary Savings
MEMORY.md (4,102 tokens) → tiered summaries:

L0 (Ultra-compact), Token Budget=200, Actual=187, Savings vs Full=95.4%
L1 (Normal), Token Budget=500, Actual=478, Savings vs Full=88.4%
L2 (Full), Token Budget=—, Actual=4,102, Savings vs Full=0%

**Impact on sub-agents:** A sub-agent loading L0 instead of full MEMORY.md saves 3,915 tokens per spawn. At 20 sub-agent spawns/day, that's 78,300 tokens/day saved.

## Independent Technique Contribution
Each technique measured independently (not cumulative):

Rule engine alone, Savings on Memory Files=3.9%, Notes=Dedup + strip + merge
Dictionary alone, Savings on Memory Files=4.8%, Notes=Before rule engine (slightly higher)
Tokenizer optimize alone, Savings on Memory Files=1.4%, Notes=Tables → key:value biggest win
RLE alone, Savings on Memory Files=0.7%, Notes=Path-dependent
Combined, Savings on Memory Files=9.7%, Notes=Less than sum (some overlap)

## Token Cost Savings Estimate
Based on Anthropic's Claude pricing (as of 2024):

| Claude Sonnet 4 | $3/M tokens | 15M tokens | $45.00 | $22.50 | **$22.50** |
| Claude Opus 4 | $15/M tokens | 15M tokens | $225.00 | $112.50 | **$112.50** |
| Claude Haiku 3.5 | $0.25/M tokens | 15M tokens | $3.75 | $1.88 | **$1.88** |

*Estimate for active daily use: 50 sessions × 10 context loads each × 30K avg tokens per load.

### Breakdown by Source
| Session transcripts | 4.5M | 97% | 4.365M | $13.10 |
| Memory file loads | 8.5M* | 10% | 850K | $2.55 |
| Sub-agent context | 2M* | 88% (L0) | 1.76M | $5.28 |
| **Total** | **15M** | **46.5%** | **6.975M** | **$20.93** |

*Estimated from session frequency × tokens per load.

## Execution Performance
Benchmark runtime (Apple Silicon, 64GB RAM):

`estimate`, Time=0.3s, Notes=Token counting only
`compress` (rule engine), Time=0.8s, Notes=15 files
`dict` (build + compress), Time=1.2s, Notes=N-gram scanning
`dedup`, Time=0.5s, Notes=Shingle computation
`observe` (1 session), Time=0.1s, Notes=Rule-based extraction
`observe` (173 sessions), Time=8.2s, Notes=Batch processing
`tiers`, Time=0.4s, Notes=Summary generation
`full` (complete pipeline), Time=11.5s, Notes=All steps
`benchmark` (dry-run), Time=2.1s, Notes=Read-only analysis

All operations are I/O-bound, not CPU-bound. The bottleneck is reading/writing markdown files.

```

### references/compression-prompts.md

```markdown
# Compression Prompts
LLM prompts used by claw-compactor, adapted from claude-mem's observation compression approach.

## Design Rationale
Claude-mem captures tool observations as structured XML (`<observation>` → type, title, facts, narrative, concepts). Our prompts adapt this principle for flat markdown memory files — extracting and preserving the same categories of information while aggressively removing filler.

Key insight from claude-mem: **facts and decisions are the most token-efficient form of memory**. Narratives add context but cost 5-10× more tokens. Our compression targets facts first.

## Compression Prompt (used by compress_memory.py)
```
You are a memory compression specialist. Compress the following memory
content while preserving ALL factual information, decisions, and action items.

Rules:
- Remove filler words, redundant explanations, and verbose formatting
- Merge related items into concise bullet points
- Preserve dates, names, numbers, and technical details exactly
- Keep section structure but tighten headers
- Target: reduce to ~{target_pct}% of original size
- Output valid markdown

Content to compress:
---
{content}

Compressed version:

### Why this prompt works
- "ALL factual information" prevents lossy compression of key data
- "dates, names, numbers, technical details exactly" preserves identifiers (IPs, IDs, versions)
- "section structure" maintains navigability
- Explicit target percentage gives the model a concrete goal

## Tier Summary Prompts
Not currently LLM-generated — tiers use algorithmic section selection based on priority scores and token budgets. This is more deterministic and reproducible than LLM-based summarization.

If LLM-based tier generation is desired, use compress_memory.py's prompt with modified targets:
- Level 0: target_pct=5 with additional instruction "key-value pairs only"
- Level 1: target_pct=15 with additional instruction "organized sections"
```

### references/compression-techniques.md

```markdown
# Compression Techniques
claw-compactor applies 5 independent compression techniques in a layered pipeline. Each targets a different source of token waste and can run independently.

---

## 1. Rule-Based Compression
**Module:** `compress_memory.py` + `lib/markdown.py`
**Typical savings:** 4-8% on memory files
**Lossless:** Yes

The rule engine applies deterministic transformations that remove redundancy without losing any information.

### Rules Applied
Exact dedup, Description=Remove duplicate lines within a section, Typical Impact=1-3%
Near-dedup merge, Description=Merge sections with >60% Jaccard similarity, Typical Impact=1-2%
Whitespace strip, Description=Collapse excessive blank lines, trailing spaces, Typical Impact=0.5-1%
Empty section removal, Description=Remove headers with no body content, Typical Impact=0.5%
Markdown filler, Description=Strip unnecessary bold/italic/backtick markers, Typical Impact=0.5-1%
Chinese punctuation, Description=Fullwidth `,.!` → halfwidth `,.!` (saves 1 token each), Typical Impact=0-1%

### Before / After
**Before:**
```markdown

## Remote Machines

### Production Server
- IP: 10.0.2.1, Internal: 10.0.1.2, User: deploy

### Production Server
- Internal IP: 10.0.2.1, IP: 10.0.1.2, SSH user: deploy
- SSH: `ssh -i ~/.ssh/server_key.pem [email protected]`

## Notes
```

**After:**

The duplicate "Production Server" section was merged (near-dedup), and the empty "Notes" section was removed.

## 2. Dictionary Encoding
**Module:** `dictionary_compress.py` + `lib/dictionary.py`
**Typical savings:** 4-5% on memory files
**Lossless:** Yes (perfect roundtrip)

### How It Works
1. **Scan** — Analyze all workspace markdown files for n-gram frequencies (1-4 words)
2. **Score** — For each candidate phrase: `score = freq × (len(phrase) - len(code)) - codebook_overhead`
3. **Build** — Select top-scoring phrases, assign `$A1`, `$A2`, ... codes
4. **Compress** — Replace all occurrences of phrases with their codes
5. **Store** — Save codebook to `memory/.codebook.json`

### Codebook Format
```json
{
 "version": 1,
 "entries": {
 "$A1": "example_user",
 "$A2": "10.0.1",
 "$A3": "workspace",
 "$A4": "server_key.pem",
 "$A5": "my-secret-token-2024"
 }
}
```

**Before (TOOLS.md excerpt):**
- user: example_user
- SSH: ssh -i ~/.ssh/server_key.pem [email protected]
- IP: 10.0.1.1, Token: my-secret-token-2024, Workspace: /home/user/workspace

**After:**
- user: $A1
- SSH: ssh -i ~/.ssh/$A4 deploy@$A2.2
- IP: $A2.1, Token: $A5, Workspace: /home/$A1/$A3

### Roundtrip Guarantee
`decompress_text(compress_text(text, codebook), codebook) == text` — always. The compression and decompression functions are perfect inverses. This is verified by 50+ roundtrip tests covering edge cases (overlapping phrases, adjacent codes, Unicode, empty input).

### Collision Avoidance
Codes use the `$` prefix followed by uppercase alphanumeric characters. The codebook builder checks that no code is a substring of another code and that no code appears naturally in the source text.

## 3. Session Observation Compression
**Module:** `observation_compressor.py`
**Typical savings:** ~97% on session transcripts
**Lossless:** No (facts preserved, verbosity removed)

This is the single largest source of savings. Raw session transcripts contain verbose tool output — file contents, command results, API responses — most of which is never needed again.

### Pipeline
.jsonl transcript (26,000 tokens)
 │
 ▼
 Parse messages → extract tool calls
 Classify interactions → [feature|bugfix|decision|discovery|config|...]
 Rule-based extraction → key facts, errors, decisions
 Generate LLM prompt (optional) → structured XML
 Format as markdown observation (~780 tokens)

### Observation XML Format
```xml
<observations>
 <observation>
 <type>config</type>
 <title>Network configured for multi-node setup</title>
 <facts>
 - Gateway: 10.0.1.1, Remote node: 10.0.1.2, Worker: 10.0.1.3
 </facts>
 <narrative>Set up mesh network connecting 3 nodes</narrative>
 </observation>
</observations>
```

**Before (raw session, 847 lines):**
```
{"role":"assistant","content":"Let me check the network..."}
{"role":"tool","name":"exec","content":"network status\n200 OK...\n"}
{"role":"assistant","content":"Good, the network is active. Let me check peers..."}
... (800+ more lines of tool output)
```

**After (observation, 12 lines):**

## 1. [config] Multi-Node Network Setup
**Facts:**
- Gateway: 10.0.1.1
- Remote node: 10.0.1.2
- All peers connected

**Result:** 3-node mesh network operational

## 4. RLE Pattern Compression
**Module:** `lib/rle.py`
**Typical savings:** 1-2%
**Lossless:** Yes (roundtrip supported)

Targets three categories of structured repetitive data:

### Path Compression
Long workspace paths are replaced with `$WS`:

Before: /home/user/workspace/skills/claw-compactor/scripts/lib/tokens.py
After: $WS/skills/claw-compactor/scripts/lib/tokens.py

Decompression: `decompress_paths(text, "/home/user/workspace")`

### IP Family Compression
When multiple IPs share a common prefix (≥2 occurrences), the prefix is extracted:

Before:
 - 10.0.1.1
 - 10.0.1.2
 - 10.0.1.3

After:
 $IP1=10.0.1.
 - $IP1.1
 - $IP1.2
 - $IP1.3

### Enumeration Compaction
Detects comma-separated uppercase lists and compacts them:

Before: The supported types are FEATURE, BUGFIX, DECISION, DISCOVERY, CONFIG, DEPLOYMENT, DATA, INVESTIGATION
After: Types: [FEATURE,BUGFIX,DECISION,DISCOVERY,CONFIG,DEPLOYMENT,DATA,INVESTIGATION]

## 5. Compressed Context Protocol (CCP)
**Module:** `compressed_context.py`
**Typical savings:** 20-60% depending on level
**Lossless:** No (designed for model consumption)

CCP is designed for a specific use case: compress context on a cheap model, then feed it to an expensive model. The receiving model gets decompression instructions in its system prompt.

### Three Levels

#### Ultra (40-60% compression)
Aggressive abbreviation + filler removal. The output looks telegraphic:

 John has approximately 15 years of experience in software development,
 with a focus on infrastructure and cloud architecture. He is the
 Chief Executive Officer of TechCorp, based in San Francisco.

 John ~15y exp software dev, focus infra+cloud arch. CEO: TechCorp, loc:SF

Decompression instruction:
 "Compressed notation: key:val=attribute, loc:X=location,
 Ny+=N+ years, slash-separated=alternatives. Expand naturally."

#### Medium (20-35% compression)
Moderate abbreviation with key:value notation:

 The application server runs on port 8080 with a maximum of 256
 concurrent connections. The database connection pool is configured
 with 20 minimum and 50 maximum connections.

 App server: port 8080, max 256 concurrent conns.
 DB pool: min 20, max 50 conns.

#### Light (10-20% compression)
Light condensation only — remains fully human-readable:

 We decided to use PostgreSQL instead of MySQL for the new project
 because it has better support for JSON columns and more advanced
 indexing capabilities that we need for our search functionality.

 Decision: PostgreSQL over MySQL — better JSON column support
 and advanced indexing for search needs.

### Decompression Instructions
Each level generates a decompression instruction block to prepend to the receiving model's system prompt:

Ultra: "Compressed notation: key:val=attribute, loc:X=location, ..."
Medium: "Text uses abbreviated notation: key:value pairs, condensed lists, ..."
Light: "Text is lightly condensed. Read normally."

## Technique Comparison
| Rule engine | 4-8% | | Zero | Memory files |
| Dictionary | 4-5% | | Zero | Repetitive workspaces |
| Observation | ~97% | * | Zero or 1 LLM call | Session transcripts |
| RLE | 1-2% | | Zero | Path-heavy, IP-heavy docs |
| CCP | 20-60% | | Zero | Cross-model context passing |

*Observation compression preserves all facts and decisions; only verbose tool output is removed.

## Pipeline Interaction
The techniques are designed to compose:

1. **Rule engine first** — removes obvious waste before dictionary scoring
2. **Dictionary second** — works on cleaner text, better phrase detection
3. **RLE alongside dictionary** — different targets, no interference
4. **Observation runs independently** — operates on transcripts, not memory files
5. **CCP runs last or standalone** — can compress already-compressed output further

```

### references/testing.md

```markdown
# Testing

## Philosophy
claw-compactor follows a **trust-through-testing** approach. Every compression technique must prove:

1. **Correctness** — lossless techniques roundtrip perfectly; lossy techniques preserve all facts
2. **Safety** — edge cases (empty files, Unicode, malformed markdown) never crash
3. **Non-inflation** — compressed output is never larger than input
4. **Idempotency** — running compression twice produces the same result

## Test Suite Overview
**810+ tests** across 30 test files, covering unit tests, integration tests, and real-workspace validation.

```
tests/
├── conftest.py # Shared fixtures
│
├── # Core module tests
├── test_compress_memory.py # Rule engine compression
├── test_compress_memory_comprehensive.py # Extended rule engine tests
├── test_dictionary.py # Dictionary encoding basics
├── test_dictionary_comprehensive.py # Codebook edge cases, roundtrip
├── test_observation_compressor.py # Observation pipeline
├── test_observation_comprehensive.py # Extended observation tests
├── test_compressed_context.py # CCP levels
├── test_dedup_memory.py # Shingle dedup
├── test_generate_summary_tiers.py # Tier generation
├── test_estimate_tokens.py # Token estimation
├── test_audit_memory.py # Audit checks
├── test_audit_comprehensive.py # Extended audit tests
├── # Library tests
├── test_lib_tokens.py # tiktoken + fallback
├── test_lib_dedup.py # Shingle hashing, Jaccard
├── test_lib_markdown.py # MD parsing, normalization
├── test_rle.py # RLE basics
├── test_rle_comprehensive.py # Path/IP/enum edge cases
├── test_tokenizer_optimizer.py # Format optimization
├── test_tokenizer_optimizer_comprehensive.py # Extended optimizer tests
├── test_config.py # Config loading
├── test_tokens.py # Token utilities
├── # Integration & validation
├── test_main_entry.py # mem_compress.py CLI routing
├── test_cli_commands.py # Subprocess CLI invocation
├── test_pipeline.py # Full pipeline integration
├── test_integration.py # End-to-end scenarios
├── test_roundtrip.py # Roundtrip guarantees
├── test_roundtrip_comprehensive.py # Extended roundtrip tests
├── test_performance.py # Performance regression
├── test_benchmark.py # Benchmark command
├── test_tiers_comprehensive.py # Tier edge cases
├── test_error_handling.py # Error paths
├── test_new_features.py # Recent feature tests
├── test_real_workspace.py # Real workspace validation
├── test_token_economics.py # Cost calculations
└── test_markdown_advanced.py # Advanced MD scenarios

## Coverage Matrix
| compress_memory | | | | |
| dictionary_compress | | | | (50+ cases) |
| observation_compressor | | | | N/A (lossy) |
| dedup_memory | | | | N/A |
| generate_summary_tiers | | | | N/A |
| estimate_tokens | | | | N/A |
| audit_memory | | | | N/A |
| compressed_context | | | | N/A (lossy) |
| lib/tokens | | — | | N/A |
| lib/markdown | | — | | N/A |
| lib/dedup | | — | | N/A |
| lib/dictionary | | — | | |
| lib/rle | | — | | |
| lib/tokenizer_optimizer | | — | | N/A |
| lib/config | | — | | N/A |
| mem_compress (CLI) | | | | N/A |

### Edge Cases Tested
- **Empty files** — all modules handle gracefully
- **Unicode/CJK** — Chinese headers, mixed en/zh, emoji, accented characters
- **Large files** — 100K+ characters, 2000+ sections
- **Malformed markdown** — unclosed code blocks, broken headers, missing spaces
- **Headers-only files** — no body content
- **Single-line files** — minimal content
- **Nonexistent paths** — proper errors and exit codes
- **Overlapping dictionary codes** — no collisions
- **Adjacent `$XX` codes** — correct boundary detection
- **Empty codebooks** — graceful no-op

## Running Tests
```bash
cd skills/claw-compactor

# Run all tests
PYTHONPATH=scripts python3 -m pytest tests/ -v

# Run a specific test file
PYTHONPATH=scripts python3 -m pytest tests/test_dictionary.py -v

# Run a specific test class
PYTHONPATH=scripts python3 -m pytest tests/test_roundtrip.py::TestDictionaryRoundtrip -v

# Run with coverage (requires pytest-cov)
PYTHONPATH=scripts python3 -m pytest tests/ --cov=lib --cov-report=term-missing

# Quick check (no verbose)
PYTHONPATH=scripts python3 -m pytest tests/ -q

**Expected output:**
810 passed in 31s

## Fixtures (conftest.py)
Shared test fixtures provide consistent test environments:

- `tmp_workspace`: Workspace with MEMORY.md + `memory/` containing 2 daily files
- `empty_file`: Empty `.md` file
- `unicode_file`: Chinese + Japanese + emoji + accented characters
- `large_file`: 2000 sections, 100K+ characters
- `broken_markdown`: Malformed headers, unclosed code blocks
- `headers_only`: Only header lines, no body text
- `single_line`: Single line of text
- `duplicate_content`: Two files with known overlapping sections

## Adding New Tests

### For a new compression technique
1. Create `tests/test_<technique>.py`
2. Include at minimum:
 - **Basic functionality** — happy path
 - **Empty input** — should return empty/no-op
 - **Unicode input** — CJK, emoji, mixed scripts
 - **Roundtrip** (if lossless) — `decompress(compress(x)) == x`
 - **Non-inflation** — `len(compress(x)) <= len(x)` in tokens
 - **Idempotency** — `compress(compress(x)) == compress(x)`

3. Add fixture if needed in `conftest.py`

### For a new edge case
1. Add to the most relevant existing test file
2. Use the `@pytest.mark.parametrize` decorator for variants
3. Document what the edge case covers in the test docstring

### Test naming convention
```python
class TestModuleName:
 def test_basic_functionality(self):
 """Module handles the standard case."""

 def test_empty_input(self):
 """Module handles empty input gracefully."""

 def test_unicode_content(self):
 """Module handles CJK and emoji content."""

 def test_roundtrip_guarantee(self):
 """Compress then decompress returns original."""

## Continuous Validation
Tests should be run:
- Before any code change is committed
- After modifying any `lib/` module (shared dependencies)
- After updating compression rules or codebook logic
- As part of the `full` pipeline verification (post-packaging)
```

### scripts/audit_memory.py

```python
#!/usr/bin/env python3
"""Audit workspace memory files for token usage, staleness, and compression opportunities.

Scans all markdown files in a workspace and reports:
- Total token budget usage
- File age distribution
- Stale entries (not updated in N days)
- Compression suggestions

Usage:
    python3 audit_memory.py <workspace_path> [--stale-days 14] [--json]

Part of claw-compactor. License: MIT.
"""

import argparse
import json
import logging
import os
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional

sys.path.insert(0, str(Path(__file__).resolve().parent))
from lib.tokens import estimate_tokens
from lib.markdown import parse_sections, compress_markdown_table, strip_emoji
from lib.exceptions import FileNotFoundError_

logger = logging.getLogger(__name__)

# Default memory token budgets
DEFAULT_BUDGETS = {
    "MEMORY.md": 2000,
    "TOOLS.md": 1500,
    "AGENTS.md": 2000,
    "daily_total": 5000,
    "workspace_total": 15000,
}


def _has_tables(text: str) -> bool:
    """Check if text contains markdown tables."""
    return '|' in text and '---' in text


def _has_emoji(text: str) -> bool:
    """Check if text contains emoji characters."""
    from lib.markdown import _EMOJI_RE
    return bool(_EMOJI_RE.search(text))


def _count_empty_sections(text: str) -> int:
    """Count sections with no meaningful body content."""
    from lib.markdown import parse_sections
    sections = parse_sections(text)
    return sum(1 for h, b, _ in sections if h and not b.strip())


def _file_age_days(path: Path) -> float:
    """Return the age of *path* in days since last modification."""
    mtime = path.stat().st_mtime
    return (time.time() - mtime) / 86400


def audit_file(
    path: Path,
    stale_days: int = 14,
) -> Dict[str, Any]:
    """Audit a single markdown file.

    Returns dict with name, tokens, is_stale, suggestions, etc.
    """
    text = path.read_text(encoding="utf-8", errors="replace")
    tokens = estimate_tokens(text)
    age = _file_age_days(path)
    is_stale = age > stale_days

    suggestions: List[str] = []

    # Check for tables that could be compressed
    if '|' in text and '---' in text:
        compressed = compress_markdown_table(text)
        if len(compressed) < len(text) * 0.9:
            suggestions.append("Table detected — compress_markdown_table could save tokens")

    # Check for emoji
    stripped = strip_emoji(text)
    if len(stripped) < len(text):
        suggestions.append("Contains emoji — strip_emoji could save tokens")

    # Check for empty sections
    sections = parse_sections(text)
    empty_count = sum(1 for h, b, _ in sections if h and not b.strip())
    if empty_count > 0:
        suggestions.append(f"{empty_count} empty section(s) — remove_empty_sections")

    # Check token budget
    budget = DEFAULT_BUDGETS.get(path.name, DEFAULT_BUDGETS["workspace_total"])
    if tokens > budget:
        suggestions.append(f"Over budget: {tokens:,} tokens (budget: {budget:,})")

    if is_stale:
        suggestions.append(f"Stale: not modified in {age:.0f} days")

    return {
        "path": str(path),
        "file": str(path),
        "name": path.name,
        "tokens": tokens,
        "age_days": round(age, 1),
        "is_stale": is_stale,
        "suggestions": suggestions,
        "sections": len(sections),
    }


def audit_workspace(
    workspace: str,
    stale_days: int = 14,
) -> Dict[str, Any]:
    """Audit all memory files in *workspace*.

    Raises FileNotFoundError_ if workspace doesn't exist.
    """
    p = Path(workspace)
    if not p.exists():
        raise FileNotFoundError_(f"Workspace not found: {workspace}")

    files: List[Path] = []
    for f in sorted(p.glob("*.md")):
        files.append(f)
    mem_dir = p / "memory"
    if mem_dir.is_dir():
        for f in sorted(mem_dir.glob("*.md")):
            files.append(f)

    if not files:
        return {
            "total_files": 0,
            "total_tokens": 0,
            "files": [],
            "age_distribution": {},
            "suggestions": [],
        }

    file_results = [audit_file(f, stale_days=stale_days) for f in files]
    total_tokens = sum(r["tokens"] for r in file_results)

    # Age distribution
    age_bins = {"<7d": 0, "7-30d": 0, "30-90d": 0, ">90d": 0}
    for r in file_results:
        age = r["age_days"]
        if age < 7:
            age_bins["<7d"] += 1
        elif age < 30:
            age_bins["7-30d"] += 1
        elif age < 90:
            age_bins["30-90d"] += 1
        else:
            age_bins[">90d"] += 1

    # Aggregate suggestions
    all_suggestions = []
    for r in file_results:
        for s in r["suggestions"]:
            all_suggestions.append(f"[{r['name']}] {s}")

    return {
        "total_files": len(file_results),
        "total_tokens": total_tokens,
        "files": file_results,
        "age_distribution": age_bins,
        "suggestions": all_suggestions,
    }


def format_report(result: Dict[str, Any]) -> str:
    """Format audit result as a human-readable report."""
    lines = [
        "=== Memory Audit Report ===",
        f"Files: {result['total_files']}",
        f"Total tokens: {result['total_tokens']:,}",
        "",
        "Age distribution:",
    ]
    for bucket, count in result.get("age_distribution", {}).items():
        lines.append(f"  {bucket}: {count} files")

    if result.get("suggestions"):
        lines.append("\nSuggestions:")
        for s in result["suggestions"]:
            lines.append(f"  - {s}")
    else:
        lines.append("\nNo suggestions — workspace looks healthy.")

    return '\n'.join(lines)


def main():
    parser = argparse.ArgumentParser(description="Audit workspace memory files")
    parser.add_argument("workspace", help="Workspace directory")
    parser.add_argument("--stale-days", type=int, default=14, help="Days before stale")
    parser.add_argument("--json", action="store_true", help="JSON output")
    args = parser.parse_args()

    result = audit_workspace(args.workspace, stale_days=args.stale_days)
    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print(format_report(result))


if __name__ == "__main__":
    main()

```

### scripts/compress_memory.py

```python
#!/usr/bin/env python3
"""Compress memory files using rule-based preprocessing + LLM semantic compression.

Two-phase approach:
1. Rule engine: dedup lines, strip markdown redundancy, merge similar entries
2. LLM prompt: generate a prompt for semantic compression of remaining content

Usage:
    python3 compress_memory.py <path> [--dry-run] [--output FILE] [--older-than DAYS] [--no-llm]

Part of claw-compactor. License: MIT.
"""

import argparse
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Any, Optional

sys.path.insert(0, str(Path(__file__).resolve().parent))
from lib.tokens import estimate_tokens, using_tiktoken
from lib.markdown import (
    strip_markdown_redundancy, remove_duplicate_lines, parse_sections,
    normalize_chinese_punctuation, strip_emoji, remove_empty_sections,
    compress_markdown_table, merge_similar_bullets, merge_short_bullets,
)
from lib.dedup import find_duplicates, merge_duplicates
from lib.exceptions import FileNotFoundError_, MemCompressError

logger = logging.getLogger(__name__)

# LLM prompt template for semantic compression
COMPRESS_PROMPT = """You are a memory compression assistant. Compress the following text to approximately {target_pct}% of its current size while preserving ALL factual information, decisions, configurations, and actionable items.

Rules:
- Keep all names, IPs, paths, tokens, dates, and technical details EXACTLY
- Remove filler words, redundant explanations, and verbose phrasing
- Merge related items
- Use concise notation (key:value, abbreviations)
- Preserve markdown structure (headers, bullets)
- Output ONLY the compressed text, no commentary

Text to compress:
---
{content}
---

Compressed version:"""


def _file_age_days(path: Path) -> float:
    """Return file age in days based on mtime."""
    return (time.time() - path.stat().st_mtime) / 86400


def rule_compress(
    text: str,
    enable_emoji_strip: bool = True,
) -> str:
    """Apply all rule-based compression passes to *text*.

    Returns the compressed text. Never increases token count.
    """
    if not text:
        return ""

    result = text

    # 1. Normalize Chinese punctuation
    result = normalize_chinese_punctuation(result)

    # 2. Strip markdown redundancy (excess blanks, trailing whitespace)
    result = strip_markdown_redundancy(result)

    # 3. Remove duplicate lines
    result = remove_duplicate_lines(result)

    # 4. Remove empty sections
    result = remove_empty_sections(result)

    # 5. Compress markdown tables to key:value
    result = compress_markdown_table(result)

    # 6. Strip emoji if enabled
    if enable_emoji_strip:
        result = strip_emoji(result)

    # 7. Merge similar bullets
    result = merge_similar_bullets(result)

    # 8. Merge short bullets
    result = merge_short_bullets(result)

    # 9. Final cleanup
    result = strip_markdown_redundancy(result)

    return result


def generate_llm_prompt(content: str, target_pct: int = 50) -> str:
    """Generate an LLM prompt for semantic compression of *content*."""
    return COMPRESS_PROMPT.format(content=content, target_pct=target_pct)


def _collect_files(
    target: str,
    older_than: Optional[int] = None,
) -> List[Path]:
    """Collect markdown files from *target* (file or directory).

    If *older_than* is set, only include files older than N days.
    """
    path = Path(target)
    if not path.exists():
        raise FileNotFoundError_(f"Path not found: {target}")

    if path.is_file():
        if older_than is not None and _file_age_days(path) < older_than:
            return []
        return [path]

    # Directory: collect all .md files recursively
    files = sorted(path.rglob("*.md"))
    if older_than is not None:
        files = [f for f in files if _file_age_days(f) >= older_than]
    return files


def compress_file(
    path: Path,
    dry_run: bool = False,
    output: Optional[str] = None,
    no_llm: bool = False,
) -> Dict[str, Any]:
    """Compress a single file using rule-based compression.

    Args:
        path: File to compress.
        dry_run: If True, don't write changes.
        output: Optional output file path.
        no_llm: If True, skip LLM prompt generation.

    Returns a dict with compression statistics.
    """
    path = Path(path)
    original = path.read_text(encoding="utf-8")
    original_tokens = estimate_tokens(original)

    compressed = rule_compress(original)
    compressed_tokens = estimate_tokens(compressed)

    reduction_pct = ((original_tokens - compressed_tokens) / original_tokens * 100) if original_tokens else 0.0
    result: Dict[str, Any] = {
        "file": str(path),
        "original_tokens": original_tokens,
        "rule_compressed_tokens": compressed_tokens,
        "rule_reduction_pct": round(reduction_pct, 2),
        "dry_run": dry_run,
    }

    if not no_llm and compressed.strip():
        result["llm_prompt"] = generate_llm_prompt(compressed)

    if not dry_run:
        target = Path(output) if output else path
        target.write_text(compressed, encoding="utf-8")
        result["written_to"] = str(target)

    return result


def llm_compress_file(
    path: Path,
    target_pct: int = 40,
) -> Dict[str, Any]:
    """Generate an LLM compression prompt for a file and write it to a .prompt file.

    Returns stats dict with original_tokens, rule_compressed_tokens, prompt_file, etc.
    """
    text = path.read_text(encoding="utf-8", errors="replace")
    original_tokens = estimate_tokens(text)
    compressed = rule_compress(text)
    rule_tokens = estimate_tokens(compressed)
    prompt = generate_llm_prompt(compressed, target_pct)
    prompt_tokens = estimate_tokens(prompt)

    prompt_path = path.with_suffix(".prompt.md")
    prompt_path.write_text(prompt, encoding="utf-8")

    return {
        "file": str(path),
        "original_tokens": original_tokens,
        "rule_compressed_tokens": rule_tokens,
        "prompt_tokens": prompt_tokens,
        "prompt_file": str(prompt_path),
        "target_pct": target_pct,
        "instruction": f"Feed this prompt to an LLM for further {target_pct}% compression.",
    }


def main():
    parser = argparse.ArgumentParser(description="Compress memory files")
    parser.add_argument("path", help="File or directory to compress")
    parser.add_argument("--dry-run", action="store_true")
    parser.add_argument("--output", help="Output file")
    parser.add_argument("--older-than", type=int, help="Only files older than N days")
    parser.add_argument("--no-llm", action="store_true", help="Skip LLM prompt")
    parser.add_argument("--json", action="store_true")
    args = parser.parse_args()

    files = _collect_files(args.path, older_than=args.older_than)
    results = []
    for f in files:
        r = compress_file(f, dry_run=args.dry_run, output=args.output, no_llm=args.no_llm)
        results.append(r)

    if args.json:
        print(json.dumps(results, indent=2, ensure_ascii=False))
    else:
        for r in results:
            saved = r["original_tokens"] - r["rule_compressed_tokens"]
            print(f"{r['file']}: {r['original_tokens']} → {r['rule_compressed_tokens']} tokens (saved {saved})")


if __name__ == "__main__":
    main()

```

### scripts/compressed_context.py

```python
#!/usr/bin/env python3
"""Compressed Context Protocol -- compress text for expensive model consumption.

Generates ultra-compressed context + decompression instructions for system prompts.
Three compression levels: ultra, medium, light.

Usage:
    python3 compressed_context.py <file> [--level ultra|medium|light] [--output FILE]

Part of claw-compactor. License: MIT.
"""

import argparse
import json
import logging
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

sys.path.insert(0, str(Path(__file__).resolve().parent))
from lib.tokens import estimate_tokens

logger = logging.getLogger(__name__)

# Decompression instructions to prepend to system prompt
DECOMPRESS_INSTRUCTIONS = {
    "ultra": (
        "Compressed notation: key:val=attribute, loc:X+Y=locations, "
        "Ny+=N+ years, slash-separated=alternatives. "
        "Expand naturally when responding."
    ),
    "medium": (
        "Text uses abbreviated notation: key:value pairs, "
        "condensed lists, minimal punctuation. Read as natural language."
    ),
    "light": (
        "Text is lightly condensed. Read normally."
    ),
}

# Common words to abbreviate in ultra mode
ULTRA_ABBREVS = {
    "experience": "exp",
    "management": "mgmt",
    "development": "dev",
    "approximately": "~",
    "application": "app",
    "applications": "apps",
    "configuration": "config",
    "information": "info",
    "environment": "env",
    "infrastructure": "infra",
    "architecture": "arch",
    "implementation": "impl",
    "performance": "perf",
    "operations": "ops",
    "production": "prod",
    "repository": "repo",
    "repositories": "repos",
    "documentation": "docs",
    "communication": "comms",
    "organization": "org",
    "technology": "tech",
    "technologies": "tech",
    "cryptocurrency": "crypto",
    "quantitative": "quant",
    "distributed": "dist",
    "international": "intl",
    "professional": "pro",
    "certificate": "cert",
    "authentication": "auth",
    "authorization": "authz",
    "database": "db",
    "kubernetes": "k8s",
    "continuous": "cont",
    "integration": "integ",
    "deployment": "deploy",
    "monitoring": "mon",
    "notification": "notif",
    "requirements": "reqs",
    "specification": "spec",
    "administrator": "admin",
    "description": "desc",
    "transaction": "tx",
    "transactions": "txs",
    "currently": "curr",
    "previously": "prev",
    "following": "foll",
    "including": "incl",
    "especially": "esp",
    "engineering": "eng",
    "university": "univ",
    "founded": "founder",
    "established": "est",
    "headquarters": "HQ",
    "years of": "y+",
    "based in": "loc:",
    "located in": "loc:",
    "offices in": "offices:",
    "founder of": "founder:",
    "CEO of": "CEO:",
    "CTO of": "CTO:",
}

# Filler phrases to remove in ultra mode
ULTRA_FILLERS = [
    "In addition,", "Furthermore,", "Moreover,", "Additionally,",
    "It is worth noting that", "It should be noted that",
    "As a matter of fact,", "In fact,", "Actually,",
    "Basically,", "Essentially,", "In other words,",
    "That being said,", "Having said that,",
    "At the end of the day,", "When it comes to",
    "In terms of", "With regard to", "With respect to",
    "As mentioned earlier,", "As previously stated,",
    "It is important to note that", "Please note that",
    "In conclusion,", "To summarize,", "To sum up,",
    "extensive experience", "extensive experience in",
    "He has", "She has", "They have",
    "which is", "that is", "who is",
    "a wide range of", "a variety of",
]

# Medium-mode abbreviations (less aggressive)
MEDIUM_ABBREVS = {
    "configuration": "config",
    "application": "app",
    "environment": "env",
    "infrastructure": "infra",
    "implementation": "impl",
    "documentation": "docs",
    "database": "db",
    "kubernetes": "k8s",
}


def compress_ultra(text: str) -> str:
    """Apply ultra compression -- aggressive abbreviation and filler removal."""
    if not text:
        return ""

    result = text

    # Remove fillers
    for filler in ULTRA_FILLERS:
        result = result.replace(filler, "")

    # Apply abbreviations (case-insensitive for the word, preserve surrounding)
    for word, abbrev in ULTRA_ABBREVS.items():
        # Replace whole words
        result = re.sub(r'\b' + re.escape(word) + r'\b', abbrev, result, flags=re.IGNORECASE)

    # Remove articles and common short fillers
    result = re.sub(r'\b(?:the|a|an|is|are|was|were|has|have|had|been|being)\b\s*', '', result, flags=re.IGNORECASE)
    # Remove "of" in common phrases but keep meaningful ones
    result = re.sub(r'\bof\b\s+', ' ', result)
    # Remove "and" → "+"
    result = re.sub(r'\band\b', '+', result)
    # Remove "with" → "w/"
    result = re.sub(r'\bwith\b', 'w/', result)
    # Remove "for" → "4"
    result = re.sub(r'\bfor\b', '4', result)
    # "in" → "in" (keep, too short to abbreviate)

    # Clean up spacing
    result = re.sub(r'  +', ' ', result)
    result = re.sub(r'\n{3,}', '\n\n', result)
    result = re.sub(r'^\s+', '', result, flags=re.MULTILINE)

    return result.strip()


def compress_medium(text: str) -> str:
    """Apply medium compression -- moderate abbreviation."""
    if not text:
        return ""

    result = text

    # Apply medium abbreviations only
    for word, abbrev in MEDIUM_ABBREVS.items():
        result = re.sub(r'\b' + re.escape(word) + r'\b', abbrev, result, flags=re.IGNORECASE)

    # Remove some fillers
    for filler in ULTRA_FILLERS[:5]:  # Only the most common
        result = result.replace(filler, "")

    # Clean up
    result = re.sub(r'  +', ' ', result)
    result = re.sub(r'\n{3,}', '\n\n', result)

    return result.strip()


def compress_light(text: str) -> str:
    """Apply light compression -- just cleanup."""
    if not text:
        return ""

    result = text
    result = re.sub(r'  +', ' ', result)
    result = re.sub(r'\n{3,}', '\n\n', result)
    return result.strip()


def compress(text: str, level: str) -> Dict[str, str]:
    """Compress text at the specified level.

    Returns dict with compressed text, instructions, and level.
    Raises ValueError for invalid level.
    """
    if level not in DECOMPRESS_INSTRUCTIONS:
        raise ValueError(f"Invalid compression level: {level}. Use: ultra, medium, light")

    compressors = {
        "ultra": compress_ultra,
        "medium": compress_medium,
        "light": compress_light,
    }

    compressed = compressors[level](text)
    return {
        "compressed": compressed,
        "instructions": DECOMPRESS_INSTRUCTIONS[level],
        "level": level,
    }


def compress_with_stats(text: str, level: str) -> Dict[str, Any]:
    """Compress text and return statistics.

    Returns dict with compressed text, token counts, and reduction percentage.
    """
    result = compress(text, level)
    orig_tokens = estimate_tokens(text)
    comp_tokens = estimate_tokens(result["compressed"])
    inst_tokens = estimate_tokens(result["instructions"])

    # Net includes instruction overhead
    net_tokens = comp_tokens + inst_tokens
    reduction = ((orig_tokens - comp_tokens) / orig_tokens * 100) if orig_tokens > 0 else 0.0

    return {
        "compressed": result["compressed"],
        "instructions": result["instructions"],
        "level": level,
        "original_tokens": orig_tokens,
        "compressed_tokens": comp_tokens,
        "instruction_tokens": inst_tokens,
        "net_tokens": net_tokens,
        "reduction_pct": round(reduction, 1),
    }


def main():
    parser = argparse.ArgumentParser(description="Compressed Context Protocol")
    parser.add_argument("file", help="File to compress")
    parser.add_argument("--level", default="ultra", choices=["ultra", "medium", "light"])
    parser.add_argument("--output", help="Output file")
    parser.add_argument("--json", action="store_true")
    args = parser.parse_args()

    text = Path(args.file).read_text(encoding="utf-8")
    stats = compress_with_stats(text, args.level)

    if args.output:
        Path(args.output).write_text(stats["compressed"], encoding="utf-8")

    if args.json:
        print(json.dumps(stats, indent=2))
    else:
        pct = stats["reduction_pct"]
        print(f"Level: {args.level}")
        print(f"Original: {stats['original_tokens']} tokens")
        print(f"Compressed: {stats['compressed_tokens']} tokens ({pct:.1f}% reduction)")
        print(f"Instructions: {stats['instruction_tokens']} tokens")


if __name__ == "__main__":
    main()

```

### scripts/dedup_memory.py

```python
#!/usr/bin/env python3
"""Find and merge near-duplicate entries across memory files.

Uses shingle hashing for efficient similarity detection without O(n^2) comparison.

Usage:
    python3 dedup_memory.py <path> [--json] [--auto-merge] [--threshold 0.6]

Part of claw-compactor. License: MIT.
"""

import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Dict, List, Any

sys.path.insert(0, str(Path(__file__).resolve().parent))
from lib.tokens import estimate_tokens
from lib.markdown import parse_sections, strip_markdown_redundancy
from lib.dedup import find_duplicates, merge_duplicates, SIMILARITY_THRESHOLD
from lib.exceptions import FileNotFoundError_

logger = logging.getLogger(__name__)


def _collect_entries(target: str) -> List[Dict[str, Any]]:
    """Collect bullet/paragraph entries from markdown files at *target*.

    Returns a list of dicts with 'text', 'source', and 'section' keys.
    """
    path = Path(target)
    if not path.exists():
        raise FileNotFoundError_(f"Path not found: {target}")

    files = [path] if path.is_file() else sorted(path.rglob("*.md"))
    entries: List[Dict[str, Any]] = []

    for f in files:
        text = f.read_text(encoding="utf-8")
        if not text.strip():
            continue
        sections = parse_sections(text)
        for header, body, level in sections:
            if not body.strip():
                continue
            # Split body into bullet lines or paragraphs
            for line in body.split('\n'):
                line = line.strip()
                if line and len(line) > 10:  # Skip very short lines
                    entries.append({
                        "text": line,
                        "source": str(f),
                        "section": header,
                    })

    return entries


def run_dedup(
    target: str,
    threshold: float = SIMILARITY_THRESHOLD,
    auto_merge: bool = False,
) -> Dict[str, Any]:
    """Run deduplication on *target* (file or directory).

    Returns a dict with statistics and duplicate groups.
    """
    entries = _collect_entries(target)
    texts = [e["text"] for e in entries]

    tokens_before = estimate_tokens('\n'.join(texts))
    groups = find_duplicates(texts, threshold=threshold)

    result: Dict[str, Any] = {
        "total_entries": len(entries),
        "duplicate_groups": groups,
        "duplicate_group_count": len(groups),
        "entries_removed": 0,
        "tokens_before": tokens_before,
    }

    if auto_merge and groups:
        merged = merge_duplicates(texts, groups)
        tokens_after = estimate_tokens('\n'.join(merged))
        result["entries_removed"] = len(texts) - len(merged)
        result["tokens_after"] = tokens_after
        result["tokens_saved"] = tokens_before - tokens_after

    if groups:
        result["groups"] = []
        for g in groups:
            group_entries = [entries[i] for i in g["indices"]]
            result["groups"].append({
                "similarity": g["similarity"],
                "entries": [e["text"][:100] for e in group_entries],
                "sources": list(set(e["source"] for e in group_entries)),
            })

    return result


def format_human(result: Dict[str, Any]) -> str:
    """Format dedup results as a human-readable report."""
    lines = ["# Deduplication Report", ""]
    lines.append(f"Total entries scanned: {result['total_entries']}")
    groups = result['duplicate_groups']
    num_groups = len(groups) if isinstance(groups, list) else groups
    lines.append(f"Duplicate groups found: {num_groups}")

    if not num_groups:
        lines.append("\nNo duplicates found.")
        return '\n'.join(lines)

    lines.append(f"Entries removed: {result.get('entries_removed', 0)}")
    if "tokens_saved" in result:
        lines.append(f"Tokens saved: {result['tokens_saved']}")

    if "groups" in result:
        lines.append("\n## Groups")
        for i, g in enumerate(result["groups"]):
            lines.append(f"\n### Group {i + 1} (similarity: {g['similarity']:.2f})")
            for entry in g["entries"]:
                lines.append(f"  - {entry}")

    return '\n'.join(lines)


def main():
    parser = argparse.ArgumentParser(description="Find near-duplicate memory entries")
    parser.add_argument("path", help="File or directory to scan")
    parser.add_argument("--json", action="store_true")
    parser.add_argument("--auto-merge", action="store_true")
    parser.add_argument("--threshold", type=float, default=SIMILARITY_THRESHOLD)
    args = parser.parse_args()

    result = run_dedup(args.path, threshold=args.threshold, auto_merge=args.auto_merge)

    if args.json:
        print(json.dumps(result, indent=2, ensure_ascii=False))
    else:
        print(format_human(result))


if __name__ == "__main__":
    main()

```

### scripts/dictionary_compress.py

```python
#!/usr/bin/env python3
"""Dictionary-based compression for workspace memory files.

Learns high-frequency phrases from the workspace, builds a codebook,
and applies lossless substitution compression.

Usage:
    python3 dictionary_compress.py <workspace> --build       # Scan + generate codebook
    python3 dictionary_compress.py <workspace> --compress     # Apply codebook
    python3 dictionary_compress.py <workspace> --decompress   # Expand codes back
    python3 dictionary_compress.py <workspace> --stats        # Show compression effect

Part of claw-compactor. License: MIT.
"""

import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Dict, List, Any

sys.path.insert(0, str(Path(__file__).resolve().parent))
from lib.dictionary import (
    build_codebook, compress_text, decompress_text,
    save_codebook, load_codebook, compression_stats,
)
from lib.tokens import estimate_tokens
from lib.exceptions import FileNotFoundError_, MemCompressError

logger = logging.getLogger(__name__)

DEFAULT_CODEBOOK_PATH = "memory/.codebook.json"


def _collect_md_files(workspace: Path) -> List[Path]:
    """Collect all markdown files in workspace."""
    files: List[Path] = []
    for name in ["MEMORY.md", "TOOLS.md", "AGENTS.md", "SOUL.md", "USER.md"]:
        p = workspace / name
        if p.exists():
            files.append(p)
    mem_dir = workspace / "memory"
    if mem_dir.is_dir():
        for f in sorted(mem_dir.glob("*.md")):
            if not f.name.startswith('.'):
                files.append(f)
    return files


def _read_texts(files: List[Path]) -> List[str]:
    """Read all files into a list of strings."""
    return [f.read_text(encoding="utf-8", errors="replace") for f in files]


def cmd_build(
    workspace: Path,
    codebook_path: Path,
    min_freq: int = 3,
    max_entries: int = 200,
) -> Dict[str, Any]:
    """Scan workspace and build codebook."""
    files = _collect_md_files(workspace)
    texts = _read_texts(files)
    cb = build_codebook(texts, min_freq=min_freq, max_entries=max_entries)
    save_codebook(cb, codebook_path)
    return {
        "codebook_entries": len(cb),
        "codebook_path": str(codebook_path),
        "files_scanned": len(files),
    }


def cmd_compress(
    workspace: Path,
    codebook_path: Path,
    dry_run: bool = False,
) -> Dict[str, Any]:
    """Apply codebook compression to all workspace files."""
    cb = load_codebook(codebook_path)
    files = _collect_md_files(workspace)
    total_before = 0
    total_after = 0

    for f in files:
        text = f.read_text(encoding="utf-8", errors="replace")
        before = estimate_tokens(text)
        compressed = compress_text(text, cb)
        after = estimate_tokens(compressed)
        total_before += before
        total_after += after
        if not dry_run:
            f.write_text(compressed, encoding="utf-8")

    return {
        "files": len(files),
        "tokens_before": total_before,
        "tokens_after": total_after,
        "tokens_saved": total_before - total_after,
        "dry_run": dry_run,
    }


def cmd_decompress(
    workspace: Path,
    codebook_path: Path,
    dry_run: bool = False,
) -> Dict[str, Any]:
    """Expand codebook codes back to original phrases."""
    cb = load_codebook(codebook_path)
    files = _collect_md_files(workspace)

    for f in files:
        text = f.read_text(encoding="utf-8", errors="replace")
        decompressed = decompress_text(text, cb)
        if not dry_run:
            f.write_text(decompressed, encoding="utf-8")

    return {"files": len(files), "dry_run": dry_run}


def cmd_stats(
    workspace: Path,
    codebook_path: Path,
) -> Dict[str, Any]:
    """Show compression statistics."""
    cb = load_codebook(codebook_path)
    files = _collect_md_files(workspace)
    texts = _read_texts(files)
    combined = '\n'.join(texts)
    compressed = compress_text(combined, cb)
    stats = compression_stats(combined, compressed, cb)
    stats["files"] = len(files)
    return stats


def main():
    parser = argparse.ArgumentParser(description="Dictionary-based compression")
    parser.add_argument("workspace", help="Workspace directory")
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument("--build", action="store_true", help="Build codebook")
    group.add_argument("--compress", action="store_true", help="Apply compression")
    group.add_argument("--decompress", action="store_true", help="Expand codes")
    group.add_argument("--stats", action="store_true", help="Show stats")
    parser.add_argument("--codebook", default=None, help="Codebook path")
    parser.add_argument("--dry-run", action="store_true")
    parser.add_argument("--json", action="store_true")
    args = parser.parse_args()

    ws = Path(args.workspace)
    cb_path = Path(args.codebook) if args.codebook else ws / DEFAULT_CODEBOOK_PATH

    if args.build:
        result = cmd_build(ws, cb_path)
    elif args.compress:
        result = cmd_compress(ws, cb_path, dry_run=args.dry_run)
    elif args.decompress:
        result = cmd_decompress(ws, cb_path, dry_run=args.dry_run)
    else:
        result = cmd_stats(ws, cb_path)

    if args.json:
        print(json.dumps(result, indent=2))
    else:
        for k, v in result.items():
            print(f"{k}: {v}")


if __name__ == "__main__":
    main()

```

### scripts/estimate_tokens.py

```python
#!/usr/bin/env python3
"""Estimate token counts for memory files in a workspace.

Scans markdown files, estimates token usage, and reports compression potential.

Usage:
    python3 estimate_tokens.py <path> [--json] [--threshold N]

Part of claw-compactor. License: MIT.
"""

import argparse
import json
import logging
import os
import sys
from pathlib import Path
from typing import Dict, List, Any

sys.path.insert(0, str(Path(__file__).resolve().parent))
from lib.tokens import estimate_tokens, using_tiktoken
from lib.markdown import strip_markdown_redundancy
from lib.exceptions import FileNotFoundError_

logger = logging.getLogger(__name__)

# Compression potential scoring
POTENTIAL_THRESHOLDS = {
    "high": 2000,
    "medium": 500,
    "low": 0,
}


def _score_potential(tokens: int, stripped_tokens: int) -> str:
    """Score compression potential based on token count and reducibility."""
    ratio = (tokens - stripped_tokens) / tokens if tokens > 0 else 0
    if tokens >= POTENTIAL_THRESHOLDS["high"] or ratio >= 0.15:
        return "high"
    if ratio > 0.05 or tokens >= POTENTIAL_THRESHOLDS["medium"]:
        return "medium"
    return "low"


def _collect_md_files(path: Path) -> List[Path]:
    """Collect markdown files from path (file or directory)."""
    if path.is_file():
        return [path]
    if not path.exists():
        raise FileNotFoundError_(f"Path not found: {path}")
    files = []
    # Root-level .md files
    for f in sorted(path.glob("*.md")):
        files.append(f)
    # memory/ subdirectory
    mem_dir = path / "memory"
    if mem_dir.is_dir():
        for f in sorted(mem_dir.glob("*.md")):
            files.append(f)
    return files


def scan_path(path: str, threshold: int = 0) -> List[Dict[str, Any]]:
    """Scan *path* for markdown files and estimate token usage.

    Returns a list of dicts sorted by token count descending.
    Raises FileNotFoundError_ if path doesn't exist.
    """
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError_(f"Path not found: {path}")

    files = _collect_md_files(p) if p.is_dir() else [p]
    results: List[Dict[str, Any]] = []

    for f in files:
        text = f.read_text(encoding="utf-8", errors="replace")
        tokens = estimate_tokens(text)
        stripped = strip_markdown_redundancy(text)
        stripped_tokens = estimate_tokens(stripped)
        potential = _score_potential(tokens, stripped_tokens)

        if tokens >= threshold:
            results.append({
                "file": str(f),
                "name": f.name,
                "tokens": tokens,
                "stripped_tokens": stripped_tokens,
                "potential": potential,
                "size_bytes": len(text.encode("utf-8")),
            })

    results.sort(key=lambda r: r["tokens"], reverse=True)
    return results


def format_human(results: List[Dict[str, Any]]) -> str:
    """Format scan results as a human-readable report."""
    if not results:
        return "No files found or all below threshold."

    total = sum(r["tokens"] for r in results)
    lines = [
        "=== Token Estimation Report ===",
        f"Engine: {'tiktoken' if using_tiktoken() else 'heuristic'}",
        f"Files: {len(results)}",
        f"Total tokens: {total:,}",
        "",
    ]
    for r in results:
        lines.append(f"  {r['name']:30s} {r['tokens']:>8,} tokens  [{r['potential']}]")

    return '\n'.join(lines)


def main():
    parser = argparse.ArgumentParser(description="Estimate token usage in memory files")
    parser.add_argument("path", help="File or directory to scan")
    parser.add_argument("--json", action="store_true", help="JSON output")
    parser.add_argument("--threshold", type=int, default=0, help="Min tokens to show")
    args = parser.parse_args()

    results = scan_path(args.path, threshold=args.threshold)
    if args.json:
        print(json.dumps({"files": results, "total_tokens": sum(r["tokens"] for r in results)}, indent=2))
    else:
        print(format_human(results))


if __name__ == "__main__":
    main()

```

### scripts/generate_summary_tiers.py

```python
#!/usr/bin/env python3
"""Generate tiered summaries from MEMORY.md files.

Creates Level 0/1/2 summary templates with token budgets:
- Level 0 (Ultra-compact): ~200 tokens - key facts only
- Level 1 (Working memory): ~1000 tokens - active context
- Level 2 (Full context): ~3000 tokens - complete reference

Usage:
    python3 generate_summary_tiers.py <path> [--json] [--output-dir DIR]

Part of claw-compactor. License: MIT.
"""

import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional

sys.path.insert(0, str(Path(__file__).resolve().parent))
from lib.tokens import estimate_tokens
from lib.markdown import parse_sections
from lib.exceptions import FileNotFoundError_

logger = logging.getLogger(__name__)

# Tier definitions
TIERS = {
    0: {"name": "Ultra-compact", "budget": 200, "description": "Key facts and critical decisions only"},
    1: {"name": "Working memory", "budget": 1000, "description": "Active context for daily work"},
    2: {"name": "Full context", "budget": 3000, "description": "Complete reference with details"},
}

# Section priority for compression (higher = keep more)
SECTION_PRIORITIES = {
    "decision": 10,
    "critical": 10,
    "important": 9,
    "action": 8,
    "todo": 8,
    "config": 7,
    "setup": 7,
    "architecture": 7,
    "preference": 6,
    "convention": 6,
    "lesson": 5,
    "note": 4,
    "log": 3,
    "history": 2,
    "archive": 1,
}

DEFAULT_PRIORITY = 5


def _classify_section(header: str) -> int:
    """Classify a section header by priority.

    Returns a priority score (1-10). Higher = more important.
    """
    h = header.lower()
    for keyword, priority in SECTION_PRIORITIES.items():
        if keyword in h:
            return priority
    return DEFAULT_PRIORITY


def _find_memory_files(target: str) -> List[Path]:
    """Find memory files to process.

    Raises FileNotFoundError_ if target doesn't exist.
    """
    p = Path(target)
    if not p.exists():
        raise FileNotFoundError_(f"Path not found: {target}")

    if p.is_file():
        return [p]

    files = []
    # Prioritize MEMORY.md
    mem = p / "MEMORY.md"
    if mem.exists():
        files.append(mem)
    # Add other root .md files
    for f in sorted(p.glob("*.md")):
        if f.name != "MEMORY.md" and f not in files:
            files.append(f)
    # memory/ subdirectory
    mem_dir = p / "memory"
    if mem_dir.is_dir():
        for f in sorted(mem_dir.glob("*.md")):
            files.append(f)

    return files


def generate_tiers(files: List[Path]) -> Dict[str, Any]:
    """Generate tier analysis from memory files.

    Returns a dict with total_tokens, total_sections, and per-tier info.
    """
    # Collect all sections with priorities
    all_sections: List[Dict[str, Any]] = []
    total_tokens = 0

    for f in files:
        text = f.read_text(encoding="utf-8", errors="replace")
        tokens = estimate_tokens(text)
        total_tokens += tokens
        sections = parse_sections(text)
        for header, body, level in sections:
            sec_tokens = estimate_tokens(header + '\n' + body) if (header or body) else 0
            priority = _classify_section(header) if header else DEFAULT_PRIORITY
            all_sections.append({
                "header": header,
                "body": body,
                "level": level,
                "tokens": sec_tokens,
                "priority": priority,
                "file": str(f),
            })

    # Sort by priority descending, then by token count ascending
    all_sections.sort(key=lambda s: (-s["priority"], s["tokens"]))

    # Build tiers
    tiers: Dict[int, Dict[str, Any]] = {}
    for tier_level, tier_def in TIERS.items():
        budget = tier_def["budget"]
        selected: List[Dict[str, Any]] = []
        used = 0
        for sec in all_sections:
            if used + sec["tokens"] <= budget:
                selected.append(sec)
                used += sec["tokens"]
        tiers[tier_level] = {
            "name": tier_def["name"],
            "budget": budget,
            "description": tier_def["description"],
            "sections_included": len(selected),
            "tokens_used": used,
            "sections": selected,
        }

    return {
        "total_tokens": total_tokens,
        "total_sections": len(all_sections),
        "tiers": tiers,
    }


def format_tier_template(result: Dict[str, Any], level: int) -> str:
    """Format a tier as a markdown template."""
    tier = result["tiers"][level]
    lines = [
        f"# Level {level} — {tier['name']}",
        f"Budget: {tier['budget']} tokens | Used: {tier['tokens_used']}",
        f"Sections: {tier['sections_included']}",
        "",
    ]
    for sec in tier["sections"]:
        if sec["header"]:
            lines.append(f"## {sec['header']}")
        if sec["body"]:
            lines.append(sec["body"])
        lines.append("")

    return '\n'.join(lines)


def format_human(result: Dict[str, Any]) -> str:
    """Format tier analysis as a human-readable report."""
    lines = [
        "=== Summary Tier Analysis ===",
        f"Total tokens: {result['total_tokens']:,}",
        f"Total sections: {result['total_sections']}",
        "",
    ]
    for level in range(3):
        tier = result["tiers"][level]
        lines.append(f"Level {level} ({tier['name']}):")
        lines.append(f"  Budget: {tier['budget']} tokens")
        lines.append(f"  Used: {tier['tokens_used']} tokens")
        lines.append(f"  Sections: {tier['sections_included']}")
        lines.append("")

    return '\n'.join(lines)


def extract_key_facts(text: str) -> List[str]:
    """Extract key facts from markdown text.

    Identifies lines with key:value patterns, important markers, and
    critical information. Returns deduplicated list of fact strings.
    """
    if not text:
        return []

    facts: List[str] = []
    seen: set = set()

    for line in text.split('\n'):
        line = line.strip()
        if not line:
            continue
        # Skip headers
        if line.startswith('#'):
            continue
        # Strip bullet prefix
        clean = line.lstrip('- *+').strip()
        if not clean:
            continue

        # Key:value patterns, important markers
        is_fact = (
            ':' in clean
            or any(m in line for m in ['⚠️', 'Critical', 'Important', 'IMPORTANT', 'WARNING'])
            or any(c.isdigit() for c in clean)  # Contains numbers
        )

        if is_fact and clean not in seen:
            seen.add(clean)
            facts.append(clean)

    return facts


def generate_auto_summary(
    files: List[Path],
    budget: int = 200,
) -> str:
    """Generate an automatic summary from memory files within token budget.

    Extracts key facts and fills up to budget tokens.
    """
    all_facts: List[str] = []
    for f in files:
        text = f.read_text(encoding="utf-8", errors="replace")
        all_facts.extend(extract_key_facts(text))

    lines = ["# Auto Summary", ""]
    used_tokens = estimate_tokens('\n'.join(lines))

    for fact in all_facts:
        fact_line = f"- {fact}"
        fact_tokens = estimate_tokens(fact_line)
        if used_tokens + fact_tokens > budget:
            break
        lines.append(fact_line)
        used_tokens += fact_tokens

    return '\n'.join(lines)


def main():
    parser = argparse.ArgumentParser(description="Generate tiered summaries")
    parser.add_argument("path", help="File or directory")
    parser.add_argument("--json", action="store_true", help="JSON output")
    parser.add_argument("--output-dir", help="Write tier files to this directory")
    args = parser.parse_args()

    files = _find_memory_files(args.path)
    result = generate_tiers(files)

    if args.json:
        # Make JSON-serializable (remove section bodies for brevity)
        output = {
            "total_tokens": result["total_tokens"],
            "total_sections": result["total_sections"],
            "tiers": {
                k: {kk: vv for kk, vv in v.items() if kk != "sections"}
                for k, v in result["tiers"].items()
            },
        }
        print(json.dumps(output, indent=2))
    else:
        print(format_human(result))

    if args.output_dir:
        out = Path(args.output_dir)
        out.mkdir(parents=True, exist_ok=True)
        for level in range(3):
            (out / f"MEMORY-L{level}.md").write_text(
                format_tier_template(result, level), encoding="utf-8"
            )


if __name__ == "__main__":
    main()

```

### scripts/lib/__init__.py

```python
"""claw-compactor shared library.

Core utilities for token estimation, markdown parsing, deduplication,
dictionary encoding, run-length encoding, and format optimization.

Part of claw-compactor. License: MIT.
"""

```

### scripts/lib/config.py

```python
"""Configuration management for claw-compactor.

Loads settings from claw-compactor-config.json in the workspace root,
falling back to sensible defaults.

Part of claw-compactor. License: MIT.
"""

import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Optional

logger = logging.getLogger("claw-compactor.config")

DEFAULT_CONFIG: Dict[str, Any] = {
    "chars_per_token": 4,
    "level0_max_tokens": 200,
    "level1_max_tokens": 500,
    "dedup_similarity_threshold": 0.6,
    "dedup_shingle_size": 3,
    "dedup_max_results": 50,
    "dedup_min_line_length": 20,
    "compress_min_tokens": 50,
    "compress_target_ratio": 0.4,
    "date_format": "%Y-%m-%d",
    "memory_dir": "memory",
    "memory_file": "MEMORY.md",
    "summary_tiers_file": "memory/summary-tiers.md",
    "compressed_suffix": ".compressed.md",
    "log_level": "INFO",
}

CONFIG_FILENAME = "claw-compactor-config.json"


@dataclass
class MemCompressConfig:
    """Runtime configuration for claw-compactor."""

    chars_per_token: int = DEFAULT_CONFIG["chars_per_token"]
    level0_max_tokens: int = DEFAULT_CONFIG["level0_max_tokens"]
    level1_max_tokens: int = DEFAULT_CONFIG["level1_max_tokens"]
    dedup_similarity_threshold: float = DEFAULT_CONFIG["dedup_similarity_threshold"]
    dedup_shingle_size: int = DEFAULT_CONFIG["dedup_shingle_size"]
    dedup_max_results: int = DEFAULT_CONFIG["dedup_max_results"]
    dedup_min_line_length: int = DEFAULT_CONFIG["dedup_min_line_length"]
    compress_min_tokens: int = DEFAULT_CONFIG["compress_min_tokens"]
    compress_target_ratio: float = DEFAULT_CONFIG["compress_target_ratio"]
    date_format: str = DEFAULT_CONFIG["date_format"]
    memory_dir: str = DEFAULT_CONFIG["memory_dir"]
    memory_file: str = DEFAULT_CONFIG["memory_file"]
    summary_tiers_file: str = DEFAULT_CONFIG["summary_tiers_file"]
    compressed_suffix: str = DEFAULT_CONFIG["compressed_suffix"]
    log_level: str = DEFAULT_CONFIG["log_level"]


def load_config(workspace: Path) -> MemCompressConfig:
    """Load configuration from *workspace*/claw-compactor-config.json.

    Returns default config if the file is missing, empty, or invalid.
    """
    config_path = workspace / CONFIG_FILENAME
    if not config_path.exists():
        return MemCompressConfig()
    try:
        text = config_path.read_text(encoding="utf-8").strip()
        if not text:
            return MemCompressConfig()
        data = json.loads(text)
        if not isinstance(data, dict):
            logger.warning("Config is not a JSON object, using defaults")
            return MemCompressConfig()
        # Filter to known fields only
        known = {f.name for f in MemCompressConfig.__dataclass_fields__.values()}
        filtered = {k: v for k, v in data.items() if k in known}
        return MemCompressConfig(**filtered)
    except (json.JSONDecodeError, TypeError, ValueError) as exc:
        logger.warning("Invalid config %s: %s — using defaults", config_path, exc)
        return MemCompressConfig()

```

### scripts/lib/dedup.py

```python
"""Deduplication engine using shingle hashing.

Uses n-gram (shingle) fingerprinting for efficient near-duplicate detection
without O(n^2) pairwise comparison. Groups entries by section, then compares
shingle sets using Jaccard similarity.

Part of claw-compactor. License: MIT.
"""

import hashlib
import logging
from typing import List, Dict, Any, Set, Tuple

logger = logging.getLogger(__name__)

# Configuration
SHINGLE_SIZE = 3  # n-gram size (words)
SIMILARITY_THRESHOLD = 0.6  # Jaccard similarity threshold for "duplicate"


def _shingles(text: str, k: int = SHINGLE_SIZE) -> Set[int]:
    """Generate a set of k-word shingle hashes from *text*.

    Each shingle is a hash of *k* consecutive words.
    Returns a set of integer hashes.
    """
    words = text.split()
    if not words:
        return {hash("")}
    if len(words) < k:
        return {hash(' '.join(words))}
    result: Set[int] = set()
    for i in range(len(words) - k + 1):
        shingle = ' '.join(words[i:i + k])
        result.add(hash(shingle))
    return result


def jaccard(a: Set[int], b: Set[int]) -> float:
    """Compute Jaccard similarity between two shingle sets.

    Returns 1.0 for identical sets, 0.0 for disjoint.
    If both are empty, returns 1.0.
    """
    if not a and not b:
        return 1.0
    if not a or not b:
        return 0.0
    intersection = len(a & b)
    union = len(a | b)
    return intersection / union if union else 0.0


def find_duplicates(
    entries: List[str],
    threshold: float = SIMILARITY_THRESHOLD,
    k: int = SHINGLE_SIZE,
) -> List[Dict[str, Any]]:
    """Find near-duplicate groups among *entries*.

    Returns a list of dicts, each with:
        - indices: list of indices that are near-duplicates
        - similarity: average Jaccard similarity within the group

    Uses O(n^2) pairwise comparison with shingle hashing.
    """
    if len(entries) < 2:
        return []

    shingle_sets = [_shingles(e, k) for e in entries]
    used: Set[int] = set()
    groups: List[Dict[str, Any]] = []

    for i in range(len(entries)):
        if i in used:
            continue
        group_indices = [i]
        total_sim = 0.0
        count = 0
        for j in range(i + 1, len(entries)):
            if j in used:
                continue
            sim = jaccard(shingle_sets[i], shingle_sets[j])
            if sim >= threshold:
                group_indices.append(j)
                total_sim += sim
                count += 1
        if len(group_indices) > 1:
            avg_sim = total_sim / count if count else threshold
            groups.append({
                "indices": group_indices,
                "similarity": round(avg_sim, 4),
            })
            used.update(group_indices)

    return groups


def merge_duplicates(
    entries: List[str],
    groups: List[Dict[str, Any]],
) -> List[str]:
    """Merge duplicate groups, keeping the longest entry in each group.

    Entries not in any group are passed through unchanged.
    """
    if not groups:
        return list(entries)

    removed: Set[int] = set()
    for g in groups:
        indices = g["indices"]
        # Keep the longest
        best = max(indices, key=lambda idx: len(entries[idx]))
        for idx in indices:
            if idx != best:
                removed.add(idx)

    return [e for i, e in enumerate(entries) if i not in removed]

```

### scripts/lib/dictionary.py

```python
"""Dictionary-based compression using auto-learned codebooks.

Scans workspace memory files, learns high-frequency n-grams, builds a
codebook mapping long phrases to short `$XX` codes, and applies/reverses
substitutions for lossless compression.

Part of claw-compactor. License: MIT.
"""

import json
import logging
import re
from collections import Counter
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple

logger = logging.getLogger(__name__)

# Code format: $AA .. $ZZ (676 slots), then $AAA.. if needed
_CODE_RE = re.compile(r'\$[A-Z]{2,3}')
# Reserved: don't compress things that already look like codes
_RESERVED_RE = re.compile(r'\$[A-Z]{2,3}')

# Min occurrences for a phrase to be codebook-worthy
MIN_FREQ = 3
# Min raw length to be worth replacing (shorter than this → no savings)
MIN_PHRASE_LEN = 6
# Max codebook entries
MAX_CODEBOOK = 200

# IP address pattern
_IP_RE = re.compile(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b')
# Absolute path pattern (Unix)
_PATH_RE = re.compile(r'(/[A-Za-z0-9_.~-]+){3,}')


def _generate_codes(n: int) -> List[str]:
    """Generate *n* unique short codes: $AA..$ZZ, then $AAA.. if needed."""
    codes: List[str] = []
    # 2-letter codes: $AA .. $ZZ (676)
    for i in range(26):
        for j in range(26):
            codes.append('$' + chr(65 + i) + chr(65 + j))
            if len(codes) >= n:
                return codes
    # 3-letter codes if needed
    for i in range(26):
        for j in range(26):
            for k in range(26):
                codes.append('$' + chr(65 + i) + chr(65 + j) + chr(65 + k))
                if len(codes) >= n:
                    return codes
    return codes


def _tokenize_ngrams(text: str, min_n: int = 2, max_n: int = 5) -> Counter:
    """Extract word n-grams from *text*, filtering by minimum length."""
    counter: Counter = Counter()
    if not text:
        return counter
    words = text.split()
    for n in range(min_n, max_n + 1):
        for i in range(len(words) - n + 1):
            gram = ' '.join(words[i:i + n])
            if len(gram) >= MIN_PHRASE_LEN:
                counter[gram] += 1
    return counter


def _extract_ip_prefixes(texts: List[str]) -> Dict[str, int]:
    """Find frequently occurring IP prefixes (3-octet) across *texts*."""
    counter: Counter = Counter()
    for text in texts:
        for ip in _IP_RE.findall(text):
            parts = ip.split('.')
            prefix = '.'.join(parts[:3]) + '.'
            counter[prefix] += 1
    return {prefix: count for prefix, count in counter.items() if count >= 2}


def _extract_path_prefixes(texts: List[str]) -> Dict[str, int]:
    """Find frequently occurring path prefixes (directory components) across *texts*."""
    all_paths: List[str] = []
    for text in texts:
        for m in _PATH_RE.finditer(text):
            all_paths.append(m.group())
    
    if len(all_paths) < 2:
        return {}
    
    # Extract directory prefixes at various depths
    counter: Counter = Counter()
    for path in all_paths:
        parts = path.split('/')
        # Generate prefixes of increasing length (at least 3 components)
        for depth in range(3, len(parts)):
            prefix = '/'.join(parts[:depth])
            counter[prefix] += 1
    
    return {prefix: count for prefix, count in counter.items() if count >= 2}


def build_codebook(
    texts: List[str],
    min_freq: int = MIN_FREQ,
    max_entries: int = MAX_CODEBOOK,
) -> Dict[str, str]:
    """Build a codebook from a list of text documents.

    Scans for high-frequency n-grams, IPs, and paths. Returns a dict
    mapping short codes ($XX) to the phrases they replace.
    """
    if not texts:
        return {}

    # Gather candidates: n-grams + IPs + paths
    combined = Counter()
    for text in texts:
        combined.update(_tokenize_ngrams(text))

    # Add IPs and paths
    ip_freqs = _extract_ip_prefixes(texts)
    for ip, count in ip_freqs.items():
        if len(ip) >= MIN_PHRASE_LEN:
            combined[ip] = max(combined.get(ip, 0), count)

    path_freqs = _extract_path_prefixes(texts)
    for path, count in path_freqs.items():
        if len(path) >= MIN_PHRASE_LEN:
            combined[path] = max(combined.get(path, 0), count)

    # Filter by min_freq and sort by savings potential (freq * len)
    candidates = [
        (phrase, count)
        for phrase, count in combined.items()
        if count >= min_freq and len(phrase) >= MIN_PHRASE_LEN
    ]
    candidates.sort(key=lambda x: x[1] * len(x[0]), reverse=True)

    # Take top entries, avoiding overlapping phrases
    codes = _generate_codes(min(len(candidates), max_entries))
    codebook: Dict[str, str] = {}
    used_phrases: Set[str] = set()

    for (phrase, _count), code in zip(candidates, codes):
        # Skip if this phrase is a substring of an already-selected phrase
        skip = False
        for existing in used_phrases:
            if phrase in existing or existing in phrase:
                skip = True
                break
        if skip:
            continue
        codebook[code] = phrase
        used_phrases.add(phrase)
        if len(codebook) >= max_entries:
            break

    return codebook


def _normalize_codebook(codebook: Dict[str, str]) -> Dict[str, str]:
    """Normalize codebook to {code: phrase} format.
    
    Accepts either {code: phrase} or {phrase: code} format.
    Detects format by checking if keys start with '$'.
    """
    if not codebook:
        return {}
    # Check first key to determine format
    first_key = next(iter(codebook))
    if first_key.startswith('$'):
        return codebook  # Already {code: phrase}
    else:
        # {phrase: code} -> {code: phrase}
        return {code: phrase for phrase, code in codebook.items()}


_DOLLAR_ESCAPE = "\x00DLR\x00"  # sentinel for literal '$' in source text


def compress_text(text: str, codebook: Dict[str, str]) -> str:
    """Apply codebook substitutions to *text*. Lossless.
    
    Accepts codebook in either {code: phrase} or {phrase: code} format.
    Pre-existing '$' characters are escaped so they survive roundtrip.
    """
    if not text or not codebook:
        return text
    normalized = _normalize_codebook(codebook)
    # Escape pre-existing '$' to avoid collisions with codes
    result = text.replace("$", _DOLLAR_ESCAPE)
    # Sort by phrase length descending to avoid partial matches
    for code, phrase in sorted(normalized.items(), key=lambda x: -len(x[1])):
        escaped_phrase = phrase.replace("$", _DOLLAR_ESCAPE)
        result = result.replace(escaped_phrase, code)
    return result


def decompress_text(text: str, codebook: Dict[str, str]) -> str:
    """Reverse codebook substitutions. Lossless.
    
    Accepts codebook in either {code: phrase} or {phrase: code} format.
    """
    if not text or not codebook:
        return text
    normalized = _normalize_codebook(codebook)
    result = text
    # Sort by code length descending to handle $AAA before $AA
    for code, phrase in sorted(normalized.items(), key=lambda x: -len(x[0])):
        result = result.replace(code, phrase)
    # Unescape literal '$' characters
    result = result.replace(_DOLLAR_ESCAPE, "$")
    return result


def save_codebook(codebook: Dict[str, str], path: Path) -> None:
    """Save codebook to a JSON file."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    data = {"version": 1, "entries": codebook}
    path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")


def load_codebook(path: Path) -> Dict[str, str]:
    """Load codebook from a JSON file."""
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Codebook not found: {path}")
    data = json.loads(path.read_text(encoding="utf-8"))
    if not isinstance(data, dict) or "entries" not in data:
        raise ValueError(f"Invalid codebook format: {path}")
    return data["entries"]


def compression_stats(
    texts_or_original, codebook_or_compressed=None, codebook=None
) -> Dict[str, object]:
    """Calculate compression statistics.
    
    Can be called as:
      compression_stats(texts_dict, codebook) — where texts_dict maps filenames to content
      compression_stats(original_str, compressed_str, codebook)
    """
    if codebook is not None:
        # 3-arg form: (original, compressed, codebook)
        original = texts_or_original
        compressed = codebook_or_compressed
        orig_len = len(original)
        comp_len = len(compressed)
    elif isinstance(texts_or_original, dict) and isinstance(codebook_or_compressed, dict):
        # 2-arg form: (texts_dict, codebook)
        codebook = codebook_or_compressed
        all_text = '\n'.join(texts_or_original.values())
        original = all_text
        compressed = compress_text(all_text, codebook)
        orig_len = len(original)
        comp_len = len(compressed)
    else:
        return {"original_chars": 0, "compressed_chars": 0, "gross_reduction_pct": 0.0,
                "codebook_entries": 0, "codes_used": 0}

    reduction = ((orig_len - comp_len) / orig_len * 100) if orig_len else 0.0

    # Count how many codes are actually used in the compressed text
    normalized = _normalize_codebook(codebook)
    codes_used = sum(1 for code in normalized if code in compressed)

    # Net reduction accounts for codebook overhead
    codebook_overhead = sum(len(k) + len(v) + 2 for k, v in normalized.items())  # code: phrase + separator
    net_saved = orig_len - comp_len - codebook_overhead
    net_reduction = (net_saved / orig_len * 100) if orig_len else 0.0

    return {
        "original_chars": orig_len,
        "compressed_chars": comp_len,
        "gross_reduction_pct": round(reduction, 2),
        "net_reduction_pct": round(net_reduction, 2),
        "codebook_entries": len(codebook),
        "codes_used": codes_used,
    }

```

### scripts/lib/exceptions.py

```python
"""Custom exceptions for claw-compactor.

Part of claw-compactor. License: MIT.
"""


class MemCompressError(Exception):
    """Base exception for claw-compactor operations."""
    pass


class FileNotFoundError_(MemCompressError):
    """Raised when a required file or directory is not found."""
    pass


class ParseError(MemCompressError):
    """Raised when input cannot be parsed (malformed markdown, JSON, etc.)."""
    pass


class TokenEstimationError(MemCompressError):
    """Raised when token estimation fails."""
    pass

```

### scripts/lib/markdown.py

```python
"""Markdown parsing and manipulation utilities.

Part of claw-compactor. License: MIT.
"""

import re
import logging
from difflib import SequenceMatcher
from typing import List, Tuple, Dict, Optional

logger = logging.getLogger(__name__)

# Chinese punctuation -> English equivalents (saves tokens)
_ZH_PUNCT_MAP: Dict[str, str] = {
    '\uFF0C': ',', '\u3002': '.', '\uFF1B': ';', '\uFF1A': ':', '\uFF01': '!', '\uFF1F': '?',
    '\u201C': '"', '\u201D': '"', '\u2018': "'", '\u2019': "'",
    '\uFF08': '(', '\uFF09': ')', '\u3010': '[', '\u3011': ']',
    '\u3001': ',', '\u2026': '...', '\u2014\u2014': '--', '\uFF5E': '~',
}
_ZH_PUNCT_RE = re.compile('|'.join(re.escape(k) for k in _ZH_PUNCT_MAP))

# Emoji pattern (broad: emoticons, symbols, pictographs, etc.)
_EMOJI_RE = re.compile(
    '[\U0001F600-\U0001F64F'   # emoticons
    '\U0001F300-\U0001F5FF'    # symbols & pictographs
    '\U0001F680-\U0001F6FF'    # transport & map
    '\U0001F1E0-\U0001F1FF'    # flags
    '\U00002702-\U000027B0'    # dingbats
    '\U0001F900-\U0001F9FF'    # supplemental symbols
    '\U0001FA00-\U0001FA6F'    # chess symbols
    '\U0001FA70-\U0001FAFF'    # symbols extended-A
    '\U00002600-\U000026FF'    # misc symbols
    ']+', re.UNICODE
)

# Header regex
_HEADER_RE = re.compile(r'^(#{1,6})\s+(.*)', re.MULTILINE)

# Table separator line
_TABLE_SEP_RE = re.compile(r'^[\s|:\-]+$')


def parse_sections(text: str) -> List[Tuple[str, str, int]]:
    """Parse *text* into sections delimited by markdown headers.

    Returns a list of (header, body, level) tuples.
    A preamble (text before the first header) is returned with header=''.
    """
    if not text:
        return []

    sections: List[Tuple[str, str, int]] = []
    lines = text.split('\n')
    current_header = ''
    current_level = 0
    current_body_lines: List[str] = []

    for line in lines:
        m = _HEADER_RE.match(line)
        if m:
            # Save previous section
            body = '\n'.join(current_body_lines).strip()
            if current_header or body:
                sections.append((current_header, body, current_level))
            current_header = m.group(2).strip()
            current_level = len(m.group(1))
            current_body_lines = []
        else:
            current_body_lines.append(line)

    # Last section
    body = '\n'.join(current_body_lines).strip()
    if current_header or body:
        sections.append((current_header, body, current_level))

    return sections


def strip_markdown_redundancy(text: str) -> str:
    """Remove excessive blank lines and trailing whitespace."""
    if not text:
        return ""
    # Collapse 3+ consecutive blank lines to 2
    text = re.sub(r'\n{3,}', '\n\n', text)
    # Strip trailing whitespace per line
    lines = [line.rstrip() for line in text.split('\n')]
    return '\n'.join(lines).strip()


def remove_duplicate_lines(text: str) -> str:
    """Remove exact duplicate non-blank lines, preserving order."""
    if not text:
        return ""
    seen = set()
    result = []
    for line in text.split('\n'):
        stripped = line.strip()
        if not stripped:
            # Preserve blank lines
            result.append(line)
            continue
        if stripped in seen:
            continue
        seen.add(stripped)
        result.append(line)
    return '\n'.join(result)


def normalize_chinese_punctuation(text: str) -> str:
    """Replace Chinese fullwidth punctuation with ASCII equivalents."""
    if not text:
        return ""
    # Handle the double-char em-dash first
    text = text.replace('\u2014\u2014', '--')
    return _ZH_PUNCT_RE.sub(lambda m: _ZH_PUNCT_MAP.get(m.group(), m.group()), text)


def strip_emoji(text: str) -> str:
    """Remove emoji characters from *text*."""
    if not text:
        return ""
    result = _EMOJI_RE.sub('', text)
    # Collapse multiple spaces left by emoji removal
    result = re.sub(r'  +', ' ', result)
    return result


def remove_empty_sections(text: str) -> str:
    """Remove markdown sections that have no meaningful body content."""
    if not text:
        return ""
    sections = parse_sections(text)
    if not sections:
        return text

    # Determine which sections have children (a deeper section follows)
    has_child = [False] * len(sections)
    for idx, (header, body, level) in enumerate(sections):
        if level > 0:
            # Look backwards for a parent
            for pidx in range(idx - 1, -1, -1):
                _, _, plevel = sections[pidx]
                if plevel > 0 and plevel < level:
                    has_child[pidx] = True
                    break

    result_lines: List[str] = []
    for idx, (header, body, level) in enumerate(sections):
        if not header and not body:
            continue
        if header and not body.strip() and not has_child[idx]:
            continue  # Empty section with no children
        if header:
            result_lines.append('#' * level + ' ' + header)
        if body.strip():
            result_lines.append(body)
        result_lines.append('')  # Blank line between sections

    return '\n'.join(result_lines).strip()


def compress_markdown_table(text: str) -> str:
    """Convert markdown tables to compact key:value notation.

    A 2-column table becomes ``Key: Value`` lines.
    Multi-column tables become ``Col1 | Col2 | ...`` lines (no header row / separator).
    """
    if not text:
        return ""

    lines = text.split('\n')
    result: List[str] = []
    i = 0

    while i < len(lines):
        line = lines[i]
        # Detect a table: line with | ... | followed by separator |---|
        if '|' in line and i + 1 < len(lines) and _TABLE_SEP_RE.match(lines[i + 1].strip()):
            # Parse header row
            headers = [c.strip() for c in line.strip().strip('|').split('|')]
            i += 2  # skip header + separator
            rows: List[List[str]] = []
            while i < len(lines) and '|' in lines[i] and lines[i].strip():
                cells = [c.strip() for c in lines[i].strip().strip('|').split('|')]
                rows.append(cells)
                i += 1

            if len(headers) >= 5:
                # Wide tables: preserve as-is but without header/separator
                for row in rows:
                    result.append('| ' + ' | '.join(row) + ' |')
            elif len(headers) == 2:
                # 2-column: key: value format
                for row in rows:
                    k = row[0] if len(row) > 0 else ''
                    v = row[1] if len(row) > 1 else ''
                    if k or v:
                        result.append(f"- {k}: {v}")
            else:
                # Multi-column: compact format using headers as labels
                for row in rows:
                    parts = []
                    for ci, cell in enumerate(row):
                        if ci == 0:
                            parts.append(cell)
                        elif ci < len(headers):
                            parts.append(f"{headers[ci]}={cell}")
                        else:
                            parts.append(cell)
                    result.append(', '.join(parts))
        else:
            result.append(line)
            i += 1

    return '\n'.join(result)


def merge_similar_bullets(text: str, threshold: float = 0.80) -> str:
    """Merge bullet lines with high similarity.

    Uses SequenceMatcher ratio. When two bullets exceed *threshold*,
    keep the longer one.
    """
    if not text:
        return ""

    lines = text.split('\n')
    bullet_re = re.compile(r'^(\s*[-*+]\s+)(.*)')
    result: List[str] = []
    bullets: List[Tuple[str, str, str]] = []  # (prefix, content, full_line)

    def flush_bullets():
        if not bullets:
            return
        kept = list(bullets)
        merged_out: List[bool] = [False] * len(kept)
        for i in range(len(kept)):
            if merged_out[i]:
                continue
            for j in range(i + 1, len(kept)):
                if merged_out[j]:
                    continue
                ratio = SequenceMatcher(None, kept[i][1], kept[j][1]).ratio()
                if ratio >= threshold:
                    # Keep the longer one
                    if len(kept[j][1]) > len(kept[i][1]):
                        merged_out[i] = True
                        break
                    else:
                        merged_out[j] = True
        for idx, (prefix, content, full_line) in enumerate(kept):
            if not merged_out[idx]:
                result.append(full_line)
        bullets.clear()

    for line in lines:
        m = bullet_re.match(line)
        if m:
            bullets.append((m.group(1), m.group(2), line))
        else:
            flush_bullets()
            result.append(line)

    flush_bullets()
    return '\n'.join(result)


def merge_short_bullets(text: str, max_words: int = 3, max_merge: int = 10) -> str:
    """Combine consecutive short bullet points into comma-separated form.

    Bullets with <= *max_words* words are candidates. Up to *max_merge*
    consecutive short bullets are joined into one line.
    """
    if not text:
        return ""

    bullet_re = re.compile(r'^(\s*[-*+]\s+)(.*)')
    lines = text.split('\n')
    result: List[str] = []
    short_bullets: List[str] = []
    prefix = '- '

    def flush_short():
        nonlocal prefix
        if not short_bullets:
            return
        if len(short_bullets) <= 2:
            for sb in short_bullets:
                result.append(prefix + sb)
        else:
            # Merge into one line
            result.append(prefix + ', '.join(short_bullets))
        short_bullets.clear()

    for line in lines:
        m = bullet_re.match(line)
        if m:
            content = m.group(2).strip()
            prefix = m.group(1)
            if len(content.split()) <= max_words:
                short_bullets.append(content)
                if len(short_bullets) >= max_merge:
                    flush_short()
            else:
                flush_short()
                result.append(line)
        else:
            flush_short()
            result.append(line)

    flush_short()
    return '\n'.join(result)

```

### scripts/lib/rle.py

```python
"""Run-Length Encoding for structured data patterns.

Detects and compresses structured repetitive patterns:
- IP address families → common prefix extraction
- File paths → $WS/ shorthand
- Enumeration lists → compact format
- Repeated section headers

Part of claw-compactor. License: MIT.
"""

import re
import logging
from collections import Counter
from typing import Dict, List, Optional, Tuple

logger = logging.getLogger(__name__)

# Default workspace path to shorten
DEFAULT_WS_PATHS = [
    "/home/user/workspace",
]

# IP pattern
_IP_RE = re.compile(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b')


def compress_paths(text: str, workspace_paths: Optional[List[str]] = None) -> str:
    """Replace long workspace paths with $WS shorthand."""
    if not text:
        return ""
    paths = workspace_paths or DEFAULT_WS_PATHS
    result = text
    for ws in sorted(paths, key=len, reverse=True):
        result = result.replace(ws, "$WS")
    return result


def decompress_paths(text: str, workspace_path: str) -> str:
    """Expand $WS back to the full workspace path."""
    if not text:
        return ""
    return text.replace("$WS", workspace_path)


def compress_ip_families(text: str, min_occurrences: int = 2) -> Tuple[str, Dict[str, str]]:
    """Group IPs by common prefix and compress families.

    Returns (compressed_text, prefix_map) where prefix_map maps
    $IPn labels to the common prefix.
    Only compresses families with min_occurrences+ IPs sharing a 3-octet prefix.
    """
    if not text:
        return "", {}

    ips = _IP_RE.findall(text)
    if not ips:
        return text, {}

    # Group by first 3 octets
    families: Dict[str, List[str]] = {}
    for ip in ips:
        parts = ip.split('.')
        prefix = '.'.join(parts[:3]) + '.'
        families.setdefault(prefix, []).append(ip)

    # Only compress families with min_occurrences+ members
    prefix_map: Dict[str, str] = {}
    result = text
    idx = 0
    for prefix, members in sorted(families.items(), key=lambda x: -len(x[1])):
        if len(members) < min_occurrences:
            continue
        label = f"$IP{idx}" if idx > 0 else "$IP"
        prefix_map[label] = prefix
        for ip in set(members):
            parts = ip.split('.')
            suffix = parts[3]
            result = result.replace(ip, f"{label}.{suffix}")
        idx += 1

    return result, prefix_map


def decompress_ip_families(text: str, prefix_map: Dict[str, str]) -> str:
    """Expand compressed IP references back to full IPs."""
    if not text or not prefix_map:
        return text
    result = text
    for label, prefix in prefix_map.items():
        # Match $IPn.suffix patterns
        pattern = re.compile(re.escape(label) + r'\.(\d{1,3})')
        result = pattern.sub(lambda m: prefix + m.group(1), result)
    return result


def compress_enumerations(text: str) -> str:
    """Compress comma-separated lists of ALL-CAPS short codes.

    Only compresses lists with 4+ items that are all uppercase short tokens.
    E.g. "BTC, ETH, SOL, BNB, DOGE" → "[BTC,ETH,SOL,BNB,DOGE]"
    """
    if not text:
        return ""

    # Match comma-separated uppercase tokens
    pattern = re.compile(r'((?:[A-Z][A-Z0-9]{1,6})(?:\s*,\s*(?:[A-Z][A-Z0-9]{1,6})){3,})')

    def _compact(m: re.Match) -> str:
        items = [s.strip() for s in m.group(0).split(',')]
        return '[' + ','.join(items) + ']'

    return pattern.sub(_compact, text)


def compress_repeated_headers(text: str) -> str:
    """Compress repeated identical section headers.

    When the same header text appears multiple times, keep only the first
    and merge contents.
    """
    if not text:
        return ""
    lines = text.split('\n')
    seen_headers: Dict[str, int] = {}
    result: List[str] = []
    i = 0
    while i < len(lines):
        line = lines[i]
        # Check if this is a header
        if line.startswith('#'):
            header_text = line.lstrip('#').strip()
            if header_text in seen_headers:
                # Skip this header, but keep its body content
                i += 1
                while i < len(lines) and not lines[i].startswith('#'):
                    if lines[i].strip():
                        result.append(lines[i])
                    i += 1
                continue
            else:
                seen_headers[header_text] = len(result)
        result.append(line)
        i += 1
    return '\n'.join(result)


def compress(text: str, workspace_paths: Optional[List[str]] = None) -> str:
    """Apply all RLE-style compressions to *text*."""
    if not text:
        return ""
    result = compress_paths(text, workspace_paths)
    result, _ = compress_ip_families(result)
    result = compress_enumerations(result)
    return result


def decompress(text: str, workspace_path: str, ip_prefix_map: Optional[Dict[str, str]] = None) -> str:
    """Reverse all RLE-style compressions."""
    if not text:
        return ""
    result = decompress_paths(text, workspace_path)
    if ip_prefix_map:
        result = decompress_ip_families(result, ip_prefix_map)
    return result

```

### scripts/lib/tokenizer_optimizer.py

```python
"""Token-level format optimization.

Applies encoding-aware transformations that reduce token count while
preserving all semantic information. Each transformation targets
specific tokenizer inefficiencies in cl100k_base / o200k_base.

Key insight: the same information can be encoded in fewer tokens
by choosing formats the tokenizer handles more efficiently.

Part of claw-compactor. License: MIT.
"""

import re
import logging
from typing import List, Tuple

logger = logging.getLogger(__name__)

# Chinese full-width punctuation → half-width (each saves ~1 token)
_ZH_PUNCT_MAP = {
    ',': ',', '。': '.', ';': ';', ':': ':', '!': '!', '?': '?',
    '"': '"', '"': '"', ''': "'", ''': "'",
    '(': '(', ')': ')', '【': '[', '】': ']',
    '、': ',', '…': '...', '——': '--', '~': '~',
}
_ZH_PUNCT_RE = re.compile('|'.join(re.escape(k) for k in _ZH_PUNCT_MAP))

# Bold/italic markdown decorators
_BOLD_RE = re.compile(r'\*\*(.+?)\*\*')
_ITALIC_RE = re.compile(r'(?<!\*)\*([^*]+?)\*(?!\*)')

# Inline code that's just a plain word (not actual code)
_TRIVIAL_CODE_RE = re.compile(r'`([a-zA-Z0-9_.-]+)`')

# Markdown table detection
_TABLE_SEP_RE = re.compile(r'^[\s|:\-]+$')

# Bullet patterns
_BULLET_RE = re.compile(r'^(\s*)([-*+])\s+', re.MULTILINE)

# Multiple spaces / excessive indentation
_MULTI_SPACE_RE = re.compile(r'  +')
_LEADING_SPACES_RE = re.compile(r'^( {4,})', re.MULTILINE)


def strip_bold_italic(text: str) -> str:
    """Remove **bold** and *italic* markdown decorators."""
    if not text:
        return ""
    text = _BOLD_RE.sub(r'\1', text)
    text = _ITALIC_RE.sub(r'\1', text)
    return text


def normalize_punctuation(text: str) -> str:
    """Replace Chinese fullwidth punctuation with ASCII equivalents."""
    if not text:
        return ""
    text = text.replace('——', '--')
    return _ZH_PUNCT_RE.sub(lambda m: _ZH_PUNCT_MAP.get(m.group(), m.group()), text)


def strip_trivial_backticks(text: str) -> str:
    """Remove backticks around simple words (not real code).

    Keeps backticks when content contains spaces or special chars.
    """
    if not text:
        return ""
    return _TRIVIAL_CODE_RE.sub(r'\1', text)


def minimize_whitespace(text: str) -> str:
    """Reduce multiple spaces and excessive indentation."""
    if not text:
        return ""
    # Reduce multiple spaces to single
    text = _MULTI_SPACE_RE.sub(' ', text)
    # Cap leading indentation at 4 spaces
    text = _LEADING_SPACES_RE.sub('    ', text)
    # Collapse 3+ consecutive newlines to 2
    text = re.sub(r'\n{3,}', '\n\n', text)
    return text


def compact_bullets(text: str) -> str:
    """Remove bullet prefixes from long consecutive bullet lists (3+).

    Short lists (1-2 items) keep their bullets.
    """
    if not text:
        return ""
    lines = text.split('\n')
    result: List[str] = []
    bullet_run: List[str] = []
    bullet_re = re.compile(r'^(\s*[-*+])\s+(.*)')

    def flush():
        if len(bullet_run) >= 3:
            # Strip bullet prefix
            for content in bullet_run:
                result.append(content)
        else:
            # Keep original bullets
            for content in bullet_run:
                result.append('- ' + content)
        bullet_run.clear()

    for line in lines:
        m = bullet_re.match(line)
        if m:
            bullet_run.append(m.group(2))
        else:
            flush()
            result.append(line)
    flush()
    return '\n'.join(result)


def compress_table_to_kv(text: str) -> str:
    """Convert markdown tables to compact key:value or compact format."""
    if not text:
        return ""

    lines = text.split('\n')
    result: List[str] = []
    i = 0

    while i < len(lines):
        line = lines[i]
        if '|' in line and i + 1 < len(lines) and _TABLE_SEP_RE.match(lines[i + 1].strip()):
            headers = [c.strip() for c in line.strip().strip('|').split('|')]
            i += 2
            rows: List[List[str]] = []
            while i < len(lines) and '|' in lines[i] and lines[i].strip():
                cells = [c.strip() for c in lines[i].strip().strip('|').split('|')]
                rows.append(cells)
                i += 1

            if len(headers) == 2:
                for row in rows:
                    k = row[0] if len(row) > 0 else ''
                    v = row[1] if len(row) > 1 else ''
                    if k or v:
                        result.append(f"{k}: {v}")
            else:
                for row in rows:
                    result.append(' | '.join(row))
        else:
            result.append(line)
            i += 1

    return '\n'.join(result)


def optimize_tokens(text: str, aggressive: bool = False) -> str:
    """Apply all token-saving optimizations.

    Args:
        text: Input text.
        aggressive: If True, apply more aggressive transformations
                    (strip bold/italic, compact bullets, strip backticks).
    """
    if not text:
        return ""
    result = normalize_punctuation(text)
    result = compress_table_to_kv(result)
    result = minimize_whitespace(result)
    if aggressive:
        result = strip_bold_italic(result)
        result = strip_trivial_backticks(result)
        result = compact_bullets(result)
    return result


def estimate_savings(original: str, optimized: str) -> dict:
    """Calculate token savings between original and optimized text."""
    from lib.tokens import estimate_tokens
    orig_tokens = estimate_tokens(original)
    opt_tokens = estimate_tokens(optimized)
    reduction = ((orig_tokens - opt_tokens) / orig_tokens * 100) if orig_tokens else 0.0
    return {
        "original_tokens": orig_tokens,
        "optimized_tokens": opt_tokens,
        "original_chars": len(original),
        "optimized_chars": len(optimized),
        "token_reduction_pct": round(reduction, 2),
    }

```

### scripts/lib/tokens.py

```python
"""Token estimation utilities.

Uses tiktoken when available, falls back to a CJK-aware heuristic.

For the heuristic:
- ASCII/Latin text: ~4 chars per token
- CJK characters: ~1.5 chars per token (tiktoken cl100k_base)

Part of claw-compactor. License: MIT.
"""

import re
import logging
from typing import Optional

logger = logging.getLogger(__name__)

_encoder = None
_tiktoken_available = False

try:
    import tiktoken
    _encoder = tiktoken.encoding_for_model("gpt-4")
    _tiktoken_available = True
    logger.debug("tiktoken available, using cl100k_base encoding")
except (ImportError, Exception):
    logger.debug("tiktoken unavailable, using CJK-aware heuristic")

CHARS_PER_TOKEN = 4  # fallback for ASCII text
CJK_CHARS_PER_TOKEN = 1.5  # CJK characters average ~1.5 chars/token

# CJK unified ideographs + common ranges
_CJK_RE = re.compile(r'[\u4e00-\u9fff\u3400-\u4dbf\u3000-\u303f\uff00-\uffef]')


def _heuristic_tokens(text: str) -> int:
    """Estimate tokens using CJK-aware heuristic.

    CJK characters are counted at ~1.5 chars/token, everything else at ~4.
    """
    if not text:
        return 0
    cjk_chars = len(_CJK_RE.findall(text))
    other_chars = len(text) - cjk_chars
    cjk_tokens = cjk_chars / CJK_CHARS_PER_TOKEN
    other_tokens = other_chars / CHARS_PER_TOKEN
    return max(1, int(cjk_tokens + other_tokens))


def estimate_tokens(text: str) -> int:
    """Estimate the number of tokens in *text*.

    Uses tiktoken (cl100k_base) when available, otherwise a CJK-aware
    heuristic.  Returns 0 for empty strings.
    Raises TypeError if *text* is None.
    """
    if text is None:
        raise TypeError("estimate_tokens() requires a string, got None")
    if not text:
        return 0
    if _tiktoken_available and _encoder is not None:
        return len(_encoder.encode(text))
    return _heuristic_tokens(text)


def using_tiktoken() -> bool:
    """Return True if tiktoken is being used for estimation."""
    return _tiktoken_available

```

### scripts/observation_compressor.py

```python
#!/usr/bin/env python3
"""Compress OpenClaw session transcripts into structured observations.

Inspired by claude-mem: extract tool calls and results from session JSONL,
generate LLM prompts for compression into structured observations, achieving
97%+ compression on verbose tool output.

Usage:
    python3 observation_compressor.py <transcript.jsonl> [--output observations.md]
    python3 observation_compressor.py <session_dir/> --all [--output-dir DIR]
    python3 observation_compressor.py <transcript.jsonl> --stats

Part of claw-compactor. License: MIT.
"""

import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple

sys.path.insert(0, str(Path(__file__).resolve().parent))
from lib.tokens import estimate_tokens
from lib.tokenizer_optimizer import optimize_tokens
from lib.exceptions import FileNotFoundError_, MemCompressError

logger = logging.getLogger(__name__)

# Observation types for classification
OBSERVATION_TYPES = [
    "feature",
    "bugfix",
    "decision",
    "discovery",
    "config",
    "deployment",
    "data",
    "investigation",
]

# LLM prompt for compressing a session segment
COMPRESS_PROMPT = """You are a session observation extractor. Compress the following session transcript segment into structured observations.

Rules:
- Extract ONLY facts: what was done, what was the result, what was decided
- Remove all tool output verbosity -- just capture the key information
- Each observation should be self-contained and useful for future reference
- Use the XML format below
- Multiple observations per segment are fine
- Skip trivial operations (cd, ls with no interesting output, etc)

Transcript segment:
---
{segment}
---

Output observations in this format:
```xml
<observations>
  <observation>
    <type>{types_hint}</type>
    <title>Brief descriptive title</title>
    <facts>
      - Key fact 1
      - Key fact 2
    </facts>
    <narrative>One sentence summary of what happened.</narrative>
  </observation>
</observations>
```"""


def parse_session_jsonl(path: Path) -> List[Dict[str, Any]]:
    """Parse an OpenClaw session .jsonl file.

    Each line is a JSON object with type, message, etc.
    Returns list of parsed message dicts.
    Raises FileNotFoundError_ if file doesn't exist.
    """
    if not path.exists():
        raise FileNotFoundError_(f"Session file not found: {path}")

    text = path.read_text(encoding="utf-8", errors="replace").strip()
    if not text:
        return []

    messages: List[Dict[str, Any]] = []
    for line in text.split('\n'):
        line = line.strip()
        if not line:
            continue
        try:
            obj = json.loads(line)
            # Normalize: extract role from nested message if present
            if "message" in obj and isinstance(obj["message"], dict):
                msg = obj["message"]
                msg["_type"] = obj.get("type", "message")
                msg["_id"] = obj.get("id", "")
                msg["_timestamp"] = obj.get("timestamp", "")
                messages.append(msg)
            elif "role" in obj:
                # Flat message format (role/content at top level)
                messages.append(obj)
            elif "type" in obj:
                # Session start or metadata
                messages.append({"role": obj.get("type", "unknown"), "_type": obj["type"], **obj})
        except json.JSONDecodeError:
            logger.debug("Skipping malformed JSONL line: %s", line[:80])
            continue

    return messages


def extract_tool_interactions(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Extract tool call/result pairs from parsed messages.

    Returns list of interaction dicts with tool_name, input_summary, output_summary.
    """
    interactions: List[Dict[str, Any]] = []

    for msg in messages:
        content = msg.get("content", "")
        role = msg.get("role", "")

        if role == "assistant" and isinstance(content, list):
            for block in content:
                if isinstance(block, dict) and block.get("type") == "toolCall":
                    interaction = {
                        "tool_name": block.get("toolName", "unknown"),
                        "input_summary": json.dumps(block.get("input", {}))[:200],
                        "output_summary": "",
                        "output_size": 0,
                        "assistant_text": "",
                    }
                    # Capture assistant text from the same message
                    for b2 in content:
                        if isinstance(b2, dict) and b2.get("type") == "text":
                            interaction["assistant_text"] = b2.get("text", "")[:200]
                    interactions.append(interaction)

        # OpenAI-style tool_calls format
        elif role == "assistant" and "tool_calls" in msg:
            for tc in msg["tool_calls"]:
                func = tc.get("function", {})
                interaction = {
                    "tool_name": func.get("name", "unknown"),
                    "input_summary": func.get("arguments", "")[:200],
                    "output_summary": "",
                    "output_size": 0,
                    "assistant_text": content[:200] if isinstance(content, str) else "",
                }
                interactions.append(interaction)

        elif role == "tool" and isinstance(content, list):
            for block in content:
                if isinstance(block, dict) and block.get("type") == "toolResult":
                    result_text = str(block.get("result", ""))
                    # Attach to the last interaction if available
                    if interactions and not interactions[-1]["output_summary"]:
                        interactions[-1]["output_summary"] = result_text[:500]
                        interactions[-1]["output_size"] = len(result_text)

        elif role == "tool" and isinstance(content, str):
            if interactions and not interactions[-1]["output_summary"]:
                interactions[-1]["output_summary"] = content[:500]
                interactions[-1]["output_size"] = len(content)

    return interactions


def generate_observation_prompt(segment: List[Dict[str, Any]]) -> str:
    """Generate an LLM prompt for compressing a session segment."""
    types_hint = '|'.join(OBSERVATION_TYPES)
    lines = []
    for interaction in segment:
        lines.append(f"Tool: {interaction.get('tool_name', 'unknown')}")
        lines.append(f"Input: {interaction.get('input_summary', '')}")
        output_size = interaction.get('output_size', len(interaction.get('output_summary', '')))
        lines.append(f"Output ({output_size} chars): {interaction.get('output_summary', '')[:200]}")
        lines.append("")
    segment_text = '\n'.join(lines)
    return COMPRESS_PROMPT.format(segment=segment_text, types_hint=types_hint)


def rule_extract_observations(
    interactions: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
    """Extract observations using rule-based heuristics (no LLM needed).

    Groups interactions by tool and extracts key patterns.
    """
    if not interactions:
        return []

    observations: List[Dict[str, Any]] = []

    for interaction in interactions:
        tool = interaction["tool_name"]
        output = interaction.get("output_summary", "") or interaction.get("result", "") or ""
        assistant = interaction.get("assistant_text", "")

        # Classify
        obs_type = "discovery"
        if "error" in output.lower() or "fail" in output.lower():
            obs_type = "bugfix"
        elif tool in ("write", "edit"):
            obs_type = "feature"
        elif tool in ("exec",) and ("deploy" in output.lower() or "docker" in output.lower()):
            obs_type = "deployment"
        elif tool in ("exec",) and any(k in output.lower() for k in ("config", "setup", "install")):
            obs_type = "config"

        title = assistant[:80] if assistant else f"{tool} operation"
        facts = [f"Tool: {tool}"]
        if output:
            # Extract key facts from output
            output_lines = output.split('\n')
            for line in output_lines[:5]:
                line = line.strip()
                if line and len(line) > 5:
                    facts.append(line[:100])

        observations.append({
            "type": obs_type,
            "title": title,
            "facts": facts,
            "narrative": assistant[:200] if assistant else f"Ran {tool}",
        })

    return observations


def format_observations_xml(observations: List[Dict[str, Any]]) -> str:
    """Format observations as XML."""
    lines = ["<observations>"]
    for obs in observations:
        lines.append("  <observation>")
        lines.append(f"    <type>{obs['type']}</type>")
        lines.append(f"    <title>{obs.get('title', '') or obs.get('summary', '')}</title>")
        lines.append("    <facts>")
        for fact in obs.get("facts", []):
            lines.append(f"      - {fact}")
        lines.append("    </facts>")
        lines.append(f"    <narrative>{obs.get('narrative', '')}</narrative>")
        lines.append("  </observation>")
    lines.append("</observations>")
    return '\n'.join(lines)


def format_observations_md(observations: List[Dict[str, Any]]) -> str:
    """Format observations as markdown."""
    lines = ["# Session Observations", ""]
    for i, obs in enumerate(observations, 1):
        lines.append(f"## {i}. [{obs['type']}] {obs.get('title', '') or obs.get('summary', '')}")
        lines.append("")
        if obs.get("facts"):
            lines.append("**Facts:**")
            for fact in obs["facts"]:
                lines.append(f"- {fact}")
            lines.append("")
        if obs.get("narrative"):
            lines.append(f"**Result:** {obs['narrative']}")
            lines.append("")
    return '\n'.join(lines)


def compress_session(
    path: Path,
    use_llm: bool = False,
) -> Dict[str, Any]:
    """Compress a single session transcript.

    Returns dict with observation count, tokens before/after, etc.
    """
    messages = parse_session_jsonl(path)
    if not messages:
        return {
            "file": str(path),
            "messages": 0,
            "interactions": 0,
            "observations": 0,
            "tokens_before": 0,
            "tokens_after": 0,
        }

    interactions = extract_tool_interactions(messages)
    observations = rule_extract_observations(interactions)

    # Estimate tokens
    raw_text = path.read_text(encoding="utf-8", errors="replace")
    tokens_before = estimate_tokens(raw_text)

    if observations:
        md = format_observations_md(observations)
        tokens_after = estimate_tokens(md)
    else:
        tokens_after = 0

    result: Dict[str, Any] = {
        "file": str(path),
        "messages": len(messages),
        "interactions": len(interactions),
        "observations": observations,
        "observation_count": len(observations),
        "tokens_before": tokens_before,
        "tokens_after": tokens_after,
    }

    if use_llm and interactions:
        result["llm_prompt"] = generate_observation_prompt(interactions)

    return result


def main():
    parser = argparse.ArgumentParser(description="Compress session transcripts")
    parser.add_argument("path", help="Session .jsonl file or directory")
    parser.add_argument("--output", help="Output file")
    parser.add_argument("--all", action="store_true", help="Process all sessions in directory")
    parser.add_argument("--stats", action="store_true", help="Show stats only")
    parser.add_argument("--json", action="store_true", help="JSON output")
    args = parser.parse_args()

    p = Path(args.path)
    if args.all and p.is_dir():
        files = sorted(p.glob("*.jsonl"))
    else:
        files = [p]

    results = [compress_session(f) for f in files]

    if args.json:
        print(json.dumps(results, indent=2))
    else:
        total_before = sum(r["tokens_before"] for r in results)
        total_after = sum(r["tokens_after"] for r in results)
        total_obs = sum(r["observation_count"] for r in results)
        pct = ((total_before - total_after) / total_before * 100) if total_before else 0
        print(f"Processed {len(results)} session(s)")
        print(f"Observations: {total_obs}")
        print(f"Tokens: {total_before:,} -> {total_after:,} ({pct:.1f}% savings)")


if __name__ == "__main__":
    main()

```