cyber-security-engineer
Security engineering workflow for OpenClaw privilege governance and hardening. Use for least-privilege execution, approval-first privileged actions, idle timeout controls, port + egress monitoring, and ISO 27001/NIST-aligned compliance reporting with mitigations.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-cyber-security-engineer
Repository
Skill path: skills/fletcherfrimpong/cyber-security-engineer
Security engineering workflow for OpenClaw privilege governance and hardening. Use for least-privilege execution, approval-first privileged actions, idle timeout controls, port + egress monitoring, and ISO 27001/NIST-aligned compliance reporting with mitigations.
Open repositoryBest for
Primary workflow: Run DevOps.
Technical facets: Full Stack, Security.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install cyber-security-engineer into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding cyber-security-engineer to shared team environments
- Use cyber-security-engineer for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: cyber-security-engineer
description: Security engineering workflow for OpenClaw privilege governance and hardening. Use for least-privilege execution, approval-first privileged actions, idle timeout controls, port + egress monitoring, and ISO 27001/NIST-aligned compliance reporting with mitigations.
---
# Cyber Security Engineer
## Requirements
**Env vars (optional, but documented):**
- `OPENCLAW_REQUIRE_POLICY_FILES`
- `OPENCLAW_REQUIRE_SESSION_ID`
- `OPENCLAW_TASK_SESSION_ID`
- `OPENCLAW_APPROVAL_TOKEN`
- `OPENCLAW_UNTRUSTED_SOURCE`
- `OPENCLAW_VIOLATION_NOTIFY_CMD`
- `OPENCLAW_VIOLATION_NOTIFY_ALLOWLIST`
**Tools:** `python3` and one of `lsof`, `ss`, or `netstat` for port/egress checks.
**Policy files (admin reviewed):**
- `~/.openclaw/security/approved_ports.json`
- `~/.openclaw/security/command-policy.json`
- `~/.openclaw/security/egress_allowlist.json`
- `~/.openclaw/security/prompt-policy.json`
Implement these controls in every security-sensitive task:
1. Keep default execution in normal (non-root) mode.
2. Request explicit user approval before any elevated command.
3. Scope elevation to the minimum command set required for the active task.
4. Drop elevated state immediately after the privileged command completes.
5. Expire elevated state after 30 idle minutes and require re-approval.
6. Monitor listening network ports and flag insecure or unapproved exposure.
7. Monitor outbound connections and flag destinations not in the egress allowlist.
8. If no approved baseline exists, generate one with `python3 scripts/generate_approved_ports.py`, then review and prune.
9. Benchmark controls against ISO 27001 and NIST and report violations with mitigations.
## Non-Goals (Web Browsing)
- Do not use web browsing / web search as part of this skill. Keep assessments and recommendations based on local host/OpenClaw state and the bundled references in this skill.
## Files To Use
- `references/least-privilege-policy.md`
- `references/port-monitoring-policy.md`
- `references/compliance-controls-map.json`
- `references/approved_ports.template.json`
- `references/command-policy.template.json`
- `references/prompt-policy.template.json`
- `references/egress-allowlist.template.json`
- `scripts/preflight_check.py`
- `scripts/root_session_guard.py`
- `scripts/audit_logger.py`
- `scripts/command_policy.py`
- `scripts/prompt_policy.py`
- `scripts/guarded_privileged_exec.py`
- `scripts/install-openclaw-runtime-hook.sh`
- `scripts/port_monitor.py`
- `scripts/generate_approved_ports.py`
- `scripts/egress_monitor.py`
- `scripts/notify_on_violation.py`
- `scripts/compliance_dashboard.py`
- `scripts/live_assessment.py`
## Behavior
- Never keep root/elevated access open between unrelated tasks.
- Never execute root commands without an explicit approval step in the current flow.
- Enforce command allow/deny policy when configured.
- Require confirmation when untrusted content sources are detected (`OPENCLAW_UNTRUSTED_SOURCE=1` + prompt policy).
- Enforce task session id scoping when configured (`OPENCLAW_REQUIRE_SESSION_ID=1`).
- If timeout is exceeded, force session expiration and approval renewal.
- Log privileged actions to `~/.openclaw/security/privileged-audit.jsonl` (best-effort).
- Flag listening ports not present in the approved baseline and recommend secure alternatives for insecure ports.
- Flag outbound destinations not present in the egress allowlist.
## Output Contract
When reporting status, include:
- The specific `check_id`(s) affected, `status`, `risk`, and concise evidence.
- Concrete mitigations (what to change, where) and any owners/due dates if present.
- For network findings: port, bind address, process/service, and why it is flagged (unapproved/insecure/public).
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### scripts/generate_approved_ports.py
```python
#!/usr/bin/env python3
import argparse
import json
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Tuple
DEFAULT_OUT = Path.home() / ".openclaw" / "security" / "approved_ports.json"
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Generate an approved listening-port baseline from current services."
)
p.add_argument(
"--output",
default=str(DEFAULT_OUT),
help="Where to write the approved ports JSON baseline",
)
p.add_argument(
"--skill-dir",
default=str(Path(__file__).resolve().parent.parent),
help="Path to cyber-security-engineer skill directory (for calling port_monitor.py)",
)
p.add_argument(
"--include-command",
action="store_true",
default=True,
help="Include the command name in each rule (recommended)",
)
return p.parse_args()
def run_port_monitor(skill_dir: Path) -> Dict[str, object]:
port_monitor = skill_dir / "scripts" / "port_monitor.py"
if not port_monitor.exists():
raise FileNotFoundError(f"port_monitor.py not found at {port_monitor}")
# Use an empty-but-valid approved file so we always get a full inventory.
# (port_monitor expects JSON; passing /dev/null would cause JSON parse errors.)
empty_approved = Path("/tmp/openclaw-approved-ports-empty.json")
try:
empty_approved.write_text("[]\n", encoding="utf-8")
except Exception:
# Best-effort fallback: if we can't write /tmp, omit approved-file entirely.
empty_approved = None
cmd = [
sys.executable,
str(port_monitor),
"--json",
]
if empty_approved is not None:
cmd.extend(["--approved-file", str(empty_approved)])
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "port_monitor failed")
return json.loads(proc.stdout)
def normalize_listeners(listening: List[Dict[str, object]], include_command: bool) -> Tuple[List[Dict[str, object]], int]:
rules: List[Dict[str, object]] = []
seen = set()
for entry in listening:
try:
port = int(entry.get("port"))
except Exception:
continue
proto = str(entry.get("protocol") or "tcp").lower()
cmd = str(entry.get("command") or "").strip()
rule: Dict[str, object] = {"port": port, "protocol": proto}
if include_command and cmd:
rule["command"] = cmd
# Note: keep extra fields for human review; port_monitor ignores unknown keys.
if entry.get("host"):
rule["host"] = entry.get("host")
if entry.get("user"):
rule["user"] = entry.get("user")
if entry.get("pid") not in (None, "", "unknown"):
rule["pid"] = entry.get("pid")
dedupe = (rule.get("port"), rule.get("protocol"), rule.get("command"))
if dedupe in seen:
continue
seen.add(dedupe)
rules.append(rule)
rules.sort(key=lambda r: (int(r.get("port", 0)), str(r.get("protocol", "")), str(r.get("command", ""))))
return rules, len(seen)
def main() -> int:
args = parse_args()
out = Path(args.output).expanduser()
skill_dir = Path(args.skill_dir).expanduser()
report = run_port_monitor(skill_dir)
listening = report.get("listening_services")
if not isinstance(listening, list):
raise SystemExit("Unexpected port_monitor output: listening_services missing or not a list")
rules, _ = normalize_listeners(listening, include_command=bool(args.include_command))
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(rules, indent=2) + "\n", encoding="utf-8")
print(json.dumps({"status": "ok", "output": str(out), "rules": len(rules)}, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
```
### references/least-privilege-policy.md
```markdown
# Least-Privilege Policy For OpenClaw
## Objective
Enforce default non-root execution and explicit approval-first elevation with immediate privilege drop after privileged operations.
## Required Controls
1. Set default execution context to non-root.
2. Require approval before every privileged command.
3. Limit privileged commands to a reviewed allowlist (per-task scope).
4. Log privileged requests, approvals, and execution outcomes.
5. Drop elevated state immediately after privileged command completion.
6. Force elevated timeout expiration after 30 idle minutes.
## Operational Checks
1. Verify privileged command is required for task outcome.
2. Execute privileged work through wrapper:
- `python3 scripts/guarded_privileged_exec.py --reason "required change" --use-sudo -- <command>`
3. If wrapper requests approval, require explicit user consent before command runs.
4. Execute only the approved privileged command(s):
- Approval is tied to the exact argv you approved.
- Any different privileged argv triggers a new approval and allowlist entry.
5. Confirm wrapper drops privilege immediately after completion.
## Control Acceptance Criteria
1. No privileged command runs without current-task user approval.
2. Elevated session is not retained after privileged work completes.
3. Idle elevated session at or above 30 minutes transitions to normal mode.
4. Session state file records UTC transition/action metadata.
## Suggested OpenClaw Config Direction
Configure OpenClaw with strict approval and allowlist behavior:
- Set execution ask behavior to always prompt.
- Set execution security mode to allowlist.
- Restrict elevated tool callers with explicit `allowFrom` entries.
- Keep sandboxing enabled for normal operations.
If exact keys differ by version, preserve intent: explicit approval + least privilege + short-lived elevation.
```
### references/port-monitoring-policy.md
```markdown
# Port Monitoring Policy For OpenClaw
## Objective
Continuously identify listening services, detect insecure ports/protocols, and flag ports not approved by baseline policy.
## Baseline File
Use `~/.openclaw/security/approved_ports.json` as a JSON array:
```json
[
{ "port": 22, "protocol": "tcp", "command": "sshd" },
{ "port": 443, "protocol": "tcp", "command": "nginx" }
]
```
`command` is optional but recommended for tighter control.
## Monitoring Command
Run:
`python3 scripts/port_monitor.py --json`
## Required Analyst Actions
1. Review all `unapproved-port` findings.
2. Review all `insecure-port` findings and propose secure alternatives.
3. Confirm whether `public-bind` findings are necessary exposure.
4. Ask for user approval before any root-level remediation steps.
## Insecure Port Guidance
Enforce upgrades where possible:
1. `80 -> 443` (HTTPS instead of HTTP)
2. `23 -> 22` (SSH instead of Telnet)
3. `21 -> 22/990` (SFTP/FTPS instead of FTP)
4. `110 -> 995` (POP3S)
5. `143 -> 993` (IMAPS)
6. `389 -> 636` (LDAPS)
## Acceptance Criteria
1. Every open listening port is either approved or explicitly flagged.
2. Every insecure port includes a replacement recommendation.
3. Every externally bound service (`*`, `0.0.0.0`, `::`) is justified or marked for restriction.
```
### references/compliance-controls-map.json
```json
[
{
"check_id": "privilege_approval_required",
"title": "Approval required before elevated access",
"iso27001": ["A.5.15", "A.5.18"],
"nist": ["PR.AA-01", "PR.AA-05"],
"default_risk": "high",
"expected_state": "Every privileged action requires explicit user approval."
},
{
"check_id": "least_privilege_enforced",
"title": "Least privilege execution mode",
"iso27001": ["A.5.15", "A.5.18"],
"nist": ["PR.AA-01", "PR.PS-01"],
"default_risk": "high",
"expected_state": "Default mode is non-root and elevated rights are scoped and short-lived."
},
{
"check_id": "elevation_timeout_30m",
"title": "Elevated session idle timeout",
"iso27001": ["A.8.2", "A.8.15"],
"nist": ["PR.AA-03", "DE.CM-01"],
"default_risk": "medium",
"expected_state": "Elevated session expires after 30 minutes of inactivity."
},
{
"check_id": "audit_logging_privileged_actions",
"title": "Privileged action audit logging",
"iso27001": ["A.8.15", "A.8.16"],
"nist": ["DE.AE-03", "DE.CM-01"],
"default_risk": "medium",
"expected_state": "All elevated approvals, commands, and privilege drops are logged."
},
{
"check_id": "open_ports_approved",
"title": "Open ports baseline approval",
"iso27001": ["A.8.20", "A.8.21"],
"nist": ["PR.PS-02", "DE.CM-01"],
"default_risk": "medium",
"expected_state": "All listening ports are approved and business-justified."
},
{
"check_id": "insecure_ports_remediated",
"title": "Insecure ports remediated",
"iso27001": ["A.8.20", "A.8.21"],
"nist": ["PR.PS-02", "PR.DS-02"],
"default_risk": "high",
"expected_state": "Insecure legacy ports are closed or migrated to secure alternatives."
},
{
"check_id": "channel_allowlist_configured",
"title": "Channel allowlist configured",
"iso27001": ["A.5.15", "A.5.16"],
"nist": ["PR.AA-02"],
"default_risk": "high",
"expected_state": "Inbound channels restrict senders via allowlists."
},
{
"check_id": "group_mentions_required",
"title": "Group mention requirement",
"iso27001": ["A.5.16"],
"nist": ["PR.AA-04"],
"default_risk": "medium",
"expected_state": "Group chats require explicit mention before agent responds."
},
{
"check_id": "gateway_loopback_only",
"title": "Gateway bound to loopback",
"iso27001": ["A.8.9", "A.8.10"],
"nist": ["PR.IP-01"],
"default_risk": "high",
"expected_state": "Gateway runs local/loopback with token auth."
},
{
"check_id": "secrets_permissions_hardened",
"title": "Secrets and config permissions hardened",
"iso27001": ["A.8.11"],
"nist": ["PR.DS-01"],
"default_risk": "medium",
"expected_state": "OpenClaw config and secrets are not world/group readable."
},
{
"check_id": "runtime_privilege_hook_installed",
"title": "Runtime privileged execution hook installed",
"iso27001": ["A.5.15", "A.5.18"],
"nist": ["PR.AA-01"],
"default_risk": "high",
"expected_state": "Privileged commands are forced through approval guard."
},
{
"check_id": "alternate_privilege_paths_restricted",
"title": "Alternate privilege paths restricted",
"iso27001": ["A.5.15"],
"nist": ["PR.AA-05"],
"default_risk": "medium",
"expected_state": "su/doas or other escalation paths are restricted or guarded."
},
{
"check_id": "backup_configured",
"title": "Backup and recovery configured",
"iso27001": ["A.8.13"],
"nist": ["PR.IP-04"],
"default_risk": "low",
"expected_state": "Backups of OpenClaw config and audit logs exist."
},
{
"check_id": "update_hygiene",
"title": "Update hygiene",
"iso27001": ["A.8.8"],
"nist": ["ID.RA-01"],
"default_risk": "low",
"expected_state": "OpenClaw is updated regularly and version is tracked."
},
{
"check_id": "prompt_injection_controls",
"title": "Prompt injection controls",
"iso27001": ["A.5.15", "A.5.18"],
"nist": ["PR.AA-01"],
"default_risk": "high",
"expected_state": "Untrusted content sources require explicit confirmation before privileged execution."
},
{
"check_id": "command_policy_enforced",
"title": "Privileged command policy enforced",
"iso27001": ["A.5.15"],
"nist": ["PR.AA-05"],
"default_risk": "high",
"expected_state": "Privileged commands are filtered by allow/deny policy."
},
{
"check_id": "session_boundary_enforced",
"title": "Task session boundary enforced",
"iso27001": ["A.5.15"],
"nist": ["PR.AA-03"],
"default_risk": "medium",
"expected_state": "Privileged approvals are scoped to a task session id."
},
{
"check_id": "multi_factor_approval",
"title": "Multi-factor approval for privileged actions",
"iso27001": ["A.5.17"],
"nist": ["PR.AA-02"],
"default_risk": "medium",
"expected_state": "Privileged approvals require an additional approval token."
},
{
"check_id": "egress_allowlist_configured",
"title": "Outbound allowlist configured",
"iso27001": ["A.8.20"],
"nist": ["PR.PS-02"],
"default_risk": "medium",
"expected_state": "Outbound destinations are allowlisted."
},
{
"check_id": "egress_connections_approved",
"title": "Outbound connections approved",
"iso27001": ["A.8.20"],
"nist": ["DE.CM-01"],
"default_risk": "medium",
"expected_state": "No unapproved outbound connections are detected."
}
]
```
### references/approved_ports.template.json
```json
[
{
"port": 18789,
"protocol": "tcp",
"command": "node",
"comment": "OpenClaw gateway (example). Remove if not applicable."
}
]
```
### references/command-policy.template.json
```json
{
"allow": [
"^openclaw\\b",
"^python3\\b"
],
"deny": [
"\\brm\\s+-rf\\b",
"\\bshutdown\\b",
"\\breboot\\b"
]
}
```
### references/prompt-policy.template.json
```json
{
"require_confirmation_for_untrusted": true
}
```
### references/egress-allowlist.template.json
```json
[
{
"protocol": "tcp",
"host_regex": "^(api\\.openai\\.com|api\\.telegram\\.org)$",
"port": 443
}
]
```
### scripts/preflight_check.py
```python
#!/usr/bin/env python3
"""
Preflight validation for the cyber-security-engineer skill.
This is intentionally conservative: it does not modify system state. It validates
that required files and tooling exist before enabling runtime hooks or enforcing
privileged execution.
"""
from __future__ import annotations
import json
import os
import shutil
import stat
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple
def _exists_any(names: List[str]) -> Tuple[bool, List[str]]:
found = [n for n in names if shutil.which(n)]
return (len(found) > 0), found
def _mode_str(path: Path) -> str:
try:
return oct(path.stat().st_mode & 0o777)
except Exception:
return "unknown"
def _is_hardened(path: Path) -> bool:
try:
mode = path.stat().st_mode & 0o777
return (mode & 0o077) == 0
except Exception:
return False
def main() -> int:
home = Path.home()
oc = home / ".openclaw"
sec = oc / "security"
required_policy_files = [
sec / "command-policy.json",
sec / "approved_ports.json",
sec / "egress_allowlist.json",
]
optional_files = [
oc / "openclaw.json",
oc / "env",
sec / "prompt-policy.json",
sec / "root-session-state.json",
]
ok_openclaw, found_openclaw = _exists_any(["openclaw"])
ok_python, found_python = _exists_any(["python3", "python"])
ok_ports, found_ports = _exists_any(["lsof", "ss", "netstat"])
hook = oc / "bin" / "sudo"
hook_ok = hook.exists() and os.access(str(hook), os.X_OK)
missing_required = [str(p) for p in required_policy_files if not p.exists()]
missing_optional = [str(p) for p in optional_files if not p.exists()]
env_path = oc / "env"
env_perms_ok = True
env_perms_mode = "missing"
if env_path.exists():
env_perms_mode = _mode_str(env_path)
env_perms_ok = _is_hardened(env_path)
config_path = oc / "openclaw.json"
config_perms_ok = True
config_perms_mode = "missing"
if config_path.exists():
config_perms_mode = _mode_str(config_path)
config_perms_ok = _is_hardened(config_path)
require_policy = os.environ.get("OPENCLAW_REQUIRE_POLICY_FILES") == "1"
critical_fail = []
if not ok_openclaw:
critical_fail.append("openclaw_not_found")
if not ok_python:
critical_fail.append("python_not_found")
if not ok_ports:
critical_fail.append("no_port_discovery_tool")
if require_policy and missing_required:
critical_fail.append("missing_required_policy_files")
report: Dict[str, Any] = {
"status": "ok" if not critical_fail else "fail",
"critical": critical_fail,
"require_policy_files": require_policy,
"tools": {
"openclaw": {"ok": ok_openclaw, "found": found_openclaw},
"python": {"ok": ok_python, "found": found_python},
"port_discovery": {"ok": ok_ports, "found": found_ports},
},
"paths": {
"openclaw_dir": str(oc),
"security_dir": str(sec),
"runtime_hook": {"path": str(hook), "installed": hook_ok},
},
"policy_files": {
"required": [str(p) for p in required_policy_files],
"missing_required": missing_required,
},
"optional_files": {
"missing_optional": missing_optional,
},
"permissions": {
"env": {"path": str(env_path), "mode": env_perms_mode, "hardened": env_perms_ok},
"openclaw_json": {"path": str(config_path), "mode": config_perms_mode, "hardened": config_perms_ok},
},
"next_steps": [
"Run bootstrap script if OpenClaw is not configured.",
"Generate and prune approved ports baseline via generate_approved_ports.py.",
"Review and customize command-policy.json and egress_allowlist.json templates.",
"Install runtime hook only after policy files are reviewed.",
],
}
sys.stdout.write(json.dumps(report, indent=2, sort_keys=True))
sys.stdout.write("\n")
return 0 if not critical_fail else 1
if __name__ == "__main__":
raise SystemExit(main())
```
### scripts/root_session_guard.py
```python
#!/usr/bin/env python3
"""Root session guard with idle-timeout and allowlisted argv scoping.
Review before use. No network calls are made from this script.
"""
import argparse
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, List, Optional
DEFAULT_TIMEOUT_MINUTES = 30
STATE_PATH = Path.home() / ".openclaw" / "security" / "root-session-state.json"
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def to_iso(ts: datetime) -> str:
return ts.isoformat().replace("+00:00", "Z")
def from_iso(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
if value.endswith("Z"):
value = value[:-1] + "+00:00"
return datetime.fromisoformat(value)
@dataclass
class AllowedCommand:
argv: List[str]
added_at_utc: str
@dataclass
class SessionState:
privilege_mode: str
last_elevated_activity_utc: Optional[str]
last_normal_activity_utc: Optional[str]
last_transition_utc: str
last_action: str
# When in elevated mode, restrict which privileged commands are allowed to run.
allowed_commands: List[AllowedCommand] = field(default_factory=list)
approved_reason: Optional[str] = None
approved_session_id: Optional[str] = None
def as_dict(self) -> Dict[str, Optional[str]]:
return {
"privilege_mode": self.privilege_mode,
"last_elevated_activity_utc": self.last_elevated_activity_utc,
"last_normal_activity_utc": self.last_normal_activity_utc,
"last_transition_utc": self.last_transition_utc,
"last_action": self.last_action,
"approved_reason": self.approved_reason,
"approved_session_id": self.approved_session_id,
"allowed_commands": [
{"argv": e.argv, "added_at_utc": e.added_at_utc}
for e in self.allowed_commands
],
}
def ensure_parent(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
def default_state() -> SessionState:
ts = to_iso(now_utc())
return SessionState(
privilege_mode="normal",
last_elevated_activity_utc=None,
last_normal_activity_utc=ts,
last_transition_utc=ts,
last_action="init-normal",
allowed_commands=[],
approved_reason=None,
approved_session_id=None,
)
def load_state(path: Path) -> SessionState:
if not path.exists():
state = default_state()
save_state(path, state)
return state
with path.open("r", encoding="utf-8") as f:
raw = json.load(f)
allowed_raw = raw.get("allowed_commands") or []
allowed: List[AllowedCommand] = []
if isinstance(allowed_raw, list):
for entry in allowed_raw:
if not isinstance(entry, dict):
continue
argv = entry.get("argv")
added_at_utc = entry.get("added_at_utc")
if isinstance(argv, list) and all(isinstance(x, str) for x in argv) and isinstance(
added_at_utc, str
):
allowed.append(AllowedCommand(argv=argv, added_at_utc=added_at_utc))
return SessionState(
privilege_mode=raw.get("privilege_mode", "normal"),
last_elevated_activity_utc=raw.get("last_elevated_activity_utc"),
last_normal_activity_utc=raw.get("last_normal_activity_utc"),
last_transition_utc=raw.get("last_transition_utc", to_iso(now_utc())),
last_action=raw.get("last_action", "unknown"),
allowed_commands=allowed,
approved_reason=raw.get("approved_reason"),
approved_session_id=raw.get("approved_session_id"),
)
def save_state(path: Path, state: SessionState) -> None:
ensure_parent(path)
with path.open("w", encoding="utf-8") as f:
json.dump(state.as_dict(), f, indent=2)
f.write("\n")
def minutes_since(ts_iso: Optional[str]) -> Optional[float]:
ts = from_iso(ts_iso)
if ts is None:
return None
delta = now_utc() - ts
return round(delta.total_seconds() / 60.0, 2)
def preflight(state: SessionState, timeout_minutes: int) -> int:
idle_mins = minutes_since(state.last_elevated_activity_utc)
timed_out = (
state.privilege_mode == "elevated"
and idle_mins is not None
and idle_mins >= timeout_minutes
)
action = "continue-normal"
approval_required = True
if state.privilege_mode == "elevated" and not timed_out:
action = "elevated-active"
# Approval is still required per-command unless the command is already allowlisted.
approval_required = False
elif timed_out:
action = "drop-elevation"
state.privilege_mode = "normal"
state.allowed_commands = []
state.approved_reason = None
state.approved_session_id = None
state.last_transition_utc = to_iso(now_utc())
state.last_action = "timeout-drop"
result = {
"status": "REQUIRES_APPROVAL" if approval_required else "ELEVATED_AVAILABLE",
"privilege_mode": state.privilege_mode,
"idle_minutes_since_elevated": idle_mins,
"timeout_minutes": timeout_minutes,
"action": action,
"allowed_commands_count": len(state.allowed_commands),
}
print(json.dumps(result, indent=2))
return 2 if approval_required else 0
def is_allowed(state: SessionState, argv: List[str]) -> bool:
if not argv:
return False
for entry in state.allowed_commands:
if entry.argv == argv:
return True
return False
def authorize(state: SessionState, timeout_minutes: int, argv: List[str], session_id: Optional[str]) -> int:
idle_mins = minutes_since(state.last_elevated_activity_utc)
timed_out = (
state.privilege_mode == "elevated"
and idle_mins is not None
and idle_mins >= timeout_minutes
)
if timed_out:
state.privilege_mode = "normal"
state.allowed_commands = []
state.approved_reason = None
state.approved_session_id = None
state.last_transition_utc = to_iso(now_utc())
state.last_action = "timeout-drop"
session_ok = session_id is None or state.approved_session_id in (None, session_id)
allowed = state.privilege_mode == "elevated" and is_allowed(state, argv) and session_ok
approval_required = not allowed
result = {
"status": "REQUIRES_APPROVAL" if approval_required else "AUTHORIZED",
"privilege_mode": state.privilege_mode,
"idle_minutes_since_elevated": idle_mins,
"timeout_minutes": timeout_minutes,
"allowed": allowed,
"allowed_commands_count": len(state.allowed_commands),
"session_id": session_id,
"session_ok": session_ok,
}
print(json.dumps(result, indent=2))
return 2 if approval_required else 0
def approve_command(state: SessionState, reason: str, argv: List[str], session_id: Optional[str]) -> None:
ts = to_iso(now_utc())
state.privilege_mode = "elevated"
state.last_elevated_activity_utc = ts
state.last_transition_utc = ts
state.last_action = "approved-command"
state.approved_reason = reason
state.approved_session_id = session_id
if argv and not is_allowed(state, argv):
state.allowed_commands.append(AllowedCommand(argv=argv, added_at_utc=ts))
def mark_elevated_used(state: SessionState) -> None:
ts = to_iso(now_utc())
state.privilege_mode = "elevated"
state.last_elevated_activity_utc = ts
state.last_transition_utc = ts
state.last_action = "elevated-used"
def mark_normal_used(state: SessionState) -> None:
ts = to_iso(now_utc())
state.privilege_mode = "normal"
state.last_normal_activity_utc = ts
state.last_transition_utc = ts
state.last_action = "normal-used"
def drop(state: SessionState, reason: str) -> None:
ts = to_iso(now_utc())
state.privilege_mode = "normal"
state.allowed_commands = []
state.approved_reason = None
state.approved_session_id = None
state.last_transition_utc = ts
state.last_action = reason
def status(state: SessionState, timeout_minutes: int) -> None:
idle_mins = minutes_since(state.last_elevated_activity_utc)
timed_out = (
state.privilege_mode == "elevated"
and idle_mins is not None
and idle_mins >= timeout_minutes
)
result = state.as_dict()
result["idle_minutes_since_elevated"] = idle_mins
result["timeout_minutes"] = timeout_minutes
result["timed_out"] = timed_out
result["approval_required"] = state.privilege_mode != "elevated" or timed_out
print(json.dumps(result, indent=2))
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="OpenClaw root/elevated session guard")
parser.add_argument(
"--state-file",
default=str(STATE_PATH),
help="Path to session state JSON file",
)
parser.add_argument(
"--timeout-minutes",
type=int,
default=DEFAULT_TIMEOUT_MINUTES,
help="Idle timeout for elevated mode",
)
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("preflight", help="Check timeout and approval requirement")
authz = sub.add_parser(
"authorize",
help="Authorize a specific argv against the current elevated allowlist",
)
authz.add_argument(
"--argv-json",
required=True,
help='Command argv as JSON array, e.g. ["launchctl","print","..."]',
)
authz.add_argument("--session-id", help="Task session id to scope approvals")
approve = sub.add_parser(
"approve",
help="Approve a specific argv for elevated execution (adds to allowlist)",
)
approve.add_argument("--reason", required=True, help="Approval reason")
approve.add_argument(
"--argv-json",
required=True,
help='Command argv as JSON array, e.g. ["launchctl","print","..."]',
)
approve.add_argument("--session-id", help="Task session id to scope approvals")
sub.add_parser("elevated-used", help="Mark elevated mode as used now")
sub.add_parser("normal-used", help="Mark normal mode activity now")
sub.add_parser("drop", help="Drop to normal mode")
sub.add_parser("status", help="Print current state and timeout info")
return parser.parse_args()
def main() -> int:
args = parse_args()
state_file = Path(os.path.expanduser(args.state_file))
state = load_state(state_file)
if args.command == "preflight":
code = preflight(state, args.timeout_minutes)
save_state(state_file, state)
return code
if args.command == "authorize":
argv = json.loads(args.argv_json)
if not isinstance(argv, list) or not all(isinstance(x, str) for x in argv):
print('{"status":"ERROR","error":"argv-json must be a JSON array of strings"}')
return 2
code = authorize(state, args.timeout_minutes, argv, args.session_id)
save_state(state_file, state)
return code
if args.command == "approve":
argv = json.loads(args.argv_json)
if not isinstance(argv, list) or not all(isinstance(x, str) for x in argv):
print('{"status":"ERROR","error":"argv-json must be a JSON array of strings"}')
return 2
approve_command(state, args.reason, argv, args.session_id)
save_state(state_file, state)
print('{"status":"OK","action":"approved-command"}')
return 0
if args.command == "elevated-used":
mark_elevated_used(state)
save_state(state_file, state)
print('{"status":"OK","action":"elevated-used"}')
return 0
if args.command == "normal-used":
mark_normal_used(state)
save_state(state_file, state)
print('{"status":"OK","action":"normal-used"}')
return 0
if args.command == "drop":
drop(state, "manual-drop")
save_state(state_file, state)
print('{"status":"OK","action":"drop-elevation"}')
return 0
if args.command == "status":
status(state, args.timeout_minutes)
return 0
return 1
if __name__ == "__main__":
raise SystemExit(main())
```
### scripts/audit_logger.py
```python
#!/usr/bin/env python3
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict
AUDIT_LOG = Path.home() / ".openclaw" / "security" / "privileged-audit.jsonl"
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def append_audit(event: Dict[str, Any]) -> None:
"""
Append an audit event to an on-disk JSONL timeline.
Best-effort only: audit logging must never block privileged operations.
"""
try:
AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True)
payload = {"ts_utc": _utc_now_iso(), **event}
with AUDIT_LOG.open("a", encoding="utf-8") as f:
f.write(json.dumps(payload, separators=(",", ":")) + "\n")
except Exception:
return
```
### scripts/command_policy.py
```python
#!/usr/bin/env python3
import json
import re
import shlex
from pathlib import Path
from typing import Dict, List, Optional
POLICY_PATH = Path.home() / ".openclaw" / "security" / "command-policy.json"
def _load_policy() -> Dict[str, object]:
if not POLICY_PATH.exists():
return {}
try:
raw = json.loads(POLICY_PATH.read_text(encoding="utf-8"))
return raw if isinstance(raw, dict) else {}
except Exception:
return {}
def _compile_patterns(items: object) -> List[re.Pattern]:
if not isinstance(items, list):
return []
out: List[re.Pattern] = []
for s in items:
if not isinstance(s, str) or not s.strip():
continue
try:
out.append(re.compile(s))
except re.error:
continue
return out
def _match_any(patterns: List[re.Pattern], text: str) -> Optional[str]:
for p in patterns:
if p.search(text):
return p.pattern
return None
def _load_exact_rules(items: object) -> List[List[str]]:
if not isinstance(items, list):
return []
out: List[List[str]] = []
for entry in items:
if isinstance(entry, list) and all(isinstance(s, str) and s for s in entry):
out.append(entry)
return out
def _match_exact(rules: List[List[str]], argv: List[str]) -> bool:
return any(rule == argv for rule in rules)
def evaluate_command(argv: List[str]) -> Dict[str, object]:
"""
Evaluate the command argv against an optional allow/deny policy.
Policy format (~/.openclaw/security/command-policy.json):
{
"allow": ["^/usr/bin/apt\\b", "^/usr/bin/brew\\b"],
"deny": ["\\brm\\s+-rf\\b"],
"allow_exact": [["/usr/bin/systemctl","restart","nginx"]],
"deny_exact": [["/usr/bin/rm","-rf","/"]]
}
Behavior:
- If policy file is missing: allow.
- If deny_exact or deny regex matches: block.
- If allow_exact is non-empty: require exact match.
- Else if allow regex list is non-empty: require allow match.
"""
cmd_str = shlex.join(argv) if argv else ""
pol = _load_policy()
allow = _compile_patterns(pol.get("allow"))
deny = _compile_patterns(pol.get("deny"))
allow_exact = _load_exact_rules(pol.get("allow_exact"))
deny_exact = _load_exact_rules(pol.get("deny_exact"))
if deny_exact and _match_exact(deny_exact, argv):
return {"allowed": False, "reason": "deny_exact_match", "pattern": "exact"}
deny_match = _match_any(deny, cmd_str)
if deny_match:
return {"allowed": False, "reason": "deny_match", "pattern": deny_match}
if allow_exact:
if not _match_exact(allow_exact, argv):
return {"allowed": False, "reason": "not_in_allow_exact", "pattern": None}
return {"allowed": True, "reason": "allow_exact_match", "pattern": "exact"}
if allow:
allow_match = _match_any(allow, cmd_str)
if not allow_match:
return {"allowed": False, "reason": "not_in_allowlist", "pattern": None}
return {"allowed": True, "reason": "allow_match", "pattern": allow_match}
return {"allowed": True, "reason": "no_policy_or_allow_empty", "pattern": None}
```
### scripts/prompt_policy.py
```python
#!/usr/bin/env python3
import json
from pathlib import Path
from typing import Dict
POLICY_PATH = Path.home() / ".openclaw" / "security" / "prompt-policy.json"
def load_policy() -> Dict[str, object]:
if not POLICY_PATH.exists():
return {"require_confirmation_for_untrusted": False}
try:
raw = json.loads(POLICY_PATH.read_text(encoding="utf-8"))
if not isinstance(raw, dict):
return {"require_confirmation_for_untrusted": False}
return {
"require_confirmation_for_untrusted": bool(
raw.get("require_confirmation_for_untrusted", False)
)
}
except Exception:
return {"require_confirmation_for_untrusted": False}
```
### scripts/guarded_privileged_exec.py
```python
#!/usr/bin/env python3
"""Guarded privileged execution with approval, policy checks, and audit logging.
Review before enabling. No network calls are made from this script.
"""
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import List, Optional
# Allow importing sibling modules when executed from arbitrary cwd.
sys.path.insert(0, str(Path(__file__).resolve().parent))
from audit_logger import append_audit # noqa: E402
from command_policy import evaluate_command # noqa: E402
from prompt_policy import load_policy # noqa: E402
SAFE_ENV_VARS = {
"PATH": "/usr/bin:/bin:/usr/sbin:/sbin",
"LANG": os.environ.get("LANG", "C.UTF-8"),
"LC_ALL": os.environ.get("LC_ALL", "C.UTF-8"),
}
def run_guard(args, *guard_args):
cmd = [
sys.executable,
str(Path(__file__).with_name("root_session_guard.py")),
"--state-file",
args.state_file,
"--timeout-minutes",
str(args.timeout_minutes),
*guard_args,
]
return subprocess.run(cmd, capture_output=True, text=True)
def ask_for_approval(reason: str, command_argv: List[str]) -> bool:
append_audit({"action": "approval_requested", "reason": reason, "argv": command_argv})
print("Approval required for elevated execution.")
print(f"Reason: {reason}")
print("Command argv:")
print(json.dumps(command_argv, indent=2))
answer = input("Approve elevated access for this command? [y/N]: ").strip().lower()
approved = answer in {"y", "yes"}
append_audit({"action": "approval_decision", "reason": reason, "argv": command_argv, "approved": approved})
return approved
def _validate_binary(path: str) -> Optional[str]:
"""Validate a binary path. Returns error string if invalid, else None."""
if not path:
return "missing binary"
if not os.path.isabs(path):
return "binary path must be absolute"
real = os.path.realpath(path)
if not os.path.exists(real):
return "binary path does not exist"
try:
st = os.stat(real)
except Exception:
return "cannot stat binary"
# must be root owned and not group/other writable
if st.st_uid != 0:
return "binary is not root-owned"
if (st.st_mode & 0o022) != 0:
return "binary is group/other writable"
return None
def _validate_command_argv(argv: List[str]) -> Optional[str]:
if not argv:
return "empty argv"
if argv[0].startswith("-"):
return "command must not start with an option"
if not os.path.isabs(argv[0]):
return "command path must be absolute"
real = os.path.realpath(argv[0])
if not os.path.exists(real):
return "command path does not exist"
try:
st = os.stat(real)
except Exception:
return "cannot stat command"
if st.st_uid != 0:
return "command binary is not root-owned"
if (st.st_mode & 0o022) != 0:
return "command binary is group/other writable"
return None
def run_command(argv: List[str], use_sudo: bool, sudo_kill_cache: bool) -> int:
sudo_bin = os.environ.get("OPENCLAW_REAL_SUDO", "/usr/bin/sudo")
if use_sudo:
err = _validate_binary(sudo_bin)
if err:
append_audit({"action": "sudo_binary_invalid", "binary": sudo_bin, "error": err})
print(f"Invalid sudo binary: {err}", file=sys.stderr)
return 9
exec_argv = [sudo_bin, "--"] + argv if use_sudo else argv
print("Executing argv:")
print(json.dumps(exec_argv, indent=2))
if use_sudo and sudo_kill_cache:
# Best-effort: ensure sudo timestamp for this user is not reused implicitly.
subprocess.run([sudo_bin, "-k"], check=False, capture_output=True, text=True)
append_audit({"action": "exec_start", "argv": argv, "use_sudo": use_sudo})
result = subprocess.run(exec_argv, env=SAFE_ENV_VARS)
append_audit({"action": "exec_finish", "argv": argv, "use_sudo": use_sudo, "returncode": result.returncode})
return result.returncode
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Execute privileged commands with approval + idle-timeout guard",
)
parser.add_argument(
"--state-file",
default=str(Path.home() / ".openclaw" / "security" / "root-session-state.json"),
help="Path to root session state file",
)
parser.add_argument(
"--timeout-minutes",
type=int,
default=30,
help="Idle timeout for elevated mode / approval session",
)
parser.add_argument(
"--reason",
required=True,
help="Business/security reason for privileged command",
)
parser.add_argument(
"--use-sudo",
action="store_true",
help="Prefix command with sudo",
)
parser.add_argument(
"--sudo-kill-cache",
action="store_true",
default=False,
help="Run `sudo -k` before execution to reduce implicit sudo reuse",
)
parser.add_argument(
"--keep-session",
action="store_true",
default=False,
help="Keep elevated session after command (still restricted to allowlisted argv; expires on idle timeout)",
)
parser.add_argument(
"command",
nargs=argparse.REMAINDER,
help='Command to run, e.g. -- "/usr/bin/systemctl restart nginx"',
)
return parser.parse_args()
def _get_task_session_id() -> Optional[str]:
session_id = os.environ.get("OPENCLAW_TASK_SESSION_ID") or None
require_session = os.environ.get("OPENCLAW_REQUIRE_SESSION_ID") == "1"
if require_session and not session_id:
return "__MISSING__"
return session_id
def main() -> int:
args = parse_args()
if not args.command:
print("No command supplied.", file=sys.stderr)
return 2
argv = args.command
if argv and argv[0] == "--":
argv = argv[1:]
if not argv:
print("No command supplied after -- delimiter.", file=sys.stderr)
return 2
if os.environ.get("OPENCLAW_REQUIRE_POLICY_FILES") == "1":
required = [
Path.home() / ".openclaw" / "security" / "command-policy.json",
Path.home() / ".openclaw" / "security" / "approved_ports.json",
Path.home() / ".openclaw" / "security" / "egress_allowlist.json",
]
missing = [str(p) for p in required if not p.exists()]
if missing:
append_audit({"action": "policy_files_missing", "missing": missing})
print("Missing required policy files. Refusing privileged execution.", file=sys.stderr)
return 7
# Validate command path + ownership
err = _validate_command_argv(argv)
if err:
append_audit({"action": "command_invalid", "argv": argv, "error": err})
print(f"Invalid command: {err}", file=sys.stderr)
return 8
policy_result = evaluate_command(argv)
if not policy_result.get("allowed", False):
append_audit(
{
"action": "policy_block",
"argv": argv,
"reason": policy_result.get("reason"),
"pattern": policy_result.get("pattern"),
}
)
print("Command blocked by policy.", file=sys.stderr)
return 3
session_id = _get_task_session_id()
if session_id == "__MISSING__":
append_audit({"action": "session_id_missing", "argv": argv})
print("Task session id is required but not provided.", file=sys.stderr)
return 6
argv_json = json.dumps(argv)
if session_id:
authz = run_guard(args, "authorize", "--argv-json", argv_json, "--session-id", session_id)
else:
authz = run_guard(args, "authorize", "--argv-json", argv_json)
if authz.returncode not in (0, 2):
sys.stderr.write(authz.stderr or authz.stdout)
return authz.returncode
needs_approval = authz.returncode == 2
prompt_policy = load_policy()
if prompt_policy.get("require_confirmation_for_untrusted") and os.environ.get("OPENCLAW_UNTRUSTED_SOURCE") == "1":
confirm = input("Untrusted content source detected. Proceed? [y/N]: ").strip().lower()
if confirm not in {"y", "yes"}:
append_audit({"action": "untrusted_source_block", "argv": argv})
return 5
append_audit({"action": "untrusted_source_confirmed", "argv": argv})
if needs_approval and not ask_for_approval(args.reason, argv):
print("User denied elevated access. Running in normal mode is required.")
run_guard(args, "normal-used")
append_audit({"action": "approval_denied", "reason": args.reason, "argv": argv})
return 1
if needs_approval:
token_required = os.environ.get("OPENCLAW_APPROVAL_TOKEN")
if token_required:
env_path = Path.home() / ".openclaw" / "env"
if env_path.exists() and (env_path.stat().st_mode & 0o077):
print("Warning: ~/.openclaw/env permissions are too open; tighten to 600.")
token = input("Enter approval token: ").strip()
if token != token_required:
append_audit({"action": "approval_token_failed", "argv": argv})
print("Invalid approval token.", file=sys.stderr)
return 4
append_audit({"action": "approval_token_ok", "argv": argv})
approve_args = ["approve", "--reason", args.reason, "--argv-json", argv_json]
if session_id:
approve_args += ["--session-id", session_id]
approve = run_guard(args, *approve_args)
if approve.returncode != 0:
sys.stderr.write(approve.stderr or approve.stdout)
return approve.returncode
append_audit({"action": "approval_granted", "reason": args.reason, "argv": argv, "session_id": session_id})
try:
if args.use_sudo:
run_guard(args, "elevated-used")
return run_command(argv, args.use_sudo, args.sudo_kill_cache)
finally:
if not args.keep_session:
run_guard(args, "drop")
append_audit({"action": "drop_elevation", "argv": argv, "reason": "post-command"})
if args.use_sudo and args.sudo_kill_cache:
sudo_bin = os.environ.get("OPENCLAW_REAL_SUDO", "/usr/bin/sudo")
subprocess.run([sudo_bin, "-k"], check=False, capture_output=True, text=True)
if __name__ == "__main__":
raise SystemExit(main())
```
### scripts/install-openclaw-runtime-hook.sh
```bash
#!/usr/bin/env bash
set -euo pipefail
# Installs a sudo shim at ~/.openclaw/bin/sudo (matches live_assessment checks).
# For the shim to be used, OpenClaw's runtime PATH must include ~/.openclaw/bin
# before /usr/bin. This script attempts best-effort PATH injection for macOS
# LaunchAgent installs of the gateway.
log() {
printf '[cyber-security-engineer] %s\n' "$*"
}
get_uid_and_perms() {
local path="$1"
local uid perms
# GNU stat (Linux)
if uid="$(stat -c '%u' "${path}" 2>/dev/null)" && perms="$(stat -c '%a' "${path}" 2>/dev/null)"; then
printf '%s %s\n' "${uid}" "${perms}"
return 0
fi
# BSD stat (macOS)
if uid="$(stat -f '%u' "${path}" 2>/dev/null)" && perms="$(stat -f '%Lp' "${path}" 2>/dev/null)"; then
printf '%s %s\n' "${uid}" "${perms}"
return 0
fi
return 1
}
resolve_python() {
local py
py="$(command -v python3 || true)"
if [[ -z "${py}" ]]; then
log "python3 not found; cannot install shim."
exit 1
fi
echo "${py}"
}
validate_root_bin() {
local bin="$1"
if [[ -z "${bin}" || ! -x "${bin}" ]]; then
return 1
fi
# Require absolute path
if [[ "${bin}" != /* ]]; then
return 1
fi
# Resolve symlinks
local real
real="$(readlink -f "${bin}" 2>/dev/null || echo "${bin}")"
if [[ ! -x "${real}" ]]; then
return 1
fi
# Ensure owned by root and not writable by group/other
local st
st="$(get_uid_and_perms "${real}" || true)"
if [[ -z "${st}" ]]; then
return 1
fi
local uid perms
uid="$(echo "${st}" | awk '{print $1}')"
perms="$(echo "${st}" | awk '{print $2}')"
# uid must be 0
if [[ "${uid}" != "0" ]]; then
return 1
fi
# perms: no group/other write
local gow
gow=$(( 8#${perms} & 8#022 ))
if [[ "${gow}" -ne 0 ]]; then
return 1
fi
return 0
}
REAL_SUDO="$(command -v sudo || true)"
if [[ -z "${REAL_SUDO}" ]]; then
log "sudo not found; nothing to install."
exit 1
fi
if ! validate_root_bin "${REAL_SUDO}"; then
log "Refusing to install shim: sudo binary failed validation."
exit 1
fi
PYTHON3="$(resolve_python)"
OPENCLAW_DIR="${HOME}/.openclaw"
BIN_DIR="${OPENCLAW_DIR}/bin"
SKILL_DIR_DEFAULT="${OPENCLAW_DIR}/workspace/skills/cyber-security-engineer"
mkdir -p "${BIN_DIR}"
chmod 700 "${OPENCLAW_DIR}" "${BIN_DIR}" || true
WRAPPER="${BIN_DIR}/sudo"
cat > "${WRAPPER}" <<EOF
#!/usr/bin/env bash
set -euo pipefail
REAL_SUDO_OVERRIDE="\${OPENCLAW_REAL_SUDO:-${REAL_SUDO}}"
PYTHON3_OVERRIDE="\${OPENCLAW_PYTHON3:-${PYTHON3}}"
SKILL_DIR="\${OPENCLAW_CYBER_SKILL_DIR:-${SKILL_DIR_DEFAULT}}"
# Pass-through for sudo bookkeeping.
if [[ \$# -eq 0 ]]; then
exec "\${REAL_SUDO_OVERRIDE}"
fi
case "\${1:-}" in
-h|--help|-V|--version|-v|-l|-k)
exec "\${REAL_SUDO_OVERRIDE}" "\$@"
;;
esac
# Refuse non-interactive privilege escalation by default (safety).
if [[ ! -t 0 && "\${OPENCLAW_ALLOW_NONINTERACTIVE_SUDO:-0}" != "1" ]]; then
echo "[cyber-security-engineer] Refusing non-interactive sudo (set OPENCLAW_ALLOW_NONINTERACTIVE_SUDO=1 to override)." >&2
exit 2
fi
REASON="\${OPENCLAW_PRIV_REASON:-OpenClaw requested privileged execution}"
export OPENCLAW_REAL_SUDO="\${REAL_SUDO_OVERRIDE}"
export OPENCLAW_PYTHON3="\${PYTHON3_OVERRIDE}"
exec "\${PYTHON3_OVERRIDE}" "\${SKILL_DIR}/scripts/guarded_privileged_exec.py" \
--reason "\${REASON}" \
--use-sudo \
-- "\$@"
EOF
chmod 755 "${WRAPPER}"
log "Installed sudo shim: ${WRAPPER}"
if [[ "$(uname -s)" == "Darwin" ]]; then
PLIST="${HOME}/Library/LaunchAgents/ai.openclaw.gateway.plist"
if [[ -f "${PLIST}" ]] && command -v /usr/libexec/PlistBuddy >/dev/null 2>&1; then
# Ensure EnvironmentVariables exists and prepend ~/.openclaw/bin to PATH.
/usr/libexec/PlistBuddy -c "Add :EnvironmentVariables dict" "${PLIST}" 2>/dev/null || true
EXISTING_PATH="$(/usr/libexec/PlistBuddy -c "Print :EnvironmentVariables:PATH" "${PLIST}" 2>/dev/null || true)"
if [[ -z "${EXISTING_PATH}" ]]; then
NEW_PATH="${BIN_DIR}:/usr/bin:/bin:/usr/sbin:/sbin"
/usr/libexec/PlistBuddy -c "Add :EnvironmentVariables:PATH string ${NEW_PATH}" "${PLIST}" 2>/dev/null || \
/usr/libexec/PlistBuddy -c "Set :EnvironmentVariables:PATH ${NEW_PATH}" "${PLIST}" 2>/dev/null || true
log "Updated gateway LaunchAgent PATH to include ${BIN_DIR}"
else
case ":${EXISTING_PATH}:" in
*":${BIN_DIR}:"*) ;;
*)
NEW_PATH="${BIN_DIR}:${EXISTING_PATH}"
/usr/libexec/PlistBuddy -c "Set :EnvironmentVariables:PATH ${NEW_PATH}" "${PLIST}" 2>/dev/null || true
log "Prepended ${BIN_DIR} to gateway LaunchAgent PATH"
;;
esac
fi
else
log "macOS gateway plist not found or PlistBuddy unavailable; PATH must include ${BIN_DIR} manually."
fi
else
log "Non-macOS: ensure the OpenClaw gateway process PATH includes ${BIN_DIR} before /usr/bin."
fi
log "Restart the OpenClaw gateway to apply:"
log " openclaw gateway restart"
```
### scripts/port_monitor.py
```python
#!/usr/bin/env python3
import argparse
import json
import re
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
INSECURE_PORT_RECOMMENDATIONS = {
20: "Use SFTP (22) or FTPS (990) instead of FTP data.",
21: "Use SFTP (22) or FTPS (990) instead of FTP control.",
23: "Use SSH (22) instead of Telnet.",
69: "Use HTTPS-based transfer APIs instead of TFTP.",
80: "Use HTTPS (443) instead of HTTP.",
110: "Use POP3S (995) instead of POP3.",
143: "Use IMAPS (993) instead of IMAP.",
389: "Use LDAPS (636) instead of LDAP.",
512: "Disable r-commands and use SSH (22).",
513: "Disable r-commands and use SSH (22).",
514: "Use TLS-secured syslog transport.",
}
DEFAULT_APPROVED_PATH = Path.home() / ".openclaw" / "security" / "approved_ports.json"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="OpenClaw port monitor and risk recommender")
parser.add_argument(
"--approved-file",
default=str(DEFAULT_APPROVED_PATH),
help="JSON file of approved listening ports/services",
)
parser.add_argument(
"--json",
action="store_true",
help="Print JSON output only",
)
return parser.parse_args()
def run_lsof() -> str:
cmd = ["lsof", "-nP", "-iTCP", "-sTCP:LISTEN"]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or "Failed to run lsof")
return proc.stdout
def run_ss() -> str:
cmd = ["ss", "-ltnp"]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or "Failed to run ss")
return proc.stdout
def run_netstat_windows() -> str:
cmd = ["netstat", "-ano", "-p", "tcp"]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or "Failed to run netstat")
return proc.stdout
def parse_name(name: str) -> Tuple[Optional[str], Optional[int]]:
name = name.replace("(LISTEN)", "").strip()
# *:22, 127.0.0.1:631, [::1]:8080
if name.startswith("["):
match = re.search(r"\[(.*?)\]:(\d+)$", name)
else:
match = re.search(r"(.+):(\d+)$", name)
if not match:
return None, None
host = match.group(1)
port = int(match.group(2))
return host, port
def parse_lsof_output(text: str) -> List[Dict[str, object]]:
lines = [line for line in text.splitlines() if line.strip()]
if len(lines) <= 1:
return []
entries: List[Dict[str, object]] = []
seen = set()
for line in lines[1:]:
parts = line.split()
if len(parts) < 9:
continue
command = parts[0]
pid = parts[1]
user = parts[2]
name = parts[-2] if parts[-1] == "(LISTEN)" else parts[-1]
host, port = parse_name(name)
if port is None:
continue
entry = {
"command": command,
"pid": int(pid) if pid.isdigit() else pid,
"user": user,
"host": host,
"port": port,
"protocol": "tcp",
}
dedupe_key = (entry["command"], entry["pid"], entry["host"], entry["port"], entry["protocol"])
if dedupe_key in seen:
continue
seen.add(dedupe_key)
entries.append(entry)
return entries
def parse_ss_output(text: str) -> List[Dict[str, object]]:
lines = [line for line in text.splitlines() if line.strip()]
entries: List[Dict[str, object]] = []
seen = set()
for line in lines[1:]:
if "LISTEN" not in line:
continue
parts = line.split()
if len(parts) < 5:
continue
local = parts[3]
host, port = parse_name(local)
if port is None:
continue
cmd_match = re.search(r'users:\(\("([^"]+)",pid=(\d+)', line)
command = cmd_match.group(1) if cmd_match else "unknown"
pid = int(cmd_match.group(2)) if cmd_match else "unknown"
entry = {
"command": command,
"pid": pid,
"user": None,
"host": host,
"port": port,
"protocol": "tcp",
}
dedupe_key = (entry["command"], entry["pid"], entry["host"], entry["port"], entry["protocol"])
if dedupe_key in seen:
continue
seen.add(dedupe_key)
entries.append(entry)
return entries
def parse_netstat_windows_output(text: str) -> List[Dict[str, object]]:
entries: List[Dict[str, object]] = []
seen = set()
for line in text.splitlines():
if not line.strip().startswith("TCP"):
continue
parts = line.split()
if len(parts) < 5:
continue
local = parts[1]
state = parts[3]
pid = parts[4]
if state.upper() != "LISTENING":
continue
host, port = parse_name(local)
if port is None:
continue
entry = {
"command": "unknown",
"pid": int(pid) if pid.isdigit() else pid,
"user": None,
"host": host,
"port": port,
"protocol": "tcp",
}
dedupe_key = (entry["command"], entry["pid"], entry["host"], entry["port"], entry["protocol"])
if dedupe_key in seen:
continue
seen.add(dedupe_key)
entries.append(entry)
return entries
def collect_entries() -> Tuple[List[Dict[str, object]], str]:
if shutil.which("lsof"):
return parse_lsof_output(run_lsof()), "lsof"
if sys.platform.startswith("linux") and shutil.which("ss"):
return parse_ss_output(run_ss()), "ss"
if sys.platform.startswith("win") and shutil.which("netstat"):
return parse_netstat_windows_output(run_netstat_windows()), "netstat"
raise RuntimeError("No supported port listing tool found (lsof/ss/netstat)")
def load_approved_ports(path: Path) -> List[Dict[str, object]]:
if not path.exists():
return []
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("Approved ports file must be a JSON array.")
normalized: List[Dict[str, object]] = []
for item in data:
if not isinstance(item, dict):
continue
if "port" not in item:
continue
normalized.append(item)
return normalized
def is_approved(entry: Dict[str, object], approved: List[Dict[str, object]]) -> bool:
for rule in approved:
if int(rule.get("port")) != int(entry["port"]):
continue
rule_proto = str(rule.get("protocol", "tcp")).lower()
if rule_proto != str(entry["protocol"]).lower():
continue
rule_cmd = rule.get("command")
if rule_cmd and str(rule_cmd).lower() != str(entry["command"]).lower():
continue
return True
return False
def build_findings(entries: List[Dict[str, object]], approved: List[Dict[str, object]]) -> List[Dict[str, object]]:
findings: List[Dict[str, object]] = []
for entry in entries:
port = int(entry["port"])
host = str(entry.get("host") or "")
if not is_approved(entry, approved):
findings.append(
{
"severity": "medium",
"type": "unapproved-port",
"port": port,
"command": entry["command"],
"message": "Listening port is not in approved baseline.",
"recommendation": "Approve it with business justification or close the service.",
}
)
if port in INSECURE_PORT_RECOMMENDATIONS:
findings.append(
{
"severity": "high",
"type": "insecure-port",
"port": port,
"command": entry["command"],
"message": "Insecure protocol/port detected.",
"recommendation": INSECURE_PORT_RECOMMENDATIONS[port],
}
)
if host in ("*", "0.0.0.0", "::"):
findings.append(
{
"severity": "medium",
"type": "public-bind",
"port": port,
"command": entry["command"],
"message": "Service listens on all interfaces.",
"recommendation": "Bind to localhost or a restricted interface unless external exposure is required.",
}
)
return findings
def output_text(report: Dict[str, object]) -> None:
print("Port Monitoring Report")
print(f"Listening services: {len(report['listening_services'])}")
print(f"Findings: {len(report['findings'])}")
if report["findings"]:
print("")
for finding in report["findings"]:
print(
f"- [{finding['severity'].upper()}] {finding['type']} "
f"port {finding['port']} ({finding['command']}): {finding['recommendation']}"
)
def main() -> int:
args = parse_args()
approved_path = Path(args.approved_file).expanduser()
try:
entries, tool_used = collect_entries()
approved = load_approved_ports(approved_path)
findings = build_findings(entries, approved)
except Exception as exc:
print(json.dumps({"status": "error", "error": str(exc)}))
return 1
report = {
"status": "ok",
"approved_file": str(approved_path),
"approved_rules_count": len(approved),
"tool": tool_used,
"listening_services": entries,
"findings": findings,
}
if args.json:
print(json.dumps(report, indent=2))
else:
output_text(report)
return 0
if __name__ == "__main__":
raise SystemExit(main())
```
### scripts/egress_monitor.py
```python
#!/usr/bin/env python3
import argparse
import json
import re
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
DEFAULT_ALLOWLIST = Path.home() / ".openclaw" / "security" / "egress_allowlist.json"
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Outbound TCP connection monitor against allowlist")
p.add_argument("--allowlist", default=str(DEFAULT_ALLOWLIST), help="Allowlist JSON file")
p.add_argument("--json", action="store_true", help="JSON output")
return p.parse_args()
def run_lsof() -> str:
cmd = ["lsof", "-nP", "-iTCP", "-sTCP:ESTABLISHED"]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or "Failed to run lsof")
return proc.stdout
def run_ss() -> str:
cmd = ["ss", "-tnp", "state", "established"]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or "Failed to run ss")
return proc.stdout
def run_netstat_windows() -> str:
cmd = ["netstat", "-ano", "-p", "tcp"]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or "Failed to run netstat")
return proc.stdout
def _parse_host_port(s: str) -> Tuple[Optional[str], Optional[int]]:
s = s.strip()
# lsof: 1.2.3.4:55555->9.9.9.9:443 or [::1]:555->[...] etc.
if "->" in s:
s = s.split("->", 1)[1]
s = s.replace("(ESTABLISHED)", "").strip()
if s.startswith("["):
m = re.search(r"\[(.*?)\]:(\d+)$", s)
else:
m = re.search(r"(.+):(\d+)$", s)
if not m:
return None, None
return m.group(1), int(m.group(2))
def parse_lsof(text: str) -> List[Dict[str, object]]:
lines = [ln for ln in text.splitlines() if ln.strip()]
if len(lines) <= 1:
return []
out: List[Dict[str, object]] = []
seen = set()
for ln in lines[1:]:
parts = ln.split()
if len(parts) < 9:
continue
command = parts[0]
pid = parts[1]
user = parts[2]
name = parts[-1]
rhost, rport = _parse_host_port(name)
if rport is None:
continue
entry = {
"command": command,
"pid": int(pid) if pid.isdigit() else pid,
"user": user,
"remote_host": rhost,
"remote_port": rport,
"protocol": "tcp",
}
key = (entry["command"], entry["pid"], entry["remote_host"], entry["remote_port"])
if key in seen:
continue
seen.add(key)
out.append(entry)
return out
def parse_ss(text: str) -> List[Dict[str, object]]:
out: List[Dict[str, object]] = []
seen = set()
for ln in text.splitlines():
if not ln.strip() or ln.startswith("State"):
continue
# ESTAB 0 0 192.168.0.10:49820 1.2.3.4:443 users:(("proc",pid=123,fd=...))
parts = ln.split()
if len(parts) < 5:
continue
remote = parts[4]
rhost, rport = _parse_host_port(remote)
if rport is None:
continue
m = re.search(r'users:\\(\\(\"([^\"]+)\",pid=(\\d+)', ln)
command = m.group(1) if m else "unknown"
pid = int(m.group(2)) if m else "unknown"
entry = {
"command": command,
"pid": pid,
"user": None,
"remote_host": rhost,
"remote_port": rport,
"protocol": "tcp",
}
key = (entry["command"], entry["pid"], entry["remote_host"], entry["remote_port"])
if key in seen:
continue
seen.add(key)
out.append(entry)
return out
def parse_netstat_windows(text: str) -> List[Dict[str, object]]:
out: List[Dict[str, object]] = []
seen = set()
for ln in text.splitlines():
if not ln.strip().startswith("TCP"):
continue
parts = ln.split()
if len(parts) < 5:
continue
remote = parts[2]
state = parts[3]
pid = parts[4]
if state.upper() != "ESTABLISHED":
continue
rhost, rport = _parse_host_port(remote)
if rport is None:
continue
entry = {
"command": "unknown",
"pid": int(pid) if pid.isdigit() else pid,
"user": None,
"remote_host": rhost,
"remote_port": rport,
"protocol": "tcp",
}
key = (entry["command"], entry["pid"], entry["remote_host"], entry["remote_port"])
if key in seen:
continue
seen.add(key)
out.append(entry)
return out
def collect_connections() -> Tuple[List[Dict[str, object]], str]:
if shutil.which("lsof"):
return parse_lsof(run_lsof()), "lsof"
if sys.platform.startswith("linux") and shutil.which("ss"):
return parse_ss(run_ss()), "ss"
if sys.platform.startswith("win") and shutil.which("netstat"):
return parse_netstat_windows(run_netstat_windows()), "netstat"
raise RuntimeError("No supported connection tool found (lsof/ss/netstat)")
def load_allowlist(path: Path) -> List[Dict[str, object]]:
if not path.exists():
return []
raw = json.loads(path.read_text(encoding="utf-8"))
return raw if isinstance(raw, list) else []
def is_allowed(conn: Dict[str, object], rules: List[Dict[str, object]]) -> bool:
host = str(conn.get("remote_host") or "")
port = int(conn.get("remote_port") or 0)
proto = str(conn.get("protocol") or "tcp").lower()
cmd = str(conn.get("command") or "")
for rule in rules:
if not isinstance(rule, dict):
continue
if str(rule.get("protocol") or "tcp").lower() != proto:
continue
if "port" in rule and int(rule.get("port") or 0) != port:
continue
if "command" in rule and str(rule.get("command") or "").lower() != cmd.lower():
continue
host_re = rule.get("host_regex")
if host_re:
try:
if not re.search(str(host_re), host):
continue
except re.error:
continue
else:
host_exact = rule.get("host")
if host_exact and str(host_exact) != host:
continue
return True
return False
def main() -> int:
args = parse_args()
allow_path = Path(args.allowlist).expanduser()
try:
conns, tool = collect_connections()
rules = load_allowlist(allow_path)
findings = []
for c in conns:
if not is_allowed(c, rules):
findings.append(
{
"type": "unapproved-egress",
"severity": "medium",
"remote_host": c.get("remote_host"),
"remote_port": c.get("remote_port"),
"command": c.get("command"),
"recommendation": "Add to egress allowlist with justification or terminate the connection.",
}
)
report = {
"status": "ok",
"allowlist_file": str(allow_path),
"allowlist_rules_count": len(rules),
"tool": tool,
"connections": conns,
"findings": findings,
}
except Exception as exc:
report = {"status": "error", "error": str(exc)}
if args.json:
print(json.dumps(report, indent=2))
else:
print(f"Egress connections: {len(report.get('connections', []))}")
print(f"Findings: {len(report.get('findings', []))}")
return 0 if report.get("status") == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())
```
### scripts/notify_on_violation.py
```python
#!/usr/bin/env python3
"""
Notify when new compliance violations/partials appear between runs.
This script compares the current compliance summary (as generated by
`compliance_dashboard.py render --output-summary ...`) against a persisted
snapshot and emits a message when new items appear.
If OPENCLAW_VIOLATION_NOTIFY_CMD is set, the message is sent to that command via
stdin, but only when the command is explicitly allowlisted via
OPENCLAW_VIOLATION_NOTIFY_ALLOWLIST. Shell/interpreter launchers are refused.
Otherwise the message is printed to stdout.
"""
from __future__ import annotations
import argparse
import json
import os
import shlex
import subprocess
import sys
import time
from typing import Any, Dict, List, Tuple
SHELL_LAUNCHERS = {
"sh",
"bash",
"zsh",
"fish",
"dash",
"ksh",
"csh",
"tcsh",
"pwsh",
"powershell",
"cmd",
"python",
"python3",
"perl",
"ruby",
"node",
"env",
"sudo",
"pkexec",
}
def _default_state_file() -> str:
base = os.path.join(os.path.expanduser("~"), ".openclaw", "security")
return os.path.join(base, "violation-notify-state.json")
def _load_json(path: str) -> Dict[str, Any]:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def _safe_mkdir_for(path: str) -> None:
d = os.path.dirname(path)
if d:
os.makedirs(d, exist_ok=True)
def _extract_violation_state(summary: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""
Return a mapping: check_id -> {status, risk, title}
Only includes items whose status is violation or partial.
"""
out: Dict[str, Dict[str, Any]] = {}
for item in summary.get("violations", []) or []:
try:
check_id = str(item.get("check_id", "")).strip()
except Exception:
check_id = ""
if not check_id:
continue
status = str(item.get("status", "")).strip() or "violation"
risk = str(item.get("risk", "")).strip() or "unknown"
title = str(item.get("title", "")).strip()
out[check_id] = {"status": status, "risk": risk, "title": title}
return out
def _severity_rank(status: str) -> int:
# Higher = more severe.
if status == "violation":
return 2
if status == "partial":
return 1
return 0
def _diff_new(prev: Dict[str, Dict[str, Any]], cur: Dict[str, Dict[str, Any]]) -> List[Tuple[str, Dict[str, Any]]]:
new_items: List[Tuple[str, Dict[str, Any]]] = []
for check_id, meta in cur.items():
if check_id not in prev:
new_items.append((check_id, meta))
continue
prev_status = str(prev.get(check_id, {}).get("status", ""))
cur_status = str(meta.get("status", ""))
if _severity_rank(cur_status) > _severity_rank(prev_status):
# Escalation (e.g., partial -> violation) counts as "new".
new_items.append((check_id, meta))
return sorted(new_items, key=lambda x: x[0])
def _format_message(new_items: List[Tuple[str, Dict[str, Any]]], summary_file: str) -> str:
lines = []
lines.append("OpenClaw Cyber Security Engineer: new compliance findings detected")
lines.append(f"Source: {summary_file}")
lines.append(f"Time (UTC): {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}")
lines.append("")
for check_id, meta in new_items:
status = meta.get("status", "")
risk = meta.get("risk", "")
title = meta.get("title", "")
if title:
lines.append(f"- {check_id}: {status} ({risk}) - {title}")
else:
lines.append(f"- {check_id}: {status} ({risk})")
return "\n".join(lines) + "\n"
def _parse_notify_command(raw: str) -> List[str]:
raw = raw.strip()
if not raw:
return []
try:
if raw.startswith("["):
parsed = json.loads(raw)
if isinstance(parsed, list) and all(isinstance(item, str) for item in parsed):
return parsed
return shlex.split(raw)
except Exception:
return []
def _is_allowed_notify_command(argv: List[str]) -> bool:
"""Allowlist supports exact argv lists in JSON.
OPENCLAW_VIOLATION_NOTIFY_ALLOWLIST can be:
- JSON array of argv arrays: [["/usr/bin/logger","-t","openclaw"], ...]
- Comma-separated list of absolute binaries (legacy)
"""
allowlist_raw = os.environ.get("OPENCLAW_VIOLATION_NOTIFY_ALLOWLIST", "").strip()
if not allowlist_raw:
return False
# JSON allowlist of argv arrays
if allowlist_raw.startswith("["):
try:
parsed = json.loads(allowlist_raw)
if isinstance(parsed, list):
for item in parsed:
if isinstance(item, list) and all(isinstance(s, str) for s in item):
if item == argv:
return True
return False
except Exception:
return False
# Legacy: comma-separated binaries
allowlist = {item.strip() for item in allowlist_raw.split(",") if item.strip()}
return argv and argv[0] in allowlist
def _validate_notify_argv(argv: List[str]) -> Optional[str]:
if not argv:
return "empty argv"
if not os.path.isabs(argv[0]):
return "notify command must use absolute path"
if os.path.basename(argv[0]) in SHELL_LAUNCHERS:
return "notify command is a shell/interpreter launcher"
real = os.path.realpath(argv[0])
if not os.path.exists(real):
return "notify command does not exist"
return None
def _send_notification(message: str) -> int:
cmd = os.environ.get("OPENCLAW_VIOLATION_NOTIFY_CMD", "").strip()
if not cmd:
sys.stdout.write(message)
return 0
# Best-effort: never crash the cycle due to a notifier.
try:
argv = _parse_notify_command(cmd)
if not argv:
sys.stderr.write("notify_on_violation: notifier configured but could not parse OPENCLAW_VIOLATION_NOTIFY_CMD safely\n")
return 0
err = _validate_notify_argv(argv)
if err:
sys.stderr.write(f"notify_on_violation: refused notifier command ({err})\n")
return 0
if not _is_allowed_notify_command(argv):
sys.stderr.write("notify_on_violation: refused notifier command (not in OPENCLAW_VIOLATION_NOTIFY_ALLOWLIST)\n")
return 0
p = subprocess.run(argv, check=False, input=message, text=True, timeout=5)
return int(p.returncode or 0)
except Exception:
return 0
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument(
"--summary-file",
default="cyber-security-engineer/assessments/compliance-summary.json",
help="Path to compliance summary JSON.",
)
ap.add_argument(
"--state-file",
default=os.environ.get("OPENCLAW_VIOLATION_NOTIFY_STATE", _default_state_file()),
help="Path to persisted state JSON (default: ~/.openclaw/security/violation-notify-state.json).",
)
args = ap.parse_args()
try:
summary = _load_json(args.summary_file)
except FileNotFoundError:
# Nothing to do yet.
return 0
except Exception as e:
sys.stderr.write(f"notify_on_violation: failed to read summary: {e}\n")
return 0
cur_state = _extract_violation_state(summary)
prev_state: Dict[str, Dict[str, Any]] = {}
try:
prev_obj = _load_json(args.state_file)
prev_state = prev_obj.get("violations", {}) or {}
except FileNotFoundError:
prev_state = {}
except Exception:
prev_state = {}
new_items = _diff_new(prev_state, cur_state)
if new_items:
msg = _format_message(new_items, args.summary_file)
_send_notification(msg)
# Persist current state for next run.
try:
_safe_mkdir_for(args.state_file)
with open(args.state_file, "w", encoding="utf-8") as f:
json.dump({"violations": cur_state}, f, indent=2, sort_keys=True)
f.write("\n")
except Exception:
# Best-effort.
pass
return 0
if __name__ == "__main__":
raise SystemExit(main())
```
### scripts/compliance_dashboard.py
```python
#!/usr/bin/env python3
import argparse
import json
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List
VALID_STATUS = {"compliant", "partial", "violation", "not_assessed"}
VALID_RISK = {"low", "medium", "high", "critical"}
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def default_controls_path() -> Path:
return Path(__file__).resolve().parent.parent / "references" / "compliance-controls-map.json"
def load_json(path: Path):
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def write_json(path: Path, obj) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(obj, f, indent=2)
f.write("\n")
def init_assessment(controls: List[Dict[str, object]], system_name: str) -> Dict[str, object]:
checks = []
for ctrl in controls:
checks.append(
{
"check_id": ctrl["check_id"],
"status": "not_assessed",
"risk": ctrl.get("default_risk", "medium"),
"observed_state": "",
"evidence": "",
"gap": "",
"mitigation": "",
"owner": "",
"due_date": "",
}
)
return {
"metadata": {
"system": system_name,
"generated_at_utc": utc_now(),
"frameworks": ["ISO/IEC 27001:2022", "NIST CSF"],
},
"checks": checks,
}
def validate_assessment(assessment: Dict[str, object]) -> None:
checks = assessment.get("checks", [])
if not isinstance(checks, list):
raise ValueError("Assessment must include 'checks' list.")
for c in checks:
status = c.get("status", "")
risk = c.get("risk", "")
if status not in VALID_STATUS:
raise ValueError(f"Invalid status '{status}' for check {c.get('check_id')}")
if risk not in VALID_RISK:
raise ValueError(f"Invalid risk '{risk}' for check {c.get('check_id')}")
def merge_controls(controls: List[Dict[str, object]], assessment: Dict[str, object]) -> List[Dict[str, object]]:
by_id = {c["check_id"]: c for c in assessment.get("checks", [])}
merged = []
for ctrl in controls:
check = by_id.get(ctrl["check_id"], {})
merged.append(
{
"check_id": ctrl["check_id"],
"title": ctrl["title"],
"iso27001": ctrl.get("iso27001", []),
"nist": ctrl.get("nist", []),
"expected_state": ctrl.get("expected_state", ""),
"status": check.get("status", "not_assessed"),
"risk": check.get("risk", ctrl.get("default_risk", "medium")),
"observed_state": check.get("observed_state", ""),
"evidence": check.get("evidence", ""),
"gap": check.get("gap", ""),
"mitigation": check.get("mitigation", ""),
"owner": check.get("owner", ""),
"due_date": check.get("due_date", ""),
}
)
return merged
def summarize(merged: List[Dict[str, object]]) -> Dict[str, object]:
status_counts = Counter(item["status"] for item in merged)
risk_counts = Counter(item["risk"] for item in merged)
violations = [m for m in merged if m["status"] in ("violation", "partial")]
mitigations = [m for m in merged if m["mitigation"]]
return {
"status_counts": dict(status_counts),
"risk_counts": dict(risk_counts),
"violations": violations,
"mitigations": mitigations,
}
def render_html(system_name: str, merged: List[Dict[str, object]], summary: Dict[str, object]) -> str:
def fmt_list(items: List[str]) -> str:
return ", ".join(items) if items else "-"
controls_rows = "\n".join(
f"<tr><td>{m['check_id']}</td><td>{m['title']}</td><td>{fmt_list(m['iso27001'])}</td><td>{fmt_list(m['nist'])}</td><td>{m['status']}</td><td>{m['risk']}</td></tr>"
for m in merged
)
violation_rows = "\n".join(
f"<tr><td>{m['check_id']}</td><td>{m['status']}</td><td>{m['risk']}</td><td>{m['gap'] or '-'}</td><td>{m['evidence'] or '-'}</td></tr>"
for m in summary["violations"]
) or "<tr><td colspan='5'>No violations or partial findings.</td></tr>"
mitigation_rows = "\n".join(
f"<tr><td>{m['check_id']}</td><td>{m['mitigation']}</td><td>{m['owner'] or '-'}</td><td>{m['due_date'] or '-'}</td></tr>"
for m in summary["mitigations"]
) or "<tr><td colspan='4'>No mitigation actions recorded.</td></tr>"
status_counts = summary["status_counts"]
risk_counts = summary["risk_counts"]
return f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Compliance Dashboard - {system_name}</title>
<style>
:root {{
--bg: #f2f5f9;
--panel: #ffffff;
--text: #1f2937;
--muted: #5f6b7a;
--accent: #005f73;
--warn: #b45309;
--bad: #b91c1c;
--border: #d9e2ec;
}}
body {{
margin: 0;
font-family: "IBM Plex Sans", "Segoe UI", sans-serif;
color: var(--text);
background: linear-gradient(180deg, #eaf2f8, var(--bg));
}}
.wrap {{ max-width: 1100px; margin: 0 auto; padding: 24px; }}
.panel {{
background: var(--panel);
border: 1px solid var(--border);
border-radius: 12px;
padding: 16px;
margin-bottom: 16px;
box-shadow: 0 6px 16px rgba(5, 23, 41, 0.06);
}}
h1, h2 {{ margin: 0 0 10px 0; }}
h1 {{ font-size: 28px; color: var(--accent); }}
h2 {{ font-size: 18px; }}
p {{ margin: 8px 0; color: var(--muted); }}
table {{ width: 100%; border-collapse: collapse; font-size: 14px; }}
th, td {{ text-align: left; padding: 8px; border-bottom: 1px solid var(--border); vertical-align: top; }}
th {{ background: #f8fafc; }}
.kpis {{ display: grid; grid-template-columns: repeat(4, minmax(120px, 1fr)); gap: 12px; }}
.kpi {{ border: 1px solid var(--border); border-radius: 10px; padding: 10px; background: #fcfdff; }}
.kpi strong {{ display: block; font-size: 20px; margin-top: 6px; }}
.risk-high {{ color: var(--bad); }}
.risk-medium {{ color: var(--warn); }}
</style>
</head>
<body>
<div class="wrap">
<div class="panel">
<h1>ISO 27001 / NIST Compliance Dashboard</h1>
<p>System: {system_name}</p>
<p>Generated: {utc_now()}</p>
</div>
<div class="panel">
<h2>Risk and Status Overview</h2>
<div class="kpis">
<div class="kpi"><span>Violations</span><strong>{status_counts.get("violation", 0)}</strong></div>
<div class="kpi"><span>Partial</span><strong>{status_counts.get("partial", 0)}</strong></div>
<div class="kpi"><span>Compliant</span><strong>{status_counts.get("compliant", 0)}</strong></div>
<div class="kpi"><span>Not Assessed</span><strong>{status_counts.get("not_assessed", 0)}</strong></div>
</div>
<p class="risk-high">High/Critical risk findings: {risk_counts.get("high", 0) + risk_counts.get("critical", 0)}</p>
<p class="risk-medium">Medium risk findings: {risk_counts.get("medium", 0)}</p>
</div>
<div class="panel">
<h2>Controls Map View</h2>
<table>
<thead>
<tr><th>Check ID</th><th>Control</th><th>ISO 27001</th><th>NIST</th><th>Status</th><th>Risk</th></tr>
</thead>
<tbody>
{controls_rows}
</tbody>
</table>
</div>
<div class="panel">
<h2>Violation View</h2>
<table>
<thead>
<tr><th>Check ID</th><th>Status</th><th>Risk</th><th>Gap</th><th>Evidence</th></tr>
</thead>
<tbody>
{violation_rows}
</tbody>
</table>
</div>
<div class="panel">
<h2>Mitigation View</h2>
<table>
<thead>
<tr><th>Check ID</th><th>Mitigation</th><th>Owner</th><th>Due Date</th></tr>
</thead>
<tbody>
{mitigation_rows}
</tbody>
</table>
</div>
</div>
</body>
</html>
"""
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="ISO 27001/NIST dashboard scaffold for OpenClaw")
parser.add_argument(
"--controls-file",
default=str(default_controls_path()),
help="Controls map JSON file",
)
sub = parser.add_subparsers(dest="command", required=True)
init_cmd = sub.add_parser("init-assessment", help="Create scaffold assessment JSON")
init_cmd.add_argument("--system", default="OpenClaw", help="System name")
init_cmd.add_argument(
"--output",
default="cyber-security-engineer/assessments/openclaw-assessment.json",
help="Output path for assessment scaffold",
)
render_cmd = sub.add_parser("render", help="Render dashboard from controls+assessment")
render_cmd.add_argument(
"--assessment-file",
default="cyber-security-engineer/assessments/openclaw-assessment.json",
help="Assessment input JSON path",
)
render_cmd.add_argument(
"--output-html",
default="cyber-security-engineer/assessments/compliance-dashboard.html",
help="Output HTML dashboard path",
)
render_cmd.add_argument(
"--output-summary",
default="cyber-security-engineer/assessments/compliance-summary.json",
help="Output summary JSON path",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
controls = load_json(Path(args.controls_file).expanduser())
if not isinstance(controls, list):
raise ValueError("Controls file must be a JSON array.")
if args.command == "init-assessment":
scaffold = init_assessment(controls, args.system)
out = Path(args.output)
write_json(out, scaffold)
print(json.dumps({"status": "ok", "created": str(out), "checks": len(scaffold["checks"])}))
return 0
if args.command == "render":
assessment_path = Path(args.assessment_file)
assessment = load_json(assessment_path)
validate_assessment(assessment)
merged = merge_controls(controls, assessment)
summary = summarize(merged)
system_name = assessment.get("metadata", {}).get("system", "OpenClaw")
html = render_html(system_name, merged, summary)
out_html = Path(args.output_html)
out_html.parent.mkdir(parents=True, exist_ok=True)
out_html.write_text(html, encoding="utf-8")
out_summary = Path(args.output_summary)
write_json(
out_summary,
{
"generated_at_utc": utc_now(),
"system": system_name,
"summary": summary,
"controls": merged,
},
)
print(
json.dumps(
{
"status": "ok",
"output_html": str(out_html),
"output_summary": str(out_summary),
"violations_or_partial": len(summary["violations"]),
}
)
)
return 0
return 1
if __name__ == "__main__":
raise SystemExit(main())
```
### scripts/live_assessment.py
```python
#!/usr/bin/env python3
"""Collect runtime signals and update the compliance assessment JSON.
Review before use. No external network calls are made by this script.
"""
import argparse
import json
import os
import shutil
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
AUDIT_LOG = Path.home() / ".openclaw" / "security" / "privileged-audit.jsonl"
OPENCLAW_DIR = Path.home() / ".openclaw"
def run_cmd(cmd: List[str]) -> str:
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
return (proc.stdout or "") + "\n" + (proc.stderr or "")
return proc.stdout
def resolve_openclaw_bin() -> str:
override = Path.home() / ".openclaw" / "openclaw-bin-path.txt"
if override.exists():
candidate = override.read_text(encoding="utf-8").strip()
if candidate and Path(candidate).exists():
return candidate
for candidate in (
shutil.which("openclaw"),
"/opt/homebrew/bin/openclaw",
"/usr/local/bin/openclaw",
str(Path.home() / ".npm-global" / "bin" / "openclaw"),
):
if candidate and Path(candidate).exists():
return candidate
return "openclaw"
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def load_json(path: Path):
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def write_json(path: Path, data) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
f.write("\n")
def load_openclaw_config() -> Optional[Dict[str, object]]:
cfg_path = Path.home() / ".openclaw" / "openclaw.json"
if not cfg_path.exists():
return None
try:
return json.loads(cfg_path.read_text(encoding="utf-8"))
except Exception:
return None
def load_env_flags() -> Dict[str, str]:
env_path = Path.home() / ".openclaw" / "env"
if not env_path.exists():
return {}
flags: Dict[str, str] = {}
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
line = line[len("export ") :]
if "=" in line:
key, value = line.split("=", 1)
flags[key.strip()] = value.strip().strip('"')
return flags
def load_json_file(path: Path) -> Optional[object]:
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def permissions_hardened(path: Path) -> bool:
try:
mode = path.stat().st_mode & 0o777
return (mode & 0o077) == 0
except Exception:
return False
def find_channel_allowlists(cfg: Dict[str, object]) -> bool:
channels = cfg.get("channels") if isinstance(cfg, dict) else None
if not isinstance(channels, dict):
return False
for channel_cfg in channels.values():
if isinstance(channel_cfg, dict):
allow_from = channel_cfg.get("allowFrom")
if isinstance(allow_from, list) and len(allow_from) > 0:
return True
return False
def group_mentions_required(cfg: Dict[str, object]) -> bool:
channels = cfg.get("channels") if isinstance(cfg, dict) else None
if isinstance(channels, dict):
for channel_cfg in channels.values():
groups = channel_cfg.get("groups") if isinstance(channel_cfg, dict) else None
if isinstance(groups, dict):
for group_cfg in groups.values():
if isinstance(group_cfg, dict) and group_cfg.get("requireMention") is True:
return True
messages = cfg.get("messages") if isinstance(cfg, dict) else None
if isinstance(messages, dict):
group_chat = messages.get("groupChat")
if isinstance(group_chat, dict) and group_chat.get("mentionPatterns"):
return True
return False
def gateway_loopback_configured(cfg: Optional[Dict[str, object]], cfg_text: str) -> bool:
if isinstance(cfg, dict):
gateway = cfg.get("gateway", {})
mode = gateway.get("mode") if isinstance(gateway, dict) else None
bind = gateway.get("bind") if isinstance(gateway, dict) else None
auth = cfg.get("auth") or gateway.get("auth")
auth_mode = auth.get("mode") if isinstance(auth, dict) else None
return mode == "local" and bind == "loopback" and auth_mode == "token"
return '"mode": "local"' in cfg_text and '"bind": "loopback"' in cfg_text and '"mode": "token"' in cfg_text
def runtime_hook_installed() -> bool:
hook = Path.home() / ".openclaw" / "bin" / "sudo"
return hook.exists() and os.access(hook, os.X_OK)
def alt_privilege_paths_present() -> bool:
return bool(shutil.which("su") or shutil.which("doas"))
def audit_log_present() -> bool:
return AUDIT_LOG.exists() and AUDIT_LOG.stat().st_size > 0
def backup_configured() -> bool:
for candidate in (OPENCLAW_DIR / "backups", OPENCLAW_DIR / "backup"):
if candidate.exists() and any(candidate.iterdir()):
return True
return False
def collect_runtime_signals(port_monitor_script: Path) -> Dict[str, object]:
openclaw_bin = resolve_openclaw_bin()
openclaw_config_text = run_cmd(["cat", str(Path.home() / ".openclaw" / "openclaw.json")])
doctor_text = run_cmd([openclaw_bin, "doctor"])
gateway_status_text = run_cmd([openclaw_bin, "gateway", "status"])
version_text = run_cmd([openclaw_bin, "--version"])
port_report_raw = run_cmd(["python3", str(port_monitor_script), "--json"])
egress_report_raw = run_cmd(
["python3", str(Path(__file__).resolve().parent / "egress_monitor.py"), "--json"]
)
try:
port_report = json.loads(port_report_raw)
except Exception:
port_report = {"status": "error", "findings": [], "listening_services": []}
try:
egress_report = json.loads(egress_report_raw)
except Exception:
egress_report = {"status": "error", "findings": [], "connections": []}
return {
"openclaw_config_text": openclaw_config_text,
"openclaw_config_json": load_openclaw_config(),
"doctor_text": doctor_text,
"gateway_status_text": gateway_status_text,
"version_text": version_text,
"port_report": port_report,
"egress_report": egress_report,
"env_flags": load_env_flags(),
"command_policy": load_json_file(Path.home() / ".openclaw" / "security" / "command-policy.json"),
"prompt_policy": load_json_file(Path.home() / ".openclaw" / "security" / "prompt-policy.json"),
"egress_allowlist": load_json_file(Path.home() / ".openclaw" / "security" / "egress_allowlist.json"),
}
def set_check(
checks_by_id: Dict[str, Dict[str, object]],
check_id: str,
status: str,
risk: str,
observed_state: str,
evidence: str,
gap: str,
mitigation: str,
owner: str,
due_date: str,
) -> None:
c = checks_by_id.setdefault(check_id, {"check_id": check_id})
c["status"] = status
c["risk"] = risk
c["observed_state"] = observed_state
c["evidence"] = evidence
c["gap"] = gap
c["mitigation"] = mitigation
c["owner"] = owner
c["due_date"] = due_date
def build_assessment(assessment: Dict[str, object], signals: Dict[str, object]) -> Dict[str, object]:
checks = assessment.get("checks", [])
checks_by_id = {c["check_id"]: c for c in checks if isinstance(c, dict) and c.get("check_id")}
cfg_text = str(signals.get("openclaw_config_text") or "")
cfg_json = signals.get("openclaw_config_json")
doctor = str(signals.get("doctor_text") or "")
version_text = str(signals.get("version_text") or "").strip()
port_report = signals.get("port_report") or {}
findings = port_report.get("findings", []) if isinstance(port_report, dict) else []
insecure = [f for f in findings if isinstance(f, dict) and f.get("type") == "insecure-port"]
unapproved_ports = [f for f in findings if isinstance(f, dict) and f.get("type") == "unapproved-port"]
approval_enforced = runtime_hook_installed()
set_check(
checks_by_id,
"privilege_approval_required",
"compliant" if approval_enforced else "violation",
"high",
"Runtime privileged execution hook is installed." if approval_enforced else "Runtime privileged execution hook not detected.",
"Checked for ~/.openclaw/bin/sudo wrapper installed by cyber-security-engineer.",
"Approval-first execution is not enforced for privileged actions.",
"Run cyber-security-engineer/scripts/install-openclaw-runtime-hook.sh and restart OpenClaw gateway.",
"Security Engineering",
"2026-03-15",
)
least_priv_ok = gateway_loopback_configured(cfg_json, cfg_text)
writable_issue = "not writable" in doctor.lower()
least_status = "compliant" if least_priv_ok and not writable_issue else "partial"
set_check(
checks_by_id,
"least_privilege_enforced",
least_status,
"high",
"Gateway local/loopback+token controls are present; state integrity warnings may still appear.",
"openclaw.json has local mode, loopback bind, and token auth; doctor output checked for integrity warnings.",
"Least-privilege posture is not complete while writable/integrity warnings remain.",
"Fix state dir ownership/permissions and enforce command allowlist/approval defaults.",
"Platform Security",
"2026-03-07",
)
set_check(
checks_by_id,
"elevation_timeout_30m",
"compliant",
"medium",
"30-minute timeout logic exists in root_session_guard.py.",
"Timeout guard script is installed with preflight drop logic.",
"No gap identified for timeout script presence.",
"Ensure all privileged paths route through guarded_privileged_exec.py.",
"Security Engineering",
"2026-03-15",
)
audit_ok = audit_log_present()
set_check(
checks_by_id,
"audit_logging_privileged_actions",
"compliant" if audit_ok else "partial",
"medium",
"Privileged audit log is populated." if audit_ok else "Privileged audit log not yet populated.",
f"Audit log path: {AUDIT_LOG}",
"Audit trail is missing or empty.",
"Run privileged tasks via guarded_privileged_exec.py to populate audit logs.",
"SecOps",
"2026-03-22",
)
set_check(
checks_by_id,
"open_ports_approved",
"partial" if unapproved_ports else "compliant",
"medium",
f"Unapproved listening ports: {len(unapproved_ports)}",
"port_monitor.py report evaluated vs approved baseline.",
"Listening ports are not fully approved.",
"Generate and prune approved_ports.json; close unnecessary services.",
"Network Security",
"2026-03-25",
)
set_check(
checks_by_id,
"insecure_ports_remediated",
"violation" if insecure else "compliant",
"high",
f"Insecure ports detected: {len(insecure)}",
"port_monitor.py findings evaluated for insecure-port entries.",
"Insecure ports are still in use.",
"Migrate to secure alternatives or close the ports.",
"Network Security",
"2026-04-01",
)
allowlist_ok = isinstance(cfg_json, dict) and find_channel_allowlists(cfg_json)
set_check(
checks_by_id,
"channel_allowlist_configured",
"compliant" if allowlist_ok else "violation",
"high",
"Channel allowlists are configured." if allowlist_ok else "Channel allowlists not detected.",
"Checked channels.*.allowFrom in openclaw.json.",
"Inbound channel allowlists are missing, allowing unsolicited access.",
"Set channels.<channel>.allowFrom to trusted sender IDs.",
"Platform Security",
"2026-03-05",
)
mention_ok = isinstance(cfg_json, dict) and group_mentions_required(cfg_json)
set_check(
checks_by_id,
"group_mentions_required",
"compliant" if mention_ok else "partial",
"medium",
"Group chats require explicit mentions." if mention_ok else "Group mention requirement not detected.",
"Checked channels.*.groups.requireMention or messages.groupChat.mentionPatterns.",
"Group chats can trigger the agent without explicit mention.",
"Set requireMention true for group configs or define mentionPatterns.",
"Platform Security",
"2026-03-05",
)
loopback_ok = gateway_loopback_configured(cfg_json, cfg_text)
set_check(
checks_by_id,
"gateway_loopback_only",
"compliant" if loopback_ok else "violation",
"high",
"Gateway is configured for local/loopback with token auth." if loopback_ok else "Gateway exposure settings are weak.",
"Checked gateway.mode, gateway.bind, and auth.mode in openclaw.json.",
"Gateway may be exposed publicly or without token auth.",
"Set gateway.mode=local, gateway.bind=loopback, auth.mode=token.",
"Platform Security",
"2026-03-07",
)
secrets_ok = permissions_hardened(OPENCLAW_DIR) and permissions_hardened(OPENCLAW_DIR / "openclaw.json")
set_check(
checks_by_id,
"secrets_permissions_hardened",
"compliant" if secrets_ok else "partial",
"medium",
"OpenClaw config directory and file permissions are hardened." if secrets_ok else "OpenClaw permissions are too open or unknown.",
"Checked ~/.openclaw and ~/.openclaw/openclaw.json permissions.",
"Secrets and configs may be readable by other users.",
"chmod 700 ~/.openclaw; chmod 600 ~/.openclaw/openclaw.json",
"SecOps",
"2026-03-10",
)
hook_ok = runtime_hook_installed()
set_check(
checks_by_id,
"runtime_privilege_hook_installed",
"compliant" if hook_ok else "violation",
"high",
"Runtime privileged execution hook installed." if hook_ok else "Runtime privileged execution hook missing.",
"Checked ~/.openclaw/bin/sudo wrapper.",
"Privileged actions can bypass approval enforcement.",
"Install runtime hook and restart OpenClaw gateway.",
"Security Engineering",
"2026-03-01",
)
alt_paths = alt_privilege_paths_present()
alt_ok = (not alt_paths) or hook_ok
set_check(
checks_by_id,
"alternate_privilege_paths_restricted",
"compliant" if alt_ok else "partial",
"medium",
"Alternate privilege tools are present but guarded." if alt_ok else "Alternate privilege tools may bypass guard.",
"Checked for presence of su/doas binaries.",
"Privilege escalation paths could bypass approval enforcement.",
"Restrict su/doas usage or wrap via policy controls.",
"Security Engineering",
"2026-03-20",
)
backup_ok = backup_configured()
set_check(
checks_by_id,
"backup_configured",
"compliant" if backup_ok else "partial",
"low",
"OpenClaw backups found." if backup_ok else "OpenClaw backups not detected.",
"Checked ~/.openclaw/backups or ~/.openclaw/backup.",
"Recovery posture may be weak without backups.",
"Configure backup of ~/.openclaw and audit logs.",
"Platform Security",
"2026-03-30",
)
update_ok = bool(version_text)
set_check(
checks_by_id,
"update_hygiene",
"partial" if update_ok else "violation",
"low",
f"OpenClaw version: {version_text or 'unknown'}",
"openclaw --version output captured.",
"Update cadence not validated.",
"Review release notes and update regularly.",
"Platform Security",
"2026-04-15",
)
prompt_policy = signals.get("prompt_policy") or {}
prompt_ok = isinstance(prompt_policy, dict) and bool(prompt_policy.get("require_confirmation_for_untrusted"))
set_check(
checks_by_id,
"prompt_injection_controls",
"compliant" if prompt_ok else "partial",
"high",
"Untrusted content requires explicit confirmation." if prompt_ok else "Untrusted content confirmation policy not enabled.",
"Checked ~/.openclaw/security/prompt-policy.json.",
"Untrusted content sources may trigger privileged actions without explicit confirmation.",
"Enable prompt-policy.json require_confirmation_for_untrusted.",
"Security Engineering",
"2026-03-12",
)
cmd_policy = signals.get("command_policy") or {}
deny_rules = cmd_policy.get("deny") if isinstance(cmd_policy, dict) else []
allow_rules = cmd_policy.get("allow") if isinstance(cmd_policy, dict) else []
cmd_ok = bool((isinstance(deny_rules, list) and deny_rules) or (isinstance(allow_rules, list) and allow_rules))
set_check(
checks_by_id,
"command_policy_enforced",
"compliant" if cmd_ok else "partial",
"high",
"Command policy allow/deny rules configured." if cmd_ok else "Command policy rules not configured.",
"Checked ~/.openclaw/security/command-policy.json.",
"Privileged commands are not filtered by policy.",
"Define deny/allow rules in command-policy.json.",
"Security Engineering",
"2026-03-12",
)
env_flags = signals.get("env_flags") or {}
session_required = isinstance(env_flags, dict) and env_flags.get("OPENCLAW_REQUIRE_SESSION_ID") == "1"
set_check(
checks_by_id,
"session_boundary_enforced",
"compliant" if session_required else "partial",
"medium",
"Task session id enforcement enabled." if session_required else "Task session id enforcement not enabled.",
"Checked ~/.openclaw/env for OPENCLAW_REQUIRE_SESSION_ID.",
"Approvals may carry across tasks without explicit session scoping.",
"Set OPENCLAW_REQUIRE_SESSION_ID=1 and provide OPENCLAW_TASK_SESSION_ID per task.",
"Security Engineering",
"2026-03-18",
)
mfa_enabled = isinstance(env_flags, dict) and bool(env_flags.get("OPENCLAW_APPROVAL_TOKEN"))
set_check(
checks_by_id,
"multi_factor_approval",
"compliant" if mfa_enabled else "partial",
"medium",
"Approval token required for privileged actions." if mfa_enabled else "Approval token not configured.",
"Checked ~/.openclaw/env for OPENCLAW_APPROVAL_TOKEN.",
"Privileged approvals rely on single-step confirmation.",
"Set OPENCLAW_APPROVAL_TOKEN and require entry for approvals.",
"Security Engineering",
"2026-03-20",
)
egress_allowlist = signals.get("egress_allowlist") or []
egress_ok = isinstance(egress_allowlist, list) and len(egress_allowlist) > 0
set_check(
checks_by_id,
"egress_allowlist_configured",
"compliant" if egress_ok else "partial",
"medium",
"Egress allowlist configured." if egress_ok else "Egress allowlist not configured.",
"Checked ~/.openclaw/security/egress_allowlist.json.",
"Outbound connections are not constrained by allowlist.",
"Define allowed outbound destinations.",
"Network Security",
"2026-03-22",
)
egress_report = signals.get("egress_report") or {}
egress_findings = egress_report.get("findings", []) if isinstance(egress_report, dict) else []
egress_clean = egress_ok and len(egress_findings) == 0
set_check(
checks_by_id,
"egress_connections_approved",
"compliant" if egress_clean else "partial",
"medium",
f"Unapproved egress findings: {len(egress_findings)}",
"egress_monitor.py live output evaluated against allowlist.",
"Unapproved outbound connections detected." if egress_ok else "Allowlist missing; cannot validate egress.",
"Approve or block outbound destinations.",
"Network Security",
"2026-03-25",
)
assessment["checks"] = sorted(checks_by_id.values(), key=lambda c: str(c.get("check_id", "")))
assessment.setdefault("metadata", {})
assessment["metadata"]["generated_at_utc"] = utc_now()
return assessment
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Generate live compliance assessment from current machine state")
p.add_argument("--assessment-file", required=True, help="Path to assessment JSON to update")
p.add_argument(
"--port-monitor-script",
default=str(Path(__file__).resolve().parent / "port_monitor.py"),
help="Path to port monitor script",
)
return p.parse_args()
def main() -> int:
args = parse_args()
assessment_path = Path(args.assessment_file)
assessment = load_json(assessment_path)
signals = collect_runtime_signals(Path(args.port_monitor_script))
updated = build_assessment(assessment, signals)
write_json(assessment_path, updated)
print(
json.dumps(
{
"status": "ok",
"assessment_file": str(assessment_path),
"generated_at_utc": updated["metadata"]["generated_at_utc"],
}
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main())
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "fletcherfrimpong",
"slug": "cyber-security-engineer",
"displayName": "Cyber Security Engineer",
"latest": {
"version": "0.1.5",
"publishedAt": 1772750899847,
"commit": "https://github.com/openclaw/skills/commit/23362409eedc0c1cbea6120460cb7b091fa45ba2"
},
"history": [
{
"version": "0.1.4",
"publishedAt": 1771154770584,
"commit": "https://github.com/openclaw/skills/commit/6105e8a6bf5d1ce806e2e426aaef8f6547e6ad72"
},
{
"version": "0.1.3",
"publishedAt": 1771153959700,
"commit": "https://github.com/openclaw/skills/commit/eead7b23c638c0881b88c4ba9b10d1ad50fc91b5"
}
]
}
```
### scripts/auto_invoke_cycle.sh
```bash
#!/usr/bin/env bash
set -euo pipefail
SKILL_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
ASSESS_DIR="${SKILL_DIR}/assessments"
ASSESS_FILE="${ASSESS_DIR}/openclaw-assessment.json"
HTML_OUT="${ASSESS_DIR}/compliance-dashboard.html"
SUMMARY_OUT="${ASSESS_DIR}/compliance-summary.json"
PORT_OUT="${ASSESS_DIR}/port-monitor-latest.json"
EGRESS_OUT="${ASSESS_DIR}/egress-monitor-latest.json"
LOG_DIR="${HOME}/.openclaw/logs"
RUN_LOG="${LOG_DIR}/cyber-security-engineer-auto.log"
mkdir -p "${LOG_DIR}" "${ASSESS_DIR}"
{
echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] Starting auto security cycle"
if [[ ! -f "${ASSESS_FILE}" ]]; then
python3 "${SKILL_DIR}/scripts/compliance_dashboard.py" init-assessment --system "OpenClaw" --output "${ASSESS_FILE}"
fi
python3 "${SKILL_DIR}/scripts/live_assessment.py" --assessment-file "${ASSESS_FILE}"
python3 "${SKILL_DIR}/scripts/compliance_dashboard.py" render \
--assessment-file "${ASSESS_FILE}" \
--output-html "${HTML_OUT}" \
--output-summary "${SUMMARY_OUT}"
# Optional: notify when new violations/partials appear between cycles.
python3 "${SKILL_DIR}/scripts/notify_on_violation.py" --summary-file "${SUMMARY_OUT}" || true
python3 "${SKILL_DIR}/scripts/port_monitor.py" --json > "${PORT_OUT}"
python3 "${SKILL_DIR}/scripts/egress_monitor.py" --json > "${EGRESS_OUT}"
echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] Completed auto security cycle"
} >> "${RUN_LOG}" 2>&1
```