Back to skills
SkillHub ClubRun DevOpsFull StackSecurity

cyber-security-engineer

Security engineering workflow for OpenClaw privilege governance and hardening. Use for least-privilege execution, approval-first privileged actions, idle timeout controls, port + egress monitoring, and ISO 27001/NIST-aligned compliance reporting with mitigations.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,019
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
B73.6

Install command

npx @skill-hub/cli install openclaw-skills-fletcher-cyber-security-engineer

Repository

openclaw/skills

Skill path: skills/fletcherfrimpong/fletcher-cyber-security-engineer

Security engineering workflow for OpenClaw privilege governance and hardening. Use for least-privilege execution, approval-first privileged actions, idle timeout controls, port + egress monitoring, and ISO 27001/NIST-aligned compliance reporting with mitigations.

Open repository

Best for

Primary workflow: Run DevOps.

Technical facets: Full Stack, Security.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install cyber-security-engineer into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding cyber-security-engineer to shared team environments
  • Use cyber-security-engineer for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: cyber-security-engineer
description: Security engineering workflow for OpenClaw privilege governance and hardening. Use for least-privilege execution, approval-first privileged actions, idle timeout controls, port + egress monitoring, and ISO 27001/NIST-aligned compliance reporting with mitigations.
---

# Cyber Security Engineer

Implement these controls in every security-sensitive task:

1. Keep default execution in normal (non-root) mode.
2. Request explicit user approval before any elevated command.
3. Scope elevation to the minimum command set required for the active task.
4. Drop elevated state immediately after the privileged command completes.
5. Expire elevated state after 30 idle minutes and require re-approval.
6. Monitor listening network ports and flag insecure or unapproved exposure.
7. Monitor outbound connections and flag destinations not in the egress allowlist.
8. If no approved baseline exists, generate one and require user review/pruning.
9. Benchmark controls against ISO 27001 and NIST and report violations with mitigations.

## Non-Goals (Web Browsing)

- Do not use web browsing / web search as part of this skill. Keep assessments and recommendations based on local host/OpenClaw state and the bundled references in this skill.

## Files To Use

- `references/least-privilege-policy.md`
- `references/port-monitoring-policy.md`
- `references/compliance-controls-map.json`
- `references/approved_ports.template.json`
- `references/command-policy.template.json`
- `references/prompt-policy.template.json`
- `references/egress-allowlist.template.json`
- `scripts/preflight_check.py`
- `scripts/root_session_guard.py`
- `scripts/audit_logger.py`
- `scripts/command_policy.py`
- `scripts/prompt_policy.py`
- `scripts/guarded_privileged_exec.py`
- `scripts/install-openclaw-runtime-hook.sh`
- `scripts/port_monitor.py`
- `scripts/generate_approved_ports.py`
- `scripts/egress_monitor.py`
- `scripts/notify_on_violation.py`
- `scripts/compliance_dashboard.py`
- `scripts/live_assessment.py`

## Behavior

- Never keep root/elevated access open between unrelated tasks.
- Never execute root commands without an explicit approval step in the current flow.
- Enforce command allow/deny policy when configured.
- Require confirmation when untrusted content sources are detected (`OPENCLAW_UNTRUSTED_SOURCE=1` + prompt policy).
- Enforce task session id scoping when configured (`OPENCLAW_REQUIRE_SESSION_ID=1`).
- If timeout is exceeded, force session expiration and approval renewal.
- Log privileged actions to `~/.openclaw/security/privileged-audit.jsonl` (best-effort).
- Flag listening ports not present in the approved baseline and recommend secure alternatives for insecure ports.
- Flag outbound destinations not present in the egress allowlist.

## Output Contract

When reporting status, include:

- The specific `check_id`(s) affected, `status`, `risk`, and concise evidence.
- Concrete mitigations (what to change, where) and any owners/due dates if present.
- For network findings: port, bind address, process/service, and why it is flagged (unapproved/insecure/public).



---

## Referenced Files

> The following files are referenced in this skill and included for context.

### references/least-privilege-policy.md

```markdown
# Least-Privilege Policy For OpenClaw

## Objective

Enforce default non-root execution and explicit approval-first elevation with immediate privilege drop after privileged operations.

## Required Controls

1. Set default execution context to non-root.
2. Require approval before every privileged command.
3. Limit privileged commands to a reviewed allowlist (per-task scope).
4. Log privileged requests, approvals, and execution outcomes.
5. Drop elevated state immediately after privileged command completion.
6. Force elevated timeout expiration after 30 idle minutes.

## Operational Checks

1. Verify privileged command is required for task outcome.
2. Execute privileged work through wrapper:
   - `python3 scripts/guarded_privileged_exec.py --reason "required change" --use-sudo -- <command>`
3. If wrapper requests approval, require explicit user consent before command runs.
4. Execute only the approved privileged command(s):
   - Approval is tied to the exact argv you approved.
   - Any different privileged argv triggers a new approval and allowlist entry.
5. Confirm wrapper drops privilege immediately after completion.

## Control Acceptance Criteria

1. No privileged command runs without current-task user approval.
2. Elevated session is not retained after privileged work completes.
3. Idle elevated session at or above 30 minutes transitions to normal mode.
4. Session state file records UTC transition/action metadata.

## Suggested OpenClaw Config Direction

Configure OpenClaw with strict approval and allowlist behavior:

- Set execution ask behavior to always prompt.
- Set execution security mode to allowlist.
- Restrict elevated tool callers with explicit `allowFrom` entries.
- Keep sandboxing enabled for normal operations.

If exact keys differ by version, preserve intent: explicit approval + least privilege + short-lived elevation.

```

### references/port-monitoring-policy.md

```markdown
# Port Monitoring Policy For OpenClaw

## Objective

Continuously identify listening services, detect insecure ports/protocols, and flag ports not approved by baseline policy.

## Baseline File

Use `~/.openclaw/security/approved_ports.json` as a JSON array:

```json
[
  { "port": 22, "protocol": "tcp", "command": "sshd" },
  { "port": 443, "protocol": "tcp", "command": "nginx" }
]
```

`command` is optional but recommended for tighter control.

## Monitoring Command

Run:

`python3 scripts/port_monitor.py --json`

## Required Analyst Actions

1. Review all `unapproved-port` findings.
2. Review all `insecure-port` findings and propose secure alternatives.
3. Confirm whether `public-bind` findings are necessary exposure.
4. Ask for user approval before any root-level remediation steps.

## Insecure Port Guidance

Enforce upgrades where possible:

1. `80 -> 443` (HTTPS instead of HTTP)
2. `23 -> 22` (SSH instead of Telnet)
3. `21 -> 22/990` (SFTP/FTPS instead of FTP)
4. `110 -> 995` (POP3S)
5. `143 -> 993` (IMAPS)
6. `389 -> 636` (LDAPS)

## Acceptance Criteria

1. Every open listening port is either approved or explicitly flagged.
2. Every insecure port includes a replacement recommendation.
3. Every externally bound service (`*`, `0.0.0.0`, `::`) is justified or marked for restriction.

```

### references/compliance-controls-map.json

```json
[
  {
    "check_id": "privilege_approval_required",
    "title": "Approval required before elevated access",
    "iso27001": ["A.5.15", "A.5.18"],
    "nist": ["PR.AA-01", "PR.AA-05"],
    "default_risk": "high",
    "expected_state": "Every privileged action requires explicit user approval."
  },
  {
    "check_id": "least_privilege_enforced",
    "title": "Least privilege execution mode",
    "iso27001": ["A.5.15", "A.5.18"],
    "nist": ["PR.AA-01", "PR.PS-01"],
    "default_risk": "high",
    "expected_state": "Default mode is non-root and elevated rights are scoped and short-lived."
  },
  {
    "check_id": "elevation_timeout_30m",
    "title": "Elevated session idle timeout",
    "iso27001": ["A.8.2", "A.8.15"],
    "nist": ["PR.AA-03", "DE.CM-01"],
    "default_risk": "medium",
    "expected_state": "Elevated session expires after 30 minutes of inactivity."
  },
  {
    "check_id": "audit_logging_privileged_actions",
    "title": "Privileged action audit logging",
    "iso27001": ["A.8.15", "A.8.16"],
    "nist": ["DE.AE-03", "DE.CM-01"],
    "default_risk": "medium",
    "expected_state": "All elevated approvals, commands, and privilege drops are logged."
  },
  {
    "check_id": "open_ports_approved",
    "title": "Open ports baseline approval",
    "iso27001": ["A.8.20", "A.8.21"],
    "nist": ["PR.PS-02", "DE.CM-01"],
    "default_risk": "medium",
    "expected_state": "All listening ports are approved and business-justified."
  },
  {
    "check_id": "insecure_ports_remediated",
    "title": "Insecure ports remediated",
    "iso27001": ["A.8.20", "A.8.21"],
    "nist": ["PR.PS-02", "PR.DS-02"],
    "default_risk": "high",
    "expected_state": "Insecure legacy ports are closed or migrated to secure alternatives."
  },
  {
    "check_id": "channel_allowlist_configured",
    "title": "Channel allowlist configured",
    "iso27001": ["A.5.15", "A.5.16"],
    "nist": ["PR.AA-02"],
    "default_risk": "high",
    "expected_state": "Inbound channels restrict senders via allowlists."
  },
  {
    "check_id": "group_mentions_required",
    "title": "Group mention requirement",
    "iso27001": ["A.5.16"],
    "nist": ["PR.AA-04"],
    "default_risk": "medium",
    "expected_state": "Group chats require explicit mention before agent responds."
  },
  {
    "check_id": "gateway_loopback_only",
    "title": "Gateway bound to loopback",
    "iso27001": ["A.8.9", "A.8.10"],
    "nist": ["PR.IP-01"],
    "default_risk": "high",
    "expected_state": "Gateway runs local/loopback with token auth."
  },
  {
    "check_id": "secrets_permissions_hardened",
    "title": "Secrets and config permissions hardened",
    "iso27001": ["A.8.11"],
    "nist": ["PR.DS-01"],
    "default_risk": "medium",
    "expected_state": "OpenClaw config and secrets are not world/group readable."
  },
  {
    "check_id": "runtime_privilege_hook_installed",
    "title": "Runtime privileged execution hook installed",
    "iso27001": ["A.5.15", "A.5.18"],
    "nist": ["PR.AA-01"],
    "default_risk": "high",
    "expected_state": "Privileged commands are forced through approval guard."
  },
  {
    "check_id": "alternate_privilege_paths_restricted",
    "title": "Alternate privilege paths restricted",
    "iso27001": ["A.5.15"],
    "nist": ["PR.AA-05"],
    "default_risk": "medium",
    "expected_state": "su/doas or other escalation paths are restricted or guarded."
  },
  {
    "check_id": "backup_configured",
    "title": "Backup and recovery configured",
    "iso27001": ["A.8.13"],
    "nist": ["PR.IP-04"],
    "default_risk": "low",
    "expected_state": "Backups of OpenClaw config and audit logs exist."
  },
  {
    "check_id": "update_hygiene",
    "title": "Update hygiene",
    "iso27001": ["A.8.8"],
    "nist": ["ID.RA-01"],
    "default_risk": "low",
    "expected_state": "OpenClaw is updated regularly and version is tracked."
  },
  {
    "check_id": "prompt_injection_controls",
    "title": "Prompt injection controls",
    "iso27001": ["A.5.15", "A.5.18"],
    "nist": ["PR.AA-01"],
    "default_risk": "high",
    "expected_state": "Untrusted content sources require explicit confirmation before privileged execution."
  },
  {
    "check_id": "command_policy_enforced",
    "title": "Privileged command policy enforced",
    "iso27001": ["A.5.15"],
    "nist": ["PR.AA-05"],
    "default_risk": "high",
    "expected_state": "Privileged commands are filtered by allow/deny policy."
  },
  {
    "check_id": "session_boundary_enforced",
    "title": "Task session boundary enforced",
    "iso27001": ["A.5.15"],
    "nist": ["PR.AA-03"],
    "default_risk": "medium",
    "expected_state": "Privileged approvals are scoped to a task session id."
  },
  {
    "check_id": "multi_factor_approval",
    "title": "Multi-factor approval for privileged actions",
    "iso27001": ["A.5.17"],
    "nist": ["PR.AA-02"],
    "default_risk": "medium",
    "expected_state": "Privileged approvals require an additional approval token."
  },
  {
    "check_id": "egress_allowlist_configured",
    "title": "Outbound allowlist configured",
    "iso27001": ["A.8.20"],
    "nist": ["PR.PS-02"],
    "default_risk": "medium",
    "expected_state": "Outbound destinations are allowlisted."
  },
  {
    "check_id": "egress_connections_approved",
    "title": "Outbound connections approved",
    "iso27001": ["A.8.20"],
    "nist": ["DE.CM-01"],
    "default_risk": "medium",
    "expected_state": "No unapproved outbound connections are detected."
  }
]

```

