Back to skills
SkillHub ClubShip Full StackFull Stack

soul-guardian

Drift detection + baseline integrity guard for agent workspace files with automatic alerting support

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,129
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
C62.8

Install command

npx @skill-hub/cli install openclaw-skills-soul-guardian

Repository

openclaw/skills

Skill path: skills/davida-ps/soul-guardian

Drift detection + baseline integrity guard for agent workspace files with automatic alerting support

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install soul-guardian into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding soul-guardian to shared team environments
  • Use soul-guardian for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: soul-guardian
version: 0.0.2
description: Drift detection + baseline integrity guard for agent workspace files with automatic alerting support
homepage: https://clawsec.prompt.security
metadata: {"openclaw":{"emoji":"πŸ‘»","category":"security"}}
clawdis:
  emoji: "πŸ‘»"
  requires:
    bins: [python3]
---

# soul-guardian πŸ‘»

Protects your agent's core files (SOUL.md, AGENTS.md, etc.) from unauthorized changes with automatic detection, restoration, and **user alerting**.

## Quick Start (3 Steps)

### Step 1: Initialize baselines
```bash
cd ~/.openclaw/workspace
python3 skills/soul-guardian/scripts/soul_guardian.py init --actor setup --note "initial baseline"
```

### Step 2: Enable monitoring
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py enable-monitoring
```
This shows you how to add monitoring to your workflow.

### Step 3: Add to HEARTBEAT.md (Recommended)

Add this to your HEARTBEAT.md for automatic monitoring:

```markdown
## Soul Guardian Check
- Run `python3 skills/soul-guardian/scripts/soul_guardian.py check --actor heartbeat --output-format alert`
- If any output is produced, relay it to the user immediately as a security alert
```

That's it! Soul Guardian will now:
- βœ… Detect unauthorized changes to protected files
- βœ… Auto-restore SOUL.md and AGENTS.md to approved baseline
- βœ… Alert you when drift is detected and handled

---

## What it protects (default policy)

| File | Mode | Action on drift |
|------|------|-----------------|
| SOUL.md | restore | Auto-restore + alert |
| AGENTS.md | restore | Auto-restore + alert |
| USER.md | alert | Alert only |
| TOOLS.md | alert | Alert only |
| IDENTITY.md | alert | Alert only |
| HEARTBEAT.md | alert | Alert only |
| MEMORY.md | alert | Alert only |
| memory/*.md | ignore | Ignored |

## Commands

### Check for drift (with alert output)
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py check --output-format alert
```
- Silent if no drift
- Outputs human-readable alert if drift detected
- Perfect for heartbeat integration

### Watch mode (continuous monitoring)
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py watch --interval 30
```
Runs continuously, checking every 30 seconds.

### Approve intentional changes
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py approve --file SOUL.md --actor user --note "intentional update"
```

### View status
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py status
```

### Verify audit log integrity
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py verify-audit
```

---

## Alert Format

When drift is detected, the `--output-format alert` produces output like:

```
==================================================
🚨 SOUL GUARDIAN SECURITY ALERT
==================================================

πŸ“„ FILE: SOUL.md
   Mode: restore
   Status: βœ… RESTORED to approved baseline
   Expected hash: abc123def456...
   Found hash:    789xyz000111...
   Diff saved: /path/to/patches/drift.patch

==================================================
Review changes and investigate the source of drift.
If intentional, run: soul_guardian.py approve --file <path>
==================================================
```

This output is designed to be relayed directly to the user in TUI/chat.

---

## Security Model

**What it does:**
- Detects filesystem drift vs approved baseline (sha256)
- Produces unified diffs for review
- Maintains tamper-evident audit log with hash chaining
- Refuses to operate on symlinks
- Uses atomic writes for restores

**What it doesn't do:**
- Cannot prove WHO made a change (actor is best-effort metadata)
- Cannot protect if attacker controls both workspace AND state directory
- Is not a substitute for backups

**Recommendation:** Store state directory outside workspace for better resilience.

---

## Demo

Run the full demo flow to see soul-guardian in action:

```bash
bash skills/soul-guardian/scripts/demo.sh
```

This will:
1. Verify clean state (silent check)
2. Inject malicious content into SOUL.md
3. Run heartbeat check (produces alert)
4. Show SOUL.md was restored

---

## Troubleshooting

**"Not initialized" error:**
Run `init` first to set up baselines.

**Drift keeps happening:**
Check what's modifying your files. Review the audit log and patches.

**Want to approve a change:**
Run `approve --file <path>` after reviewing the change.


---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### README.md

```markdown
# soul-guardian

A small, dependency-free integrity guard for Clawdbot agent workspaces.

It helps you detect (and optionally auto-undo) unexpected edits to the workspace markdown files that an agent auto-loads (e.g., `SOUL.md`, `AGENTS.md`). It also records a **tamper-evident** audit trail of changes.

## Why this exists

In many Clawdbot setups, the agent reads certain markdown files every session (identity, instructions, memory, tools, etc.). If those files drift unexpectedly (accidental edits, bad merges, unwanted automation, etc.), you want:

- detection (sha256 mismatch)
- a diff/patch artifact for review
- a record of what happened (audit log)
- optionally: an automatic restore to a known-good baseline for critical files

## What it protects (default policy)

Default `policy.json` protects:

- **Auto-restore + alert:** `SOUL.md`, `AGENTS.md`
- **Alert-only:** `USER.md`, `TOOLS.md`, `IDENTITY.md`, `HEARTBEAT.md`, `MEMORY.md`
- **Ignored by default:** `memory/*.md` (daily notes)

You can customize this by editing the policy file in the guardian state directory.

## Security model (and limitations)

What it does well:
- Detects filesystem drift vs an approved baseline.
- Produces unified diffs (patch files) for review.
- Maintains an **append-only JSONL audit log** with **hash chaining** so log tampering is detectable.
- Refuses to operate on **symlinks** (reduces link attacks).
- Uses **atomic writes** for restores and baseline updates (`os.replace`).

What it does *not* do:
- It cannot prove *who* changed a file. `--actor` is best-effort metadata.
- It cannot protect you if an attacker can modify both the workspace and the guardian state directory.
- It is not a substitute for backups.

Recommendation (not enforced):
- Mirror/back up your guardian state directory (and/or workspace) using git and/or offsite backups.

## State directory

By default, state is stored inside the workspace:

- `memory/soul-guardian/`
  - `policy.json` (what to monitor)
  - `baselines.json` (approved sha256 per file)
  - `approved/<path>` (approved snapshots)
  - `audit.jsonl` (append-only log with hash chain)
  - `patches/*.patch` (unified diffs)
  - `quarantine/*` (copies of drifted files before restore)

For better resilience, you can move this **outside** the workspace (recommended).

## Install / usage

From the agent workspace root.

### First run / Initialize baselines (recommended)

For resilience, create your guardian **state directory outside** the workspace first, then initialize baselines.

1) Onboard an external state dir (creates policy, copies any existing state, prints paths/snippets):

```bash
python3 skills/soul-guardian/scripts/onboard_state_dir.py --agent-id <agentId>
```

2) Initialize baselines **in that external state dir**:

```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  init --actor sam --note "first baseline"
```

3) Run a check once (should be silent on OK; prints a single-line summary on drift):

```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  check --actor system --note "first check"
```

### Common commands

Status (summary):

```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  status
```

Check for drift (default: restores restore-mode files):

```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  check --actor system --note cron
```

Alert-only check (never restore):

```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  check --no-restore
```

Approve intentional edits (one file):

```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  approve --file SOUL.md --actor sam --note "intentional update"
```

Approve all policy targets (except ignored ones):

```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  approve --all --actor sam --note "bulk approve"
```

Restore (only restore-mode files):

```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  restore --file SOUL.md --actor system --note "manual restore"
```

Verify audit log tamper-evidence:

```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  verify-audit
```

## Policy format (`policy.json`)

Example:

```json
{
  "version": 1,
  "workspaceRoot": "/path/to/workspace",
  "targets": [
    {"path": "SOUL.md", "mode": "restore"},
    {"path": "AGENTS.md", "mode": "restore"},
    {"path": "USER.md", "mode": "alert"},
    {"pattern": "memory/*.md", "mode": "ignore"}
  ]
}
```

- `mode`:
  - `restore`: drift triggers audit + patch + (by default) restore + quarantine copy
  - `alert`: drift triggers audit + patch, but does not restore
  - `ignore`: excluded

## Onboarding: move state outside the workspace

Run the helper:

```bash
python3 skills/soul-guardian/scripts/onboard_state_dir.py
```

It will:
- create an external state dir (**recommended default:** `~/.clawdbot/soul-guardian/<agentId>/`)
- copy (or move with `--move`) existing state from `memory/soul-guardian/`
- write a default `policy.json` if missing
- print scheduling snippets

Notes:
- `<agentId>` should be **stable and unique per workspace** (don’t point multiple workspaces at the same state dir).
- WARNING: `--move` deletes the old in-workspace state dir after copying.
- The external state dir can contain **approved snapshots, patches, and quarantined copies** of sensitive prompt/instruction/memory files. Keep permissions restrictive (e.g., `chmod 700 <dir>`; `chmod go-rwx <dir>`).

Then include `--state-dir` in all commands (run from the workspace root), e.g.:

```bash
cd <workspace> && python3 skills/soul-guardian/scripts/soul_guardian.py --state-dir ~/.clawdbot/soul-guardian/<agentId> check
```

## Scheduling (cron)

### A) Clawdbot Gateway Cron (recommended)

This is the default pattern when you want drift notifications to flow through Clawdbot.

Note: even when there is **no drift**, Clawdbot cron runs typically show an **OK summary** in the main session.

Example (edit paths + schedule):

```bash
clawdbot cron add \
  --name "soul-guardian: check workspace" \
  --description "Run soul-guardian check; alert when drift detected." \
  --session isolated \
  --wake now \
  --cron "*/10 * * * *" \
  --tz UTC \
  --message "Run:\ncd '<workspace>'\npython3 skills/soul-guardian/scripts/soul_guardian.py --state-dir ~/.clawdbot/soul-guardian/<agentId> check --actor cron --note 'gateway-cron'\n\nIf the command prints a line starting with 'SOUL_GUARDIAN_DRIFT', treat it as an alert. If it prints nothing, reply HEARTBEAT_OK." \
  --post-prefix "[soul-guardian]" \
  --post-mode summary
