Back to skills
SkillHub ClubShip Full StackFull Stack
openclaw-warden
Imported from https://github.com/openclaw/skills.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Stars
3,114
Hot score
99
Updated
March 20, 2026
Overall rating
C0.0
Composite score
0.0
Best-practice grade
F17.6
Install command
npx @skill-hub/cli install openclaw-skills-openclaw-warden
Repository
openclaw/skills
Skill path: skills/atlaspa/openclaw-warden
Imported from https://github.com/openclaw/skills.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install openclaw-warden into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding openclaw-warden to shared team environments
- Use openclaw-warden for development workflows
Works across
Claude CodeCodex CLIGemini CLIOpenCode
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: openclaw-warden
user-invocable: true
metadata: {"openclaw":{"emoji":"π‘οΈ","requires":{"bins":["python3"]},"os":["darwin","linux","win32"]}}
---
# OpenClaw Warden
Monitors your workspace files for unauthorized modifications and prompt injection attacks. Existing security tools scan *skills* before installation β this tool watches the *workspace itself* after installation, catching tampering that other tools miss.
## Why This Matters
Your agent reads SOUL.md, AGENTS.md, IDENTITY.md, USER.md, and memory files on every session startup and **trusts them implicitly**. A compromised skill, a malicious heartbeat payload, or an unauthorized process can modify these files to:
- Inject hidden instructions that alter agent behavior
- Embed data exfiltration URLs in markdown images
- Override identity and safety boundaries
- Plant persistent backdoors in memory files
This skill detects all of these.
## Commands
### Establish Baseline
Create or reset the integrity baseline. Run this after setting up your workspace or after reviewing and accepting all current file states.
```bash
python3 {baseDir}/scripts/integrity.py baseline --workspace /path/to/workspace
```
### Verify Integrity
Check all monitored files against the stored baseline. Reports modifications, deletions, and new untracked files.
```bash
python3 {baseDir}/scripts/integrity.py verify --workspace /path/to/workspace
```
### Scan for Injections
Scan workspace files for prompt injection patterns: hidden instructions, base64 payloads, Unicode tricks, markdown image exfiltration, HTML injection, and suspicious system prompt markers.
```bash
python3 {baseDir}/scripts/integrity.py scan --workspace /path/to/workspace
```
### Full Check (Verify + Scan)
Run both integrity verification and injection scanning in one pass.
```bash
python3 {baseDir}/scripts/integrity.py full --workspace /path/to/workspace
```
### Quick Status
One-line summary of workspace health.
```bash
python3 {baseDir}/scripts/integrity.py status --workspace /path/to/workspace
```
### Accept Changes
After reviewing a legitimate change, update the baseline for a specific file.
```bash
python3 {baseDir}/scripts/integrity.py accept SOUL.md --workspace /path/to/workspace
```
## Workspace Auto-Detection
If `--workspace` is omitted, the script tries:
1. `OPENCLAW_WORKSPACE` environment variable
2. Current directory (if AGENTS.md exists)
3. `~/.openclaw/workspace` (default)
## What Gets Monitored
| Category | Files | Alert Level on Change |
|----------|-------|-----------------------|
| **Critical** | SOUL.md, AGENTS.md, IDENTITY.md, USER.md, TOOLS.md, HEARTBEAT.md | WARNING |
| **Memory** | memory/*.md, MEMORY.md | INFO (expected to change) |
| **Config** | *.json in workspace root | WARNING |
| **Skills** | skills/*/SKILL.md | WARNING |
Injection patterns trigger **CRITICAL** alerts regardless of file category.
## Injection Patterns Detected
- **Instruction override:** "ignore previous instructions", "disregard above", "you are now", "new system prompt"
- **Base64 payloads:** Suspiciously long base64 strings outside code blocks
- **Unicode manipulation:** Zero-width characters, RTL overrides, homoglyphs
- **Markdown exfiltration:** Image tags with data-encoding URLs
- **HTML injection:** script tags, iframes, hidden elements
- **System prompt markers:** `<system>`, `[SYSTEM]`, `<<SYS>>` blocks
- **Shell injection:** `$(...)` outside code blocks
## Exit Codes
- `0` β Clean, no issues
- `1` β Modifications detected (review needed)
- `2` β Injection patterns detected (action needed)
## No External Dependencies
Python standard library only. No pip install. No network calls. Everything runs locally.
## Cross-Platform
Works with OpenClaw, Claude Code, Cursor, and any tool using the Agent Skills specification.
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### README.md
```markdown
# OpenClaw Warden
Free workspace integrity verification for [OpenClaw](https://github.com/openclaw/openclaw), [Claude Code](https://docs.anthropic.com/en/docs/claude-code), and any Agent Skills-compatible tool.
Detects unauthorized modifications to agent identity and memory files and scans for prompt injection patterns β the post-installation security layer that other tools miss.
## The Problem
AI agents read workspace files (`SOUL.md`, `AGENTS.md`, `IDENTITY.md`, memory files) on every session startup and **trust them implicitly**. Existing security tools scan *skills* before installation. Nothing monitors the *workspace itself* afterward.
A compromised skill, a malicious payload, or any process with file access can inject hidden instructions, embed exfiltration URLs, override safety boundaries, or plant persistent backdoors.
This skill detects all of these.
## Install
```bash
# Clone
git clone https://github.com/AtlasPA/openclaw-warden.git
# Copy to your workspace skills directory
cp -r openclaw-warden ~/.openclaw/workspace/skills/
```
## Usage
```bash
# Establish baseline
python3 scripts/integrity.py baseline
# Check for modifications + injections
python3 scripts/integrity.py full
# Quick health check
python3 scripts/integrity.py status
# Accept a legitimate change
python3 scripts/integrity.py accept SOUL.md
```
All commands accept `--workspace /path/to/workspace`. If omitted, auto-detects from `$OPENCLAW_WORKSPACE`, current directory, or `~/.openclaw/workspace`.
## What It Detects
### Integrity Violations
- Modified files (SHA-256 checksum comparison)
- Deleted files
- New untracked files
### Prompt Injection Patterns
- **Instruction override** β "ignore previous instructions", "you are now", "forget your instructions"
- **System prompt markers** β `<system>`, `[SYSTEM]`, `<<SYS>>`, `[INST]`
- **Markdown exfiltration** β Image tags with encoded data in URLs
- **Base64 payloads** β Large encoded blobs outside code blocks
- **Unicode manipulation** β Zero-width characters, RTL overrides, homoglyphs
- **HTML injection** β `<script>`, `<iframe>`, hidden elements
- **Shell injection** β `$(...)` subshell execution outside code blocks
### Smart Detection
- Respects markdown fenced code blocks (no false positives on documented examples)
- Skips its own files (which describe injection patterns)
- Distinguishes file categories: critical, memory, config, skills
## File Categories
| Category | Files | Alert Level |
|----------|-------|-------------|
| Critical | SOUL.md, AGENTS.md, IDENTITY.md, USER.md, TOOLS.md, HEARTBEAT.md | WARNING |
| Memory | memory/*.md, MEMORY.md | INFO |
| Config | *.json in workspace root | WARNING |
| Skills | skills/*/SKILL.md | WARNING |
Injection patterns always trigger **CRITICAL** regardless of category.
## Exit Codes
| Code | Meaning |
|------|---------|
| 0 | Clean |
| 1 | Modifications detected |
| 2 | Injection patterns detected |
|---------|------|-----|
| Baseline checksums | Yes | Yes |
| Integrity verification | Yes | Yes |
| Injection scanning | Yes | Yes |
| Snapshot restore | - | Yes |
| Git rollback | - | Yes |
| Skill quarantine | - | Yes |
| Automated protect | - | Yes |
| Session startup hook | - | Yes |
## Requirements
- Python 3.8+
- No external dependencies (stdlib only)
- Cross-platform: Windows, macOS, Linux
## License
MIT
```
### _meta.json
```json
{
"owner": "atlaspa",
"slug": "openclaw-warden",
"displayName": "Openclaw Warden",
"latest": {
"version": "1.0.3",
"publishedAt": 1770892012579,
"commit": "https://github.com/openclaw/skills/commit/8feb98d3b26493a21b1ca7b2aafdb933c4ea0e08"
},
"history": [
{
"version": "1.0.2",
"publishedAt": 1770313111579,
"commit": "https://github.com/clawdbot/skills/commit/9dfa4e134c7bd443dff57ca28c826976a33c4dbd"
}
]
}
```
### scripts/integrity.py
```python
#!/usr/bin/env python3
"""
OpenClaw Wardenβ Workspace Integrity Verification
Detects unauthorized modifications and prompt injection patterns
in agent workspace files. Includes countermeasures: snapshot restore,
skill quarantine, and git rollback.
Usage:
integrity.py baseline [--workspace PATH]
integrity.py verify [--workspace PATH]
integrity.py scan [--workspace PATH]
integrity.py full [--workspace PATH]
integrity.py status [--workspace PATH]
integrity.py accept FILE [--workspace PATH]
integrity.py restore FILE [--workspace PATH]
integrity.py quarantine SKILL [--workspace PATH]
integrity.py unquarantine SKILL [--workspace PATH]
integrity.py rollback FILE [--workspace PATH]
integrity.py protect [--workspace PATH]
"""
import hashlib
import io
import json
import os
import re
import sys
import difflib
from datetime import datetime, timezone
from pathlib import Path
# Ensure stdout can handle Unicode on Windows (cp1252 etc.)
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = io.TextIOWrapper(
sys.stdout.buffer, encoding="utf-8", errors="replace"
)
sys.stderr = io.TextIOWrapper(
sys.stderr.buffer, encoding="utf-8", errors="replace"
)
MANIFEST_VERSION = 1
INTEGRITY_DIR = ".integrity"
MANIFEST_FILE = "manifest.json"
SNAPSHOTS_DIR = "snapshots"
QUARANTINE_PREFIX = ".quarantined-"
# ---------------------------------------------------------------------------
# File categories and their default severity on unexpected change
# ---------------------------------------------------------------------------
CRITICAL_FILES = {
"SOUL.md", "AGENTS.md", "IDENTITY.md", "USER.md",
"TOOLS.md", "HEARTBEAT.md",
}
MEMORY_PATTERNS = ["memory/*.md", "MEMORY.md"]
CONFIG_PATTERNS = ["*.json"]
SKILL_PATTERNS = ["skills/*/SKILL.md"]
SEVERITY_WARNING = "WARNING"
SEVERITY_INFO = "INFO"
SEVERITY_CRITICAL = "CRITICAL"
# ---------------------------------------------------------------------------
# Injection detection patterns
# ---------------------------------------------------------------------------
INSTRUCTION_OVERRIDE_PATTERNS = [
r"(?i)ignore\s+(all\s+)?previous\s+instructions",
r"(?i)disregard\s+(all\s+)?(above|previous|prior)",
r"(?i)forget\s+(all\s+)?your\s+instructions",
r"(?i)you\s+are\s+now\s+(?!(?:ready|going|able))",
r"(?i)new\s+system\s+prompt",
r"(?i)override\s+(all\s+)?(previous|safety|existing)\s+(instructions|rules|guidelines)",
r"(?i)act\s+as\s+if\s+you\s+(have\s+)?no\s+(restrictions|rules|guidelines|limits)",
r"(?i)from\s+now\s+on[,\s]+you\s+(will|must|should)",
r"(?i)entering\s+(a\s+)?(new|special|developer|admin)\s+mode",
r"(?i)execute\s+the\s+following\s+(commands?|instructions?|code)\s*(:|without)",
]
SYSTEM_FULLMPT_MARKERS = [
r"<\s*system\s*>",
r"\[\s*SYSTEM\s*\]",
r"<<\s*SYS\s*>>",
r"<\s*\|im_start\|>\s*system",
r"\[INST\]",
r"###\s*(?:System|Assistant|Human)\s*:",
]
HTML_INJECTION_PATTERNS = [
r"<\s*script[\s>]",
r"<\s*iframe[\s>]",
r"<\s*object[\s>]",
r"<\s*embed[\s>]",
r"<\s*link\s[^>]*rel\s*=\s*[\"']?import",
r"style\s*=\s*[\"'][^\"']*display\s*:\s*none",
r"<\s*div[^>]*hidden",
r"<\s*img\s[^>]*onerror\s*=",
]
EXFIL_IMAGE_PATTERN = (
r"!\[[^\]]*\]\(\s*https?://[^)]*"
r"(?:[?&][a-zA-Z0-9_]+=(?:[A-Za-z0-9+/]{20,}={0,2}|[0-9a-fA-F]{20,}))"
)
BASE64_BLOB_PATTERN = r"(?<![A-Za-z0-9+/])[A-Za-z0-9+/]{60,}={0,2}(?![A-Za-z0-9+/])"
UNICODE_TRICKS = [
"\u200b", # zero-width space
"\u200c", # zero-width non-joiner
"\u200d", # zero-width joiner
"\u2060", # word joiner
"\u2062", # invisible times
"\u2063", # invisible separator
"\ufeff", # zero-width no-break space / BOM
"\u202a", # LTR embedding
"\u202b", # RTL embedding
"\u202c", # pop directional formatting
"\u202d", # LTR override
"\u202e", # RTL override
"\u2066", # LTR isolate
"\u2067", # RTL isolate
"\u2068", # first strong isolate
"\u2069", # pop directional isolate
"\u00ad", # soft hyphen (invisible in many renders)
]
SHELL_INJECTION_PATTERNS = [
r"\$\([^)]+\)", # $(command) subshell execution
]
# Inline backtick references (like `SOUL.md` or `python3 cmd`) are normal
# markdown formatting, not shell injection. Only flag multi-line backtick
# blocks that aren't fenced code blocks β but those are already handled
# by the code-block check. So we skip backtick patterns entirely to avoid
# false positives on standard markdown inline code.
# Security skills whose SKILL.md documents injection patterns they detect.
# These must be exempt from injection scanning to avoid false positives.
SECURITY_SCAN_EXEMPT = {
"openclaw-warden",
"openclaw-warden",
"openclaw-bastion",
"openclaw-bastion",
}
# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def resolve_workspace(args: list[str]) -> Path:
"""Determine workspace path from args, env, or defaults."""
ws = None
for i, arg in enumerate(args):
if arg == "--workspace" and i + 1 < len(args):
ws = args[i + 1]
break
if ws is None:
ws = os.environ.get("OPENCLAW_WORKSPACE")
if ws is None:
cwd = Path.cwd()
if (cwd / "AGENTS.md").exists():
ws = str(cwd)
if ws is None:
ws = str(Path.home() / ".openclaw" / "workspace")
p = Path(ws)
if not p.is_dir():
print(f"ERROR: Workspace not found: {p}", file=sys.stderr)
sys.exit(1)
return p
def manifest_path(workspace: Path) -> Path:
return workspace / INTEGRITY_DIR / MANIFEST_FILE
def load_manifest(workspace: Path) -> dict | None:
mp = manifest_path(workspace)
if not mp.exists():
return None
with open(mp, "r", encoding="utf-8") as f:
return json.load(f)
def save_manifest(workspace: Path, manifest: dict):
mp = manifest_path(workspace)
mp.parent.mkdir(parents=True, exist_ok=True)
with open(mp, "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2)
def classify_file(rel_path: str) -> tuple[str, str]:
"""Return (category, severity_on_change) for a relative path."""
name = Path(rel_path).name
if name in CRITICAL_FILES:
return "critical", SEVERITY_WARNING
if rel_path == "MEMORY.md" or rel_path.startswith("memory/"):
if rel_path.endswith(".md"):
return "memory", SEVERITY_INFO
if rel_path.startswith("skills/") and name == "SKILL.md":
return "skills", SEVERITY_WARNING
if not "/" in rel_path and rel_path.endswith(".json"):
return "config", SEVERITY_WARNING
return "other", SEVERITY_INFO
def collect_monitored_files(workspace: Path) -> dict[str, Path]:
"""Collect all files that should be monitored, returning {rel_path: abs_path}."""
files = {}
# Critical files in workspace root
for name in CRITICAL_FILES:
p = workspace / name
if p.is_file():
files[name] = p
# MEMORY.md
p = workspace / "MEMORY.md"
if p.is_file():
files["MEMORY.md"] = p
# memory/*.md
mem_dir = workspace / "memory"
if mem_dir.is_dir():
for f in mem_dir.iterdir():
if f.is_file() and f.suffix == ".md":
rel = f.relative_to(workspace).as_posix()
files[rel] = f
# *.json in root (but not in .integrity/)
for f in workspace.iterdir():
if f.is_file() and f.suffix == ".json":
files[f.name] = f
# skills/*/SKILL.md
skills_dir = workspace / "skills"
if skills_dir.is_dir():
for skill in skills_dir.iterdir():
if skill.is_dir():
sm = skill / "SKILL.md"
if sm.is_file():
rel = sm.relative_to(workspace).as_posix()
files[rel] = sm
return files
def read_file_text(path: Path) -> str | None:
"""Read file as text, returning None if binary."""
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()
except (UnicodeDecodeError, ValueError):
return None
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def snapshot_dir(workspace: Path) -> Path:
return workspace / INTEGRITY_DIR / SNAPSHOTS_DIR
def save_snapshot(workspace: Path, rel: str, abspath: Path):
"""Store a copy of a file for later restoration."""
dest = snapshot_dir(workspace) / rel
dest.parent.mkdir(parents=True, exist_ok=True)
import shutil
shutil.copy2(abspath, dest)
def get_snapshot_path(workspace: Path, rel: str) -> Path | None:
"""Get the snapshot path for a file, or None if no snapshot exists."""
p = snapshot_dir(workspace) / rel
return p if p.is_file() else None
def cmd_baseline(workspace: Path):
"""Establish or reset the integrity baseline."""
files = collect_monitored_files(workspace)
if not files:
print("No monitored files found in workspace.")
return
manifest = {
"version": MANIFEST_VERSION,
"created": now_iso(),
"updated": now_iso(),
"workspace": str(workspace),
"files": {},
}
snapshotted = 0
for rel, abspath in sorted(files.items()):
cat, _ = classify_file(rel)
stat = abspath.stat()
manifest["files"][rel] = {
"sha256": sha256_file(abspath),
"size": stat.st_size,
"modified": datetime.fromtimestamp(
stat.st_mtime, tz=timezone.utc
).isoformat(),
"category": cat,
}
# Snapshot critical and config files for restoration
if cat in ("critical", "config", "skills"):
save_snapshot(workspace, rel, abspath)
snapshotted += 1
save_manifest(workspace, manifest)
print(f"Baseline established: {len(manifest['files'])} files tracked, {snapshotted} snapshotted")
for rel in sorted(manifest["files"]):
cat = manifest["files"][rel]["category"]
has_snap = "S" if get_snapshot_path(workspace, rel) else " "
print(f" [{cat:8s}] [{has_snap}] {rel}")
def cmd_verify(workspace: Path) -> list[dict]:
"""Verify workspace files against baseline. Returns list of findings."""
manifest = load_manifest(workspace)
if manifest is None:
print("No baseline found. Run 'baseline' first.")
sys.exit(1)
findings = []
current_files = collect_monitored_files(workspace)
baseline_files = manifest.get("files", {})
# Check for modified and deleted files
for rel, info in sorted(baseline_files.items()):
if rel in current_files:
abspath = current_files[rel]
current_hash = sha256_file(abspath)
if current_hash != info["sha256"]:
cat, severity = classify_file(rel)
# Generate diff
diff_lines = []
old_text = None
new_text = read_file_text(abspath)
# We don't have the old content, but we can show the hash mismatch
finding = {
"type": "modified",
"file": rel,
"category": cat,
"severity": severity,
"old_hash": info["sha256"][:16] + "...",
"new_hash": current_hash[:16] + "...",
"old_size": info["size"],
"new_size": abspath.stat().st_size,
}
findings.append(finding)
else:
cat, severity = classify_file(rel)
findings.append({
"type": "deleted",
"file": rel,
"category": cat,
"severity": SEVERITY_WARNING if cat == "critical" else severity,
})
# Check for new untracked files
for rel in sorted(current_files):
if rel not in baseline_files:
cat, _ = classify_file(rel)
findings.append({
"type": "new",
"file": rel,
"category": cat,
"severity": SEVERITY_INFO,
})
return findings
def _is_inside_code_block(text: str, match_start: int) -> bool:
"""Check if a match position is inside a fenced code block."""
lines = text[:match_start].split("\n")
fence_count = 0
for line in lines:
stripped = line.strip()
if stripped.startswith("```") or stripped.startswith("~~~"):
fence_count += 1
return fence_count % 2 == 1
def scan_file_for_injections(path: Path, rel_path: str) -> list[dict]:
"""Scan a single file for prompt injection patterns."""
text = read_file_text(path)
if text is None:
return []
findings = []
def add_finding(pattern_type: str, detail: str, line_num: int, severity: str = SEVERITY_CRITICAL):
findings.append({
"type": "injection",
"file": rel_path,
"pattern_type": pattern_type,
"detail": detail,
"line": line_num,
"severity": severity,
})
def line_number_at(pos: int) -> int:
return text[:pos].count("\n") + 1
# Instruction override patterns
for pattern in INSTRUCTION_OVERRIDE_PATTERNS:
for m in re.finditer(pattern, text):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"instruction_override",
f"Instruction override pattern: '{m.group()[:80]}'",
ln,
)
# System prompt markers
for pattern in SYSTEM_FULLMPT_MARKERS:
for m in re.finditer(pattern, text):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"systemmpt_marker",
f"System prompt marker: '{m.group()[:60]}'",
ln,
)
# HTML injection
for pattern in HTML_INJECTION_PATTERNS:
for m in re.finditer(pattern, text, re.IGNORECASE):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"html_injection",
f"HTML injection: '{m.group()[:60]}'",
ln,
)
# Markdown image exfiltration
for m in re.finditer(EXFIL_IMAGE_PATTERN, text):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"exfil_image",
f"Suspicious image URL with encoded data: '{m.group()[:80]}'",
ln,
)
# Base64 blobs outside code blocks
for m in re.finditer(BASE64_BLOB_PATTERN, text):
if not _is_inside_code_block(text, m.start()):
blob = m.group()
# Skip if it looks like a normal hash (exactly 64 hex chars = sha256)
if len(blob) <= 64 and re.fullmatch(r"[0-9a-fA-F]+", blob):
continue
# Skip short blobs that might be normal encoded values
if len(blob) < 80:
continue
ln = line_number_at(m.start())
add_finding(
"base64_payload",
f"Large base64 blob ({len(blob)} chars): '{blob[:40]}...'",
ln,
SEVERITY_WARNING,
)
# Unicode tricks
for char in UNICODE_TRICKS:
idx = text.find(char)
while idx != -1:
ln = line_number_at(idx)
add_finding(
"unicode_trick",
f"Hidden Unicode character U+{ord(char):04X} ({repr(char)})",
ln,
SEVERITY_WARNING,
)
idx = text.find(char, idx + 1)
# Shell injection outside code blocks
for pattern in SHELL_INJECTION_PATTERNS:
for m in re.finditer(pattern, text):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"shell_injection",
f"Possible shell injection: '{m.group()[:60]}'",
ln,
SEVERITY_WARNING,
)
return findings
def cmd_scan(workspace: Path) -> list[dict]:
"""Scan all workspace files for injection patterns."""
files = collect_monitored_files(workspace)
all_findings = []
for rel, abspath in sorted(files.items()):
# Skip security skills that document injection patterns β
# their SKILL.md will always trigger false positives
if rel.startswith("skills/"):
parts = rel.split("/")
if len(parts) >= 2 and parts[1] in SECURITY_SCAN_EXEMPT:
continue
# Skip quarantined skills β already neutralized
if QUARANTINE_PREFIX in rel:
continue
findings = scan_file_for_injections(abspath, rel)
all_findings.extend(findings)
return all_findings
def cmd_full(workspace: Path) -> tuple[list[dict], list[dict]]:
"""Run both verify and scan."""
verify_findings = cmd_verify(workspace)
scan_findings = cmd_scan(workspace)
return verify_findings, scan_findings
def cmd_status(workspace: Path):
"""Quick one-line workspace health summary."""
manifest = load_manifest(workspace)
if manifest is None:
print("STATUS: NO BASELINE β Run 'baseline' to initialize")
return
files = collect_monitored_files(workspace)
baseline_files = manifest.get("files", {})
modified = 0
deleted = 0
new = 0
for rel, info in baseline_files.items():
if rel in files:
if sha256_file(files[rel]) != info["sha256"]:
modified += 1
else:
deleted += 1
for rel in files:
if rel not in baseline_files:
new += 1
total = len(baseline_files)
updated = manifest.get("updated", "unknown")
if modified == 0 and deleted == 0 and new == 0:
print(f"STATUS: CLEAN β {total} files tracked, baseline from {updated}")
else:
parts = []
if modified:
parts.append(f"{modified} modified")
if deleted:
parts.append(f"{deleted} deleted")
if new:
parts.append(f"{new} new")
print(f"STATUS: CHANGED β {', '.join(parts)} ({total} tracked, baseline from {updated})")
def cmd_accept(workspace: Path, filepath: str):
"""Accept a changed file into the baseline."""
manifest = load_manifest(workspace)
if manifest is None:
print("No baseline found. Run 'baseline' first.")
sys.exit(1)
# Normalize the path
rel = filepath.replace("\\", "/")
abspath = workspace / rel
if not abspath.is_file():
print(f"File not found: {rel}")
sys.exit(1)
cat, _ = classify_file(rel)
stat = abspath.stat()
manifest["files"][rel] = {
"sha256": sha256_file(abspath),
"size": stat.st_size,
"modified": datetime.fromtimestamp(
stat.st_mtime, tz=timezone.utc
).isoformat(),
"category": cat,
}
manifest["updated"] = now_iso()
save_manifest(workspace, manifest)
print(f"Accepted: {rel} (category: {cat})")
# ---------------------------------------------------------------------------
# Countermeasures
# ---------------------------------------------------------------------------
def cmd_restore(workspace: Path, filepath: str):
"""Restore a file from its baseline snapshot."""
rel = filepath.replace("\\", "/")
snap = get_snapshot_path(workspace, rel)
if snap is None:
print(f"No snapshot found for: {rel}")
print("Only critical, config, and skill files are snapshotted.")
sys.exit(1)
dest = workspace / rel
import shutil
shutil.copy2(snap, dest)
print(f"Restored: {rel} (from baseline snapshot)")
# Verify the restore matched the baseline hash
manifest = load_manifest(workspace)
if manifest and rel in manifest.get("files", {}):
expected = manifest["files"][rel]["sha256"]
actual = sha256_file(dest)
if expected == actual:
print(f" Verified: hash matches baseline")
else:
print(f" WARNING: restored file hash does not match baseline")
print(f" Expected: {expected[:16]}...")
print(f" Got: {actual[:16]}...")
def cmd_rollback(workspace: Path, filepath: str):
"""Rollback a file to its last git-committed state."""
import subprocess
rel = filepath.replace("\\", "/")
abspath = workspace / rel
# Check if workspace is a git repo
git_dir = workspace / ".git"
if not git_dir.exists():
print(f"Workspace is not a git repository. Cannot rollback.")
print(f"Use 'restore' to restore from snapshot instead.")
sys.exit(1)
# Check if file is tracked by git
result = subprocess.run(
["git", "ls-files", rel],
cwd=str(workspace),
capture_output=True, text=True,
)
if not result.stdout.strip():
print(f"File is not tracked by git: {rel}")
sys.exit(1)
# Checkout from HEAD
result = subprocess.run(
["git", "checkout", "HEAD", "--", rel],
cwd=str(workspace),
capture_output=True, text=True,
)
if result.returncode != 0:
print(f"Git rollback failed: {result.stderr.strip()}")
sys.exit(1)
print(f"Rolled back: {rel} (to last git commit)")
print(f" Hash: {sha256_file(abspath)[:16]}...")
def cmd_quarantine(workspace: Path, skill_name: str):
"""Quarantine a skill by renaming its directory so OpenClaw won't load it."""
skills_dir = workspace / "skills"
skill_dir = skills_dir / skill_name
if not skill_dir.is_dir():
# Check if already quarantined
quarantined = skills_dir / (QUARANTINE_PREFIX + skill_name)
if quarantined.is_dir():
print(f"Skill '{skill_name}' is already quarantined.")
return
print(f"Skill not found: {skill_name}")
print(f"Available skills:")
if skills_dir.is_dir():
for d in sorted(skills_dir.iterdir()):
if d.is_dir():
prefix = "[Q] " if d.name.startswith(QUARANTINE_PREFIX) else " "
name = d.name.removeprefix(QUARANTINE_PREFIX) if d.name.startswith(QUARANTINE_PREFIX) else d.name
print(f" {prefix}{name}")
sys.exit(1)
quarantined = skills_dir / (QUARANTINE_PREFIX + skill_name)
skill_dir.rename(quarantined)
print(f"Quarantined: {skill_name}")
print(f" Moved: skills/{skill_name}/ -> skills/{QUARANTINE_PREFIX}{skill_name}/")
print(f" OpenClaw will not load this skill on next session.")
print(f" To restore: run 'unquarantine {skill_name}'")
def cmd_unquarantine(workspace: Path, skill_name: str):
"""Restore a quarantined skill."""
skills_dir = workspace / "skills"
quarantined = skills_dir / (QUARANTINE_PREFIX + skill_name)
if not quarantined.is_dir():
print(f"No quarantined skill found: {skill_name}")
sys.exit(1)
restored = skills_dir / skill_name
if restored.is_dir():
print(f"Cannot unquarantine: skills/{skill_name}/ already exists")
sys.exit(1)
quarantined.rename(restored)
print(f"Unquarantined: {skill_name}")
print(f" Moved: skills/{QUARANTINE_PREFIX}{skill_name}/ -> skills/{skill_name}/")
print(f" WARNING: Re-scan this skill before use.")
def cmdtect(workspace: Path):
"""Full scan + automatic countermeasures for critical threats.
Actions taken:
1. Run full integrity verification + injection scan
2. For critical injection findings: restore from snapshot if available
3. For modified critical files with injections: restore from snapshot
4. For skills containing injections: quarantine the skill
5. Report all actions taken
"""
print("=" * 60)
print("WORKSPACE FULLTECTION SWEEP")
print("=" * 60)
print(f"Timestamp: {now_iso()}")
print()
verify_findings, scan_findings = cmd_full(workspace)
actions_taken = []
if not verify_findings and not scan_findings:
print("No threats detected. Workspace is clean.")
print("=" * 60)
return
# Identify files with injection findings
injected_files = set()
for f in scan_findings:
if f.get("severity") == SEVERITY_CRITICAL:
injected_files.add(f["file"])
# Identify modified critical files
modified_critical = set()
for f in verify_findings:
if f["type"] == "modified" and f["category"] == "critical":
modified_critical.add(f["file"])
print()
print("-" * 40)
print("COUNTERMEASURES")
print("-" * 40)
# Restore critical files that were modified and have injections
files_to_restore = (modified_critical & injected_files) | modified_critical
for rel in sorted(files_to_restore):
snap = get_snapshot_path(workspace, rel)
if snap:
import shutil
dest = workspace / rel
shutil.copy2(snap, dest)
actions_taken.append(f"RESTORED: {rel} (from baseline snapshot)")
print(f" [RESTORE] {rel} <- baseline snapshot")
else:
# Try git rollback
git_dir = workspace / ".git"
if git_dir.exists():
import subprocess
result = subprocess.run(
["git", "checkout", "HEAD", "--", rel],
cwd=str(workspace),
capture_output=True, text=True,
)
if result.returncode == 0:
actions_taken.append(f"ROLLED BACK: {rel} (from git)")
print(f" [ROLLBACK] {rel} <- git HEAD")
else:
actions_taken.append(f"FAILED TO RESTORE: {rel}")
print(f" [FAILED] {rel} β no snapshot or git history")
else:
actions_taken.append(f"UNRESOLVED: {rel} β no snapshot available")
print(f" [MANUAL] {rel} β no snapshot, review manually")
# Quarantine skills with critical injection findings
quarantined_skills = set()
for rel in injected_files:
if rel.startswith("skills/") and "/SKILL.md" in rel:
# Extract skill name: skills/<name>/SKILL.md
parts = rel.split("/")
if len(parts) >= 2:
skill_name = parts[1]
if skill_name.startswith(QUARANTINE_PREFIX):
continue
if skill_name in SECURITY_SCAN_EXEMPT:
continue
if skill_name not in quarantined_skills:
skill_dir = workspace / "skills" / skill_name
quarantined_dir = workspace / "skills" / (QUARANTINE_PREFIX + skill_name)
if skill_dir.is_dir():
skill_dir.rename(quarantined_dir)
quarantined_skills.add(skill_name)
actions_taken.append(f"QUARANTINED: skill '{skill_name}'")
print(f" [QUARANTINE] skills/{skill_name}/")
# Handle injected memory/other files (can't auto-restore, flag for review)
for rel in sorted(injected_files):
if rel.startswith("skills/"):
continue # Handled above
if rel in files_to_restore:
continue # Already restored
actions_taken.append(f"FLAGGED: {rel} β contains injections, manual review needed")
print(f" [FLAG] {rel} β injection detected, review manually")
print()
if actions_taken:
print(f"ACTIONS TAKEN: {len(actions_taken)}")
for a in actions_taken:
print(f" - {a}")
else:
print("No automatic actions taken. Review findings above.")
print()
print("NEXT STEPS:")
if files_to_restore:
print(" - Restored files should be re-verified: run 'verify'")
if quarantined_skills:
print(" - Quarantined skills will not load on next session")
print(" - Investigate quarantined skills before unquarantining")
print(" - Run 'baseline' to update baseline after review")
print("=" * 60)
# ---------------------------------------------------------------------------
# Report formatting
# ---------------------------------------------------------------------------
def format_findings(verify_findings: list[dict], scan_findings: list[dict]) -> str:
"""Format findings into a structured text report."""
lines = []
lines.append("=" * 60)
lines.append("WORKSPACE INTEGRITY REPORT")
lines.append("=" * 60)
lines.append(f"Timestamp: {now_iso()}")
lines.append("")
total_issues = len(verify_findings) + len(scan_findings)
criticals = sum(1 for f in scan_findings if f.get("severity") == SEVERITY_CRITICAL)
warnings = sum(
1 for f in verify_findings + scan_findings
if f.get("severity") == SEVERITY_WARNING
)
infos = sum(
1 for f in verify_findings + scan_findings
if f.get("severity") == SEVERITY_INFO
)
if total_issues == 0:
lines.append("RESULT: CLEAN")
lines.append("No integrity violations or injection patterns detected.")
lines.append("=" * 60)
return "\n".join(lines)
lines.append(f"RESULT: {total_issues} ISSUE(S) FOUND")
if criticals:
lines.append(f" CRITICAL: {criticals}")
if warnings:
lines.append(f" WARNING: {warnings}")
if infos:
lines.append(f" INFO: {infos}")
lines.append("")
# Integrity findings
modified = [f for f in verify_findings if f["type"] == "modified"]
deleted = [f for f in verify_findings if f["type"] == "deleted"]
new = [f for f in verify_findings if f["type"] == "new"]
if modified:
lines.append("-" * 40)
lines.append("MODIFIED FILES")
lines.append("-" * 40)
for f in modified:
lines.append(f" [{f['severity']:8s}] {f['file']} ({f['category']})")
lines.append(f" Hash: {f['old_hash']} -> {f['new_hash']}")
lines.append(f" Size: {f['old_size']} -> {f['new_size']} bytes")
lines.append("")
if deleted:
lines.append("-" * 40)
lines.append("DELETED FILES")
lines.append("-" * 40)
for f in deleted:
lines.append(f" [{f['severity']:8s}] {f['file']} ({f['category']})")
lines.append("")
if new:
lines.append("-" * 40)
lines.append("NEW UNTRACKED FILES")
lines.append("-" * 40)
for f in new:
lines.append(f" [{f['severity']:8s}] {f['file']} ({f['category']})")
lines.append("")
# Injection findings
if scan_findings:
lines.append("-" * 40)
lines.append("INJECTION SCAN RESULTS")
lines.append("-" * 40)
by_file: dict[str, list[dict]] = {}
for f in scan_findings:
by_file.setdefault(f["file"], []).append(f)
for fname, findings in sorted(by_file.items()):
lines.append(f" {fname}:")
for f in findings:
lines.append(
f" [{f['severity']:8s}] Line {f['line']}: "
f"{f['pattern_type']} β {f['detail']}"
)
lines.append("")
lines.append("=" * 60)
lines.append("RECOMMENDED ACTIONS:")
if criticals:
lines.append(" 1. CRITICAL issues require immediate investigation.")
lines.append(" Review flagged files for unauthorized modifications.")
lines.append(" Do NOT load these files into the agent until verified.")
if modified:
lines.append(" - Review modified files. If changes are legitimate,")
lines.append(" run 'accept <file>' to update the baseline.")
if new:
lines.append(" - Review new files. Run 'baseline' to include them,")
lines.append(" or investigate why they appeared.")
lines.append("=" * 60)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
args = sys.argv[1:]
if not args or args[0] in ("-h", "--help", "help"):
print(__doc__.strip())
sys.exit(0)
command = args[0]
rest = args[1:]
if command == "baseline":
workspace = resolve_workspace(rest)
cmd_baseline(workspace)
elif command == "verify":
workspace = resolve_workspace(rest)
findings = cmd_verify(workspace)
report = format_findings(findings, [])
print(report)
if any(f["severity"] == SEVERITY_CRITICAL for f in findings):
sys.exit(2)
elif findings:
sys.exit(1)
elif command == "scan":
workspace = resolve_workspace(rest)
findings = cmd_scan(workspace)
report = format_findings([], findings)
print(report)
if any(f["severity"] == SEVERITY_CRITICAL for f in findings):
sys.exit(2)
elif findings:
sys.exit(1)
elif command == "full":
workspace = resolve_workspace(rest)
verify_findings, scan_findings = cmd_full(workspace)
report = format_findings(verify_findings, scan_findings)
print(report)
all_findings = verify_findings + scan_findings
if any(f["severity"] == SEVERITY_CRITICAL for f in all_findings):
sys.exit(2)
elif all_findings:
sys.exit(1)
elif command == "status":
workspace = resolve_workspace(rest)
cmd_status(workspace)
elif command == "accept":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py accept <file> [--workspace PATH]")
sys.exit(1)
filepath = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_accept(workspace, filepath)
elif command == "restore":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py restore <file> [--workspace PATH]")
sys.exit(1)
filepath = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_restore(workspace, filepath)
elif command == "rollback":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py rollback <file> [--workspace PATH]")
sys.exit(1)
filepath = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_rollback(workspace, filepath)
elif command == "quarantine":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py quarantine <skill-name> [--workspace PATH]")
sys.exit(1)
skill_name = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_quarantine(workspace, skill_name)
elif command == "unquarantine":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py unquarantine <skill-name> [--workspace PATH]")
sys.exit(1)
skill_name = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_unquarantine(workspace, skill_name)
elif command == "protect":
workspace = resolve_workspace(rest)
cmdtect(workspace)
else:
print(f"Unknown command: {command}")
print("Commands: baseline, verify, scan, full, status, accept,")
print(" restore, rollback, quarantine, unquarantine, protect")
sys.exit(1)
if __name__ == "__main__":
main()
```