### references/approved_ports.template.json

```json
[
  {
    "port": 18789,
    "protocol": "tcp",
    "command": "node",
    "comment": "OpenClaw gateway (example). Remove if not applicable."
  }
]


```

### references/command-policy.template.json

```json
{
  "allow": [
    "^openclaw\\b",
    "^python3\\b"
  ],
  "deny": [
    "\\brm\\s+-rf\\b",
    "\\bshutdown\\b",
    "\\breboot\\b"
  ]
}


```

### references/prompt-policy.template.json

```json
{
  "require_confirmation_for_untrusted": true
}


```

### references/egress-allowlist.template.json

```json
[
  {
    "protocol": "tcp",
    "host_regex": "^(api\\.openai\\.com|api\\.telegram\\.org)$",
    "port": 443
  }
]


```

### scripts/preflight_check.py

```python
#!/usr/bin/env python3
"""
Preflight validation for the cyber-security-engineer skill.

This is intentionally conservative: it does not modify system state. It validates
that required files and tooling exist before enabling runtime hooks or enforcing
privileged execution.
"""

from __future__ import annotations

import json
import os
import shutil
import stat
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple


def _exists_any(names: List[str]) -> Tuple[bool, List[str]]:
    found = [n for n in names if shutil.which(n)]
    return (len(found) > 0), found


def _mode_str(path: Path) -> str:
    try:
        return oct(path.stat().st_mode & 0o777)
    except Exception:
        return "unknown"


def _is_hardened(path: Path) -> bool:
    try:
        mode = path.stat().st_mode & 0o777
        return (mode & 0o077) == 0
    except Exception:
        return False


def main() -> int:
    home = Path.home()
    oc = home / ".openclaw"
    sec = oc / "security"

    required_policy_files = [
        sec / "command-policy.json",
        sec / "approved_ports.json",
        sec / "egress_allowlist.json",
    ]

    optional_files = [
        oc / "openclaw.json",
        oc / "env",
        sec / "prompt-policy.json",
        sec / "root-session-state.json",
    ]

    ok_openclaw, found_openclaw = _exists_any(["openclaw"])
    ok_python, found_python = _exists_any(["python3", "python"])
    ok_ports, found_ports = _exists_any(["lsof", "ss", "netstat"])

    hook = oc / "bin" / "sudo"
    hook_ok = hook.exists() and os.access(str(hook), os.X_OK)

    missing_required = [str(p) for p in required_policy_files if not p.exists()]
    missing_optional = [str(p) for p in optional_files if not p.exists()]

    env_path = oc / "env"
    env_perms_ok = True
    env_perms_mode = "missing"
    if env_path.exists():
        env_perms_mode = _mode_str(env_path)
        env_perms_ok = _is_hardened(env_path)

    config_path = oc / "openclaw.json"
    config_perms_ok = True
    config_perms_mode = "missing"
    if config_path.exists():
        config_perms_mode = _mode_str(config_path)
        config_perms_ok = _is_hardened(config_path)

    require_policy = os.environ.get("OPENCLAW_REQUIRE_POLICY_FILES") == "1"
    critical_fail = []
    if not ok_openclaw:
        critical_fail.append("openclaw_not_found")
    if not ok_python:
        critical_fail.append("python_not_found")
    if not ok_ports:
        critical_fail.append("no_port_discovery_tool")
    if require_policy and missing_required:
        critical_fail.append("missing_required_policy_files")

    report: Dict[str, Any] = {
        "status": "ok" if not critical_fail else "fail",
        "critical": critical_fail,
        "require_policy_files": require_policy,
        "tools": {
            "openclaw": {"ok": ok_openclaw, "found": found_openclaw},
            "python": {"ok": ok_python, "found": found_python},
            "port_discovery": {"ok": ok_ports, "found": found_ports},
        },
        "paths": {
            "openclaw_dir": str(oc),
            "security_dir": str(sec),
            "runtime_hook": {"path": str(hook), "installed": hook_ok},
        },
        "policy_files": {
            "required": [str(p) for p in required_policy_files],
            "missing_required": missing_required,
        },
        "optional_files": {
            "missing_optional": missing_optional,
        },
        "permissions": {
            "env": {"path": str(env_path), "mode": env_perms_mode, "hardened": env_perms_ok},
            "openclaw_json": {"path": str(config_path), "mode": config_perms_mode, "hardened": config_perms_ok},
        },
        "next_steps": [
            "Run bootstrap script if OpenClaw is not configured.",
            "Generate and prune approved ports baseline via generate_approved_ports.py.",
            "Review and customize command-policy.json and egress_allowlist.json templates.",
            "Install runtime hook only after policy files are reviewed.",
        ],
    }

    sys.stdout.write(json.dumps(report, indent=2, sort_keys=True))
    sys.stdout.write("\n")

    return 0 if not critical_fail else 1


if __name__ == "__main__":
    raise SystemExit(main())


```

### scripts/root_session_guard.py

```python
#!/usr/bin/env python3
"""Root session guard with idle-timeout and allowlisted argv scoping.
Review before use. No network calls are made from this script.
"""
import argparse
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, List, Optional


DEFAULT_TIMEOUT_MINUTES = 30
STATE_PATH = Path.home() / ".openclaw" / "security" / "root-session-state.json"


def now_utc() -> datetime:
    return datetime.now(timezone.utc)


def to_iso(ts: datetime) -> str:
    return ts.isoformat().replace("+00:00", "Z")


def from_iso(value: Optional[str]) -> Optional[datetime]:
    if not value:
        return None
    if value.endswith("Z"):
        value = value[:-1] + "+00:00"
    return datetime.fromisoformat(value)


@dataclass
class AllowedCommand:
    argv: List[str]
    added_at_utc: str


@dataclass
class SessionState:
    privilege_mode: str
    last_elevated_activity_utc: Optional[str]
    last_normal_activity_utc: Optional[str]
    last_transition_utc: str
    last_action: str
    # When in elevated mode, restrict which privileged commands are allowed to run.
    allowed_commands: List[AllowedCommand] = field(default_factory=list)
    approved_reason: Optional[str] = None
    approved_session_id: Optional[str] = None

    def as_dict(self) -> Dict[str, Optional[str]]:
        return {
            "privilege_mode": self.privilege_mode,
            "last_elevated_activity_utc": self.last_elevated_activity_utc,
            "last_normal_activity_utc": self.last_normal_activity_utc,
            "last_transition_utc": self.last_transition_utc,
            "last_action": self.last_action,
            "approved_reason": self.approved_reason,
            "approved_session_id": self.approved_session_id,
            "allowed_commands": [
                {"argv": e.argv, "added_at_utc": e.added_at_utc}
                for e in self.allowed_commands
            ],
        }


def ensure_parent(path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)


def default_state() -> SessionState:
    ts = to_iso(now_utc())
    return SessionState(
        privilege_mode="normal",
        last_elevated_activity_utc=None,
        last_normal_activity_utc=ts,
        last_transition_utc=ts,
        last_action="init-normal",
        allowed_commands=[],
        approved_reason=None,
        approved_session_id=None,
    )


def load_state(path: Path) -> SessionState:
    if not path.exists():
        state = default_state()
        save_state(path, state)
        return state
    with path.open("r", encoding="utf-8") as f:
        raw = json.load(f)
    allowed_raw = raw.get("allowed_commands") or []
    allowed: List[AllowedCommand] = []
    if isinstance(allowed_raw, list):
        for entry in allowed_raw:
            if not isinstance(entry, dict):
                continue
            argv = entry.get("argv")
            added_at_utc = entry.get("added_at_utc")
            if isinstance(argv, list) and all(isinstance(x, str) for x in argv) and isinstance(
                added_at_utc, str
            ):
                allowed.append(AllowedCommand(argv=argv, added_at_utc=added_at_utc))
    return SessionState(
        privilege_mode=raw.get("privilege_mode", "normal"),
        last_elevated_activity_utc=raw.get("last_elevated_activity_utc"),
        last_normal_activity_utc=raw.get("last_normal_activity_utc"),
        last_transition_utc=raw.get("last_transition_utc", to_iso(now_utc())),
        last_action=raw.get("last_action", "unknown"),
        allowed_commands=allowed,
        approved_reason=raw.get("approved_reason"),
        approved_session_id=raw.get("approved_session_id"),
    )


def save_state(path: Path, state: SessionState) -> None:
    ensure_parent(path)
    with path.open("w", encoding="utf-8") as f:
        json.dump(state.as_dict(), f, indent=2)
        f.write("\n")


def minutes_since(ts_iso: Optional[str]) -> Optional[float]:
    ts = from_iso(ts_iso)
    if ts is None:
        return None
    delta = now_utc() - ts
    return round(delta.total_seconds() / 60.0, 2)


def preflight(state: SessionState, timeout_minutes: int) -> int:
    idle_mins = minutes_since(state.last_elevated_activity_utc)
    timed_out = (
        state.privilege_mode == "elevated"
        and idle_mins is not None
        and idle_mins >= timeout_minutes
    )

    action = "continue-normal"
    approval_required = True
    if state.privilege_mode == "elevated" and not timed_out:
        action = "elevated-active"
        # Approval is still required per-command unless the command is already allowlisted.
        approval_required = False
    elif timed_out:
        action = "drop-elevation"
        state.privilege_mode = "normal"
        state.allowed_commands = []
        state.approved_reason = None
        state.approved_session_id = None
        state.last_transition_utc = to_iso(now_utc())
        state.last_action = "timeout-drop"

    result = {
        "status": "REQUIRES_APPROVAL" if approval_required else "ELEVATED_AVAILABLE",
        "privilege_mode": state.privilege_mode,
        "idle_minutes_since_elevated": idle_mins,
        "timeout_minutes": timeout_minutes,
        "action": action,
        "allowed_commands_count": len(state.allowed_commands),
    }
    print(json.dumps(result, indent=2))
    return 2 if approval_required else 0


def is_allowed(state: SessionState, argv: List[str]) -> bool:
    if not argv:
        return False
    for entry in state.allowed_commands:
        if entry.argv == argv:
            return True
    return False


def authorize(state: SessionState, timeout_minutes: int, argv: List[str], session_id: Optional[str]) -> int:
    idle_mins = minutes_since(state.last_elevated_activity_utc)
    timed_out = (
        state.privilege_mode == "elevated"
        and idle_mins is not None
        and idle_mins >= timeout_minutes
    )
    if timed_out:
        state.privilege_mode = "normal"
        state.allowed_commands = []
        state.approved_reason = None
        state.approved_session_id = None
        state.last_transition_utc = to_iso(now_utc())
        state.last_action = "timeout-drop"

    session_ok = session_id is None or state.approved_session_id in (None, session_id)
    allowed = state.privilege_mode == "elevated" and is_allowed(state, argv) and session_ok
    approval_required = not allowed

    result = {
        "status": "REQUIRES_APPROVAL" if approval_required else "AUTHORIZED",
        "privilege_mode": state.privilege_mode,
        "idle_minutes_since_elevated": idle_mins,
        "timeout_minutes": timeout_minutes,
        "allowed": allowed,
        "allowed_commands_count": len(state.allowed_commands),
        "session_id": session_id,
        "session_ok": session_ok,
    }
    print(json.dumps(result, indent=2))
    return 2 if approval_required else 0


def approve_command(state: SessionState, reason: str, argv: List[str], session_id: Optional[str]) -> None:
    ts = to_iso(now_utc())
    state.privilege_mode = "elevated"
    state.last_elevated_activity_utc = ts
    state.last_transition_utc = ts
    state.last_action = "approved-command"
    state.approved_reason = reason
    state.approved_session_id = session_id
    if argv and not is_allowed(state, argv):
        state.allowed_commands.append(AllowedCommand(argv=argv, added_at_utc=ts))


def mark_elevated_used(state: SessionState) -> None:
    ts = to_iso(now_utc())
    state.privilege_mode = "elevated"
    state.last_elevated_activity_utc = ts
    state.last_transition_utc = ts
    state.last_action = "elevated-used"


def mark_normal_used(state: SessionState) -> None:
    ts = to_iso(now_utc())
    state.privilege_mode = "normal"
    state.last_normal_activity_utc = ts
    state.last_transition_utc = ts
    state.last_action = "normal-used"


def drop(state: SessionState, reason: str) -> None:
    ts = to_iso(now_utc())
    state.privilege_mode = "normal"
    state.allowed_commands = []
    state.approved_reason = None
    state.approved_session_id = None
    state.last_transition_utc = ts
    state.last_action = reason


def status(state: SessionState, timeout_minutes: int) -> None:
    idle_mins = minutes_since(state.last_elevated_activity_utc)
    timed_out = (
        state.privilege_mode == "elevated"
        and idle_mins is not None
        and idle_mins >= timeout_minutes
    )
    result = state.as_dict()
    result["idle_minutes_since_elevated"] = idle_mins
    result["timeout_minutes"] = timeout_minutes
    result["timed_out"] = timed_out
    result["approval_required"] = state.privilege_mode != "elevated" or timed_out
    print(json.dumps(result, indent=2))


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="OpenClaw root/elevated session guard")
    parser.add_argument(
        "--state-file",
        default=str(STATE_PATH),
        help="Path to session state JSON file",
    )
    parser.add_argument(
        "--timeout-minutes",
        type=int,
        default=DEFAULT_TIMEOUT_MINUTES,
        help="Idle timeout for elevated mode",
    )

    sub = parser.add_subparsers(dest="command", required=True)
    sub.add_parser("preflight", help="Check timeout and approval requirement")
    authz = sub.add_parser(
        "authorize",
        help="Authorize a specific argv against the current elevated allowlist",
    )
    authz.add_argument(
        "--argv-json",
        required=True,
        help='Command argv as JSON array, e.g. ["launchctl","print","..."]',
    )
    authz.add_argument("--session-id", help="Task session id to scope approvals")
    approve = sub.add_parser(
        "approve",
        help="Approve a specific argv for elevated execution (adds to allowlist)",
    )
    approve.add_argument("--reason", required=True, help="Approval reason")
    approve.add_argument(
        "--argv-json",
        required=True,
        help='Command argv as JSON array, e.g. ["launchctl","print","..."]',
    )
    approve.add_argument("--session-id", help="Task session id to scope approvals")
    sub.add_parser("elevated-used", help="Mark elevated mode as used now")
    sub.add_parser("normal-used", help="Mark normal mode activity now")
    sub.add_parser("drop", help="Drop to normal mode")
    sub.add_parser("status", help="Print current state and timeout info")
    return parser.parse_args()


def main() -> int:
    args = parse_args()
    state_file = Path(os.path.expanduser(args.state_file))
    state = load_state(state_file)

    if args.command == "preflight":
        code = preflight(state, args.timeout_minutes)
        save_state(state_file, state)
        return code
    if args.command == "authorize":
        argv = json.loads(args.argv_json)
        if not isinstance(argv, list) or not all(isinstance(x, str) for x in argv):
            print('{"status":"ERROR","error":"argv-json must be a JSON array of strings"}')
            return 2
        code = authorize(state, args.timeout_minutes, argv, args.session_id)
        save_state(state_file, state)
        return code
    if args.command == "approve":
        argv = json.loads(args.argv_json)
        if not isinstance(argv, list) or not all(isinstance(x, str) for x in argv):
            print('{"status":"ERROR","error":"argv-json must be a JSON array of strings"}')
            return 2
        approve_command(state, args.reason, argv, args.session_id)
        save_state(state_file, state)
        print('{"status":"OK","action":"approved-command"}')
        return 0
    if args.command == "elevated-used":
        mark_elevated_used(state)
        save_state(state_file, state)
        print('{"status":"OK","action":"elevated-used"}')
        return 0
    if args.command == "normal-used":
        mark_normal_used(state)
        save_state(state_file, state)
        print('{"status":"OK","action":"normal-used"}')
        return 0
    if args.command == "drop":
        drop(state, "manual-drop")
        save_state(state_file, state)
        print('{"status":"OK","action":"drop-elevation"}')
        return 0
    if args.command == "status":
        status(state, args.timeout_minutes)
        return 0
    return 1


if __name__ == "__main__":
    raise SystemExit(main())

```