```

### B) macOS launchd (optional, silent-on-OK)

If you want **system scheduling** without Clawdbot posting OK summaries, use `launchd`.

Because `soul_guardian.py check` prints **nothing** on OK and prints a single-line `SOUL_GUARDIAN_DRIFT ...` summary on drift, this tends to be silent unless something changed.

Generate + (optionally) install a LaunchAgent plist (run from the workspace root, or pass `--workspace-root`):

```bash
python3 skills/soul-guardian/scripts/install_launchd_plist.py \
  --state-dir ~/.clawdbot/soul-guardian/<agentId> \
  --interval-seconds 600 \
  --install
```

The generated plist includes `WorkingDirectory` set to your workspace root (recommended), so relative paths behave as expected.

The script writes drift output to log files under `<state-dir>/logs/`.
You can tail them with the commands it prints.

## Development / tests

A minimal test script is included:

```bash
python3 skills/soul-guardian/scripts/test_soul_guardian.py
```

It simulates a workspace in a temp directory and validates drift detection, approve/restore flow, and audit hash chain verification.

```

### _meta.json

```json
{
  "owner": "davida-ps",
  "slug": "soul-guardian",
  "displayName": "soul-guardian",
  "latest": {
    "version": "0.0.2",
    "publishedAt": 1770399192748,
    "commit": "https://github.com/openclaw/skills/commit/d1b9f12893708eec89496ad2fd2b892111cc1b41"
  },
  "history": []
}

```

### scripts/install_launchd_plist.py

```python
#!/usr/bin/env python3
"""Generate (and optionally install) a macOS launchd plist for soul-guardian.

Goal:
- Run `soul_guardian.py check` on an interval.
- Be *silent on OK* (soul_guardian.py prints nothing + exits 0 when no drift).
- Produce a single-line stdout alert on drift (exits 2 and prints SOUL_GUARDIAN_DRIFT ...).

This script is intentionally deterministic and dependency-free.

It does NOT attempt to deliver drift alerts to Telegram/Slack/etc.
Instead it:
- writes logs to the state dir (so drift output is preserved)
- relies on you to wire notifications however you prefer

If you want Clawdbot-side delivery, use Clawdbot Gateway Cron.
"""

from __future__ import annotations

import argparse
import os
from pathlib import Path
import plistlib
import subprocess
import sys


def agent_id_default(workspace_root: Path) -> str:
    return workspace_root.name


def default_external_state_dir(agent_id: str) -> Path:
    return Path("~/.clawdbot/soul-guardian").expanduser() / agent_id


def run_launchctl(args: list[str]) -> None:
    subprocess.run(["/bin/launchctl", *args], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def main(argv: list[str]) -> int:
    ap = argparse.ArgumentParser()
    ap.add_argument(
        "--workspace-root",
        default=str(Path.cwd()),
        help="Workspace root (default: current working directory).",
    )
    ap.add_argument(
        "--agent-id",
        default=None,
        help="Agent/workspace identifier used in default label + state dir (default: workspace folder name).",
    )
    ap.add_argument(
        "--state-dir",
        default=None,
        help="External state directory (recommended). Default: ~/.clawdbot/soul-guardian/<agentId>/",
    )
    ap.add_argument(
        "--label",
        default=None,
        help="launchd label (default: com.clawdbot.soul-guardian.<agentId>)",
    )
    ap.add_argument(
        "--interval-seconds",
        type=int,
        default=600,
        help="Run interval in seconds (StartInterval). Default: 600 (10 minutes).",
    )
    ap.add_argument("--actor", default="cron", help="--actor passed to soul_guardian.py (default: cron).")
    ap.add_argument("--note", default="launchd", help="--note passed to soul_guardian.py (default: launchd).")
    ap.add_argument(
        "--out",
        default=None,
        help="Write plist to this path (default: ~/Library/LaunchAgents/<label>.plist)",
    )
    ap.add_argument("--force", action="store_true", help="Overwrite existing plist on disk.")
    ap.add_argument(
        "--install",
        action="store_true",
        help="Install+load the plist with launchctl (bootstrap). Without this flag we only write the plist.",
    )

    args = ap.parse_args(argv)

    workspace_root = Path(args.workspace_root).expanduser().resolve()
    agent_id = args.agent_id or agent_id_default(workspace_root)
    state_dir = Path(args.state_dir).expanduser().resolve() if args.state_dir else default_external_state_dir(agent_id)

    label = args.label or f"com.clawdbot.soul-guardian.{agent_id}"
    plist_path = Path(args.out).expanduser().resolve() if args.out else (Path("~/Library/LaunchAgents").expanduser() / f"{label}.plist")

    script_path = workspace_root / "skills" / "soul-guardian" / "scripts" / "soul_guardian.py"
    if not script_path.exists():
        raise SystemExit(f"soul_guardian.py not found at {script_path}; pass --workspace-root correctly")

    # Keep logs in the external state dir.
    log_dir = state_dir / "logs"
    log_dir.mkdir(parents=True, exist_ok=True)
    stdout_log = log_dir / "launchd.stdout.log"
    stderr_log = log_dir / "launchd.stderr.log"

    program_args = [
        "/usr/bin/python3",
        str(script_path),
        "--state-dir",
        str(state_dir),
        "check",
        "--actor",
        str(args.actor),
        "--note",
        str(args.note),
    ]

    plist: dict[str, object] = {
        "Label": label,
        "ProgramArguments": program_args,
        "WorkingDirectory": str(workspace_root),
        "StartInterval": int(args.interval_seconds),
        "RunAtLoad": True,
        "StandardOutPath": str(stdout_log),
        "StandardErrorPath": str(stderr_log),
        # Avoid interactive UI dependencies; run in background.
        "ProcessType": "Background",
    }

    plist_path.parent.mkdir(parents=True, exist_ok=True)

    if plist_path.exists() and not args.force:
        raise SystemExit(f"Refusing to overwrite existing {plist_path}. Re-run with --force.")

    with plist_path.open("wb") as f:
        plistlib.dump(plist, f, fmt=plistlib.FMT_XML, sort_keys=True)

    print(f"Wrote plist: {plist_path}")
    print(f"State dir:  {state_dir}")
    print(f"Label:      {label}")

    uid = os.getuid()

    if args.install:
        # Best-effort: remove any existing job with same label, then bootstrap.
        run_launchctl(["bootout", f"gui/{uid}", label])
        run_launchctl(["bootout", f"gui/{uid}", str(plist_path)])
        res = subprocess.run(["/bin/launchctl", "bootstrap", f"gui/{uid}", str(plist_path)], text=True, capture_output=True)
        if res.returncode != 0:
            sys.stderr.write((res.stderr or res.stdout or "").strip() + "\n")
            sys.stderr.write("Failed to bootstrap. You can try manually:\n")
            sys.stderr.write(f"  launchctl bootstrap gui/{uid} {plist_path}\n")
            return 1

        subprocess.run(["/bin/launchctl", "enable", f"gui/{uid}/{label}"], check=False)
        subprocess.run(["/bin/launchctl", "kickstart", "-k", f"gui/{uid}/{label}"], check=False)
        print("Installed + started (launchctl bootstrap/enable/kickstart).")
    else:
        print("Not installed (dry write). To load it:")
        print(f"  launchctl bootstrap gui/{uid} {plist_path}")
        print(f"  launchctl enable gui/{uid}/{label}")
        print(f"  launchctl kickstart -k gui/{uid}/{label}")

    print("\nLogs:")
    print(f"  tail -n 200 -f {stdout_log}")
    print(f"  tail -n 200 -f {stderr_log}")

    return 0


if __name__ == "__main__":
    raise SystemExit(main(sys.argv[1:]))

```

