soul-guardian
Drift detection + baseline integrity guard for agent workspace files with automatic alerting support
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-soul-guardian
Repository
Skill path: skills/davida-ps/soul-guardian
Drift detection + baseline integrity guard for agent workspace files with automatic alerting support
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install soul-guardian into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding soul-guardian to shared team environments
- Use soul-guardian for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: soul-guardian
version: 0.0.2
description: Drift detection + baseline integrity guard for agent workspace files with automatic alerting support
homepage: https://clawsec.prompt.security
metadata: {"openclaw":{"emoji":"π»","category":"security"}}
clawdis:
emoji: "π»"
requires:
bins: [python3]
---
# soul-guardian π»
Protects your agent's core files (SOUL.md, AGENTS.md, etc.) from unauthorized changes with automatic detection, restoration, and **user alerting**.
## Quick Start (3 Steps)
### Step 1: Initialize baselines
```bash
cd ~/.openclaw/workspace
python3 skills/soul-guardian/scripts/soul_guardian.py init --actor setup --note "initial baseline"
```
### Step 2: Enable monitoring
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py enable-monitoring
```
This shows you how to add monitoring to your workflow.
### Step 3: Add to HEARTBEAT.md (Recommended)
Add this to your HEARTBEAT.md for automatic monitoring:
```markdown
## Soul Guardian Check
- Run `python3 skills/soul-guardian/scripts/soul_guardian.py check --actor heartbeat --output-format alert`
- If any output is produced, relay it to the user immediately as a security alert
```
That's it! Soul Guardian will now:
- β
Detect unauthorized changes to protected files
- β
Auto-restore SOUL.md and AGENTS.md to approved baseline
- β
Alert you when drift is detected and handled
---
## What it protects (default policy)
| File | Mode | Action on drift |
|------|------|-----------------|
| SOUL.md | restore | Auto-restore + alert |
| AGENTS.md | restore | Auto-restore + alert |
| USER.md | alert | Alert only |
| TOOLS.md | alert | Alert only |
| IDENTITY.md | alert | Alert only |
| HEARTBEAT.md | alert | Alert only |
| MEMORY.md | alert | Alert only |
| memory/*.md | ignore | Ignored |
## Commands
### Check for drift (with alert output)
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py check --output-format alert
```
- Silent if no drift
- Outputs human-readable alert if drift detected
- Perfect for heartbeat integration
### Watch mode (continuous monitoring)
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py watch --interval 30
```
Runs continuously, checking every 30 seconds.
### Approve intentional changes
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py approve --file SOUL.md --actor user --note "intentional update"
```
### View status
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py status
```
### Verify audit log integrity
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py verify-audit
```
---
## Alert Format
When drift is detected, the `--output-format alert` produces output like:
```
==================================================
π¨ SOUL GUARDIAN SECURITY ALERT
==================================================
π FILE: SOUL.md
Mode: restore
Status: β
RESTORED to approved baseline
Expected hash: abc123def456...
Found hash: 789xyz000111...
Diff saved: /path/to/patches/drift.patch
==================================================
Review changes and investigate the source of drift.
If intentional, run: soul_guardian.py approve --file <path>
==================================================
```
This output is designed to be relayed directly to the user in TUI/chat.
---
## Security Model
**What it does:**
- Detects filesystem drift vs approved baseline (sha256)
- Produces unified diffs for review
- Maintains tamper-evident audit log with hash chaining
- Refuses to operate on symlinks
- Uses atomic writes for restores
**What it doesn't do:**
- Cannot prove WHO made a change (actor is best-effort metadata)
- Cannot protect if attacker controls both workspace AND state directory
- Is not a substitute for backups
**Recommendation:** Store state directory outside workspace for better resilience.
---
## Demo
Run the full demo flow to see soul-guardian in action:
```bash
bash skills/soul-guardian/scripts/demo.sh
```
This will:
1. Verify clean state (silent check)
2. Inject malicious content into SOUL.md
3. Run heartbeat check (produces alert)
4. Show SOUL.md was restored
---
## Troubleshooting
**"Not initialized" error:**
Run `init` first to set up baselines.
**Drift keeps happening:**
Check what's modifying your files. Review the audit log and patches.
**Want to approve a change:**
Run `approve --file <path>` after reviewing the change.
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### README.md
```markdown
# soul-guardian
A small, dependency-free integrity guard for Clawdbot agent workspaces.
It helps you detect (and optionally auto-undo) unexpected edits to the workspace markdown files that an agent auto-loads (e.g., `SOUL.md`, `AGENTS.md`). It also records a **tamper-evident** audit trail of changes.
## Why this exists
In many Clawdbot setups, the agent reads certain markdown files every session (identity, instructions, memory, tools, etc.). If those files drift unexpectedly (accidental edits, bad merges, unwanted automation, etc.), you want:
- detection (sha256 mismatch)
- a diff/patch artifact for review
- a record of what happened (audit log)
- optionally: an automatic restore to a known-good baseline for critical files
## What it protects (default policy)
Default `policy.json` protects:
- **Auto-restore + alert:** `SOUL.md`, `AGENTS.md`
- **Alert-only:** `USER.md`, `TOOLS.md`, `IDENTITY.md`, `HEARTBEAT.md`, `MEMORY.md`
- **Ignored by default:** `memory/*.md` (daily notes)
You can customize this by editing the policy file in the guardian state directory.
## Security model (and limitations)
What it does well:
- Detects filesystem drift vs an approved baseline.
- Produces unified diffs (patch files) for review.
- Maintains an **append-only JSONL audit log** with **hash chaining** so log tampering is detectable.
- Refuses to operate on **symlinks** (reduces link attacks).
- Uses **atomic writes** for restores and baseline updates (`os.replace`).
What it does *not* do:
- It cannot prove *who* changed a file. `--actor` is best-effort metadata.
- It cannot protect you if an attacker can modify both the workspace and the guardian state directory.
- It is not a substitute for backups.
Recommendation (not enforced):
- Mirror/back up your guardian state directory (and/or workspace) using git and/or offsite backups.
## State directory
By default, state is stored inside the workspace:
- `memory/soul-guardian/`
- `policy.json` (what to monitor)
- `baselines.json` (approved sha256 per file)
- `approved/<path>` (approved snapshots)
- `audit.jsonl` (append-only log with hash chain)
- `patches/*.patch` (unified diffs)
- `quarantine/*` (copies of drifted files before restore)
For better resilience, you can move this **outside** the workspace (recommended).
## Install / usage
From the agent workspace root.
### First run / Initialize baselines (recommended)
For resilience, create your guardian **state directory outside** the workspace first, then initialize baselines.
1) Onboard an external state dir (creates policy, copies any existing state, prints paths/snippets):
```bash
python3 skills/soul-guardian/scripts/onboard_state_dir.py --agent-id <agentId>
```
2) Initialize baselines **in that external state dir**:
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
init --actor sam --note "first baseline"
```
3) Run a check once (should be silent on OK; prints a single-line summary on drift):
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
check --actor system --note "first check"
```
### Common commands
Status (summary):
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
status
```
Check for drift (default: restores restore-mode files):
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
check --actor system --note cron
```
Alert-only check (never restore):
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
check --no-restore
```
Approve intentional edits (one file):
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
approve --file SOUL.md --actor sam --note "intentional update"
```
Approve all policy targets (except ignored ones):
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
approve --all --actor sam --note "bulk approve"
```
Restore (only restore-mode files):
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
restore --file SOUL.md --actor system --note "manual restore"
```
Verify audit log tamper-evidence:
```bash
python3 skills/soul-guardian/scripts/soul_guardian.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
verify-audit
```
## Policy format (`policy.json`)
Example:
```json
{
"version": 1,
"workspaceRoot": "/path/to/workspace",
"targets": [
{"path": "SOUL.md", "mode": "restore"},
{"path": "AGENTS.md", "mode": "restore"},
{"path": "USER.md", "mode": "alert"},
{"pattern": "memory/*.md", "mode": "ignore"}
]
}
```
- `mode`:
- `restore`: drift triggers audit + patch + (by default) restore + quarantine copy
- `alert`: drift triggers audit + patch, but does not restore
- `ignore`: excluded
## Onboarding: move state outside the workspace
Run the helper:
```bash
python3 skills/soul-guardian/scripts/onboard_state_dir.py
```
It will:
- create an external state dir (**recommended default:** `~/.clawdbot/soul-guardian/<agentId>/`)
- copy (or move with `--move`) existing state from `memory/soul-guardian/`
- write a default `policy.json` if missing
- print scheduling snippets
Notes:
- `<agentId>` should be **stable and unique per workspace** (donβt point multiple workspaces at the same state dir).
- WARNING: `--move` deletes the old in-workspace state dir after copying.
- The external state dir can contain **approved snapshots, patches, and quarantined copies** of sensitive prompt/instruction/memory files. Keep permissions restrictive (e.g., `chmod 700 <dir>`; `chmod go-rwx <dir>`).
Then include `--state-dir` in all commands (run from the workspace root), e.g.:
```bash
cd <workspace> && python3 skills/soul-guardian/scripts/soul_guardian.py --state-dir ~/.clawdbot/soul-guardian/<agentId> check
```
## Scheduling (cron)
### A) Clawdbot Gateway Cron (recommended)
This is the default pattern when you want drift notifications to flow through Clawdbot.
Note: even when there is **no drift**, Clawdbot cron runs typically show an **OK summary** in the main session.
Example (edit paths + schedule):
```bash
clawdbot cron add \
--name "soul-guardian: check workspace" \
--description "Run soul-guardian check; alert when drift detected." \
--session isolated \
--wake now \
--cron "*/10 * * * *" \
--tz UTC \
--message "Run:\ncd '<workspace>'\npython3 skills/soul-guardian/scripts/soul_guardian.py --state-dir ~/.clawdbot/soul-guardian/<agentId> check --actor cron --note 'gateway-cron'\n\nIf the command prints a line starting with 'SOUL_GUARDIAN_DRIFT', treat it as an alert. If it prints nothing, reply HEARTBEAT_OK." \
--post-prefix "[soul-guardian]" \
--post-mode summary
```
### B) macOS launchd (optional, silent-on-OK)
If you want **system scheduling** without Clawdbot posting OK summaries, use `launchd`.
Because `soul_guardian.py check` prints **nothing** on OK and prints a single-line `SOUL_GUARDIAN_DRIFT ...` summary on drift, this tends to be silent unless something changed.
Generate + (optionally) install a LaunchAgent plist (run from the workspace root, or pass `--workspace-root`):
```bash
python3 skills/soul-guardian/scripts/install_launchd_plist.py \
--state-dir ~/.clawdbot/soul-guardian/<agentId> \
--interval-seconds 600 \
--install
```
The generated plist includes `WorkingDirectory` set to your workspace root (recommended), so relative paths behave as expected.
The script writes drift output to log files under `<state-dir>/logs/`.
You can tail them with the commands it prints.
## Development / tests
A minimal test script is included:
```bash
python3 skills/soul-guardian/scripts/test_soul_guardian.py
```
It simulates a workspace in a temp directory and validates drift detection, approve/restore flow, and audit hash chain verification.
```
### _meta.json
```json
{
"owner": "davida-ps",
"slug": "soul-guardian",
"displayName": "soul-guardian",
"latest": {
"version": "0.0.2",
"publishedAt": 1770399192748,
"commit": "https://github.com/openclaw/skills/commit/d1b9f12893708eec89496ad2fd2b892111cc1b41"
},
"history": []
}
```
### scripts/install_launchd_plist.py
```python
#!/usr/bin/env python3
"""Generate (and optionally install) a macOS launchd plist for soul-guardian.
Goal:
- Run `soul_guardian.py check` on an interval.
- Be *silent on OK* (soul_guardian.py prints nothing + exits 0 when no drift).
- Produce a single-line stdout alert on drift (exits 2 and prints SOUL_GUARDIAN_DRIFT ...).
This script is intentionally deterministic and dependency-free.
It does NOT attempt to deliver drift alerts to Telegram/Slack/etc.
Instead it:
- writes logs to the state dir (so drift output is preserved)
- relies on you to wire notifications however you prefer
If you want Clawdbot-side delivery, use Clawdbot Gateway Cron.
"""
from __future__ import annotations
import argparse
import os
from pathlib import Path
import plistlib
import subprocess
import sys
def agent_id_default(workspace_root: Path) -> str:
return workspace_root.name
def default_external_state_dir(agent_id: str) -> Path:
return Path("~/.clawdbot/soul-guardian").expanduser() / agent_id
def run_launchctl(args: list[str]) -> None:
subprocess.run(["/bin/launchctl", *args], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser()
ap.add_argument(
"--workspace-root",
default=str(Path.cwd()),
help="Workspace root (default: current working directory).",
)
ap.add_argument(
"--agent-id",
default=None,
help="Agent/workspace identifier used in default label + state dir (default: workspace folder name).",
)
ap.add_argument(
"--state-dir",
default=None,
help="External state directory (recommended). Default: ~/.clawdbot/soul-guardian/<agentId>/",
)
ap.add_argument(
"--label",
default=None,
help="launchd label (default: com.clawdbot.soul-guardian.<agentId>)",
)
ap.add_argument(
"--interval-seconds",
type=int,
default=600,
help="Run interval in seconds (StartInterval). Default: 600 (10 minutes).",
)
ap.add_argument("--actor", default="cron", help="--actor passed to soul_guardian.py (default: cron).")
ap.add_argument("--note", default="launchd", help="--note passed to soul_guardian.py (default: launchd).")
ap.add_argument(
"--out",
default=None,
help="Write plist to this path (default: ~/Library/LaunchAgents/<label>.plist)",
)
ap.add_argument("--force", action="store_true", help="Overwrite existing plist on disk.")
ap.add_argument(
"--install",
action="store_true",
help="Install+load the plist with launchctl (bootstrap). Without this flag we only write the plist.",
)
args = ap.parse_args(argv)
workspace_root = Path(args.workspace_root).expanduser().resolve()
agent_id = args.agent_id or agent_id_default(workspace_root)
state_dir = Path(args.state_dir).expanduser().resolve() if args.state_dir else default_external_state_dir(agent_id)
label = args.label or f"com.clawdbot.soul-guardian.{agent_id}"
plist_path = Path(args.out).expanduser().resolve() if args.out else (Path("~/Library/LaunchAgents").expanduser() / f"{label}.plist")
script_path = workspace_root / "skills" / "soul-guardian" / "scripts" / "soul_guardian.py"
if not script_path.exists():
raise SystemExit(f"soul_guardian.py not found at {script_path}; pass --workspace-root correctly")
# Keep logs in the external state dir.
log_dir = state_dir / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
stdout_log = log_dir / "launchd.stdout.log"
stderr_log = log_dir / "launchd.stderr.log"
program_args = [
"/usr/bin/python3",
str(script_path),
"--state-dir",
str(state_dir),
"check",
"--actor",
str(args.actor),
"--note",
str(args.note),
]
plist: dict[str, object] = {
"Label": label,
"ProgramArguments": program_args,
"WorkingDirectory": str(workspace_root),
"StartInterval": int(args.interval_seconds),
"RunAtLoad": True,
"StandardOutPath": str(stdout_log),
"StandardErrorPath": str(stderr_log),
# Avoid interactive UI dependencies; run in background.
"ProcessType": "Background",
}
plist_path.parent.mkdir(parents=True, exist_ok=True)
if plist_path.exists() and not args.force:
raise SystemExit(f"Refusing to overwrite existing {plist_path}. Re-run with --force.")
with plist_path.open("wb") as f:
plistlib.dump(plist, f, fmt=plistlib.FMT_XML, sort_keys=True)
print(f"Wrote plist: {plist_path}")
print(f"State dir: {state_dir}")
print(f"Label: {label}")
uid = os.getuid()
if args.install:
# Best-effort: remove any existing job with same label, then bootstrap.
run_launchctl(["bootout", f"gui/{uid}", label])
run_launchctl(["bootout", f"gui/{uid}", str(plist_path)])
res = subprocess.run(["/bin/launchctl", "bootstrap", f"gui/{uid}", str(plist_path)], text=True, capture_output=True)
if res.returncode != 0:
sys.stderr.write((res.stderr or res.stdout or "").strip() + "\n")
sys.stderr.write("Failed to bootstrap. You can try manually:\n")
sys.stderr.write(f" launchctl bootstrap gui/{uid} {plist_path}\n")
return 1
subprocess.run(["/bin/launchctl", "enable", f"gui/{uid}/{label}"], check=False)
subprocess.run(["/bin/launchctl", "kickstart", "-k", f"gui/{uid}/{label}"], check=False)
print("Installed + started (launchctl bootstrap/enable/kickstart).")
else:
print("Not installed (dry write). To load it:")
print(f" launchctl bootstrap gui/{uid} {plist_path}")
print(f" launchctl enable gui/{uid}/{label}")
print(f" launchctl kickstart -k gui/{uid}/{label}")
print("\nLogs:")
print(f" tail -n 200 -f {stdout_log}")
print(f" tail -n 200 -f {stderr_log}")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
```
### scripts/onboard_state_dir.py
```python
#!/usr/bin/env python3
"""Onboard soul-guardian state directory outside the workspace.
Why:
- Keeping integrity state inside the workspace can be risky if the workspace is modified or wiped.
- Moving state to an external directory improves resilience and makes tampering harder.
What this script does:
- Creates an external state directory (default: ~/.clawdbot/soul-guardian/<agentId>/)
- Copies (or moves) existing in-workspace state from memory/soul-guardian/
- Writes a default policy.json if missing
- Prints recommended cron snippets (Clawdbot gateway cron and optional launchd)
This script does NOT modify your cron jobs automatically.
"""
from __future__ import annotations
import argparse
import os
from pathlib import Path
import shutil
import sys
WORKSPACE_ROOT = Path.cwd()
DEFAULT_IN_WORKSPACE_STATE = WORKSPACE_ROOT / "memory" / "soul-guardian"
def agent_id_default() -> str:
# Best-effort: workspace folder name.
return WORKSPACE_ROOT.name
def ensure_dir(p: Path) -> None:
p.mkdir(parents=True, exist_ok=True)
def copytree_overwrite(src: Path, dst: Path) -> None:
# Copy directory contents into dst (merge).
ensure_dir(dst)
for root, dirs, files in os.walk(src):
r = Path(root)
rel = r.relative_to(src)
target_root = dst / rel
ensure_dir(target_root)
for d in dirs:
ensure_dir(target_root / d)
for f in files:
s = r / f
t = target_root / f
# Overwrite.
shutil.copy2(s, t)
DEFAULT_POLICY_JSON = """{
"version": 1,
"workspaceRoot": "",
"targets": [
{"path": "SOUL.md", "mode": "restore"},
{"path": "AGENTS.md", "mode": "restore"},
{"path": "USER.md", "mode": "alert"},
{"path": "TOOLS.md", "mode": "alert"},
{"path": "IDENTITY.md", "mode": "alert"},
{"path": "HEARTBEAT.md", "mode": "alert"},
{"path": "MEMORY.md", "mode": "alert"},
{"pattern": "memory/*.md", "mode": "ignore"}
]
}
"""
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--agent-id", default=agent_id_default(), help="Identifier used for default external state path.")
ap.add_argument(
"--state-dir",
default=None,
help="External state directory to create/use (default: ~/.clawdbot/soul-guardian/<agentId>/).",
)
ap.add_argument("--move", action="store_true", help="Move instead of copy (WARNING: deletes the old in-workspace state dir).")
ap.add_argument("--no-copy", action="store_true", help="Do not copy/move existing in-workspace state.")
args = ap.parse_args(argv)
if args.state_dir:
external = Path(args.state_dir).expanduser()
else:
external = (Path("~/.clawdbot/soul-guardian").expanduser() / args.agent_id)
ensure_dir(external)
if not args.no_copy and DEFAULT_IN_WORKSPACE_STATE.exists():
if args.move:
# Move by copying then removing src (safer than rename across filesystems).
copytree_overwrite(DEFAULT_IN_WORKSPACE_STATE, external)
shutil.rmtree(DEFAULT_IN_WORKSPACE_STATE)
action = "moved"
else:
copytree_overwrite(DEFAULT_IN_WORKSPACE_STATE, external)
action = "copied"
print(f"Existing state {action} from {DEFAULT_IN_WORKSPACE_STATE} -> {external}")
else:
print(f"Using external state dir: {external}")
policy_path = external / "policy.json"
if not policy_path.exists():
txt = DEFAULT_POLICY_JSON.replace('"workspaceRoot": ""', f'"workspaceRoot": "{WORKSPACE_ROOT}"')
policy_path.write_text(txt, encoding="utf-8")
print(f"Wrote default policy: {policy_path}")
else:
print(f"Policy already exists: {policy_path}")
print("\nNext steps")
print("1) Initialize baselines in the external state dir:")
print(
f" cd '{WORKSPACE_ROOT}' && python3 skills/soul-guardian/scripts/soul_guardian.py --state-dir '{external}' init --actor 'sam' --note 'onboard external state'\n"
)
print("2) Update your cron/check runner to include --state-dir.")
print("\nClawdbot gateway cron (recommended; does not require system cron):")
print("- In your cron spec, run something like:")
print(
f" cd '{WORKSPACE_ROOT}' && python3 skills/soul-guardian/scripts/soul_guardian.py --state-dir '{external}' check --actor system --note cron"
)
print("\nOptional: system cron / launchd (macOS) example (NOT installed automatically):")
label = f"com.clawdbot.soul-guardian.{args.agent_id}"
print(f"- Launchd label: {label}")
print(f"- WorkingDirectory (recommended): {WORKSPACE_ROOT}")
print("- ProgramArguments (example):")
print(" [\n"
f" '/usr/bin/python3',\n"
f" '{WORKSPACE_ROOT}/skills/soul-guardian/scripts/soul_guardian.py',\n"
f" '--state-dir', '{external}',\n"
f" 'check', '--actor', 'system', '--note', 'launchd'\n"
" ]")
print("\nNotes")
print("- The external state dir can contain approved snapshots, patches, and quarantined copies of drifted prompt/instruction files; keep permissions restrictive (e.g., chmod 700; go-rwx).")
if args.move:
print("- WARNING: --move deletes the old in-workspace state dir after copying.")
print("- Consider mirroring the external state dir via git or offsite backups (not enforced by this tool).")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
```
### scripts/soul_guardian.py
```python
#!/usr/bin/env python3
"""Workspace file integrity guard + audit (multi-file).
This is a hardened successor to the original SOUL.md-only guardian.
Key features:
- Multiple target files with per-file policy (restore | alert | ignore)
- Approved baselines stored per file (snapshot + sha256)
- Append-only audit log with hash chaining (tamper-evident)
- Optional auto-restore for restore-mode files (with quarantine copy)
- Refuses to operate on symlinks
- Atomic writes for baseline + restore operations (os.replace)
State directory (default, backward-compatible): memory/soul-guardian/
Subcommands:
- init Initialize policy + baselines (first run)
- status Print status JSON for all policy targets
- check Check for drift; restore for restore-mode by default
- approve Approve current contents as baseline (per file or all)
- restore Restore restore-mode files to last approved baseline
- verify-audit Validate audit log hash chain
Exit codes:
- 0: ok / no drift
- 2: drift detected (for check when any alert/restore drift happened)
- 1: error
"""
from __future__ import annotations
import argparse
import datetime as dt
import difflib
import fnmatch
import hashlib
import json
import os
from pathlib import Path
import shutil
import stat
import sys
from typing import Any, Iterable
WORKSPACE_ROOT = Path.cwd()
DEFAULT_STATE_DIR = WORKSPACE_ROOT / "memory" / "soul-guardian"
POLICY_FILE = "policy.json"
BASELINES_FILE = "baselines.json"
AUDIT_LOG_FILE = "audit.jsonl"
APPROVED_DIRNAME = "approved"
PATCH_DIRNAME = "patches"
QUAR_DIRNAME = "quarantine"
CHAIN_GENESIS = "0" * 64
def utc_now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()
def sha256_bytes(b: bytes) -> str:
h = hashlib.sha256()
h.update(b)
return h.hexdigest()
def sha256_text(s: str) -> str:
return sha256_bytes(s.encode("utf-8"))
def read_bytes(path: Path) -> bytes:
return path.read_bytes()
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8", errors="replace")
def is_symlink(path: Path) -> bool:
try:
st = os.lstat(path)
except FileNotFoundError:
return False
return stat.S_ISLNK(st.st_mode)
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def atomic_write_bytes(path: Path, data: bytes) -> None:
ensure_dir(path.parent)
tmp = path.with_name(path.name + ".tmp")
with tmp.open("wb") as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
def atomic_write_text(path: Path, text: str) -> None:
atomic_write_bytes(path, text.encode("utf-8"))
def unified_diff_text(old: str, new: str, fromfile: str, tofile: str) -> str:
old_lines = old.splitlines(keepends=True)
new_lines = new.splitlines(keepends=True)
diff = difflib.unified_diff(old_lines, new_lines, fromfile=fromfile, tofile=tofile)
return "".join(diff)
def safe_patch_tag(tag: str) -> str:
return ("".join(c for c in tag if c.isalnum() or c in ("-", "_"))[:40] or "patch")
def relpath_str(path: Path, root: Path) -> str:
# Normalize to a stable forward-slash relative string.
try:
rel = path.relative_to(root)
except Exception:
rel = Path(os.path.relpath(path, root))
return rel.as_posix()
class GuardianState:
def __init__(self, state_dir: Path):
self.state_dir = state_dir
self.policy_path = state_dir / POLICY_FILE
self.baselines_path = state_dir / BASELINES_FILE
self.audit_path = state_dir / AUDIT_LOG_FILE
self.approved_dir = state_dir / APPROVED_DIRNAME
self.patch_dir = state_dir / PATCH_DIRNAME
self.quarantine_dir = state_dir / QUAR_DIRNAME
def ensure_dirs(self) -> None:
ensure_dir(self.state_dir)
ensure_dir(self.approved_dir)
ensure_dir(self.patch_dir)
ensure_dir(self.quarantine_dir)
def default_policy() -> dict[str, Any]:
# Default protected set, per requirements.
return {
"version": 1,
"workspaceRoot": str(WORKSPACE_ROOT),
"targets": [
{"path": "SOUL.md", "mode": "restore"},
{"path": "AGENTS.md", "mode": "restore"},
{"path": "USER.md", "mode": "alert"},
{"path": "TOOLS.md", "mode": "alert"},
{"path": "IDENTITY.md", "mode": "alert"},
{"path": "HEARTBEAT.md", "mode": "alert"},
{"path": "MEMORY.md", "mode": "alert"},
# Ignore daily notes by default.
{"pattern": "memory/*.md", "mode": "ignore"},
],
}
def load_policy(state: GuardianState) -> dict[str, Any]:
if not state.policy_path.exists():
return default_policy()
return json.loads(state.policy_path.read_text(encoding="utf-8"))
def save_policy(state: GuardianState, policy: dict[str, Any]) -> None:
state.ensure_dirs()
atomic_write_text(state.policy_path, json.dumps(policy, ensure_ascii=False, indent=2) + "\n")
def load_baselines(state: GuardianState) -> dict[str, Any]:
"""Load baselines.json.
Backward-compat:
- If baselines.json doesn't exist but legacy SOUL.md baseline exists
(approved.sha256 + approved/SOUL.md), import it into the in-memory baselines.
The caller will persist it on the next save.
"""
if state.baselines_path.exists():
return json.loads(state.baselines_path.read_text(encoding="utf-8"))
baselines: dict[str, Any] = {"version": 1, "files": {}}
legacy_sha = state.state_dir / "approved.sha256"
legacy_snap = state.approved_dir / "SOUL.md"
if legacy_sha.exists() and legacy_snap.exists():
sha = legacy_sha.read_text(encoding="utf-8").strip()
if sha:
baselines["files"]["SOUL.md"] = {"sha256": sha, "approvedAt": "legacy"}
return baselines
def save_baselines(state: GuardianState, baselines: dict[str, Any]) -> None:
state.ensure_dirs()
atomic_write_text(state.baselines_path, json.dumps(baselines, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
def resolve_targets(policy: dict[str, Any], root: Path) -> list[dict[str, str]]:
"""Return list of effective targets to consider.
For entries with {path, mode}: direct file.
For entries with {pattern, mode}: expands via globbing relative to root.
Note: We keep it simple and only expand within workspace root.
"""
targets: list[dict[str, str]] = []
entries = policy.get("targets", [])
for ent in entries:
mode = ent.get("mode")
if mode not in ("restore", "alert", "ignore"):
continue
if "path" in ent:
p = Path(ent["path"])
targets.append({"path": p.as_posix(), "mode": mode})
continue
pat = ent.get("pattern")
if not pat:
continue
# Expand pattern relative to root.
# Using glob keeps it bounded to workspace.
for match in root.glob(pat):
if match.is_dir():
continue
rel = relpath_str(match, root)
targets.append({"path": rel, "mode": mode})
# De-dup by path keeping the last specified mode.
dedup: dict[str, str] = {}
for t in targets:
dedup[t["path"]] = t["mode"]
return [{"path": p, "mode": m} for p, m in sorted(dedup.items())]
def policy_mode_for_path(policy: dict[str, Any], rel_path: str) -> str | None:
# Direct match has priority; then patterns.
entries = policy.get("targets", [])
for ent in entries:
if ent.get("path") == rel_path:
return ent.get("mode")
for ent in entries:
pat = ent.get("pattern")
if not pat:
continue
if fnmatch.fnmatch(rel_path, pat):
return ent.get("mode")
return None
def approved_snapshot_path(state: GuardianState, rel_path: str) -> Path:
# Preserve relative structure under approved/.
return state.approved_dir / Path(rel_path)
def write_patch(state: GuardianState, patch_text: str, tag: str, rel_path: str) -> Path:
state.ensure_dirs()
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
path_tag = safe_patch_tag(tag)
file_tag = safe_patch_tag(rel_path.replace("/", "_"))
path = state.patch_dir / f"{ts}-{file_tag}-{path_tag}.patch"
atomic_write_text(path, patch_text)
return path
def _canonical_json(obj: Any) -> str:
# Stable serialization for hashing.
return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def _audit_needs_upgrade(state: GuardianState) -> bool:
"""Detect legacy audit logs that lack a chain field."""
if not state.audit_path.exists():
return False
try:
with state.audit_path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
rec = json.loads(line)
return "chain" not in rec
except Exception:
# If unreadable, force rotation so we can proceed safely.
return True
return False
def _rotate_legacy_audit(state: GuardianState) -> None:
if not state.audit_path.exists():
return
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
legacy = state.state_dir / f"audit.legacy.{ts}.jsonl"
os.replace(state.audit_path, legacy)
def _last_audit_hash(state: GuardianState) -> str:
if not state.audit_path.exists():
return CHAIN_GENESIS
# Read last non-empty line without loading huge files.
with state.audit_path.open("rb") as f:
f.seek(0, os.SEEK_END)
size = f.tell()
if size == 0:
return CHAIN_GENESIS
block = 65536
start = max(0, size - block)
f.seek(start)
data = f.read()
lines = [ln for ln in data.splitlines() if ln.strip()]
if not lines:
return CHAIN_GENESIS
last = lines[-1]
try:
rec = json.loads(last.decode("utf-8"))
return rec.get("chain", {}).get("hash") or CHAIN_GENESIS
except Exception:
return CHAIN_GENESIS
def append_audit(state: GuardianState, entry: dict[str, Any]) -> None:
"""Append an audit entry with hash chaining.
Each record includes: chain.prev, chain.hash
chain.hash = sha256(prev_hash + "\n" + canonical_json(entry_without_chain))
Backward-compat: if an existing audit.jsonl doesn't contain chain fields
(legacy v1 logs), rotate it aside and start a new chained log.
"""
state.ensure_dirs()
if _audit_needs_upgrade(state):
_rotate_legacy_audit(state)
prev = _last_audit_hash(state)
entry_wo_chain = dict(entry)
entry_wo_chain.pop("chain", None)
payload = prev + "\n" + _canonical_json(entry_wo_chain)
cur = sha256_text(payload)
record = dict(entry_wo_chain)
record["chain"] = {"prev": prev, "hash": cur}
with state.audit_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
def refuse_symlink(path: Path) -> None:
if is_symlink(path):
raise RuntimeError(f"Refusing to operate on symlink: {path}")
def compute_file_sha(path: Path) -> str:
return sha256_bytes(read_bytes(path))
def baseline_info_for(state: GuardianState, baselines: dict[str, Any], rel_path: str) -> dict[str, Any] | None:
return (baselines.get("files") or {}).get(rel_path)
def set_baseline_for(state: GuardianState, baselines: dict[str, Any], rel_path: str, sha: str) -> None:
baselines.setdefault("files", {})[rel_path] = {
"sha256": sha,
"approvedAt": utc_now_iso(),
}
def init_cmd(state: GuardianState, actor: str, note: str, *, force_policy: bool = False) -> None:
state.ensure_dirs()
if force_policy or not state.policy_path.exists():
save_policy(state, default_policy())
policy = load_policy(state)
baselines = load_baselines(state)
targets = resolve_targets(policy, WORKSPACE_ROOT)
initialized_any = False
for t in targets:
relp = t["path"]
mode = t["mode"]
if mode == "ignore":
continue
abs_path = WORKSPACE_ROOT / relp
if not abs_path.exists():
continue
refuse_symlink(abs_path)
# If already has baseline, do not overwrite.
if baseline_info_for(state, baselines, relp) is not None and approved_snapshot_path(state, relp).exists():
continue
sha = compute_file_sha(abs_path)
# Snapshot.
snap = approved_snapshot_path(state, relp)
ensure_dir(snap.parent)
atomic_write_bytes(snap, read_bytes(abs_path))
set_baseline_for(state, baselines, relp, sha)
initialized_any = True
append_audit(state, {
"ts": utc_now_iso(),
"event": "init",
"actor": actor,
"note": note,
"path": relp,
"mode": mode,
"approvedSha": sha,
"workspace": str(WORKSPACE_ROOT),
"stateDir": str(state.state_dir),
})
save_baselines(state, baselines)
if initialized_any:
print(f"Initialized baselines in {state.state_dir}")
else:
print("Already initialized (no new baselines created).")
def status_cmd(state: GuardianState) -> None:
state.ensure_dirs()
policy = load_policy(state)
baselines = load_baselines(state)
targets = resolve_targets(policy, WORKSPACE_ROOT)
out: dict[str, Any] = {
"workspace": str(WORKSPACE_ROOT),
"stateDir": str(state.state_dir),
"policyPath": str(state.policy_path),
"baselinesPath": str(state.baselines_path),
"auditLog": str(state.audit_path),
"files": [],
}
for t in targets:
relp = t["path"]
mode = t["mode"]
abs_path = WORKSPACE_ROOT / relp
baseline = baseline_info_for(state, baselines, relp)
approved_sha = baseline.get("sha256") if baseline else None
approved_snap = approved_snapshot_path(state, relp)
current_sha = None
if abs_path.exists() and not is_symlink(abs_path):
try:
current_sha = compute_file_sha(abs_path)
except Exception:
current_sha = None
ok = (mode == "ignore") or (approved_sha is not None and current_sha == approved_sha)
out["files"].append({
"path": relp,
"mode": mode,
"exists": abs_path.exists(),
"isSymlink": is_symlink(abs_path) if abs_path.exists() else False,
"approvedSha": approved_sha,
"currentSha": current_sha,
"approvedSnapshot": str(approved_snap) if approved_snap.exists() else None,
"ok": ok,
})
print(json.dumps(out, indent=2))
def detect_drift_for(state: GuardianState, baselines: dict[str, Any], relp: str) -> tuple[bool, dict[str, Any]]:
abs_path = WORKSPACE_ROOT / relp
if not abs_path.exists():
return True, {"error": f"Missing {relp}"}
refuse_symlink(abs_path)
baseline = baseline_info_for(state, baselines, relp)
if not baseline:
return True, {"error": f"Not initialized for {relp} (missing baseline). Run init/approve."}
approved_sha = baseline.get("sha256")
approved_snap = approved_snapshot_path(state, relp)
if not approved_snap.exists():
return True, {"error": f"Not initialized for {relp} (missing approved snapshot)."}
cur_bytes = read_bytes(abs_path)
cur_sha = sha256_bytes(cur_bytes)
if cur_sha == approved_sha:
return False, {"approvedSha": approved_sha, "currentSha": cur_sha}
old_text = read_text(approved_snap)
new_text = read_text(abs_path)
patch_text = unified_diff_text(old_text, new_text, f"approved/{relp}", relp)
patch_path = write_patch(state, patch_text, tag="drift", rel_path=relp)
return True, {
"approvedSha": approved_sha,
"currentSha": cur_sha,
"patchPath": str(patch_path),
}
def restore_one(state: GuardianState, relp: str, info: dict[str, Any]) -> dict[str, Any]:
"""Restore a single file to its approved snapshot.
Returns: extra fields to include in audit.
"""
abs_path = WORKSPACE_ROOT / relp
refuse_symlink(abs_path)
approved_snap = approved_snapshot_path(state, relp)
if not approved_snap.exists():
raise RuntimeError(f"Missing approved snapshot for {relp}")
state.ensure_dirs()
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
quarantine_path = state.quarantine_dir / f"{safe_patch_tag(relp.replace('/', '_'))}.{ts}.quarantine"
atomic_write_bytes(quarantine_path, read_bytes(abs_path))
# Atomic restore.
atomic_write_bytes(abs_path, read_bytes(approved_snap))
return {"quarantinePath": str(quarantine_path), **info}
def format_alert_human(drifted: list[dict[str, Any]]) -> str:
"""Format drift results as human-readable alert for TUI notification."""
lines = []
lines.append("")
lines.append("=" * 50)
lines.append("π¨ SOUL GUARDIAN SECURITY ALERT")
lines.append("=" * 50)
lines.append("")
for d in drifted:
path = d.get("path", "unknown")
mode = d.get("mode", "unknown")
restored = d.get("restored", False)
error = d.get("error")
if error:
lines.append(f"β οΈ ERROR: {path}")
lines.append(f" {error}")
else:
lines.append(f"π FILE: {path}")
lines.append(f" Mode: {mode}")
if restored:
lines.append(f" Status: β
RESTORED to approved baseline")
if d.get("quarantinePath"):
lines.append(f" Quarantined: {d.get('quarantinePath')}")
else:
lines.append(f" Status: β οΈ DRIFT DETECTED (not auto-restored)")
if d.get("approvedSha"):
lines.append(f" Expected hash: {d.get('approvedSha')[:16]}...")
if d.get("currentSha"):
lines.append(f" Found hash: {d.get('currentSha')[:16]}...")
if d.get("patchPath"):
lines.append(f" Diff saved: {d.get('patchPath')}")
lines.append("")
lines.append("=" * 50)
lines.append("Review changes and investigate the source of drift.")
lines.append("If intentional, run: soul_guardian.py approve --file <path>")
lines.append("=" * 50)
lines.append("")
return "\n".join(lines)
def check_cmd(state: GuardianState, actor: str, note: str, *, no_restore: bool = False, output_format: str = "json") -> int:
state.ensure_dirs()
policy = load_policy(state)
baselines = load_baselines(state)
targets = resolve_targets(policy, WORKSPACE_ROOT)
drifted: list[dict[str, Any]] = []
for t in targets:
relp = t["path"]
mode = t["mode"]
if mode == "ignore":
continue
drift, info = detect_drift_for(state, baselines, relp)
if not drift:
continue
if "error" in info:
append_audit(state, {
"ts": utc_now_iso(),
"event": "error",
"actor": actor,
"note": note,
"path": relp,
"mode": mode,
"error": info["error"],
})
drifted.append({"path": relp, "mode": mode, "error": info["error"]})
continue
# Drift detected.
append_audit(state, {
"ts": utc_now_iso(),
"event": "drift",
"actor": actor,
"note": note,
"path": relp,
"mode": mode,
**info,
})
rec: dict[str, Any] = {"path": relp, "mode": mode, **info}
# Auto-restore for restore-mode unless disabled.
if mode == "restore" and not no_restore:
restored = restore_one(state, relp, info)
append_audit(state, {
"ts": utc_now_iso(),
"event": "restore",
"actor": actor,
"note": note,
"path": relp,
"mode": mode,
**restored,
})
rec["restored"] = True
rec["quarantinePath"] = restored.get("quarantinePath")
else:
rec["restored"] = False
drifted.append(rec)
if not drifted:
# Silent on OK for alert format
if output_format != "alert":
pass # Could print "OK" here if desired
return 0
# Output based on format
if output_format == "alert":
# Human-readable alert suitable for direct relay to user
print(format_alert_human(drifted))
else:
# JSON format (default) - single-line summary suitable for cron parsing
summary = {
"event": "SOUL_GUARDIAN_DRIFT",
"count": len(drifted),
"files": [
{
"path": d["path"],
"mode": d.get("mode"),
"restored": d.get("restored"),
"patch": d.get("patchPath"),
"error": d.get("error"),
}
for d in drifted
],
}
print("SOUL_GUARDIAN_DRIFT " + json.dumps(summary, ensure_ascii=False))
# Drift occurred (even if restored).
return 2
def watch_cmd(state: GuardianState, actor: str, note: str, *, interval: int = 30) -> int:
"""Continuous watch mode - checks periodically and alerts on drift."""
import time
print(f"π» Soul Guardian watching... (interval: {interval}s)")
print("Press Ctrl+C to stop.\n")
try:
while True:
result = check_cmd(state, actor, note, output_format="alert")
if result == 0:
# No drift - stay silent
pass
# Sleep before next check
time.sleep(interval)
except KeyboardInterrupt:
print("\nπ» Soul Guardian stopped.")
return 0
def enable_monitoring_cmd(state: GuardianState) -> None:
"""Output instructions and HEARTBEAT.md snippet for easy monitoring setup."""
heartbeat_snippet = """
## Soul Guardian Check
- Run `cd {workspace} && python3 skills/soul-guardian/scripts/soul_guardian.py check --actor heartbeat --output-format alert`
- If any output is produced, relay it to the user immediately as a security alert
- This checks for unauthorized changes to SOUL.md, AGENTS.md, and other protected files
""".format(workspace=WORKSPACE_ROOT)
print("""
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SOUL GUARDIAN - ENABLE MONITORING β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
To enable automatic drift detection and alerting, you have two options:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
OPTION 1: Heartbeat Integration (Recommended)
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Add the following to your HEARTBEAT.md file:
""")
print(heartbeat_snippet)
print("""
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
OPTION 2: Watch Mode (Foreground)
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Run this in a terminal to continuously monitor:
python3 skills/soul-guardian/scripts/soul_guardian.py watch --interval 30
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
OPTION 3: Manual Check
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Run a one-time check with human-readable output:
python3 skills/soul-guardian/scripts/soul_guardian.py check --output-format alert
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The guardian will:
β Detect unauthorized changes to protected files
β Auto-restore SOUL.md and AGENTS.md to approved baselines
β Alert you immediately when drift is detected
β Save diffs and quarantine modified files for review
""")
print(f"State directory: {state.state_dir}")
print(f"Workspace: {WORKSPACE_ROOT}")
print()
def approve_cmd(state: GuardianState, actor: str, note: str, *, files: list[str] | None, all_files: bool = False) -> None:
state.ensure_dirs()
policy = load_policy(state)
baselines = load_baselines(state)
targets = resolve_targets(policy, WORKSPACE_ROOT)
selectable = [t for t in targets if t["mode"] != "ignore"]
if all_files:
chosen = selectable
elif files:
# Resolve to relative posix.
wanted = {Path(f).as_posix() for f in files}
chosen = [t for t in selectable if t["path"] in wanted]
missing = wanted - {t["path"] for t in chosen}
if missing:
raise RuntimeError(f"Unknown or ignored file(s): {', '.join(sorted(missing))}")
else:
# Backward-compat: if nothing specified, approve SOUL.md.
chosen = [t for t in selectable if t["path"] == "SOUL.md"]
if not chosen:
raise RuntimeError("No files selected to approve.")
for t in chosen:
relp = t["path"]
mode = t["mode"]
abs_path = WORKSPACE_ROOT / relp
if not abs_path.exists():
raise FileNotFoundError(f"Missing {relp}")
refuse_symlink(abs_path)
prev = baseline_info_for(state, baselines, relp)
prev_sha = prev.get("sha256") if prev else None
prev_text = read_text(approved_snapshot_path(state, relp)) if approved_snapshot_path(state, relp).exists() else ""
cur_bytes = read_bytes(abs_path)
cur_sha = sha256_bytes(cur_bytes)
cur_text = read_text(abs_path)
patch_text = unified_diff_text(prev_text, cur_text, f"approved/{relp}", relp)
patch_path = write_patch(state, patch_text, tag="approve", rel_path=relp)
snap = approved_snapshot_path(state, relp)
ensure_dir(snap.parent)
atomic_write_bytes(snap, cur_bytes)
set_baseline_for(state, baselines, relp, cur_sha)
append_audit(state, {
"ts": utc_now_iso(),
"event": "approve",
"actor": actor,
"note": note,
"path": relp,
"mode": mode,
"prevApprovedSha": prev_sha,
"approvedSha": cur_sha,
"patchPath": str(patch_path),
})
print(f"Approved {relp}: sha256={cur_sha} patch={patch_path}")
save_baselines(state, baselines)
def restore_cmd(state: GuardianState, actor: str, note: str, *, files: list[str] | None, all_files: bool = False) -> None:
state.ensure_dirs()
policy = load_policy(state)
baselines = load_baselines(state)
targets = resolve_targets(policy, WORKSPACE_ROOT)
restorable = [t for t in targets if t["mode"] == "restore"]
if all_files:
chosen = restorable
elif files:
wanted = {Path(f).as_posix() for f in files}
chosen = [t for t in restorable if t["path"] in wanted]
missing = wanted - {t["path"] for t in chosen}
if missing:
raise RuntimeError(f"Not restorable or unknown file(s): {', '.join(sorted(missing))}")
else:
# Backward-compat: default restore SOUL.md.
chosen = [t for t in restorable if t["path"] == "SOUL.md"]
if not chosen:
raise RuntimeError("No files selected to restore.")
restored_any = False
for t in chosen:
relp = t["path"]
mode = t["mode"]
drift, info = detect_drift_for(state, baselines, relp)
if "error" in info:
raise RuntimeError(info["error"])
if not drift:
print(f"No drift for {relp}; nothing to restore.")
continue
restored = restore_one(state, relp, info)
append_audit(state, {
"ts": utc_now_iso(),
"event": "restore",
"actor": actor,
"note": note,
"path": relp,
"mode": mode,
**restored,
})
print(
f"RESTORED {relp} approvedSha={info.get('approvedSha')} previousSha={info.get('currentSha')} "
f"quarantine={restored.get('quarantinePath')} patch={info.get('patchPath')}"
)
restored_any = True
if not restored_any:
print("No restores performed.")
def verify_audit_cmd(state: GuardianState) -> None:
state.ensure_dirs()
if not state.audit_path.exists():
print("No audit log present.")
return
if _audit_needs_upgrade(state):
raise RuntimeError(
"Audit log is legacy (missing hash chain). "
"Run any command that writes audit (e.g., check) to rotate legacy log, then re-run verify-audit."
)
prev = CHAIN_GENESIS
line_no = 0
with state.audit_path.open("r", encoding="utf-8") as f:
for line in f:
line_no += 1
line = line.strip()
if not line:
continue
rec = json.loads(line)
chain = rec.get("chain") or {}
got_prev = chain.get("prev")
got_hash = chain.get("hash")
if got_prev != prev:
raise RuntimeError(f"Audit chain broken at line {line_no}: prev mismatch (expected {prev}, got {got_prev})")
rec_wo_chain = dict(rec)
rec_wo_chain.pop("chain", None)
payload = prev + "\n" + _canonical_json(rec_wo_chain)
expect_hash = sha256_text(payload)
if got_hash != expect_hash:
raise RuntimeError(f"Audit chain broken at line {line_no}: hash mismatch")
prev = got_hash
print(f"OK: audit log hash chain verified ({line_no} lines)")
def parse_args(argv: list[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Soul Guardian - Workspace file integrity guard with alerting support.",
epilog="For easy setup, run: soul_guardian.py enable-monitoring"
)
p.add_argument(
"--state-dir",
default=str(DEFAULT_STATE_DIR),
help="State directory (default: memory/soul-guardian).",
)
sub = p.add_subparsers(dest="cmd", required=True)
def add_common(sp: argparse.ArgumentParser) -> None:
sp.add_argument("--actor", default="unknown", help="Who initiated the action (best-effort).")
sp.add_argument("--note", default="", help="Freeform note (e.g., request context).")
sp_init = sub.add_parser("init", help="Initialize policy + baselines.")
add_common(sp_init)
sp_init.add_argument("--force-policy", action="store_true", help="Overwrite policy.json with defaults.")
sub.add_parser("status", help="Print status JSON.")
sp_check = sub.add_parser("check", help="Check for drift; restore restore-mode by default.")
add_common(sp_check)
sp_check.add_argument("--no-restore", action="store_true", help="Never restore during check (alert-only run).")
sp_check.add_argument("--output-format", choices=["json", "alert"], default="json",
help="Output format: json (machine-readable) or alert (human-readable for TUI).")
sp_approve = sub.add_parser("approve", help="Approve current contents as baselines.")
add_common(sp_approve)
sp_approve.add_argument("--file", action="append", dest="files", help="Relative file path to approve (repeatable).")
sp_approve.add_argument("--all", action="store_true", help="Approve all non-ignored policy targets.")
sp_restore = sub.add_parser("restore", help="Restore restore-mode files to approved baselines.")
add_common(sp_restore)
sp_restore.add_argument("--file", action="append", dest="files", help="Relative file path to restore (repeatable).")
sp_restore.add_argument("--all", action="store_true", help="Restore all restore-mode targets.")
sub.add_parser("verify-audit", help="Verify audit log hash chain.")
# New commands for easier monitoring setup
sp_watch = sub.add_parser("watch", help="Continuous watch mode - monitors and alerts on drift.")
add_common(sp_watch)
sp_watch.add_argument("--interval", type=int, default=30, help="Check interval in seconds (default: 30).")
sub.add_parser("enable-monitoring", help="Show instructions for enabling automatic monitoring and alerts.")
return p.parse_args(argv)
def main(argv: list[str]) -> int:
args = parse_args(argv)
state = GuardianState(Path(args.state_dir).expanduser())
try:
if args.cmd == "init":
init_cmd(state, args.actor, args.note, force_policy=bool(getattr(args, "force_policy", False)))
return 0
if args.cmd == "status":
status_cmd(state)
return 0
if args.cmd == "check":
return check_cmd(
state, args.actor, args.note,
no_restore=bool(getattr(args, "no_restore", False)),
output_format=getattr(args, "output_format", "json")
)
if args.cmd == "approve":
approve_cmd(state, args.actor, args.note, files=getattr(args, "files", None), all_files=bool(getattr(args, "all", False)))
return 0
if args.cmd == "restore":
restore_cmd(state, args.actor, args.note, files=getattr(args, "files", None), all_files=bool(getattr(args, "all", False)))
return 0
if args.cmd == "verify-audit":
verify_audit_cmd(state)
return 0
if args.cmd == "watch":
return watch_cmd(state, args.actor, args.note, interval=getattr(args, "interval", 30))
if args.cmd == "enable-monitoring":
enable_monitoring_cmd(state)
return 0
raise RuntimeError(f"Unknown cmd: {args.cmd}")
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
```
### scripts/test_soul_guardian.py
```python
#!/usr/bin/env python3
"""Minimal tests for soul_guardian.py.
Run:
python3 skills/soul-guardian/scripts/test_soul_guardian.py
This is a lightweight integration test using a temp workspace.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
import subprocess
import tempfile
REPO_ROOT = Path(__file__).resolve().parents[3] # .../clawd
SCRIPT = REPO_ROOT / "skills" / "soul-guardian" / "scripts" / "soul_guardian.py"
def run(cmd: list[str], cwd: Path) -> subprocess.CompletedProcess:
return subprocess.run(cmd, cwd=str(cwd), text=True, capture_output=True)
def must_ok(cp: subprocess.CompletedProcess) -> None:
if cp.returncode != 0:
raise AssertionError(f"Expected rc=0, got {cp.returncode}\nSTDOUT:\n{cp.stdout}\nSTDERR:\n{cp.stderr}")
def must_rc(cp: subprocess.CompletedProcess, rc: int) -> None:
if cp.returncode != rc:
raise AssertionError(f"Expected rc={rc}, got {cp.returncode}\nSTDOUT:\n{cp.stdout}\nSTDERR:\n{cp.stderr}")
def main() -> int:
with tempfile.TemporaryDirectory() as td:
ws = Path(td)
state = ws / "state"
# Create a fake workspace with the default protected files.
(ws / "memory").mkdir(parents=True, exist_ok=True)
(ws / "SOUL.md").write_text("hello soul\n", encoding="utf-8")
(ws / "AGENTS.md").write_text("hello agents\n", encoding="utf-8")
(ws / "USER.md").write_text("user v1\n", encoding="utf-8")
(ws / "TOOLS.md").write_text("tools v1\n", encoding="utf-8")
(ws / "IDENTITY.md").write_text("id v1\n", encoding="utf-8")
(ws / "HEARTBEAT.md").write_text("hb v1\n", encoding="utf-8")
(ws / "MEMORY.md").write_text("mem v1\n", encoding="utf-8")
# Daily notes should be ignored by default.
(ws / "memory" / "2026-01-01.md").write_text("daily\n", encoding="utf-8")
# Init baselines.
cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "init", "--actor", "test"], cwd=ws)
must_ok(cp)
# No drift.
cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "check"], cwd=ws)
must_ok(cp)
# Drift restore-mode file: SOUL.md should be auto-restored by check.
(ws / "SOUL.md").write_text("MALICIOUS\n", encoding="utf-8")
cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "check", "--actor", "cron"], cwd=ws)
must_rc(cp, 2)
assert (ws / "SOUL.md").read_text(encoding="utf-8") == "hello soul\n"
# Drift alert-only file: USER.md should NOT be restored.
(ws / "USER.md").write_text("user v2\n", encoding="utf-8")
cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "check"], cwd=ws)
must_rc(cp, 2)
assert (ws / "USER.md").read_text(encoding="utf-8") == "user v2\n"
# Approve USER.md change.
cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "approve", "--file", "USER.md", "--actor", "test"], cwd=ws)
must_ok(cp)
# Now check should be clean.
cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "check"], cwd=ws)
must_ok(cp)
# Verify audit chain ok.
cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "verify-audit"], cwd=ws)
must_ok(cp)
# Tamper with audit log and ensure verify fails.
audit = state / "audit.jsonl"
lines = audit.read_text(encoding="utf-8").splitlines()
assert lines, "audit log empty"
rec = json.loads(lines[-1])
rec["note"] = "tampered"
lines[-1] = json.dumps(rec, ensure_ascii=False)
audit.write_text("\n".join(lines) + "\n", encoding="utf-8")
cp = run(["python3", str(SCRIPT), "--state-dir", str(state), "verify-audit"], cwd=ws)
if cp.returncode == 0:
raise AssertionError("Expected verify-audit to fail after tamper")
print("OK: soul-guardian minimal tests passed")
return 0
if __name__ == "__main__":
raise SystemExit(main())
```