### scripts/audit_logger.py

```python
#!/usr/bin/env python3
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict


AUDIT_LOG = Path.home() / ".openclaw" / "security" / "privileged-audit.jsonl"


def _utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")


def append_audit(event: Dict[str, Any]) -> None:
    """
    Append an audit event to an on-disk JSONL timeline.

    Best-effort only: audit logging must never block privileged operations.
    """
    try:
        AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True)
        payload = {"ts_utc": _utc_now_iso(), **event}
        with AUDIT_LOG.open("a", encoding="utf-8") as f:
            f.write(json.dumps(payload, separators=(",", ":")) + "\n")
    except Exception:
        return


```

### scripts/command_policy.py

```python
#!/usr/bin/env python3
import json
import re
import shlex
from pathlib import Path
from typing import Dict, List, Optional, Tuple


POLICY_PATH = Path.home() / ".openclaw" / "security" / "command-policy.json"


def _load_policy() -> Dict[str, object]:
    if not POLICY_PATH.exists():
        return {}
    try:
        raw = json.loads(POLICY_PATH.read_text(encoding="utf-8"))
        return raw if isinstance(raw, dict) else {}
    except Exception:
        return {}


def _compile_patterns(items: object) -> List[re.Pattern]:
    if not isinstance(items, list):
        return []
    out: List[re.Pattern] = []
    for s in items:
        if not isinstance(s, str) or not s.strip():
            continue
        try:
            out.append(re.compile(s))
        except re.error:
            continue
    return out


def _match_any(patterns: List[re.Pattern], text: str) -> Optional[str]:
    for p in patterns:
        if p.search(text):
            return p.pattern
    return None


def evaluate_command(argv: List[str]) -> Dict[str, object]:
    """
    Evaluate the command argv against an optional allow/deny policy.

    Policy format (~/.openclaw/security/command-policy.json):
      {
        "allow": ["^apt\\b", "^brew\\b"],
        "deny":  ["\\brm\\s+-rf\\b"]
      }

    Behavior:
    - If policy file is missing: allow.
    - If deny matches: block.
    - If allow list is non-empty: require allow match.
    """
    cmd_str = shlex.join(argv) if argv else ""
    pol = _load_policy()
    allow = _compile_patterns(pol.get("allow"))
    deny = _compile_patterns(pol.get("deny"))

    deny_match = _match_any(deny, cmd_str)
    if deny_match:
        return {"allowed": False, "reason": "deny_match", "pattern": deny_match}

    if allow:
        allow_match = _match_any(allow, cmd_str)
        if not allow_match:
            return {"allowed": False, "reason": "not_in_allowlist", "pattern": None}
        return {"allowed": True, "reason": "allow_match", "pattern": allow_match}

    return {"allowed": True, "reason": "no_policy_or_allow_empty", "pattern": None}


```

### scripts/prompt_policy.py

```python
#!/usr/bin/env python3
import json
from pathlib import Path
from typing import Dict


POLICY_PATH = Path.home() / ".openclaw" / "security" / "prompt-policy.json"


def load_policy() -> Dict[str, object]:
    if not POLICY_PATH.exists():
        return {"require_confirmation_for_untrusted": False}
    try:
        raw = json.loads(POLICY_PATH.read_text(encoding="utf-8"))
        if not isinstance(raw, dict):
            return {"require_confirmation_for_untrusted": False}
        return {
            "require_confirmation_for_untrusted": bool(
                raw.get("require_confirmation_for_untrusted", False)
            )
        }
    except Exception:
        return {"require_confirmation_for_untrusted": False}


```

### scripts/guarded_privileged_exec.py

```python
#!/usr/bin/env python3
"""Guarded privileged execution with approval, policy checks, and audit logging.
Review before enabling. No network calls are made from this script.
"""
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import List, Optional

# Allow importing sibling modules when executed from arbitrary cwd.
sys.path.insert(0, str(Path(__file__).resolve().parent))
from audit_logger import append_audit  # noqa: E402
from command_policy import evaluate_command  # noqa: E402
from prompt_policy import load_policy  # noqa: E402


def run_guard(args, *guard_args):
    cmd = [
        sys.executable,
        str(Path(__file__).with_name("root_session_guard.py")),
        "--state-file",
        args.state_file,
        "--timeout-minutes",
        str(args.timeout_minutes),
        *guard_args,
    ]
    return subprocess.run(cmd, capture_output=True, text=True)


def ask_for_approval(reason: str, command_argv: List[str]) -> bool:
    append_audit({"action": "approval_requested", "reason": reason, "argv": command_argv})
    print("Approval required for elevated execution.")
    print(f"Reason: {reason}")
    print("Command argv:")
    print(json.dumps(command_argv, indent=2))
    answer = input("Approve elevated access for this command? [y/N]: ").strip().lower()
    approved = answer in {"y", "yes"}
    append_audit({"action": "approval_decision", "reason": reason, "argv": command_argv, "approved": approved})
    return approved


def run_command(argv: List[str], use_sudo: bool, sudo_kill_cache: bool) -> int:
    sudo_bin = os.environ.get("OPENCLAW_REAL_SUDO", "sudo")
    exec_argv = [sudo_bin, "--"] + argv if use_sudo else argv
    print("Executing argv:")
    print(json.dumps(exec_argv, indent=2))
    if use_sudo and sudo_kill_cache:
        # Best-effort: ensure sudo timestamp for this user is not reused implicitly.
        subprocess.run([sudo_bin, "-k"], check=False, capture_output=True, text=True)
    append_audit({"action": "exec_start", "argv": argv, "use_sudo": use_sudo})
    result = subprocess.run(exec_argv)
    append_audit({"action": "exec_finish", "argv": argv, "use_sudo": use_sudo, "returncode": result.returncode})
    return result.returncode


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Execute privileged commands with approval + idle-timeout guard",
    )
    parser.add_argument(
        "--state-file",
        default=str(Path.home() / ".openclaw" / "security" / "root-session-state.json"),
        help="Path to root session state file",
    )
    parser.add_argument(
        "--timeout-minutes",
        type=int,
        default=30,
        help="Idle timeout for elevated mode / approval session",
    )
    parser.add_argument(
        "--reason",
        required=True,
        help="Business/security reason for privileged command",
    )
    parser.add_argument(
        "--use-sudo",
        action="store_true",
        help="Prefix command with sudo",
    )
    parser.add_argument(
        "--sudo-kill-cache",
        action="store_true",
        default=False,
        help="Run `sudo -k` before execution to reduce implicit sudo reuse",
    )
    parser.add_argument(
        "--keep-session",
        action="store_true",
        default=False,
        help="Keep elevated session after command (still restricted to allowlisted argv; expires on idle timeout)",
    )
    parser.add_argument(
        "command",
        nargs=argparse.REMAINDER,
        help='Command to run, e.g. -- "systemctl restart nginx"',
    )
    return parser.parse_args()


def _get_task_session_id() -> Optional[str]:
    session_id = os.environ.get("OPENCLAW_TASK_SESSION_ID") or None
    require_session = os.environ.get("OPENCLAW_REQUIRE_SESSION_ID") == "1"
    if require_session and not session_id:
        return "__MISSING__"
    return session_id


def main() -> int:
    args = parse_args()
    if not args.command:
        print("No command supplied.", file=sys.stderr)
        return 2

    argv = args.command
    if argv and argv[0] == "--":
        argv = argv[1:]
    if not argv:
        print("No command supplied after -- delimiter.", file=sys.stderr)
        return 2

    if os.environ.get("OPENCLAW_REQUIRE_POLICY_FILES") == "1":
        required = [
            Path.home() / ".openclaw" / "security" / "command-policy.json",
            Path.home() / ".openclaw" / "security" / "approved_ports.json",
            Path.home() / ".openclaw" / "security" / "egress_allowlist.json",
        ]
        missing = [str(p) for p in required if not p.exists()]
        if missing:
            append_audit({"action": "policy_files_missing", "missing": missing})
            print("Missing required policy files. Refusing privileged execution.", file=sys.stderr)
            return 7

    policy_result = evaluate_command(argv)
    if not policy_result.get("allowed", False):
        append_audit(
            {
                "action": "policy_block",
                "argv": argv,
                "reason": policy_result.get("reason"),
                "pattern": policy_result.get("pattern"),
            }
        )
        print("Command blocked by policy.", file=sys.stderr)
        return 3

    session_id = _get_task_session_id()
    if session_id == "__MISSING__":
        append_audit({"action": "session_id_missing", "argv": argv})
        print("Task session id is required but not provided.", file=sys.stderr)
        return 6

    argv_json = json.dumps(argv)
    if session_id:
        authz = run_guard(args, "authorize", "--argv-json", argv_json, "--session-id", session_id)
    else:
        authz = run_guard(args, "authorize", "--argv-json", argv_json)
    if authz.returncode not in (0, 2):
        sys.stderr.write(authz.stderr or authz.stdout)
        return authz.returncode

    needs_approval = authz.returncode == 2
    prompt_policy = load_policy()
    if prompt_policy.get("require_confirmation_for_untrusted") and os.environ.get("OPENCLAW_UNTRUSTED_SOURCE") == "1":
        confirm = input("Untrusted content source detected. Proceed? [y/N]: ").strip().lower()
        if confirm not in {"y", "yes"}:
            append_audit({"action": "untrusted_source_block", "argv": argv})
            return 5
        append_audit({"action": "untrusted_source_confirmed", "argv": argv})

    if needs_approval and not ask_for_approval(args.reason, argv):
        print("User denied elevated access. Running in normal mode is required.")
        run_guard(args, "normal-used")
        append_audit({"action": "approval_denied", "reason": args.reason, "argv": argv})
        return 1

    if needs_approval:
        token_required = os.environ.get("OPENCLAW_APPROVAL_TOKEN")
        if token_required:
            env_path = Path.home() / ".openclaw" / "env"
            if env_path.exists() and (env_path.stat().st_mode & 0o077):
                print("Warning: ~/.openclaw/env permissions are too open; tighten to 600.")
            token = input("Enter approval token: ").strip()
            if token != token_required:
                append_audit({"action": "approval_token_failed", "argv": argv})
                print("Invalid approval token.", file=sys.stderr)
                return 4
            append_audit({"action": "approval_token_ok", "argv": argv})

        approve_args = ["approve", "--reason", args.reason, "--argv-json", argv_json]
        if session_id:
            approve_args += ["--session-id", session_id]
        approve = run_guard(args, *approve_args)
        if approve.returncode != 0:
            sys.stderr.write(approve.stderr or approve.stdout)
            return approve.returncode
        append_audit({"action": "approval_granted", "reason": args.reason, "argv": argv, "session_id": session_id})

    try:
        if args.use_sudo:
            run_guard(args, "elevated-used")
        return run_command(argv, args.use_sudo, args.sudo_kill_cache)
    finally:
        if not args.keep_session:
            run_guard(args, "drop")
            append_audit({"action": "drop_elevation", "argv": argv, "reason": "post-command"})
        if args.use_sudo and args.sudo_kill_cache:
            sudo_bin = os.environ.get("OPENCLAW_REAL_SUDO", "sudo")
            subprocess.run([sudo_bin, "-k"], check=False, capture_output=True, text=True)


if __name__ == "__main__":
    raise SystemExit(main())

```