### scripts/onboard_state_dir.py

```python
#!/usr/bin/env python3
"""Onboard soul-guardian state directory outside the workspace.

Why:
- Keeping integrity state inside the workspace can be risky if the workspace is modified or wiped.
- Moving state to an external directory improves resilience and makes tampering harder.

What this script does:
- Creates an external state directory (default: ~/.clawdbot/soul-guardian/<agentId>/)
- Copies (or moves) existing in-workspace state from memory/soul-guardian/
- Writes a default policy.json if missing
- Prints recommended cron snippets (Clawdbot gateway cron and optional launchd)

This script does NOT modify your cron jobs automatically.
"""

from __future__ import annotations

import argparse
import os
from pathlib import Path
import shutil
import sys


WORKSPACE_ROOT = Path.cwd()
DEFAULT_IN_WORKSPACE_STATE = WORKSPACE_ROOT / "memory" / "soul-guardian"


def agent_id_default() -> str:
    # Best-effort: workspace folder name.
    return WORKSPACE_ROOT.name


def ensure_dir(p: Path) -> None:
    p.mkdir(parents=True, exist_ok=True)


def copytree_overwrite(src: Path, dst: Path) -> None:
    # Copy directory contents into dst (merge).
    ensure_dir(dst)
    for root, dirs, files in os.walk(src):
        r = Path(root)
        rel = r.relative_to(src)
        target_root = dst / rel
        ensure_dir(target_root)
        for d in dirs:
            ensure_dir(target_root / d)
        for f in files:
            s = r / f
            t = target_root / f
            # Overwrite.
            shutil.copy2(s, t)


DEFAULT_POLICY_JSON = """{
  "version": 1,
  "workspaceRoot": "",
  "targets": [
    {"path": "SOUL.md", "mode": "restore"},
    {"path": "AGENTS.md", "mode": "restore"},
    {"path": "USER.md", "mode": "alert"},
    {"path": "TOOLS.md", "mode": "alert"},
    {"path": "IDENTITY.md", "mode": "alert"},
    {"path": "HEARTBEAT.md", "mode": "alert"},
    {"path": "MEMORY.md", "mode": "alert"},
    {"pattern": "memory/*.md", "mode": "ignore"}
  ]
}
"""


def main(argv: list[str]) -> int:
    ap = argparse.ArgumentParser()
    ap.add_argument("--agent-id", default=agent_id_default(), help="Identifier used for default external state path.")
    ap.add_argument(
        "--state-dir",
        default=None,
        help="External state directory to create/use (default: ~/.clawdbot/soul-guardian/<agentId>/).",
    )
    ap.add_argument("--move", action="store_true", help="Move instead of copy (WARNING: deletes the old in-workspace state dir).")
    ap.add_argument("--no-copy", action="store_true", help="Do not copy/move existing in-workspace state.")
    args = ap.parse_args(argv)

    if args.state_dir:
        external = Path(args.state_dir).expanduser()
    else:
        external = (Path("~/.clawdbot/soul-guardian").expanduser() / args.agent_id)

    ensure_dir(external)

    if not args.no_copy and DEFAULT_IN_WORKSPACE_STATE.exists():
        if args.move:
            # Move by copying then removing src (safer than rename across filesystems).
            copytree_overwrite(DEFAULT_IN_WORKSPACE_STATE, external)
            shutil.rmtree(DEFAULT_IN_WORKSPACE_STATE)
            action = "moved"
        else:
            copytree_overwrite(DEFAULT_IN_WORKSPACE_STATE, external)
            action = "copied"
        print(f"Existing state {action} from {DEFAULT_IN_WORKSPACE_STATE} -> {external}")
    else:
        print(f"Using external state dir: {external}")

    policy_path = external / "policy.json"
    if not policy_path.exists():
        txt = DEFAULT_POLICY_JSON.replace('"workspaceRoot": ""', f'"workspaceRoot": "{WORKSPACE_ROOT}"')
        policy_path.write_text(txt, encoding="utf-8")
        print(f"Wrote default policy: {policy_path}")
    else:
        print(f"Policy already exists: {policy_path}")

    print("\nNext steps")
    print("1) Initialize baselines in the external state dir:")
    print(
        f"   cd '{WORKSPACE_ROOT}' && python3 skills/soul-guardian/scripts/soul_guardian.py --state-dir '{external}' init --actor 'sam' --note 'onboard external state'\n"
    )

    print("2) Update your cron/check runner to include --state-dir.")
    print("\nClawdbot gateway cron (recommended; does not require system cron):")
    print("- In your cron spec, run something like:")
    print(
        f"  cd '{WORKSPACE_ROOT}' && python3 skills/soul-guardian/scripts/soul_guardian.py --state-dir '{external}' check --actor system --note cron"
    )

    print("\nOptional: system cron / launchd (macOS) example (NOT installed automatically):")
    label = f"com.clawdbot.soul-guardian.{args.agent_id}"
    print(f"- Launchd label: {label}")
    print(f"- WorkingDirectory (recommended): {WORKSPACE_ROOT}")
    print("- ProgramArguments (example):")
    print("  [\n"
          f"    '/usr/bin/python3',\n"
          f"    '{WORKSPACE_ROOT}/skills/soul-guardian/scripts/soul_guardian.py',\n"
          f"    '--state-dir', '{external}',\n"
          f"    'check', '--actor', 'system', '--note', 'launchd'\n"
          "  ]")

    print("\nNotes")
    print("- The external state dir can contain approved snapshots, patches, and quarantined copies of drifted prompt/instruction files; keep permissions restrictive (e.g., chmod 700; go-rwx).")
    if args.move:
        print("- WARNING: --move deletes the old in-workspace state dir after copying.")
    print("- Consider mirroring the external state dir via git or offsite backups (not enforced by this tool).")

    return 0


if __name__ == "__main__":
    raise SystemExit(main(sys.argv[1:]))

```