### scripts/install-openclaw-runtime-hook.sh

```bash
#!/usr/bin/env bash
set -euo pipefail

# Installs a sudo shim at ~/.openclaw/bin/sudo (matches live_assessment checks).
# For the shim to be used, OpenClaw's runtime PATH must include ~/.openclaw/bin
# before /usr/bin. This script attempts best-effort PATH injection for macOS
# LaunchAgent installs of the gateway.

log() {
  printf '[cyber-security-engineer] %s\n' "$*"
}

REAL_SUDO="$(command -v sudo || true)"
if [[ -z "${REAL_SUDO}" ]]; then
  log "sudo not found; nothing to install."
  exit 1
fi

OPENCLAW_DIR="${HOME}/.openclaw"
BIN_DIR="${OPENCLAW_DIR}/bin"
SKILL_DIR_DEFAULT="${OPENCLAW_DIR}/workspace/skills/cyber-security-engineer"

mkdir -p "${BIN_DIR}"
chmod 700 "${OPENCLAW_DIR}" "${BIN_DIR}" || true

WRAPPER="${BIN_DIR}/sudo"
cat > "${WRAPPER}" <<EOF
#!/usr/bin/env bash
set -euo pipefail

REAL_SUDO="\${OPENCLAW_REAL_SUDO:-${REAL_SUDO}}"
SKILL_DIR="\${OPENCLAW_CYBER_SKILL_DIR:-${SKILL_DIR_DEFAULT}}"

# Pass-through for sudo bookkeeping.
if [[ \$# -eq 0 ]]; then
  exec "\${REAL_SUDO}"
fi
case "\${1:-}" in
  -h|--help|-V|--version|-v|-l|-k)
    exec "\${REAL_SUDO}" "\$@"
    ;;
esac

# Refuse non-interactive privilege escalation by default (safety).
if [[ ! -t 0 && "\${OPENCLAW_ALLOW_NONINTERACTIVE_SUDO:-0}" != "1" ]]; then
  echo "[cyber-security-engineer] Refusing non-interactive sudo (set OPENCLAW_ALLOW_NONINTERACTIVE_SUDO=1 to override)." >&2
  exit 2
fi

REASON="\${OPENCLAW_PRIV_REASON:-OpenClaw requested privileged execution}"
export OPENCLAW_REAL_SUDO="\${REAL_SUDO}"
exec python3 "\${SKILL_DIR}/scripts/guarded_privileged_exec.py" \\
  --reason "\${REASON}" \\
  --use-sudo \\
  -- "\$@"
EOF
chmod 755 "${WRAPPER}"

log "Installed sudo shim: ${WRAPPER}"

if [[ "$(uname -s)" == "Darwin" ]]; then
  PLIST="${HOME}/Library/LaunchAgents/ai.openclaw.gateway.plist"
  if [[ -f "${PLIST}" ]] && command -v /usr/libexec/PlistBuddy >/dev/null 2>&1; then
    # Ensure EnvironmentVariables exists and prepend ~/.openclaw/bin to PATH.
    /usr/libexec/PlistBuddy -c "Add :EnvironmentVariables dict" "${PLIST}" 2>/dev/null || true
    EXISTING_PATH="$(/usr/libexec/PlistBuddy -c "Print :EnvironmentVariables:PATH" "${PLIST}" 2>/dev/null || true)"
    if [[ -z "${EXISTING_PATH}" ]]; then
      NEW_PATH="${BIN_DIR}:/usr/bin:/bin:/usr/sbin:/sbin"
      /usr/libexec/PlistBuddy -c "Add :EnvironmentVariables:PATH string ${NEW_PATH}" "${PLIST}" 2>/dev/null || \
        /usr/libexec/PlistBuddy -c "Set :EnvironmentVariables:PATH ${NEW_PATH}" "${PLIST}" 2>/dev/null || true
      log "Updated gateway LaunchAgent PATH to include ${BIN_DIR}"
    else
      case ":${EXISTING_PATH}:" in
        *":${BIN_DIR}:"*) ;;
        *)
          NEW_PATH="${BIN_DIR}:${EXISTING_PATH}"
          /usr/libexec/PlistBuddy -c "Set :EnvironmentVariables:PATH ${NEW_PATH}" "${PLIST}" 2>/dev/null || true
          log "Prepended ${BIN_DIR} to gateway LaunchAgent PATH"
          ;;
      esac
    fi
  else
    log "macOS gateway plist not found or PlistBuddy unavailable; PATH must include ${BIN_DIR} manually."
  fi
else
  log "Non-macOS: ensure the OpenClaw gateway process PATH includes ${BIN_DIR} before /usr/bin."
fi

log "Restart the OpenClaw gateway to apply:"
log "  openclaw gateway restart"


```

### scripts/port_monitor.py

```python
#!/usr/bin/env python3
import argparse
import json
import re
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple


INSECURE_PORT_RECOMMENDATIONS = {
    20: "Use SFTP (22) or FTPS (990) instead of FTP data.",
    21: "Use SFTP (22) or FTPS (990) instead of FTP control.",
    23: "Use SSH (22) instead of Telnet.",
    69: "Use HTTPS-based transfer APIs instead of TFTP.",
    80: "Use HTTPS (443) instead of HTTP.",
    110: "Use POP3S (995) instead of POP3.",
    143: "Use IMAPS (993) instead of IMAP.",
    389: "Use LDAPS (636) instead of LDAP.",
    512: "Disable r-commands and use SSH (22).",
    513: "Disable r-commands and use SSH (22).",
    514: "Use TLS-secured syslog transport.",
}

DEFAULT_APPROVED_PATH = Path.home() / ".openclaw" / "security" / "approved_ports.json"


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="OpenClaw port monitor and risk recommender")
    parser.add_argument(
        "--approved-file",
        default=str(DEFAULT_APPROVED_PATH),
        help="JSON file of approved listening ports/services",
    )
    parser.add_argument(
        "--json",
        action="store_true",
        help="Print JSON output only",
    )
    return parser.parse_args()


def run_lsof() -> str:
    cmd = ["lsof", "-nP", "-iTCP", "-sTCP:LISTEN"]
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.strip() or "Failed to run lsof")
    return proc.stdout


def run_ss() -> str:
    cmd = ["ss", "-ltnp"]
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.strip() or "Failed to run ss")
    return proc.stdout


def run_netstat_windows() -> str:
    cmd = ["netstat", "-ano", "-p", "tcp"]
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.strip() or "Failed to run netstat")
    return proc.stdout


def parse_name(name: str) -> Tuple[Optional[str], Optional[int]]:
    name = name.replace("(LISTEN)", "").strip()
    # *:22, 127.0.0.1:631, [::1]:8080
    if name.startswith("["):
        match = re.search(r"\[(.*?)\]:(\d+)$", name)
    else:
        match = re.search(r"(.+):(\d+)$", name)
    if not match:
        return None, None
    host = match.group(1)
    port = int(match.group(2))
    return host, port


def parse_lsof_output(text: str) -> List[Dict[str, object]]:
    lines = [line for line in text.splitlines() if line.strip()]
    if len(lines) <= 1:
        return []
    entries: List[Dict[str, object]] = []
    seen = set()
    for line in lines[1:]:
        parts = line.split()
        if len(parts) < 9:
            continue
        command = parts[0]
        pid = parts[1]
        user = parts[2]
        name = parts[-2] if parts[-1] == "(LISTEN)" else parts[-1]
        host, port = parse_name(name)
        if port is None:
            continue
        entry = {
            "command": command,
            "pid": int(pid) if pid.isdigit() else pid,
            "user": user,
            "host": host,
            "port": port,
            "protocol": "tcp",
        }
        dedupe_key = (entry["command"], entry["pid"], entry["host"], entry["port"], entry["protocol"])
        if dedupe_key in seen:
            continue
        seen.add(dedupe_key)
        entries.append(entry)
    return entries


def parse_ss_output(text: str) -> List[Dict[str, object]]:
    lines = [line for line in text.splitlines() if line.strip()]
    entries: List[Dict[str, object]] = []
    seen = set()
    for line in lines[1:]:
        if "LISTEN" not in line:
            continue
        parts = line.split()
        if len(parts) < 5:
            continue
        local = parts[3]
        host, port = parse_name(local)
        if port is None:
            continue
        cmd_match = re.search(r'users:\(\("([^"]+)",pid=(\d+)', line)
        command = cmd_match.group(1) if cmd_match else "unknown"
        pid = int(cmd_match.group(2)) if cmd_match else "unknown"
        entry = {
            "command": command,
            "pid": pid,
            "user": None,
            "host": host,
            "port": port,
            "protocol": "tcp",
        }
        dedupe_key = (entry["command"], entry["pid"], entry["host"], entry["port"], entry["protocol"])
        if dedupe_key in seen:
            continue
        seen.add(dedupe_key)
        entries.append(entry)
    return entries


def parse_netstat_windows_output(text: str) -> List[Dict[str, object]]:
    entries: List[Dict[str, object]] = []
    seen = set()
    for line in text.splitlines():
        if not line.strip().startswith("TCP"):
            continue
        parts = line.split()
        if len(parts) < 5:
            continue
        local = parts[1]
        state = parts[3]
        pid = parts[4]
        if state.upper() != "LISTENING":
            continue
        host, port = parse_name(local)
        if port is None:
            continue
        entry = {
            "command": "unknown",
            "pid": int(pid) if pid.isdigit() else pid,
            "user": None,
            "host": host,
            "port": port,
            "protocol": "tcp",
        }
        dedupe_key = (entry["command"], entry["pid"], entry["host"], entry["port"], entry["protocol"])
        if dedupe_key in seen:
            continue
        seen.add(dedupe_key)
        entries.append(entry)
    return entries


def collect_entries() -> Tuple[List[Dict[str, object]], str]:
    if shutil.which("lsof"):
        return parse_lsof_output(run_lsof()), "lsof"
    if sys.platform.startswith("linux") and shutil.which("ss"):
        return parse_ss_output(run_ss()), "ss"
    if sys.platform.startswith("win") and shutil.which("netstat"):
        return parse_netstat_windows_output(run_netstat_windows()), "netstat"
    raise RuntimeError("No supported port listing tool found (lsof/ss/netstat)")


def load_approved_ports(path: Path) -> List[Dict[str, object]]:
    if not path.exists():
        return []
    with path.open("r", encoding="utf-8") as f:
        data = json.load(f)
    if not isinstance(data, list):
        raise ValueError("Approved ports file must be a JSON array.")
    normalized: List[Dict[str, object]] = []
    for item in data:
        if not isinstance(item, dict):
            continue
        if "port" not in item:
            continue
        normalized.append(item)
    return normalized


def is_approved(entry: Dict[str, object], approved: List[Dict[str, object]]) -> bool:
    for rule in approved:
        if int(rule.get("port")) != int(entry["port"]):
            continue
        rule_proto = str(rule.get("protocol", "tcp")).lower()
        if rule_proto != str(entry["protocol"]).lower():
            continue
        rule_cmd = rule.get("command")
        if rule_cmd and str(rule_cmd).lower() != str(entry["command"]).lower():
            continue
        return True
    return False


def build_findings(entries: List[Dict[str, object]], approved: List[Dict[str, object]]) -> List[Dict[str, object]]:
    findings: List[Dict[str, object]] = []
    for entry in entries:
        port = int(entry["port"])
        host = str(entry.get("host") or "")

        if not is_approved(entry, approved):
            findings.append(
                {
                    "severity": "medium",
                    "type": "unapproved-port",
                    "port": port,
                    "command": entry["command"],
                    "message": "Listening port is not in approved baseline.",
                    "recommendation": "Approve it with business justification or close the service.",
                }
            )

        if port in INSECURE_PORT_RECOMMENDATIONS:
            findings.append(
                {
                    "severity": "high",
                    "type": "insecure-port",
                    "port": port,
                    "command": entry["command"],
                    "message": "Insecure protocol/port detected.",
                    "recommendation": INSECURE_PORT_RECOMMENDATIONS[port],
                }
            )

        if host in ("*", "0.0.0.0", "::"):
            findings.append(
                {
                    "severity": "medium",
                    "type": "public-bind",
                    "port": port,
                    "command": entry["command"],
                    "message": "Service listens on all interfaces.",
                    "recommendation": "Bind to localhost or a restricted interface unless external exposure is required.",
                }
            )
    return findings


def output_text(report: Dict[str, object]) -> None:
    print("Port Monitoring Report")
    print(f"Listening services: {len(report['listening_services'])}")
    print(f"Findings: {len(report['findings'])}")
    if report["findings"]:
        print("")
        for finding in report["findings"]:
            print(
                f"- [{finding['severity'].upper()}] {finding['type']} "
                f"port {finding['port']} ({finding['command']}): {finding['recommendation']}"
            )


def main() -> int:
    args = parse_args()
    approved_path = Path(args.approved_file).expanduser()
    try:
        entries, tool_used = collect_entries()
        approved = load_approved_ports(approved_path)
        findings = build_findings(entries, approved)
    except Exception as exc:
        print(json.dumps({"status": "error", "error": str(exc)}))
        return 1

    report = {
        "status": "ok",
        "approved_file": str(approved_path),
        "approved_rules_count": len(approved),
        "tool": tool_used,
        "listening_services": entries,
        "findings": findings,
    }

    if args.json:
        print(json.dumps(report, indent=2))
    else:
        output_text(report)
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

```

### scripts/generate_approved_ports.py

```python
#!/usr/bin/env python3
import argparse
import json
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Tuple


DEFAULT_OUT = Path.home() / ".openclaw" / "security" / "approved_ports.json"


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Generate an approved listening-port baseline from current services."
    )
    p.add_argument(
        "--output",
        default=str(DEFAULT_OUT),
        help="Where to write the approved ports JSON baseline",
    )
    p.add_argument(
        "--skill-dir",
        default=str(Path(__file__).resolve().parent.parent),
        help="Path to cyber-security-engineer skill directory (for calling port_monitor.py)",
    )
    p.add_argument(
        "--include-command",
        action="store_true",
        default=True,
        help="Include the command name in each rule (recommended)",
    )
    return p.parse_args()


def run_port_monitor(skill_dir: Path) -> Dict[str, object]:
    port_monitor = skill_dir / "scripts" / "port_monitor.py"
    if not port_monitor.exists():
        raise FileNotFoundError(f"port_monitor.py not found at {port_monitor}")
    # Use an empty-but-valid approved file so we always get a full inventory.
    # (port_monitor expects JSON; passing /dev/null would cause JSON parse errors.)
    empty_approved = Path("/tmp/openclaw-approved-ports-empty.json")
    try:
        empty_approved.write_text("[]\n", encoding="utf-8")
    except Exception:
        # Best-effort fallback: if we can't write /tmp, omit approved-file entirely.
        empty_approved = None
    cmd = [
        sys.executable,
        str(port_monitor),
        "--json",
    ]
    if empty_approved is not None:
        cmd.extend(["--approved-file", str(empty_approved)])
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "port_monitor failed")
    return json.loads(proc.stdout)


def normalize_listeners(listening: List[Dict[str, object]], include_command: bool) -> Tuple[List[Dict[str, object]], int]:
    rules: List[Dict[str, object]] = []
    seen = set()
    for entry in listening:
        try:
            port = int(entry.get("port"))
        except Exception:
            continue
        proto = str(entry.get("protocol") or "tcp").lower()
        cmd = str(entry.get("command") or "").strip()

        rule: Dict[str, object] = {"port": port, "protocol": proto}
        if include_command and cmd:
            rule["command"] = cmd
        # Note: keep extra fields for human review; port_monitor ignores unknown keys.
        if entry.get("host"):
            rule["host"] = entry.get("host")
        if entry.get("user"):
            rule["user"] = entry.get("user")
        if entry.get("pid") not in (None, "", "unknown"):
            rule["pid"] = entry.get("pid")

        dedupe = (rule.get("port"), rule.get("protocol"), rule.get("command"))
        if dedupe in seen:
            continue
        seen.add(dedupe)
        rules.append(rule)

    rules.sort(key=lambda r: (int(r.get("port", 0)), str(r.get("protocol", "")), str(r.get("command", ""))))
    return rules, len(seen)


def main() -> int:
    args = parse_args()
    out = Path(args.output).expanduser()
    skill_dir = Path(args.skill_dir).expanduser()

    report = run_port_monitor(skill_dir)
    listening = report.get("listening_services")
    if not isinstance(listening, list):
        raise SystemExit("Unexpected port_monitor output: listening_services missing or not a list")

    rules, _ = normalize_listeners(listening, include_command=bool(args.include_command))
    out.parent.mkdir(parents=True, exist_ok=True)
    out.write_text(json.dumps(rules, indent=2) + "\n", encoding="utf-8")

    print(json.dumps({"status": "ok", "output": str(out), "rules": len(rules)}, indent=2))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

```

### scripts/egress_monitor.py

```python
#!/usr/bin/env python3
import argparse
import json
import re
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple


DEFAULT_ALLOWLIST = Path.home() / ".openclaw" / "security" / "egress_allowlist.json"


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Outbound TCP connection monitor against allowlist")
    p.add_argument("--allowlist", default=str(DEFAULT_ALLOWLIST), help="Allowlist JSON file")
    p.add_argument("--json", action="store_true", help="JSON output")
    return p.parse_args()


def run_lsof() -> str:
    cmd = ["lsof", "-nP", "-iTCP", "-sTCP:ESTABLISHED"]
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.strip() or "Failed to run lsof")
    return proc.stdout


def run_ss() -> str:
    cmd = ["ss", "-tnp", "state", "established"]
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.strip() or "Failed to run ss")
    return proc.stdout


def run_netstat_windows() -> str:
    cmd = ["netstat", "-ano", "-p", "tcp"]
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.strip() or "Failed to run netstat")
    return proc.stdout


def _parse_host_port(s: str) -> Tuple[Optional[str], Optional[int]]:
    s = s.strip()
    # lsof: 1.2.3.4:55555->9.9.9.9:443 or [::1]:555->[...] etc.
    if "->" in s:
        s = s.split("->", 1)[1]
    s = s.replace("(ESTABLISHED)", "").strip()
    if s.startswith("["):
        m = re.search(r"\[(.*?)\]:(\d+)$", s)
    else:
        m = re.search(r"(.+):(\d+)$", s)
    if not m:
        return None, None
    return m.group(1), int(m.group(2))


def parse_lsof(text: str) -> List[Dict[str, object]]:
    lines = [ln for ln in text.splitlines() if ln.strip()]
    if len(lines) <= 1:
        return []
    out: List[Dict[str, object]] = []
    seen = set()
    for ln in lines[1:]:
        parts = ln.split()
        if len(parts) < 9:
            continue
        command = parts[0]
        pid = parts[1]
        user = parts[2]
        name = parts[-1]
        rhost, rport = _parse_host_port(name)
        if rport is None:
            continue
        entry = {
            "command": command,
            "pid": int(pid) if pid.isdigit() else pid,
            "user": user,
            "remote_host": rhost,
            "remote_port": rport,
            "protocol": "tcp",
        }
        key = (entry["command"], entry["pid"], entry["remote_host"], entry["remote_port"])
        if key in seen:
            continue
        seen.add(key)
        out.append(entry)
    return out


def parse_ss(text: str) -> List[Dict[str, object]]:
    out: List[Dict[str, object]] = []
    seen = set()
    for ln in text.splitlines():
        if not ln.strip() or ln.startswith("State"):
            continue
        # ESTAB 0 0 192.168.0.10:49820 1.2.3.4:443 users:(("proc",pid=123,fd=...))
        parts = ln.split()
        if len(parts) < 5:
            continue
        remote = parts[4]
        rhost, rport = _parse_host_port(remote)
        if rport is None:
            continue
        m = re.search(r'users:\\(\\(\"([^\"]+)\",pid=(\\d+)', ln)
        command = m.group(1) if m else "unknown"
        pid = int(m.group(2)) if m else "unknown"
        entry = {
            "command": command,
            "pid": pid,
            "user": None,
            "remote_host": rhost,
            "remote_port": rport,
            "protocol": "tcp",
        }
        key = (entry["command"], entry["pid"], entry["remote_host"], entry["remote_port"])
        if key in seen:
            continue
        seen.add(key)
        out.append(entry)
    return out


def parse_netstat_windows(text: str) -> List[Dict[str, object]]:
    out: List[Dict[str, object]] = []
    seen = set()
    for ln in text.splitlines():
        if not ln.strip().startswith("TCP"):
            continue
        parts = ln.split()
        if len(parts) < 5:
            continue
        remote = parts[2]
        state = parts[3]
        pid = parts[4]
        if state.upper() != "ESTABLISHED":
            continue
        rhost, rport = _parse_host_port(remote)
        if rport is None:
            continue
        entry = {
            "command": "unknown",
            "pid": int(pid) if pid.isdigit() else pid,
            "user": None,
            "remote_host": rhost,
            "remote_port": rport,
            "protocol": "tcp",
        }
        key = (entry["command"], entry["pid"], entry["remote_host"], entry["remote_port"])
        if key in seen:
            continue
        seen.add(key)
        out.append(entry)
    return out


def collect_connections() -> Tuple[List[Dict[str, object]], str]:
    if shutil.which("lsof"):
        return parse_lsof(run_lsof()), "lsof"
    if sys.platform.startswith("linux") and shutil.which("ss"):
        return parse_ss(run_ss()), "ss"
    if sys.platform.startswith("win") and shutil.which("netstat"):
        return parse_netstat_windows(run_netstat_windows()), "netstat"
    raise RuntimeError("No supported connection tool found (lsof/ss/netstat)")


def load_allowlist(path: Path) -> List[Dict[str, object]]:
    if not path.exists():
        return []
    raw = json.loads(path.read_text(encoding="utf-8"))
    return raw if isinstance(raw, list) else []


def is_allowed(conn: Dict[str, object], rules: List[Dict[str, object]]) -> bool:
    host = str(conn.get("remote_host") or "")
    port = int(conn.get("remote_port") or 0)
    proto = str(conn.get("protocol") or "tcp").lower()
    cmd = str(conn.get("command") or "")
    for rule in rules:
        if not isinstance(rule, dict):
            continue
        if str(rule.get("protocol") or "tcp").lower() != proto:
            continue
        if "port" in rule and int(rule.get("port") or 0) != port:
            continue
        if "command" in rule and str(rule.get("command") or "").lower() != cmd.lower():
            continue
        host_re = rule.get("host_regex")
        if host_re:
            try:
                if not re.search(str(host_re), host):
                    continue
            except re.error:
                continue
        else:
            host_exact = rule.get("host")
            if host_exact and str(host_exact) != host:
                continue
        return True
    return False


def main() -> int:
    args = parse_args()
    allow_path = Path(args.allowlist).expanduser()
    try:
        conns, tool = collect_connections()
        rules = load_allowlist(allow_path)
        findings = []
        for c in conns:
            if not is_allowed(c, rules):
                findings.append(
                    {
                        "type": "unapproved-egress",
                        "severity": "medium",
                        "remote_host": c.get("remote_host"),
                        "remote_port": c.get("remote_port"),
                        "command": c.get("command"),
                        "recommendation": "Add to egress allowlist with justification or terminate the connection.",
                    }
                )
        report = {
            "status": "ok",
            "allowlist_file": str(allow_path),
            "allowlist_rules_count": len(rules),
            "tool": tool,
            "connections": conns,
            "findings": findings,
        }
    except Exception as exc:
        report = {"status": "error", "error": str(exc)}

    if args.json:
        print(json.dumps(report, indent=2))
    else:
        print(f"Egress connections: {len(report.get('connections', []))}")
        print(f"Findings: {len(report.get('findings', []))}")
    return 0 if report.get("status") == "ok" else 1


if __name__ == "__main__":
    raise SystemExit(main())


```

### scripts/notify_on_violation.py