### scripts/soul_guardian.py

```python
#!/usr/bin/env python3
"""Workspace file integrity guard + audit (multi-file).

This is a hardened successor to the original SOUL.md-only guardian.

Key features:
- Multiple target files with per-file policy (restore | alert | ignore)
- Approved baselines stored per file (snapshot + sha256)
- Append-only audit log with hash chaining (tamper-evident)
- Optional auto-restore for restore-mode files (with quarantine copy)
- Refuses to operate on symlinks
- Atomic writes for baseline + restore operations (os.replace)

State directory (default, backward-compatible): memory/soul-guardian/

Subcommands:
- init            Initialize policy + baselines (first run)
- status          Print status JSON for all policy targets
- check           Check for drift; restore for restore-mode by default
- approve         Approve current contents as baseline (per file or all)
- restore         Restore restore-mode files to last approved baseline
- verify-audit    Validate audit log hash chain

Exit codes:
- 0: ok / no drift
- 2: drift detected (for check when any alert/restore drift happened)
- 1: error
"""

from __future__ import annotations

import argparse
import datetime as dt
import difflib
import fnmatch
import hashlib
import json
import os
from pathlib import Path
import shutil
import stat
import sys
from typing import Any, Iterable


WORKSPACE_ROOT = Path.cwd()
DEFAULT_STATE_DIR = WORKSPACE_ROOT / "memory" / "soul-guardian"

POLICY_FILE = "policy.json"
BASELINES_FILE = "baselines.json"
AUDIT_LOG_FILE = "audit.jsonl"
APPROVED_DIRNAME = "approved"
PATCH_DIRNAME = "patches"
QUAR_DIRNAME = "quarantine"

CHAIN_GENESIS = "0" * 64


def utc_now_iso() -> str:
    return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()


def sha256_bytes(b: bytes) -> str:
    h = hashlib.sha256()
    h.update(b)
    return h.hexdigest()


def sha256_text(s: str) -> str:
    return sha256_bytes(s.encode("utf-8"))


def read_bytes(path: Path) -> bytes:
    return path.read_bytes()


def read_text(path: Path) -> str:
    return path.read_text(encoding="utf-8", errors="replace")


def is_symlink(path: Path) -> bool:
    try:
        st = os.lstat(path)
    except FileNotFoundError:
        return False
    return stat.S_ISLNK(st.st_mode)


def ensure_dir(path: Path) -> None:
    path.mkdir(parents=True, exist_ok=True)


def atomic_write_bytes(path: Path, data: bytes) -> None:
    ensure_dir(path.parent)
    tmp = path.with_name(path.name + ".tmp")
    with tmp.open("wb") as f:
        f.write(data)
        f.flush()
        os.fsync(f.fileno())
    os.replace(tmp, path)


def atomic_write_text(path: Path, text: str) -> None:
    atomic_write_bytes(path, text.encode("utf-8"))


def unified_diff_text(old: str, new: str, fromfile: str, tofile: str) -> str:
    old_lines = old.splitlines(keepends=True)
    new_lines = new.splitlines(keepends=True)
    diff = difflib.unified_diff(old_lines, new_lines, fromfile=fromfile, tofile=tofile)
    return "".join(diff)


def safe_patch_tag(tag: str) -> str:
    return ("".join(c for c in tag if c.isalnum() or c in ("-", "_"))[:40] or "patch")


def relpath_str(path: Path, root: Path) -> str:
    # Normalize to a stable forward-slash relative string.
    try:
        rel = path.relative_to(root)
    except Exception:
        rel = Path(os.path.relpath(path, root))
    return rel.as_posix()


class GuardianState:
    def __init__(self, state_dir: Path):
        self.state_dir = state_dir
        self.policy_path = state_dir / POLICY_FILE
        self.baselines_path = state_dir / BASELINES_FILE
        self.audit_path = state_dir / AUDIT_LOG_FILE
        self.approved_dir = state_dir / APPROVED_DIRNAME
        self.patch_dir = state_dir / PATCH_DIRNAME
        self.quarantine_dir = state_dir / QUAR_DIRNAME

    def ensure_dirs(self) -> None:
        ensure_dir(self.state_dir)
        ensure_dir(self.approved_dir)
        ensure_dir(self.patch_dir)
        ensure_dir(self.quarantine_dir)


def default_policy() -> dict[str, Any]:
    # Default protected set, per requirements.
    return {
        "version": 1,
        "workspaceRoot": str(WORKSPACE_ROOT),
        "targets": [
            {"path": "SOUL.md", "mode": "restore"},
            {"path": "AGENTS.md", "mode": "restore"},
            {"path": "USER.md", "mode": "alert"},
            {"path": "TOOLS.md", "mode": "alert"},
            {"path": "IDENTITY.md", "mode": "alert"},
            {"path": "HEARTBEAT.md", "mode": "alert"},
            {"path": "MEMORY.md", "mode": "alert"},
            # Ignore daily notes by default.
            {"pattern": "memory/*.md", "mode": "ignore"},
        ],
    }


def load_policy(state: GuardianState) -> dict[str, Any]:
    if not state.policy_path.exists():
        return default_policy()
    return json.loads(state.policy_path.read_text(encoding="utf-8"))


def save_policy(state: GuardianState, policy: dict[str, Any]) -> None:
    state.ensure_dirs()
    atomic_write_text(state.policy_path, json.dumps(policy, ensure_ascii=False, indent=2) + "\n")


def load_baselines(state: GuardianState) -> dict[str, Any]:
    """Load baselines.json.

    Backward-compat:
    - If baselines.json doesn't exist but legacy SOUL.md baseline exists
      (approved.sha256 + approved/SOUL.md), import it into the in-memory baselines.
      The caller will persist it on the next save.
    """

    if state.baselines_path.exists():
        return json.loads(state.baselines_path.read_text(encoding="utf-8"))

    baselines: dict[str, Any] = {"version": 1, "files": {}}

    legacy_sha = state.state_dir / "approved.sha256"
    legacy_snap = state.approved_dir / "SOUL.md"
    if legacy_sha.exists() and legacy_snap.exists():
        sha = legacy_sha.read_text(encoding="utf-8").strip()
        if sha:
            baselines["files"]["SOUL.md"] = {"sha256": sha, "approvedAt": "legacy"}

    return baselines


def save_baselines(state: GuardianState, baselines: dict[str, Any]) -> None:
    state.ensure_dirs()
    atomic_write_text(state.baselines_path, json.dumps(baselines, ensure_ascii=False, indent=2, sort_keys=True) + "\n")


def resolve_targets(policy: dict[str, Any], root: Path) -> list[dict[str, str]]:
    """Return list of effective targets to consider.

    For entries with {path, mode}: direct file.
    For entries with {pattern, mode}: expands via globbing relative to root.

    Note: We keep it simple and only expand within workspace root.
    """

    targets: list[dict[str, str]] = []
    entries = policy.get("targets", [])

    for ent in entries:
        mode = ent.get("mode")
        if mode not in ("restore", "alert", "ignore"):
            continue

        if "path" in ent:
            p = Path(ent["path"])
            targets.append({"path": p.as_posix(), "mode": mode})
            continue

        pat = ent.get("pattern")
        if not pat:
            continue

        # Expand pattern relative to root.
        # Using glob keeps it bounded to workspace.
        for match in root.glob(pat):
            if match.is_dir():
                continue
            rel = relpath_str(match, root)
            targets.append({"path": rel, "mode": mode})

    # De-dup by path keeping the last specified mode.
    dedup: dict[str, str] = {}
    for t in targets:
        dedup[t["path"]] = t["mode"]
    return [{"path": p, "mode": m} for p, m in sorted(dedup.items())]


def policy_mode_for_path(policy: dict[str, Any], rel_path: str) -> str | None:
    # Direct match has priority; then patterns.
    entries = policy.get("targets", [])

    for ent in entries:
        if ent.get("path") == rel_path:
            return ent.get("mode")

    for ent in entries:
        pat = ent.get("pattern")
        if not pat:
            continue
        if fnmatch.fnmatch(rel_path, pat):
            return ent.get("mode")

    return None


def approved_snapshot_path(state: GuardianState, rel_path: str) -> Path:
    # Preserve relative structure under approved/.
    return state.approved_dir / Path(rel_path)


def write_patch(state: GuardianState, patch_text: str, tag: str, rel_path: str) -> Path:
    state.ensure_dirs()
    ts = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    path_tag = safe_patch_tag(tag)
    file_tag = safe_patch_tag(rel_path.replace("/", "_"))
    path = state.patch_dir / f"{ts}-{file_tag}-{path_tag}.patch"
    atomic_write_text(path, patch_text)
    return path


def _canonical_json(obj: Any) -> str:
    # Stable serialization for hashing.
    return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":"))


def _audit_needs_upgrade(state: GuardianState) -> bool:
    """Detect legacy audit logs that lack a chain field."""
    if not state.audit_path.exists():
        return False
    try:
        with state.audit_path.open("r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                rec = json.loads(line)
                return "chain" not in rec
    except Exception:
        # If unreadable, force rotation so we can proceed safely.
        return True
    return False


def _rotate_legacy_audit(state: GuardianState) -> None:
    if not state.audit_path.exists():
        return
    ts = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    legacy = state.state_dir / f"audit.legacy.{ts}.jsonl"
    os.replace(state.audit_path, legacy)


def _last_audit_hash(state: GuardianState) -> str:
    if not state.audit_path.exists():
        return CHAIN_GENESIS

    # Read last non-empty line without loading huge files.
    with state.audit_path.open("rb") as f:
        f.seek(0, os.SEEK_END)
        size = f.tell()
        if size == 0:
            return CHAIN_GENESIS
        block = 65536
        start = max(0, size - block)
        f.seek(start)
        data = f.read()

    lines = [ln for ln in data.splitlines() if ln.strip()]
    if not lines:
        return CHAIN_GENESIS

    last = lines[-1]
    try:
        rec = json.loads(last.decode("utf-8"))
        return rec.get("chain", {}).get("hash") or CHAIN_GENESIS
    except Exception:
        return CHAIN_GENESIS


def append_audit(state: GuardianState, entry: dict[str, Any]) -> None:
    """Append an audit entry with hash chaining.

    Each record includes: chain.prev, chain.hash
    chain.hash = sha256(prev_hash + "\n" + canonical_json(entry_without_chain))

    Backward-compat: if an existing audit.jsonl doesn't contain chain fields
    (legacy v1 logs), rotate it aside and start a new chained log.
    """

    state.ensure_dirs()

    if _audit_needs_upgrade(state):
        _rotate_legacy_audit(state)

    prev = _last_audit_hash(state)

    entry_wo_chain = dict(entry)
    entry_wo_chain.pop("chain", None)

    payload = prev + "\n" + _canonical_json(entry_wo_chain)
    cur = sha256_text(payload)

    record = dict(entry_wo_chain)
    record["chain"] = {"prev": prev, "hash": cur}

    with state.audit_path.open("a", encoding="utf-8") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")


def refuse_symlink(path: Path) -> None:
    if is_symlink(path):
        raise RuntimeError(f"Refusing to operate on symlink: {path}")


def compute_file_sha(path: Path) -> str:
    return sha256_bytes(read_bytes(path))


def baseline_info_for(state: GuardianState, baselines: dict[str, Any], rel_path: str) -> dict[str, Any] | None:
    return (baselines.get("files") or {}).get(rel_path)


def set_baseline_for(state: GuardianState, baselines: dict[str, Any], rel_path: str, sha: str) -> None:
    baselines.setdefault("files", {})[rel_path] = {
        "sha256": sha,
        "approvedAt": utc_now_iso(),
    }


def init_cmd(state: GuardianState, actor: str, note: str, *, force_policy: bool = False) -> None:
    state.ensure_dirs()

    if force_policy or not state.policy_path.exists():
        save_policy(state, default_policy())

    policy = load_policy(state)
    baselines = load_baselines(state)

    targets = resolve_targets(policy, WORKSPACE_ROOT)

    initialized_any = False
    for t in targets:
        relp = t["path"]
        mode = t["mode"]
        if mode == "ignore":
            continue

        abs_path = WORKSPACE_ROOT / relp
        if not abs_path.exists():
            continue
        refuse_symlink(abs_path)

        # If already has baseline, do not overwrite.
        if baseline_info_for(state, baselines, relp) is not None and approved_snapshot_path(state, relp).exists():
            continue

        sha = compute_file_sha(abs_path)

        # Snapshot.
        snap = approved_snapshot_path(state, relp)
        ensure_dir(snap.parent)
        atomic_write_bytes(snap, read_bytes(abs_path))

        set_baseline_for(state, baselines, relp, sha)
        initialized_any = True

        append_audit(state, {
            "ts": utc_now_iso(),
            "event": "init",
            "actor": actor,
            "note": note,
            "path": relp,
            "mode": mode,
            "approvedSha": sha,
            "workspace": str(WORKSPACE_ROOT),
            "stateDir": str(state.state_dir),
        })

    save_baselines(state, baselines)

    if initialized_any:
        print(f"Initialized baselines in {state.state_dir}")
    else:
        print("Already initialized (no new baselines created).")


def status_cmd(state: GuardianState) -> None:
    state.ensure_dirs()
    policy = load_policy(state)
    baselines = load_baselines(state)

    targets = resolve_targets(policy, WORKSPACE_ROOT)
    out: dict[str, Any] = {
        "workspace": str(WORKSPACE_ROOT),
        "stateDir": str(state.state_dir),
        "policyPath": str(state.policy_path),
        "baselinesPath": str(state.baselines_path),
        "auditLog": str(state.audit_path),
        "files": [],
    }

    for t in targets:
        relp = t["path"]
        mode = t["mode"]
        abs_path = WORKSPACE_ROOT / relp

        baseline = baseline_info_for(state, baselines, relp)
        approved_sha = baseline.get("sha256") if baseline else None
        approved_snap = approved_snapshot_path(state, relp)

        current_sha = None
        if abs_path.exists() and not is_symlink(abs_path):
            try:
                current_sha = compute_file_sha(abs_path)
            except Exception:
                current_sha = None

        ok = (mode == "ignore") or (approved_sha is not None and current_sha == approved_sha)

        out["files"].append({
            "path": relp,
            "mode": mode,
            "exists": abs_path.exists(),
            "isSymlink": is_symlink(abs_path) if abs_path.exists() else False,
            "approvedSha": approved_sha,
            "currentSha": current_sha,
            "approvedSnapshot": str(approved_snap) if approved_snap.exists() else None,
            "ok": ok,
        })

    print(json.dumps(out, indent=2))


def detect_drift_for(state: GuardianState, baselines: dict[str, Any], relp: str) -> tuple[bool, dict[str, Any]]:
    abs_path = WORKSPACE_ROOT / relp

    if not abs_path.exists():
        return True, {"error": f"Missing {relp}"}

    refuse_symlink(abs_path)

    baseline = baseline_info_for(state, baselines, relp)
    if not baseline:
        return True, {"error": f"Not initialized for {relp} (missing baseline). Run init/approve."}

    approved_sha = baseline.get("sha256")
    approved_snap = approved_snapshot_path(state, relp)
    if not approved_snap.exists():
        return True, {"error": f"Not initialized for {relp} (missing approved snapshot)."}

    cur_bytes = read_bytes(abs_path)
    cur_sha = sha256_bytes(cur_bytes)

    if cur_sha == approved_sha:
        return False, {"approvedSha": approved_sha, "currentSha": cur_sha}

    old_text = read_text(approved_snap)
    new_text = read_text(abs_path)
    patch_text = unified_diff_text(old_text, new_text, f"approved/{relp}", relp)
    patch_path = write_patch(state, patch_text, tag="drift", rel_path=relp)

    return True, {
        "approvedSha": approved_sha,
        "currentSha": cur_sha,
        "patchPath": str(patch_path),
    }


def restore_one(state: GuardianState, relp: str, info: dict[str, Any]) -> dict[str, Any]:
    """Restore a single file to its approved snapshot.

    Returns: extra fields to include in audit.
    """

    abs_path = WORKSPACE_ROOT / relp
    refuse_symlink(abs_path)

    approved_snap = approved_snapshot_path(state, relp)
    if not approved_snap.exists():
        raise RuntimeError(f"Missing approved snapshot for {relp}")

    state.ensure_dirs()
    ts = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")

    quarantine_path = state.quarantine_dir / f"{safe_patch_tag(relp.replace('/', '_'))}.{ts}.quarantine"
    atomic_write_bytes(quarantine_path, read_bytes(abs_path))

    # Atomic restore.
    atomic_write_bytes(abs_path, read_bytes(approved_snap))

    return {"quarantinePath": str(quarantine_path), **info}


def format_alert_human(drifted: list[dict[str, Any]]) -> str:
    """Format drift results as human-readable alert for TUI notification."""
    lines = []
    lines.append("")
    lines.append("=" * 50)
    lines.append("🚨 SOUL GUARDIAN SECURITY ALERT")
    lines.append("=" * 50)
    lines.append("")
    
    for d in drifted:
        path = d.get("path", "unknown")
        mode = d.get("mode", "unknown")
        restored = d.get("restored", False)
        error = d.get("error")
        
        if error:
            lines.append(f"⚠️  ERROR: {path}")
            lines.append(f"   {error}")
        else:
            lines.append(f"πŸ“„ FILE: {path}")
            lines.append(f"   Mode: {mode}")
            if restored:
                lines.append(f"   Status: βœ… RESTORED to approved baseline")
                if d.get("quarantinePath"):
                    lines.append(f"   Quarantined: {d.get('quarantinePath')}")
            else:
                lines.append(f"   Status: ⚠️  DRIFT DETECTED (not auto-restored)")
            
            if d.get("approvedSha"):
                lines.append(f"   Expected hash: {d.get('approvedSha')[:16]}...")
            if d.get("currentSha"):
                lines.append(f"   Found hash:    {d.get('currentSha')[:16]}...")
            if d.get("patchPath"):
                lines.append(f"   Diff saved: {d.get('patchPath')}")
        lines.append("")
    
    lines.append("=" * 50)
    lines.append("Review changes and investigate the source of drift.")
    lines.append("If intentional, run: soul_guardian.py approve --file <path>")
    lines.append("=" * 50)
    lines.append("")
    
    return "\n".join(lines)


def check_cmd(state: GuardianState, actor: str, note: str, *, no_restore: bool = False, output_format: str = "json") -> int:
    state.ensure_dirs()
    policy = load_policy(state)
    baselines = load_baselines(state)

    targets = resolve_targets(policy, WORKSPACE_ROOT)

    drifted: list[dict[str, Any]] = []

    for t in targets:
        relp = t["path"]
        mode = t["mode"]
        if mode == "ignore":
            continue

        drift, info = detect_drift_for(state, baselines, relp)
        if not drift:
            continue

        if "error" in info:
            append_audit(state, {
                "ts": utc_now_iso(),
                "event": "error",
                "actor": actor,
                "note": note,
                "path": relp,
                "mode": mode,
                "error": info["error"],
            })
            drifted.append({"path": relp, "mode": mode, "error": info["error"]})
            continue

        # Drift detected.
        append_audit(state, {
            "ts": utc_now_iso(),
            "event": "drift",
            "actor": actor,
            "note": note,
            "path": relp,
            "mode": mode,
            **info,
        })

        rec: dict[str, Any] = {"path": relp, "mode": mode, **info}

        # Auto-restore for restore-mode unless disabled.
        if mode == "restore" and not no_restore:
            restored = restore_one(state, relp, info)
            append_audit(state, {
                "ts": utc_now_iso(),
                "event": "restore",
                "actor": actor,
                "note": note,
                "path": relp,
                "mode": mode,
                **restored,
            })
            rec["restored"] = True
            rec["quarantinePath"] = restored.get("quarantinePath")
        else:
            rec["restored"] = False

        drifted.append(rec)

    if not drifted:
        # Silent on OK for alert format
        if output_format != "alert":
            pass  # Could print "OK" here if desired
        return 0

    # Output based on format
    if output_format == "alert":
        # Human-readable alert suitable for direct relay to user
        print(format_alert_human(drifted))
    else:
        # JSON format (default) - single-line summary suitable for cron parsing
        summary = {
            "event": "SOUL_GUARDIAN_DRIFT",
            "count": len(drifted),
            "files": [
                {
                    "path": d["path"],
                    "mode": d.get("mode"),
                    "restored": d.get("restored"),
                    "patch": d.get("patchPath"),
                    "error": d.get("error"),
                }
                for d in drifted
            ],
        }
        print("SOUL_GUARDIAN_DRIFT " + json.dumps(summary, ensure_ascii=False))

    # Drift occurred (even if restored).
    return 2


def watch_cmd(state: GuardianState, actor: str, note: str, *, interval: int = 30) -> int:
    """Continuous watch mode - checks periodically and alerts on drift."""
    import time
    
    print(f"πŸ‘» Soul Guardian watching... (interval: {interval}s)")
    print("Press Ctrl+C to stop.\n")
    
    try:
        while True:
            result = check_cmd(state, actor, note, output_format="alert")
            if result == 0:
                # No drift - stay silent
                pass
            # Sleep before next check
            time.sleep(interval)
    except KeyboardInterrupt:
        print("\nπŸ‘» Soul Guardian stopped.")
        return 0


def enable_monitoring_cmd(state: GuardianState) -> None:
    """Output instructions and HEARTBEAT.md snippet for easy monitoring setup."""
    
    heartbeat_snippet = """
## Soul Guardian Check
- Run `cd {workspace} && python3 skills/soul-guardian/scripts/soul_guardian.py check --actor heartbeat --output-format alert`
- If any output is produced, relay it to the user immediately as a security alert
- This checks for unauthorized changes to SOUL.md, AGENTS.md, and other protected files
""".format(workspace=WORKSPACE_ROOT)

    print("""
╔══════════════════════════════════════════════════════════════╗
β•‘           SOUL GUARDIAN - ENABLE MONITORING                 β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

To enable automatic drift detection and alerting, you have two options:

────────────────────────────────────────────────────────────────
OPTION 1: Heartbeat Integration (Recommended)
────────────────────────────────────────────────────────────────

Add the following to your HEARTBEAT.md file:
""")
    print(heartbeat_snippet)
    print("""
────────────────────────────────────────────────────────────────
OPTION 2: Watch Mode (Foreground)
────────────────────────────────────────────────────────────────

Run this in a terminal to continuously monitor:

    python3 skills/soul-guardian/scripts/soul_guardian.py watch --interval 30

────────────────────────────────────────────────────────────────
OPTION 3: Manual Check
────────────────────────────────────────────────────────────────

Run a one-time check with human-readable output:

    python3 skills/soul-guardian/scripts/soul_guardian.py check --output-format alert

────────────────────────────────────────────────────────────────

The guardian will:
βœ“ Detect unauthorized changes to protected files
βœ“ Auto-restore SOUL.md and AGENTS.md to approved baselines
βœ“ Alert you immediately when drift is detected
βœ“ Save diffs and quarantine modified files for review

""")
    print(f"State directory: {state.state_dir}")
    print(f"Workspace: {WORKSPACE_ROOT}")
    print()


def approve_cmd(state: GuardianState, actor: str, note: str, *, files: list[str] | None, all_files: bool = False) -> None:
    state.ensure_dirs()
    policy = load_policy(state)
    baselines = load_baselines(state)

    targets = resolve_targets(policy, WORKSPACE_ROOT)
    selectable = [t for t in targets if t["mode"] != "ignore"]

    if all_files:
        chosen = selectable
    elif files:
        # Resolve to relative posix.
        wanted = {Path(f).as_posix() for f in files}
        chosen = [t for t in selectable if t["path"] in wanted]
        missing = wanted - {t["path"] for t in chosen}
        if missing:
            raise RuntimeError(f"Unknown or ignored file(s): {', '.join(sorted(missing))}")
    else:
        # Backward-compat: if nothing specified, approve SOUL.md.
        chosen = [t for t in selectable if t["path"] == "SOUL.md"]
        if not chosen:
            raise RuntimeError("No files selected to approve.")

    for t in chosen:
        relp = t["path"]
        mode = t["mode"]
        abs_path = WORKSPACE_ROOT / relp
        if not abs_path.exists():
            raise FileNotFoundError(f"Missing {relp}")
        refuse_symlink(abs_path)

        prev = baseline_info_for(state, baselines, relp)
        prev_sha = prev.get("sha256") if prev else None
        prev_text = read_text(approved_snapshot_path(state, relp)) if approved_snapshot_path(state, relp).exists() else ""

        cur_bytes = read_bytes(abs_path)
        cur_sha = sha256_bytes(cur_bytes)
        cur_text = read_text(abs_path)

        patch_text = unified_diff_text(prev_text, cur_text, f"approved/{relp}", relp)
        patch_path = write_patch(state, patch_text, tag="approve", rel_path=relp)

        snap = approved_snapshot_path(state, relp)
        ensure_dir(snap.parent)
        atomic_write_bytes(snap, cur_bytes)

        set_baseline_for(state, baselines, relp, cur_sha)

        append_audit(state, {
            "ts": utc_now_iso(),
            "event": "approve",
            "actor": actor,
            "note": note,
            "path": relp,
            "mode": mode,
            "prevApprovedSha": prev_sha,
            "approvedSha": cur_sha,
            "patchPath": str(patch_path),
        })

        print(f"Approved {relp}: sha256={cur_sha} patch={patch_path}")

    save_baselines(state, baselines)


def restore_cmd(state: GuardianState, actor: str, note: str, *, files: list[str] | None, all_files: bool = False) -> None:
    state.ensure_dirs()
    policy = load_policy(state)
    baselines = load_baselines(state)

    targets = resolve_targets(policy, WORKSPACE_ROOT)
    restorable = [t for t in targets if t["mode"] == "restore"]

    if all_files:
        chosen = restorable
    elif files:
        wanted = {Path(f).as_posix() for f in files}
        chosen = [t for t in restorable if t["path"] in wanted]
        missing = wanted - {t["path"] for t in chosen}
        if missing:
            raise RuntimeError(f"Not restorable or unknown file(s): {', '.join(sorted(missing))}")
    else:
        # Backward-compat: default restore SOUL.md.
        chosen = [t for t in restorable if t["path"] == "SOUL.md"]
        if not chosen:
            raise RuntimeError("No files selected to restore.")

    restored_any = False
    for t in chosen:
        relp = t["path"]
        mode = t["mode"]

        drift, info = detect_drift_for(state, baselines, relp)
        if "error" in info:
            raise RuntimeError(info["error"])
        if not drift:
            print(f"No drift for {relp}; nothing to restore.")
            continue

        restored = restore_one(state, relp, info)
        append_audit(state, {
            "ts": utc_now_iso(),
            "event": "restore",
            "actor": actor,
            "note": note,
            "path": relp,
            "mode": mode,
            **restored,
        })
        print(
            f"RESTORED {relp} approvedSha={info.get('approvedSha')} previousSha={info.get('currentSha')} "
            f"quarantine={restored.get('quarantinePath')} patch={info.get('patchPath')}"
        )
        restored_any = True

    if not restored_any:
        print("No restores performed.")


def verify_audit_cmd(state: GuardianState) -> None:
    state.ensure_dirs()
    if not state.audit_path.exists():
        print("No audit log present.")
        return

    if _audit_needs_upgrade(state):
        raise RuntimeError(
            "Audit log is legacy (missing hash chain). "
            "Run any command that writes audit (e.g., check) to rotate legacy log, then re-run verify-audit."
        )

    prev = CHAIN_GENESIS
    line_no = 0
    with state.audit_path.open("r", encoding="utf-8") as f:
        for line in f:
            line_no += 1
            line = line.strip()
            if not line:
                continue
            rec = json.loads(line)
            chain = rec.get("chain") or {}
            got_prev = chain.get("prev")
            got_hash = chain.get("hash")

            if got_prev != prev:
                raise RuntimeError(f"Audit chain broken at line {line_no}: prev mismatch (expected {prev}, got {got_prev})")

            rec_wo_chain = dict(rec)
            rec_wo_chain.pop("chain", None)
            payload = prev + "\n" + _canonical_json(rec_wo_chain)
            expect_hash = sha256_text(payload)

            if got_hash != expect_hash:
                raise RuntimeError(f"Audit chain broken at line {line_no}: hash mismatch")

            prev = got_hash

    print(f"OK: audit log hash chain verified ({line_no} lines)")


def parse_args(argv: list[str]) -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Soul Guardian - Workspace file integrity guard with alerting support.",
        epilog="For easy setup, run: soul_guardian.py enable-monitoring"
    )
    p.add_argument(
        "--state-dir",
        default=str(DEFAULT_STATE_DIR),
        help="State directory (default: memory/soul-guardian).",
    )

    sub = p.add_subparsers(dest="cmd", required=True)

    def add_common(sp: argparse.ArgumentParser) -> None:
        sp.add_argument("--actor", default="unknown", help="Who initiated the action (best-effort).")
        sp.add_argument("--note", default="", help="Freeform note (e.g., request context).")

    sp_init = sub.add_parser("init", help="Initialize policy + baselines.")
    add_common(sp_init)
    sp_init.add_argument("--force-policy", action="store_true", help="Overwrite policy.json with defaults.")

    sub.add_parser("status", help="Print status JSON.")

    sp_check = sub.add_parser("check", help="Check for drift; restore restore-mode by default.")
    add_common(sp_check)
    sp_check.add_argument("--no-restore", action="store_true", help="Never restore during check (alert-only run).")
    sp_check.add_argument("--output-format", choices=["json", "alert"], default="json",
                          help="Output format: json (machine-readable) or alert (human-readable for TUI).")

    sp_approve = sub.add_parser("approve", help="Approve current contents as baselines.")
    add_common(sp_approve)
    sp_approve.add_argument("--file", action="append", dest="files", help="Relative file path to approve (repeatable).")
    sp_approve.add_argument("--all", action="store_true", help="Approve all non-ignored policy targets.")

    sp_restore = sub.add_parser("restore", help="Restore restore-mode files to approved baselines.")
    add_common(sp_restore)
    sp_restore.add_argument("--file", action="append", dest="files", help="Relative file path to restore (repeatable).")
    sp_restore.add_argument("--all", action="store_true", help="Restore all restore-mode targets.")

    sub.add_parser("verify-audit", help="Verify audit log hash chain.")
    
    # New commands for easier monitoring setup
    sp_watch = sub.add_parser("watch", help="Continuous watch mode - monitors and alerts on drift.")
    add_common(sp_watch)
    sp_watch.add_argument("--interval", type=int, default=30, help="Check interval in seconds (default: 30).")
    
    sub.add_parser("enable-monitoring", help="Show instructions for enabling automatic monitoring and alerts.")

    return p.parse_args(argv)


def main(argv: list[str]) -> int:
    args = parse_args(argv)
    state = GuardianState(Path(args.state_dir).expanduser())

    try:
        if args.cmd == "init":
            init_cmd(state, args.actor, args.note, force_policy=bool(getattr(args, "force_policy", False)))
            return 0
        if args.cmd == "status":
            status_cmd(state)
            return 0
        if args.cmd == "check":
            return check_cmd(
                state, args.actor, args.note,
                no_restore=bool(getattr(args, "no_restore", False)),
                output_format=getattr(args, "output_format", "json")
            )
        if args.cmd == "approve":
            approve_cmd(state, args.actor, args.note, files=getattr(args, "files", None), all_files=bool(getattr(args, "all", False)))
            return 0
        if args.cmd == "restore":
            restore_cmd(state, args.actor, args.note, files=getattr(args, "files", None), all_files=bool(getattr(args, "all", False)))
            return 0
        if args.cmd == "verify-audit":
            verify_audit_cmd(state)
            return 0
        if args.cmd == "watch":
            return watch_cmd(state, args.actor, args.note, interval=getattr(args, "interval", 30))
        if args.cmd == "enable-monitoring":
            enable_monitoring_cmd(state)
            return 0

        raise RuntimeError(f"Unknown cmd: {args.cmd}")

    except Exception as e:
        print(f"ERROR: {e}", file=sys.stderr)
        return 1


if __name__ == "__main__":
    raise SystemExit(main(sys.argv[1:]))

```