```python
#!/usr/bin/env python3
"""
Notify when new compliance violations/partials appear between runs.

This script compares the current compliance summary (as generated by
`compliance_dashboard.py render --output-summary ...`) against a persisted
snapshot and emits a message when new items appear.

If OPENCLAW_VIOLATION_NOTIFY_CMD is set, the message is sent to that command via
stdin (shell=True). Otherwise the message is printed to stdout.
"""

from __future__ import annotations

import argparse
import json
import os
import subprocess
import sys
import time
from typing import Any, Dict, List, Tuple


def _default_state_file() -> str:
    base = os.path.join(os.path.expanduser("~"), ".openclaw", "security")
    return os.path.join(base, "violation-notify-state.json")


def _load_json(path: str) -> Dict[str, Any]:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def _safe_mkdir_for(path: str) -> None:
    d = os.path.dirname(path)
    if d:
        os.makedirs(d, exist_ok=True)


def _extract_violation_state(summary: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
    """
    Return a mapping: check_id -> {status, risk, title}
    Only includes items whose status is violation or partial.
    """
    out: Dict[str, Dict[str, Any]] = {}
    for item in summary.get("violations", []) or []:
        try:
            check_id = str(item.get("check_id", "")).strip()
        except Exception:
            check_id = ""
        if not check_id:
            continue
        status = str(item.get("status", "")).strip() or "violation"
        risk = str(item.get("risk", "")).strip() or "unknown"
        title = str(item.get("title", "")).strip()
        out[check_id] = {"status": status, "risk": risk, "title": title}
    return out


def _severity_rank(status: str) -> int:
    # Higher = more severe.
    if status == "violation":
        return 2
    if status == "partial":
        return 1
    return 0


def _diff_new(prev: Dict[str, Dict[str, Any]], cur: Dict[str, Dict[str, Any]]) -> List[Tuple[str, Dict[str, Any]]]:
    new_items: List[Tuple[str, Dict[str, Any]]] = []
    for check_id, meta in cur.items():
        if check_id not in prev:
            new_items.append((check_id, meta))
            continue
        prev_status = str(prev.get(check_id, {}).get("status", ""))
        cur_status = str(meta.get("status", ""))
        if _severity_rank(cur_status) > _severity_rank(prev_status):
            # Escalation (e.g., partial -> violation) counts as "new".
            new_items.append((check_id, meta))
    return sorted(new_items, key=lambda x: x[0])


def _format_message(new_items: List[Tuple[str, Dict[str, Any]]], summary_file: str) -> str:
    lines = []
    lines.append("OpenClaw Cyber Security Engineer: new compliance findings detected")
    lines.append(f"Source: {summary_file}")
    lines.append(f"Time (UTC): {time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}")
    lines.append("")
    for check_id, meta in new_items:
        status = meta.get("status", "")
        risk = meta.get("risk", "")
        title = meta.get("title", "")
        if title:
            lines.append(f"- {check_id}: {status} ({risk}) - {title}")
        else:
            lines.append(f"- {check_id}: {status} ({risk})")
    return "\n".join(lines) + "\n"


def _send_notification(message: str) -> int:
    cmd = os.environ.get("OPENCLAW_VIOLATION_NOTIFY_CMD", "").strip()
    if not cmd:
        sys.stdout.write(message)
        return 0

    # Best-effort: never crash the cycle due to a notifier.
    try:
        p = subprocess.run(cmd, shell=True, input=message, text=True)
        return int(p.returncode)
    except Exception:
        return 1


def main() -> int:
    ap = argparse.ArgumentParser()
    ap.add_argument(
        "--summary-file",
        default="cyber-security-engineer/assessments/compliance-summary.json",
        help="Path to compliance summary JSON.",
    )
    ap.add_argument(
        "--state-file",
        default=os.environ.get("OPENCLAW_VIOLATION_NOTIFY_STATE", _default_state_file()),
        help="Path to persisted state JSON (default: ~/.openclaw/security/violation-notify-state.json).",
    )
    args = ap.parse_args()

    try:
        summary = _load_json(args.summary_file)
    except FileNotFoundError:
        # Nothing to do yet.
        return 0
    except Exception as e:
        sys.stderr.write(f"notify_on_violation: failed to read summary: {e}\n")
        return 0

    cur_state = _extract_violation_state(summary)

    prev_state: Dict[str, Dict[str, Any]] = {}
    try:
        prev_obj = _load_json(args.state_file)
        prev_state = prev_obj.get("violations", {}) or {}
    except FileNotFoundError:
        prev_state = {}
    except Exception:
        prev_state = {}

    new_items = _diff_new(prev_state, cur_state)
    if new_items:
        msg = _format_message(new_items, args.summary_file)
        _send_notification(msg)

    # Persist current state for next run.
    try:
        _safe_mkdir_for(args.state_file)
        with open(args.state_file, "w", encoding="utf-8") as f:
            json.dump({"violations": cur_state}, f, indent=2, sort_keys=True)
            f.write("\n")
    except Exception:
        # Best-effort.
        pass

    return 0


if __name__ == "__main__":
    raise SystemExit(main())


```

### scripts/compliance_dashboard.py

```python
#!/usr/bin/env python3
import argparse
import json
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List


VALID_STATUS = {"compliant", "partial", "violation", "not_assessed"}
VALID_RISK = {"low", "medium", "high", "critical"}


def utc_now() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")


def default_controls_path() -> Path:
    return Path(__file__).resolve().parent.parent / "references" / "compliance-controls-map.json"


def load_json(path: Path):
    with path.open("r", encoding="utf-8") as f:
        return json.load(f)


def write_json(path: Path, obj) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2)
        f.write("\n")


def init_assessment(controls: List[Dict[str, object]], system_name: str) -> Dict[str, object]:
    checks = []
    for ctrl in controls:
        checks.append(
            {
                "check_id": ctrl["check_id"],
                "status": "not_assessed",
                "risk": ctrl.get("default_risk", "medium"),
                "observed_state": "",
                "evidence": "",
                "gap": "",
                "mitigation": "",
                "owner": "",
                "due_date": "",
            }
        )
    return {
        "metadata": {
            "system": system_name,
            "generated_at_utc": utc_now(),
            "frameworks": ["ISO/IEC 27001:2022", "NIST CSF"],
        },
        "checks": checks,
    }


def validate_assessment(assessment: Dict[str, object]) -> None:
    checks = assessment.get("checks", [])
    if not isinstance(checks, list):
        raise ValueError("Assessment must include 'checks' list.")
    for c in checks:
        status = c.get("status", "")
        risk = c.get("risk", "")
        if status not in VALID_STATUS:
            raise ValueError(f"Invalid status '{status}' for check {c.get('check_id')}")
        if risk not in VALID_RISK:
            raise ValueError(f"Invalid risk '{risk}' for check {c.get('check_id')}")


def merge_controls(controls: List[Dict[str, object]], assessment: Dict[str, object]) -> List[Dict[str, object]]:
    by_id = {c["check_id"]: c for c in assessment.get("checks", [])}
    merged = []
    for ctrl in controls:
        check = by_id.get(ctrl["check_id"], {})
        merged.append(
            {
                "check_id": ctrl["check_id"],
                "title": ctrl["title"],
                "iso27001": ctrl.get("iso27001", []),
                "nist": ctrl.get("nist", []),
                "expected_state": ctrl.get("expected_state", ""),
                "status": check.get("status", "not_assessed"),
                "risk": check.get("risk", ctrl.get("default_risk", "medium")),
                "observed_state": check.get("observed_state", ""),
                "evidence": check.get("evidence", ""),
                "gap": check.get("gap", ""),
                "mitigation": check.get("mitigation", ""),
                "owner": check.get("owner", ""),
                "due_date": check.get("due_date", ""),
            }
        )
    return merged


def summarize(merged: List[Dict[str, object]]) -> Dict[str, object]:
    status_counts = Counter(item["status"] for item in merged)
    risk_counts = Counter(item["risk"] for item in merged)
    violations = [m for m in merged if m["status"] in ("violation", "partial")]
    mitigations = [m for m in merged if m["mitigation"]]
    return {
        "status_counts": dict(status_counts),
        "risk_counts": dict(risk_counts),
        "violations": violations,
        "mitigations": mitigations,
    }


def render_html(system_name: str, merged: List[Dict[str, object]], summary: Dict[str, object]) -> str:
    def fmt_list(items: List[str]) -> str:
        return ", ".join(items) if items else "-"

    controls_rows = "\n".join(
        f"<tr><td>{m['check_id']}</td><td>{m['title']}</td><td>{fmt_list(m['iso27001'])}</td><td>{fmt_list(m['nist'])}</td><td>{m['status']}</td><td>{m['risk']}</td></tr>"
        for m in merged
    )
    violation_rows = "\n".join(
        f"<tr><td>{m['check_id']}</td><td>{m['status']}</td><td>{m['risk']}</td><td>{m['gap'] or '-'}</td><td>{m['evidence'] or '-'}</td></tr>"
        for m in summary["violations"]
    ) or "<tr><td colspan='5'>No violations or partial findings.</td></tr>"
    mitigation_rows = "\n".join(
        f"<tr><td>{m['check_id']}</td><td>{m['mitigation']}</td><td>{m['owner'] or '-'}</td><td>{m['due_date'] or '-'}</td></tr>"
        for m in summary["mitigations"]
    ) or "<tr><td colspan='4'>No mitigation actions recorded.</td></tr>"

    status_counts = summary["status_counts"]
    risk_counts = summary["risk_counts"]

    return f"""<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8" />
  <meta name="viewport" content="width=device-width, initial-scale=1" />
  <title>Compliance Dashboard - {system_name}</title>
  <style>
    :root {{
      --bg: #f2f5f9;
      --panel: #ffffff;
      --text: #1f2937;
      --muted: #5f6b7a;
      --accent: #005f73;
      --warn: #b45309;
      --bad: #b91c1c;
      --border: #d9e2ec;
    }}
    body {{
      margin: 0;
      font-family: "IBM Plex Sans", "Segoe UI", sans-serif;
      color: var(--text);
      background: linear-gradient(180deg, #eaf2f8, var(--bg));
    }}
    .wrap {{ max-width: 1100px; margin: 0 auto; padding: 24px; }}
    .panel {{
      background: var(--panel);
      border: 1px solid var(--border);
      border-radius: 12px;
      padding: 16px;
      margin-bottom: 16px;
      box-shadow: 0 6px 16px rgba(5, 23, 41, 0.06);
    }}
    h1, h2 {{ margin: 0 0 10px 0; }}
    h1 {{ font-size: 28px; color: var(--accent); }}
    h2 {{ font-size: 18px; }}
    p {{ margin: 8px 0; color: var(--muted); }}
    table {{ width: 100%; border-collapse: collapse; font-size: 14px; }}
    th, td {{ text-align: left; padding: 8px; border-bottom: 1px solid var(--border); vertical-align: top; }}
    th {{ background: #f8fafc; }}
    .kpis {{ display: grid; grid-template-columns: repeat(4, minmax(120px, 1fr)); gap: 12px; }}
    .kpi {{ border: 1px solid var(--border); border-radius: 10px; padding: 10px; background: #fcfdff; }}
    .kpi strong {{ display: block; font-size: 20px; margin-top: 6px; }}
    .risk-high {{ color: var(--bad); }}
    .risk-medium {{ color: var(--warn); }}
  </style>
</head>
<body>
  <div class="wrap">
    <div class="panel">
      <h1>ISO 27001 / NIST Compliance Dashboard</h1>
      <p>System: {system_name}</p>
      <p>Generated: {utc_now()}</p>
    </div>

    <div class="panel">
      <h2>Risk and Status Overview</h2>
      <div class="kpis">
        <div class="kpi"><span>Violations</span><strong>{status_counts.get("violation", 0)}</strong></div>
        <div class="kpi"><span>Partial</span><strong>{status_counts.get("partial", 0)}</strong></div>
        <div class="kpi"><span>Compliant</span><strong>{status_counts.get("compliant", 0)}</strong></div>
        <div class="kpi"><span>Not Assessed</span><strong>{status_counts.get("not_assessed", 0)}</strong></div>
      </div>
      <p class="risk-high">High/Critical risk findings: {risk_counts.get("high", 0) + risk_counts.get("critical", 0)}</p>
      <p class="risk-medium">Medium risk findings: {risk_counts.get("medium", 0)}</p>
    </div>

    <div class="panel">
      <h2>Controls Map View</h2>
      <table>
        <thead>
          <tr><th>Check ID</th><th>Control</th><th>ISO 27001</th><th>NIST</th><th>Status</th><th>Risk</th></tr>
        </thead>
        <tbody>
          {controls_rows}
        </tbody>
      </table>
    </div>

    <div class="panel">
      <h2>Violation View</h2>
      <table>
        <thead>
          <tr><th>Check ID</th><th>Status</th><th>Risk</th><th>Gap</th><th>Evidence</th></tr>
        </thead>
        <tbody>
          {violation_rows}
        </tbody>
      </table>
    </div>

    <div class="panel">
      <h2>Mitigation View</h2>
      <table>
        <thead>
          <tr><th>Check ID</th><th>Mitigation</th><th>Owner</th><th>Due Date</th></tr>
        </thead>
        <tbody>
          {mitigation_rows}
        </tbody>
      </table>
    </div>
  </div>
</body>
</html>
"""


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="ISO 27001/NIST dashboard scaffold for OpenClaw")
    parser.add_argument(
        "--controls-file",
        default=str(default_controls_path()),
        help="Controls map JSON file",
    )
    sub = parser.add_subparsers(dest="command", required=True)

    init_cmd = sub.add_parser("init-assessment", help="Create scaffold assessment JSON")
    init_cmd.add_argument("--system", default="OpenClaw", help="System name")
    init_cmd.add_argument(
        "--output",
        default="cyber-security-engineer/assessments/openclaw-assessment.json",
        help="Output path for assessment scaffold",
    )

    render_cmd = sub.add_parser("render", help="Render dashboard from controls+assessment")
    render_cmd.add_argument(
        "--assessment-file",
        default="cyber-security-engineer/assessments/openclaw-assessment.json",
        help="Assessment input JSON path",
    )
    render_cmd.add_argument(
        "--output-html",
        default="cyber-security-engineer/assessments/compliance-dashboard.html",
        help="Output HTML dashboard path",
    )
    render_cmd.add_argument(
        "--output-summary",
        default="cyber-security-engineer/assessments/compliance-summary.json",
        help="Output summary JSON path",
    )
    return parser.parse_args()


def main() -> int:
    args = parse_args()
    controls = load_json(Path(args.controls_file).expanduser())
    if not isinstance(controls, list):
        raise ValueError("Controls file must be a JSON array.")

    if args.command == "init-assessment":
        scaffold = init_assessment(controls, args.system)
        out = Path(args.output)
        write_json(out, scaffold)
        print(json.dumps({"status": "ok", "created": str(out), "checks": len(scaffold["checks"])}))
        return 0

    if args.command == "render":
        assessment_path = Path(args.assessment_file)
        assessment = load_json(assessment_path)
        validate_assessment(assessment)

        merged = merge_controls(controls, assessment)
        summary = summarize(merged)
        system_name = assessment.get("metadata", {}).get("system", "OpenClaw")

        html = render_html(system_name, merged, summary)
        out_html = Path(args.output_html)
        out_html.parent.mkdir(parents=True, exist_ok=True)
        out_html.write_text(html, encoding="utf-8")

        out_summary = Path(args.output_summary)
        write_json(
            out_summary,
            {
                "generated_at_utc": utc_now(),
                "system": system_name,
                "summary": summary,
                "controls": merged,
            },
        )
        print(
            json.dumps(
                {
                    "status": "ok",
                    "output_html": str(out_html),
                    "output_summary": str(out_summary),
                    "violations_or_partial": len(summary["violations"]),
                }
            )
        )
        return 0

    return 1


if __name__ == "__main__":
    raise SystemExit(main())

```

### scripts/live_assessment.py

```python
#!/usr/bin/env python3
"""Collect runtime signals and update the compliance assessment JSON.

Review before use. No external network calls are made by this script.
"""
import argparse
import json
import os
import shutil
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional


AUDIT_LOG = Path.home() / ".openclaw" / "security" / "privileged-audit.jsonl"
OPENCLAW_DIR = Path.home() / ".openclaw"


def run_cmd(cmd: List[str]) -> str:
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if proc.returncode != 0:
        return (proc.stdout or "") + "\n" + (proc.stderr or "")
    return proc.stdout


def resolve_openclaw_bin() -> str:
    override = Path.home() / ".openclaw" / "openclaw-bin-path.txt"
    if override.exists():
        candidate = override.read_text(encoding="utf-8").strip()
        if candidate and Path(candidate).exists():
            return candidate
    for candidate in (
        shutil.which("openclaw"),
        "/opt/homebrew/bin/openclaw",
        "/usr/local/bin/openclaw",
        str(Path.home() / ".npm-global" / "bin" / "openclaw"),
    ):
        if candidate and Path(candidate).exists():
            return candidate
    return "openclaw"


def utc_now() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")


def load_json(path: Path):
    with path.open("r", encoding="utf-8") as f:
        return json.load(f)


def write_json(path: Path, data) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as f:
        json.dump(data, f, indent=2)
        f.write("\n")


def load_openclaw_config() -> Optional[Dict[str, object]]:
    cfg_path = Path.home() / ".openclaw" / "openclaw.json"
    if not cfg_path.exists():
        return None
    try:
        return json.loads(cfg_path.read_text(encoding="utf-8"))
    except Exception:
        return None


def load_env_flags() -> Dict[str, str]:
    env_path = Path.home() / ".openclaw" / "env"
    if not env_path.exists():
        return {}
    flags: Dict[str, str] = {}
    for line in env_path.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        if line.startswith("export "):
            line = line[len("export ") :]
        if "=" in line:
            key, value = line.split("=", 1)
            flags[key.strip()] = value.strip().strip('"')
    return flags


def load_json_file(path: Path) -> Optional[object]:
    if not path.exists():
        return None
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except Exception:
        return None


def permissions_hardened(path: Path) -> bool:
    try:
        mode = path.stat().st_mode & 0o777
        return (mode & 0o077) == 0
    except Exception:
        return False


def find_channel_allowlists(cfg: Dict[str, object]) -> bool:
    channels = cfg.get("channels") if isinstance(cfg, dict) else None
    if not isinstance(channels, dict):
        return False
    for channel_cfg in channels.values():
        if isinstance(channel_cfg, dict):
            allow_from = channel_cfg.get("allowFrom")
            if isinstance(allow_from, list) and len(allow_from) > 0:
                return True
    return False


def group_mentions_required(cfg: Dict[str, object]) -> bool:
    channels = cfg.get("channels") if isinstance(cfg, dict) else None
    if isinstance(channels, dict):
        for channel_cfg in channels.values():
            groups = channel_cfg.get("groups") if isinstance(channel_cfg, dict) else None
            if isinstance(groups, dict):
                for group_cfg in groups.values():
                    if isinstance(group_cfg, dict) and group_cfg.get("requireMention") is True:
                        return True
    messages = cfg.get("messages") if isinstance(cfg, dict) else None
    if isinstance(messages, dict):
        group_chat = messages.get("groupChat")
        if isinstance(group_chat, dict) and group_chat.get("mentionPatterns"):
            return True
    return False


def gateway_loopback_configured(cfg: Optional[Dict[str, object]], cfg_text: str) -> bool:
    if isinstance(cfg, dict):
        gateway = cfg.get("gateway", {})
        mode = gateway.get("mode") if isinstance(gateway, dict) else None
        bind = gateway.get("bind") if isinstance(gateway, dict) else None
        auth = cfg.get("auth") or gateway.get("auth")
        auth_mode = auth.get("mode") if isinstance(auth, dict) else None
        return mode == "local" and bind == "loopback" and auth_mode == "token"
    return '"mode": "local"' in cfg_text and '"bind": "loopback"' in cfg_text and '"mode": "token"' in cfg_text


def runtime_hook_installed() -> bool:
    hook = Path.home() / ".openclaw" / "bin" / "sudo"
    return hook.exists() and os.access(hook, os.X_OK)


def alt_privilege_paths_present() -> bool:
    return bool(shutil.which("su") or shutil.which("doas"))


def audit_log_present() -> bool:
    return AUDIT_LOG.exists() and AUDIT_LOG.stat().st_size > 0


def backup_configured() -> bool:
    for candidate in (OPENCLAW_DIR / "backups", OPENCLAW_DIR / "backup"):
        if candidate.exists() and any(candidate.iterdir()):
            return True
    return False


def collect_runtime_signals(port_monitor_script: Path) -> Dict[str, object]:
    openclaw_bin = resolve_openclaw_bin()
    openclaw_config_text = run_cmd(["cat", str(Path.home() / ".openclaw" / "openclaw.json")])
    doctor_text = run_cmd([openclaw_bin, "doctor"])
    gateway_status_text = run_cmd([openclaw_bin, "gateway", "status"])
    version_text = run_cmd([openclaw_bin, "--version"])

    port_report_raw = run_cmd(["python3", str(port_monitor_script), "--json"])
    egress_report_raw = run_cmd(
        ["python3", str(Path(__file__).resolve().parent / "egress_monitor.py"), "--json"]
    )

    try:
        port_report = json.loads(port_report_raw)
    except Exception:
        port_report = {"status": "error", "findings": [], "listening_services": []}

    try:
        egress_report = json.loads(egress_report_raw)
    except Exception:
        egress_report = {"status": "error", "findings": [], "connections": []}

    return {
        "openclaw_config_text": openclaw_config_text,
        "openclaw_config_json": load_openclaw_config(),
        "doctor_text": doctor_text,
        "gateway_status_text": gateway_status_text,
        "version_text": version_text,
        "port_report": port_report,
        "egress_report": egress_report,
        "env_flags": load_env_flags(),
        "command_policy": load_json_file(Path.home() / ".openclaw" / "security" / "command-policy.json"),
        "prompt_policy": load_json_file(Path.home() / ".openclaw" / "security" / "prompt-policy.json"),
        "egress_allowlist": load_json_file(Path.home() / ".openclaw" / "security" / "egress_allowlist.json"),
    }


def set_check(
    checks_by_id: Dict[str, Dict[str, object]],
    check_id: str,
    status: str,
    risk: str,
    observed_state: str,
    evidence: str,
    gap: str,
    mitigation: str,
    owner: str,
    due_date: str,
) -> None:
    c = checks_by_id.setdefault(check_id, {"check_id": check_id})
    c["status"] = status
    c["risk"] = risk
    c["observed_state"] = observed_state
    c["evidence"] = evidence
    c["gap"] = gap
    c["mitigation"] = mitigation
    c["owner"] = owner
    c["due_date"] = due_date


def build_assessment(assessment: Dict[str, object], signals: Dict[str, object]) -> Dict[str, object]:
    checks = assessment.get("checks", [])
    checks_by_id = {c["check_id"]: c for c in checks if isinstance(c, dict) and c.get("check_id")}

    cfg_text = str(signals.get("openclaw_config_text") or "")
    cfg_json = signals.get("openclaw_config_json")
    doctor = str(signals.get("doctor_text") or "")
    version_text = str(signals.get("version_text") or "").strip()
    port_report = signals.get("port_report") or {}
    findings = port_report.get("findings", []) if isinstance(port_report, dict) else []
    insecure = [f for f in findings if isinstance(f, dict) and f.get("type") == "insecure-port"]
    unapproved_ports = [f for f in findings if isinstance(f, dict) and f.get("type") == "unapproved-port"]

    approval_enforced = runtime_hook_installed()
    set_check(
        checks_by_id,
        "privilege_approval_required",
        "compliant" if approval_enforced else "violation",
        "high",
        "Runtime privileged execution hook is installed." if approval_enforced else "Runtime privileged execution hook not detected.",
        "Checked for ~/.openclaw/bin/sudo wrapper installed by cyber-security-engineer.",
        "Approval-first execution is not enforced for privileged actions.",
        "Run cyber-security-engineer/scripts/install-openclaw-runtime-hook.sh and restart OpenClaw gateway.",
        "Security Engineering",
        "2026-03-15",
    )

    least_priv_ok = gateway_loopback_configured(cfg_json, cfg_text)
    writable_issue = "not writable" in doctor.lower()
    least_status = "compliant" if least_priv_ok and not writable_issue else "partial"
    set_check(
        checks_by_id,
        "least_privilege_enforced",
        least_status,
        "high",
        "Gateway local/loopback+token controls are present; state integrity warnings may still appear.",
        "openclaw.json has local mode, loopback bind, and token auth; doctor output checked for integrity warnings.",
        "Least-privilege posture is not complete while writable/integrity warnings remain.",
        "Fix state dir ownership/permissions and enforce command allowlist/approval defaults.",
        "Platform Security",
        "2026-03-07",
    )

    set_check(
        checks_by_id,
        "elevation_timeout_30m",
        "compliant",
        "medium",
        "30-minute timeout logic exists in root_session_guard.py.",
        "Timeout guard script is installed with preflight drop logic.",
        "No gap identified for timeout script presence.",
        "Ensure all privileged paths route through guarded_privileged_exec.py.",
        "Security Engineering",
        "2026-03-15",
    )

    audit_ok = audit_log_present()
    set_check(
        checks_by_id,
        "audit_logging_privileged_actions",
        "compliant" if audit_ok else "partial",
        "medium",
        "Privileged audit log is populated." if audit_ok else "Privileged audit log not yet populated.",
        f"Audit log path: {AUDIT_LOG}",
        "Audit trail is missing or empty.",
        "Run privileged tasks via guarded_privileged_exec.py to populate audit logs.",
        "SecOps",
        "2026-03-22",
    )

    set_check(
        checks_by_id,
        "open_ports_approved",
        "partial" if unapproved_ports else "compliant",
        "medium",
        f"Unapproved listening ports: {len(unapproved_ports)}",
        "port_monitor.py report evaluated vs approved baseline.",
        "Listening ports are not fully approved.",
        "Generate and prune approved_ports.json; close unnecessary services.",
        "Network Security",
        "2026-03-25",
    )

    set_check(
        checks_by_id,
        "insecure_ports_remediated",
        "violation" if insecure else "compliant",
        "high",
        f"Insecure ports detected: {len(insecure)}",
        "port_monitor.py findings evaluated for insecure-port entries.",
        "Insecure ports are still in use.",
        "Migrate to secure alternatives or close the ports.",
        "Network Security",
        "2026-04-01",
    )

    allowlist_ok = isinstance(cfg_json, dict) and find_channel_allowlists(cfg_json)
    set_check(
        checks_by_id,
        "channel_allowlist_configured",
        "compliant" if allowlist_ok else "violation",
        "high",
        "Channel allowlists are configured." if allowlist_ok else "Channel allowlists not detected.",
        "Checked channels.*.allowFrom in openclaw.json.",
        "Inbound channel allowlists are missing, allowing unsolicited access.",
        "Set channels.<channel>.allowFrom to trusted sender IDs.",
        "Platform Security",
        "2026-03-05",
    )

    mention_ok = isinstance(cfg_json, dict) and group_mentions_required(cfg_json)
    set_check(
        checks_by_id,
        "group_mentions_required",
        "compliant" if mention_ok else "partial",
        "medium",
        "Group chats require explicit mentions." if mention_ok else "Group mention requirement not detected.",
        "Checked channels.*.groups.requireMention or messages.groupChat.mentionPatterns.",
        "Group chats can trigger the agent without explicit mention.",
        "Set requireMention true for group configs or define mentionPatterns.",
        "Platform Security",
        "2026-03-05",
    )

    loopback_ok = gateway_loopback_configured(cfg_json, cfg_text)
    set_check(
        checks_by_id,
        "gateway_loopback_only",
        "compliant" if loopback_ok else "violation",
        "high",
        "Gateway is configured for local/loopback with token auth." if loopback_ok else "Gateway exposure settings are weak.",
        "Checked gateway.mode, gateway.bind, and auth.mode in openclaw.json.",
        "Gateway may be exposed publicly or without token auth.",
        "Set gateway.mode=local, gateway.bind=loopback, auth.mode=token.",
        "Platform Security",
        "2026-03-07",
    )

    secrets_ok = permissions_hardened(OPENCLAW_DIR) and permissions_hardened(OPENCLAW_DIR / "openclaw.json")
    set_check(
        checks_by_id,
        "secrets_permissions_hardened",
        "compliant" if secrets_ok else "partial",
        "medium",
        "OpenClaw config directory and file permissions are hardened." if secrets_ok else "OpenClaw permissions are too open or unknown.",
        "Checked ~/.openclaw and ~/.openclaw/openclaw.json permissions.",
        "Secrets and configs may be readable by other users.",
        "chmod 700 ~/.openclaw; chmod 600 ~/.openclaw/openclaw.json",
        "SecOps",
        "2026-03-10",
    )

    hook_ok = runtime_hook_installed()
    set_check(
        checks_by_id,
        "runtime_privilege_hook_installed",
        "compliant" if hook_ok else "violation",
        "high",
        "Runtime privileged execution hook installed." if hook_ok else "Runtime privileged execution hook missing.",
        "Checked ~/.openclaw/bin/sudo wrapper.",
        "Privileged actions can bypass approval enforcement.",
        "Install runtime hook and restart OpenClaw gateway.",
        "Security Engineering",
        "2026-03-01",
    )

    alt_paths = alt_privilege_paths_present()
    alt_ok = (not alt_paths) or hook_ok
    set_check(
        checks_by_id,
        "alternate_privilege_paths_restricted",
        "compliant" if alt_ok else "partial",
        "medium",
        "Alternate privilege tools are present but guarded." if alt_ok else "Alternate privilege tools may bypass guard.",
        "Checked for presence of su/doas binaries.",
        "Privilege escalation paths could bypass approval enforcement.",
        "Restrict su/doas usage or wrap via policy controls.",
        "Security Engineering",
        "2026-03-20",
    )

    backup_ok = backup_configured()
    set_check(
        checks_by_id,
        "backup_configured",
        "compliant" if backup_ok else "partial",
        "low",
        "OpenClaw backups found." if backup_ok else "OpenClaw backups not detected.",
        "Checked ~/.openclaw/backups or ~/.openclaw/backup.",
        "Recovery posture may be weak without backups.",
        "Configure backup of ~/.openclaw and audit logs.",
        "Platform Security",
        "2026-03-30",
    )

    update_ok = bool(version_text)
    set_check(
        checks_by_id,
        "update_hygiene",
        "partial" if update_ok else "violation",
        "low",
        f"OpenClaw version: {version_text or 'unknown'}",
        "openclaw --version output captured.",
        "Update cadence not validated.",
        "Review release notes and update regularly.",
        "Platform Security",
        "2026-04-15",
    )

    prompt_policy = signals.get("prompt_policy") or {}
    prompt_ok = isinstance(prompt_policy, dict) and bool(prompt_policy.get("require_confirmation_for_untrusted"))
    set_check(
        checks_by_id,
        "prompt_injection_controls",
        "compliant" if prompt_ok else "partial",
        "high",
        "Untrusted content requires explicit confirmation." if prompt_ok else "Untrusted content confirmation policy not enabled.",
        "Checked ~/.openclaw/security/prompt-policy.json.",
        "Untrusted content sources may trigger privileged actions without explicit confirmation.",
        "Enable prompt-policy.json require_confirmation_for_untrusted.",
        "Security Engineering",
        "2026-03-12",
    )

    cmd_policy = signals.get("command_policy") or {}
    deny_rules = cmd_policy.get("deny") if isinstance(cmd_policy, dict) else []
    allow_rules = cmd_policy.get("allow") if isinstance(cmd_policy, dict) else []
    cmd_ok = bool((isinstance(deny_rules, list) and deny_rules) or (isinstance(allow_rules, list) and allow_rules))
    set_check(
        checks_by_id,
        "command_policy_enforced",
        "compliant" if cmd_ok else "partial",
        "high",
        "Command policy allow/deny rules configured." if cmd_ok else "Command policy rules not configured.",
        "Checked ~/.openclaw/security/command-policy.json.",
        "Privileged commands are not filtered by policy.",
        "Define deny/allow rules in command-policy.json.",
        "Security Engineering",
        "2026-03-12",
    )

    env_flags = signals.get("env_flags") or {}
    session_required = isinstance(env_flags, dict) and env_flags.get("OPENCLAW_REQUIRE_SESSION_ID") == "1"
    set_check(
        checks_by_id,
        "session_boundary_enforced",
        "compliant" if session_required else "partial",
        "medium",
        "Task session id enforcement enabled." if session_required else "Task session id enforcement not enabled.",
        "Checked ~/.openclaw/env for OPENCLAW_REQUIRE_SESSION_ID.",
        "Approvals may carry across tasks without explicit session scoping.",
        "Set OPENCLAW_REQUIRE_SESSION_ID=1 and provide OPENCLAW_TASK_SESSION_ID per task.",
        "Security Engineering",
        "2026-03-18",
    )

    mfa_enabled = isinstance(env_flags, dict) and bool(env_flags.get("OPENCLAW_APPROVAL_TOKEN"))
    set_check(
        checks_by_id,
        "multi_factor_approval",
        "compliant" if mfa_enabled else "partial",
        "medium",
        "Approval token required for privileged actions." if mfa_enabled else "Approval token not configured.",
        "Checked ~/.openclaw/env for OPENCLAW_APPROVAL_TOKEN.",
        "Privileged approvals rely on single-step confirmation.",
        "Set OPENCLAW_APPROVAL_TOKEN and require entry for approvals.",
        "Security Engineering",
        "2026-03-20",
    )

    egress_allowlist = signals.get("egress_allowlist") or []
    egress_ok = isinstance(egress_allowlist, list) and len(egress_allowlist) > 0
    set_check(
        checks_by_id,
        "egress_allowlist_configured",
        "compliant" if egress_ok else "partial",
        "medium",
        "Egress allowlist configured." if egress_ok else "Egress allowlist not configured.",
        "Checked ~/.openclaw/security/egress_allowlist.json.",
        "Outbound connections are not constrained by allowlist.",
        "Define allowed outbound destinations.",
        "Network Security",
        "2026-03-22",
    )

    egress_report = signals.get("egress_report") or {}
    egress_findings = egress_report.get("findings", []) if isinstance(egress_report, dict) else []
    egress_clean = egress_ok and len(egress_findings) == 0
    set_check(
        checks_by_id,
        "egress_connections_approved",
        "compliant" if egress_clean else "partial",
        "medium",
        f"Unapproved egress findings: {len(egress_findings)}",
        "egress_monitor.py live output evaluated against allowlist.",
        "Unapproved outbound connections detected." if egress_ok else "Allowlist missing; cannot validate egress.",
        "Approve or block outbound destinations.",
        "Network Security",
        "2026-03-25",
    )

    assessment["checks"] = sorted(checks_by_id.values(), key=lambda c: str(c.get("check_id", "")))
    assessment.setdefault("metadata", {})
    assessment["metadata"]["generated_at_utc"] = utc_now()
    return assessment


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Generate live compliance assessment from current machine state")
    p.add_argument("--assessment-file", required=True, help="Path to assessment JSON to update")
    p.add_argument(
        "--port-monitor-script",
        default=str(Path(__file__).resolve().parent / "port_monitor.py"),
        help="Path to port monitor script",
    )
    return p.parse_args()


def main() -> int:
    args = parse_args()
    assessment_path = Path(args.assessment_file)
    assessment = load_json(assessment_path)
    signals = collect_runtime_signals(Path(args.port_monitor_script))
    updated = build_assessment(assessment, signals)
    write_json(assessment_path, updated)
    print(
        json.dumps(
            {
                "status": "ok",
                "assessment_file": str(assessment_path),
                "generated_at_utc": updated["metadata"]["generated_at_utc"],
            }
        )
    )
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### _meta.json

```json
{
  "owner": "fletcherfrimpong",
  "slug": "fletcher-cyber-security-engineer",
  "displayName": "Fletcher Cyber Security Engineer",
  "latest": {
    "version": "0.1.2",
    "publishedAt": 1771116071318,
    "commit": "https://github.com/openclaw/skills/commit/386e0bda9e6f46214a0ed13afeaddf06caa11d19"
  },
  "history": []
}