### scripts/test_soul_guardian.py

```python
#!/usr/bin/env python3
"""Minimal tests for soul_guardian.py.

Run:
  python3 skills/soul-guardian/scripts/test_soul_guardian.py

This is a lightweight integration test using a temp workspace.
"""

from __future__ import annotations

import json
import os
from pathlib import Path
import subprocess
import tempfile


REPO_ROOT = Path(__file__).resolve().parents[3]  # .../clawd
SCRIPT = REPO_ROOT / "skills" / "soul-guardian" / "scripts" / "soul_guardian.py"


def run(cmd: list[str], cwd: Path) -> subprocess.CompletedProcess:
    return subprocess.run(cmd, cwd=str(cwd), text=True, capture_output=True)


def must_ok(cp: subprocess.CompletedProcess) -> None:
    if cp.returncode != 0:
        raise AssertionError(f"Expected rc=0, got {cp.returncode}\nSTDOUT:\n{cp.stdout}\nSTDERR:\n{cp.stderr}")


def must_rc(cp: subprocess.CompletedProcess, rc: int) -> None:
    if cp.returncode != rc:
        raise AssertionError(f"Expected rc={rc}, got {cp.returncode}\nSTDOUT:\n{cp.stdout}\nSTDERR:\n{cp.stderr}")


def main() -> int:
    with tempfile.TemporaryDirectory() as td:
        ws = Path(td)
        state = ws / "state"

        # Create a fake workspace with the default protected files.
        (ws / "memory").mkdir(parents=True, exist_ok=True)
        (ws / "SOUL.md").write_text("hello soul\n", encoding="utf-8")
        (ws / "AGENTS.md").write_text("hello agents\n", encoding="utf-8")
        (ws / "USER.md").write_text("user v1\n", encoding="utf-8")
        (ws / "TOOLS.md").write_text("tools v1\n", encoding="utf-8")
        (ws / "IDENTITY.md").write_text("id v1\n", encoding="utf-8")
        (ws / "HEARTBEAT.md").write_text("hb v1\n", encoding="utf-8")
        (ws / "MEMORY.md").write_text("mem v1\n", encoding="utf-8")
        # Daily notes should be ignored by default.
        (ws / "memory" / "2026-01-01.md").write_text("daily\n", encoding="utf-8")

        # Init baselines.
        cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "init", "--actor", "test"], cwd=ws)
        must_ok(cp)

        # No drift.
        cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "check"], cwd=ws)
        must_ok(cp)

        # Drift restore-mode file: SOUL.md should be auto-restored by check.
        (ws / "SOUL.md").write_text("MALICIOUS\n", encoding="utf-8")
        cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "check", "--actor", "cron"], cwd=ws)
        must_rc(cp, 2)
        assert (ws / "SOUL.md").read_text(encoding="utf-8") == "hello soul\n"

        # Drift alert-only file: USER.md should NOT be restored.
        (ws / "USER.md").write_text("user v2\n", encoding="utf-8")
        cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "check"], cwd=ws)
        must_rc(cp, 2)
        assert (ws / "USER.md").read_text(encoding="utf-8") == "user v2\n"

        # Approve USER.md change.
        cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "approve", "--file", "USER.md", "--actor", "test"], cwd=ws)
        must_ok(cp)

        # Now check should be clean.
        cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "check"], cwd=ws)
        must_ok(cp)

        # Verify audit chain ok.
        cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "verify-audit"], cwd=ws)
        must_ok(cp)

        # Tamper with audit log and ensure verify fails.
        audit = state / "audit.jsonl"
        lines = audit.read_text(encoding="utf-8").splitlines()
        assert lines, "audit log empty"
        rec = json.loads(lines[-1])
        rec["note"] = "tampered"
        lines[-1] = json.dumps(rec, ensure_ascii=False)
        audit.write_text("\n".join(lines) + "\n", encoding="utf-8")

        cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "verify-audit"], cwd=ws)
        if cp.returncode == 0:
            raise AssertionError("Expected verify-audit to fail after tamper")

        print("OK: soul-guardian minimal tests passed")

    return 0


if __name__ == "__main__":
    raise SystemExit(main())

```

soul-guardian | SkillHub