```

### scripts/auto_invoke_cycle.sh

```bash
#!/usr/bin/env bash
set -euo pipefail

SKILL_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
ASSESS_DIR="${SKILL_DIR}/assessments"
ASSESS_FILE="${ASSESS_DIR}/openclaw-assessment.json"
HTML_OUT="${ASSESS_DIR}/compliance-dashboard.html"
SUMMARY_OUT="${ASSESS_DIR}/compliance-summary.json"
PORT_OUT="${ASSESS_DIR}/port-monitor-latest.json"
EGRESS_OUT="${ASSESS_DIR}/egress-monitor-latest.json"
LOG_DIR="${HOME}/.openclaw/logs"
RUN_LOG="${LOG_DIR}/cyber-security-engineer-auto.log"

mkdir -p "${LOG_DIR}" "${ASSESS_DIR}"

{
  echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] Starting auto security cycle"

  if [[ ! -f "${ASSESS_FILE}" ]]; then
    python3 "${SKILL_DIR}/scripts/compliance_dashboard.py" init-assessment --system "OpenClaw" --output "${ASSESS_FILE}"
  fi

  python3 "${SKILL_DIR}/scripts/live_assessment.py" --assessment-file "${ASSESS_FILE}"
  python3 "${SKILL_DIR}/scripts/compliance_dashboard.py" render \
    --assessment-file "${ASSESS_FILE}" \
    --output-html "${HTML_OUT}" \
    --output-summary "${SUMMARY_OUT}"
  # Optional: notify when new violations/partials appear between cycles.
  python3 "${SKILL_DIR}/scripts/notify_on_violation.py" --summary-file "${SUMMARY_OUT}" || true
  python3 "${SKILL_DIR}/scripts/port_monitor.py" --json > "${PORT_OUT}"
  python3 "${SKILL_DIR}/scripts/egress_monitor.py" --json > "${EGRESS_OUT}"

  echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] Completed auto security cycle"
} >> "${RUN_LOG}" 2>&1

```

cyber-security-engineer | SkillHub