Back to skills
SkillHub ClubShip Full StackFull Stack
openclaw-egress
Imported from https://github.com/openclaw/skills.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Stars
3,111
Hot score
99
Updated
March 20, 2026
Overall rating
C0.0
Composite score
0.0
Best-practice grade
F17.6
Install command
npx @skill-hub/cli install openclaw-skills-openclaw-egress
Repository
openclaw/skills
Skill path: skills/atlaspa/openclaw-egress
Imported from https://github.com/openclaw/skills.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install openclaw-egress into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding openclaw-egress to shared team environments
- Use openclaw-egress for development workflows
Works across
Claude CodeCodex CLIGemini CLIOpenCode
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: openclaw-egress
user-invocable: true
metadata: {"openclaw":{"emoji":"π","requires":{"bins":["python3"]},"os":["darwin","linux","win32"]}}
---
# OpenClaw Egress
Network DLP for agent workspaces. Scans skills and files for outbound URLs, data exfiltration endpoints, and network function calls.
## The Problem
Skills can phone home. A compromised skill can POST your workspace contents, API keys, or conversation history to an external server. Nothing monitors what URLs your skills connect to or what data they could send.
## Commands
### Full Scan
Scan workspace for all outbound network risks.
```bash
python3 {baseDir}/scripts/egress.py scan --workspace /path/to/workspace
```
### Skills-Only Scan
```bash
python3 {baseDir}/scripts/egress.py scan --skills-only --workspace /path/to/workspace
```
### Domain Map
List all external domains referenced in workspace.
```bash
python3 {baseDir}/scripts/egress.py domains --workspace /path/to/workspace
```
### Quick Status
```bash
python3 {baseDir}/scripts/egress.py status --workspace /path/to/workspace
```
## What It Detects
| Risk | Pattern |
|------|---------|
| **CRITICAL** | Base64/hex payloads in URLs, pastebin/sharing services, request catchers, dynamic DNS |
| **HIGH** | Network function calls (requests, urllib, curl, wget, fetch), webhook/callback URLs |
| **WARNING** | Suspicious TLDs (.xyz, .tk, .ml), URL shorteners, IP address endpoints |
| **INFO** | Any external URL not on the safe domain list |
## Exit Codes
- `0` β Clean
- `1` β Network calls detected (review needed)
- `2` β Exfiltration risk detected (action needed)
## No External Dependencies
Python standard library only. No pip install. No network calls. Everything runs locally.
## Cross-Platform
Works with OpenClaw, Claude Code, Cursor, and any tool using the Agent Skills specification.
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### README.md
```markdown
# OpenClaw Egress
Network data loss prevention for [OpenClaw](https://github.com/openclaw/openclaw), [Claude Code](https://docs.anthropic.com/en/docs/claude-code), and any Agent Skills-compatible tool.
Maps every external connection your skills could make. Flags exfiltration endpoints, suspicious domains, and network function calls.
## Install
```bash
git clone https://github.com/AtlasPA/openclaw-egress.git
cp -r openclaw-egress ~/.openclaw/workspace/skills/
```
## Usage
```bash
# Full network scan
python3 scripts/egress.py scan
# Skills-only scan
python3 scripts/egress.py scan --skills-only
# List all external domains
python3 scripts/egress.py domains
# Quick status
python3 scripts/egress.py status
```
## What It Detects
- **Data exfiltration** β Base64/hex payloads in URL parameters
- **Sharing services** β Pastebin, transfer.sh, 0x0.st, file.io
- **Request catchers** β ngrok, requestbin, pipedream, beeceptor
- **Dynamic DNS** β duckdns, no-ip, dynu, freedns
- **URL shorteners** β bit.ly, tinyurl, t.co, goo.gl
- **IP endpoints** β Direct IP address connections
- **Suspicious TLDs** β .xyz, .tk, .ml, .ga, .cf, .top
- **Network code** β urllib, requests, httpx, aiohttp, curl, wget, fetch
- **Webhook callbacks** β /webhook, /callback, /hook, /beacon endpoints
|---------|------|-----|
| URL detection & classification | Yes | Yes |
| Network code analysis | Yes | Yes |
| Domain mapping | Yes | Yes |
| **Block exfil payloads** | - | Yes |
| **Quarantine calling skill** | - | Yes |
| **URL allowlist enforcement** | - | Yes |
| **Real-time egress monitoring** | - | Yes |
## Requirements
- Python 3.8+
- No external dependencies (stdlib only)
- Cross-platform: Windows, macOS, Linux
## License
MIT
```
### _meta.json
```json
{
"owner": "atlaspa",
"slug": "openclaw-egress",
"displayName": "Openclaw Egress",
"latest": {
"version": "1.0.2",
"publishedAt": 1770891978690,
"commit": "https://github.com/openclaw/skills/commit/ce4d1a05fdd666d2eea161c9aff1cbcb665419f7"
},
"history": [
{
"version": "1.0.1",
"publishedAt": 1770313128103,
"commit": "https://github.com/openclaw/skills/commit/cd024ba27367ece79db1173c9f3f6d2053613317"
}
]
}
```
### scripts/egress.py
```python
#!/usr/bin/env python3
"""OpenClaw Egressβ Full network DLP suite for agent workspaces.
Detect outbound URLs, data exfiltration patterns, and suspicious network
calls, then automatically block connections, quarantine compromised skills,
and enforce domain allowlists.
Philosophy: alert -> subvert -> quarantine -> defend
Free = alert. Pro = subvert + quarantine + defend.
"""
import argparse, io, json, os, re, shutil, sys
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
# -- Windows Unicode stdout --------------------------------------------------
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
# -- Constants ---------------------------------------------------------------
QUARANTINE_PREFIX = ".quarantined-"
BLOCK_COMMENT = "# [BLOCKED by openclaw-egress]"
ALLOWLIST_FILE = ".egress-allowlist.json"
URL_PATTERN = re.compile(r'https?://[^\s"\'<>\]\)}{,`]+', re.IGNORECASE)
EXFIL_PATTERNS = [
("Base64 in URL", re.compile(r"https?://[^\s]*[?&][^=]*=(?:[A-Za-z0-9+/]{40,}={0,2})")),
("Hex payload in URL", re.compile(r"https?://[^\s]*[?&][^=]*=(?:[0-9a-f]{32,})", re.IGNORECASE)),
("IP address endpoint", re.compile(r"https?://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")),
("Webhook/callback URL", re.compile(r"https?://[^\s]*/(?:webhook|callback|hook|notify|ping|beacon)[^\s]*", re.IGNORECASE)),
("Pastebin/sharing service", re.compile(r"https?://(?:pastebin\.com|hastebin\.com|paste\.ee|dpaste\.org|ix\.io|sprunge\.us|0x0\.st|transfer\.sh|file\.io)[^\s]*", re.IGNORECASE)),
("Request catcher", re.compile(r"https?://(?:[^\s]*\.ngrok\.|requestbin|pipedream|beeceptor|hookbin|requestcatcher)[^\s]*", re.IGNORECASE)),
("Dynamic DNS", re.compile(r"https?://[^\s]*\.(?:duckdns\.org|no-ip\.com|dynu\.com|freedns)[^\s]*", re.IGNORECASE)),
("URL shortener", re.compile(r"https?://(?:bit\.ly|tinyurl|t\.co|goo\.gl|is\.gd|v\.gd|rb\.gy|shorturl)[^\s]*", re.IGNORECASE)),
]
NETWORK_CODE_PATTERNS = [
("urllib.request", re.compile(r"\burllib\.request\.urlopen\b")),
("urllib.request.Request", re.compile(r"\burllib\.request\.Request\b")),
("requests.get/post", re.compile(r"\brequests\.(?:get|post|put|patch|delete|head)\b")),
("httpx call", re.compile(r"\bhttpx\.(?:get|post|put|patch|delete|head|Client|AsyncClient)\b")),
("aiohttp session", re.compile(r"\baiohttp\.ClientSession\b")),
("socket connection", re.compile(r"\bsocket\.(?:socket|create_connection|connect)\b")),
("http.client", re.compile(r"\bhttp\.client\.HTTP(?:S)?Connection\b")),
("fetch/XMLHttpRequest", re.compile(r"\bfetch\s*\(|XMLHttpRequest\b")),
("curl command", re.compile(r"\bcurl\s+-")),
("wget command", re.compile(r"\bwget\s+")),
]
SAFE_DOMAINS = {
"github.com", "raw.githubusercontent.com",
"docs.anthropic.com", "api.anthropic.com",
"openclaw.com", "clawhub.ai", "clawhub.com",
"python.org", "pypi.org", "nodejs.org", "npmjs.com",
"stackoverflow.com", "developer.mozilla.org",
"wikipedia.org", "example.com",
}
SKIP_DIRS = {".git", "node_modules", "__pycache__", ".venv", "venv",
".integrity", ".quarantine", ".snapshots"}
SELF_SKILL_DIRS = {"openclaw-egress", "openclaw-egress"}
CODE_SUFFIXES = {".py", ".js", ".ts", ".sh", ".bash"}
# -- Helpers -----------------------------------------------------------------
def resolve_workspace(ws_arg):
if ws_arg:
return Path(ws_arg).resolve()
env = os.environ.get("OPENCLAW_WORKSPACE")
if env:
return Path(env).resolve()
cwd = Path.cwd()
if (cwd / "AGENTS.md").exists():
return cwd
default = Path.home() / ".openclaw" / "workspace"
return default if default.exists() else cwd
def is_binary(path):
try:
with open(path, "rb") as f:
return b"\x00" in f.read(8192)
except (OSError, PermissionError):
return True
def in_code_block(lines, line_idx):
fence = 0
for i in range(line_idx):
if lines[i].strip().startswith("```"):
fence += 1
return fence % 2 == 1
def is_safe_url(url, allowlist=None):
try:
domain = (urlparse(url).hostname or "")
all_safe = SAFE_DOMAINS | (allowlist or set())
return any(domain == s or domain.endswith("." + s) for s in all_safe)
except Exception:
return False
def classify_url(url, allowlist=None):
for name, pat in EXFIL_PATTERNS:
if pat.search(url):
return "CRITICAL", name
try:
domain = (urlparse(url).hostname or "")
if is_safe_url(url, allowlist):
return "SAFE", "Known safe domain"
for tld in (".xyz", ".tk", ".ml", ".ga", ".cf", ".gq", ".top", ".buzz"):
if domain.endswith(tld):
return "WARNING", f"Suspicious TLD ({tld})"
return "INFO", "External endpoint"
except Exception:
return "WARNING", "Unparseable URL"
def now_iso():
return datetime.now(timezone.utc).isoformat()
def _comment_char(suffix):
return "// " if suffix in (".js", ".ts") else "# "
def _print_skills(skills_dir):
if not skills_dir.is_dir():
return
print("Available skills:")
for d in sorted(skills_dir.iterdir()):
if not d.is_dir():
continue
if d.name.startswith(QUARANTINE_PREFIX):
print(f" [Q] {d.name[len(QUARANTINE_PREFIX):]}")
elif not d.name.startswith("."):
print(f" {d.name}")
# -- Allowlist ---------------------------------------------------------------
def _al_path(ws):
return ws / ALLOWLIST_FILE
def load_allowlist(ws):
p = _al_path(ws)
if not p.exists():
return set()
try:
with open(p, "r", encoding="utf-8") as f:
return set(json.load(f).get("domains", []))
except (json.JSONDecodeError, OSError):
return set()
def save_allowlist(ws, domains):
with open(_al_path(ws), "w", encoding="utf-8") as f:
json.dump({"version": 1, "updated": now_iso(), "domains": sorted(domains)}, f, indent=2)
# -- Scanning ----------------------------------------------------------------
def scan_file_urls(fpath, ws, allowlist=None):
findings = []
rel = str(fpath.relative_to(ws))
try:
content = fpath.read_text(encoding="utf-8", errors="ignore")
except (OSError, PermissionError):
return findings
lines = content.split("\n")
is_md = fpath.suffix in (".md", ".markdown")
for ln, line in enumerate(lines, 1):
if is_md and in_code_block(lines, ln - 1):
continue
for m in URL_PATTERN.finditer(line):
url = m.group(0).rstrip(".,;:)")
risk, reason = classify_url(url, allowlist)
if risk != "SAFE":
findings.append({"file": rel, "line": ln, "url": url[:100], "risk": risk, "reason": reason})
return findings
def scan_file_network(fpath, ws):
findings = []
rel = str(fpath.relative_to(ws))
if fpath.suffix not in CODE_SUFFIXES:
return findings
try:
content = fpath.read_text(encoding="utf-8", errors="ignore")
except (OSError, PermissionError):
return findings
for ln, line in enumerate(content.split("\n"), 1):
s = line.strip()
if s.startswith("#") or s.startswith("//") or BLOCK_COMMENT in line:
continue
for name, pat in NETWORK_CODE_PATTERNS:
if pat.search(line):
findings.append({"file": rel, "line": ln, "url": "", "risk": "HIGH", "reason": f"Network call: {name}"})
return findings
def collect_files(ws, skills_only=False):
files, root = [], (ws / "skills") if skills_only else ws
if not root.exists():
return files
for dirpath, dirs, fnames in os.walk(root):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith(QUARANTINE_PREFIX) and not d.startswith(".quarantine")]
parts = Path(dirpath).relative_to(ws).parts
if len(parts) >= 2 and parts[0] == "skills" and parts[1] in SELF_SKILL_DIRS:
continue
for fn in fnames:
fp = Path(dirpath) / fn
if not is_binary(fp):
files.append(fp)
return files
def collect_skill_files(ws, skill):
sd = ws / "skills" / skill
if not sd.is_dir():
return []
files = []
for dirpath, dirs, fnames in os.walk(sd):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for fn in fnames:
fp = Path(dirpath) / fn
if not is_binary(fp):
files.append(fp)
return files
def dedup(findings):
seen, out = set(), []
for f in findings:
k = (f["file"], f["line"], f["reason"])
if k not in seen:
seen.add(k)
out.append(f)
return out
def scan_skill(ws, skill, allowlist=None):
results = []
for fp in collect_skill_files(ws, skill):
results.extend(scan_file_urls(fp, ws, allowlist))
results.extend(scan_file_network(fp, ws))
return dedup(results)
# -- Commands: Detection (free-equivalent) -----------------------------------
def cmd_scan(ws, skills_only=False):
al = load_allowlist(ws)
print("=" * 60)
print("OPENCLAW EGRESS FULL β NETWORK DLP SCAN")
print("=" * 60)
print(f"Workspace: {ws}")
print(f"Timestamp: {now_iso()}")
print(f"Scope: {'skills only' if skills_only else 'full workspace'}")
if al:
print(f"Custom allowlist: {len(al)} domain(s)")
print()
files = collect_files(ws, skills_only)
print(f"Scanning {len(files)} files...\n")
raw = []
for fp in files:
raw.extend(scan_file_urls(fp, ws, al))
raw.extend(scan_file_network(fp, ws))
findings = dedup(raw)
crit = [f for f in findings if f["risk"] == "CRITICAL"]
high = [f for f in findings if f["risk"] == "HIGH"]
warn = [f for f in findings if f["risk"] == "WARNING"]
info = [f for f in findings if f["risk"] == "INFO"]
print("-" * 40); print("RESULTS"); print("-" * 40)
if not findings:
print("[CLEAN] No outbound network risks detected.")
return 0
order = {"CRITICAL": 0, "HIGH": 1, "WARNING": 2, "INFO": 3}
for f in sorted(findings, key=lambda x: order.get(x["risk"], 4)):
print(f" [{f['risk']}] {f['file']}:{f['line']}")
print(f" {f['reason']}")
if f["url"]:
print(f" URL: {f['url']}")
print()
print("-" * 40); print("SUMMARY"); print("-" * 40)
print(f" Critical: {len(crit)}")
print(f" High: {len(high)}")
print(f" Warnings: {len(warn)}")
print(f" Info: {len(info)}")
print(f" Total: {len(findings)}\n")
domains = set()
for f in findings:
if f["url"]:
try:
h = urlparse(f["url"]).hostname
if h: domains.add(h)
except Exception:
pass
if domains:
print(" External domains found:")
for d in sorted(domains):
print(f" - {d}")
print()
if crit:
print("ACTION REQUIRED: Data exfiltration risk detected.")
print(" Run 'protect' for automated countermeasures.")
print(" Run 'block <skill>' to neutralize network calls.")
print(" Run 'quarantine <skill>' to disable a compromised skill.")
return 2
if high:
print("REVIEW NEEDED: Network calls detected in skills.")
print(" Run 'block <skill>' to neutralize network calls.")
return 1
return 0
def cmd_domains(ws):
al = load_allowlist(ws)
domains = {}
for fp in collect_files(ws):
for f in scan_file_urls(fp, ws, al):
if not f["url"]:
continue
try:
h = urlparse(f["url"]).hostname
if h and not is_safe_url(f["url"], al):
rec = domains.setdefault(h, {"count": 0, "files": set(), "risk": "INFO"})
rec["count"] += 1
rec["files"].add(f["file"])
if f["risk"] in ("CRITICAL", "HIGH"):
rec["risk"] = f["risk"]
except Exception:
pass
if not domains:
print("[CLEAN] No external domains found.")
return 0
print("=" * 60); print("EXTERNAL DOMAINS"); print("=" * 60); print()
for d in sorted(domains):
r = domains[d]
print(f" [{r['risk']}] {d} ({r['count']} reference(s))")
for fn in sorted(r["files"]):
print(f" - {fn}")
print()
return 0
def cmd_status(ws):
al = load_allowlist(ws)
crit = high = 0
for fp in collect_files(ws, skills_only=True):
for f in scan_file_urls(fp, ws, al):
if f["risk"] == "CRITICAL": crit += 1
elif f["risk"] == "HIGH": high += 1
for f in scan_file_network(fp, ws):
if f["risk"] == "HIGH": high += 1
qcount = 0
sd = ws / "skills"
if sd.is_dir():
qcount = sum(1 for d in sd.iterdir() if d.is_dir() and d.name.startswith(QUARANTINE_PREFIX))
parts = []
if crit: parts.append(f"{crit} exfiltration risk(s)")
if high: parts.append(f"{high} network call(s)")
if qcount: parts.append(f"{qcount} quarantined skill(s)")
if crit:
print(f"[CRITICAL] {', '.join(parts)}"); return 2
if high:
print(f"[WARNING] {', '.join(parts)}"); return 1
msg = "[CLEAN] No outbound network risks"
if qcount: msg += f" ({qcount} quarantined)"
print(msg); return 0
# -- Commands: Pro Countermeasures -------------------------------------------
def _block_lines(abs_path, line_indices):
"""Comment out specific lines in a code file. Returns count blocked."""
try:
content = abs_path.read_text(encoding="utf-8", errors="ignore")
except (OSError, PermissionError):
return 0
lines = content.split("\n")
to_block = {i for i in line_indices if 0 <= i < len(lines) and BLOCK_COMMENT not in lines[i]}
if not to_block:
return 0
shutil.copy2(abs_path, abs_path.with_suffix(abs_path.suffix + ".bak"))
cc = _comment_char(abs_path.suffix)
for idx in to_block:
orig = lines[idx]; stripped = orig.lstrip()
indent = orig[:len(orig) - len(stripped)]
if stripped.startswith("#") or stripped.startswith("//"):
lines[idx] = f"{orig} {BLOCK_COMMENT}"
else:
lines[idx] = f"{indent}{cc}{stripped} {BLOCK_COMMENT}"
abs_path.write_text("\n".join(lines), encoding="utf-8")
return len(to_block)
def cmd_block(ws, skill_name):
sd = ws / "skills"
skill_dir = sd / skill_name
if not skill_dir.is_dir():
if (sd / (QUARANTINE_PREFIX + skill_name)).is_dir():
print(f"Skill '{skill_name}' is quarantined. Unquarantine first."); sys.exit(1)
print(f"Skill not found: {skill_name}"); _print_skills(sd); sys.exit(1)
if skill_name in SELF_SKILL_DIRS:
print(f"Cannot block self: {skill_name}"); sys.exit(1)
actionable = [f for f in scan_skill(ws, skill_name, load_allowlist(ws)) if f["risk"] in ("CRITICAL", "HIGH")]
if not actionable:
print(f"No CRITICAL or HIGH findings in '{skill_name}'. Nothing to block."); return 0
by_file = {}
for f in actionable:
by_file.setdefault(f["file"], []).append(f)
total = files_mod = 0
print("=" * 60); print(f"BLOCKING NETWORK CALLS IN: {skill_name}"); print("=" * 60); print()
for rel, ffindings in sorted(by_file.items()):
ap = ws / rel
if not ap.is_file():
continue
if ap.suffix not in CODE_SUFFIXES:
for ff in ffindings:
if ff["url"]:
print(f" [FLAGGED] {rel}:{ff['line']} β {ff['reason']} (non-code, manual review)")
continue
indices = {ff["line"] - 1 for ff in ffindings}
cnt = _block_lines(ap, indices)
if cnt:
total += cnt; files_mod += 1
print(f" [BLOCKED] {rel}: {cnt} line(s) neutralized (backup: {ap.suffix}.bak)")
print(f"\nTotal: {total} line(s) blocked across {files_mod} file(s)")
if total:
print("Backups created with .bak extension.\n")
return 0
def cmd_quarantine(ws, skill_name):
sd = ws / "skills"; src = sd / skill_name
if not src.is_dir():
if (sd / (QUARANTINE_PREFIX + skill_name)).is_dir():
print(f"Skill '{skill_name}' is already quarantined."); return 0
print(f"Skill not found: {skill_name}"); _print_skills(sd); sys.exit(1)
if skill_name in SELF_SKILL_DIRS:
print(f"Cannot quarantine self: {skill_name}"); sys.exit(1)
dst = sd / (QUARANTINE_PREFIX + skill_name)
src.rename(dst)
print(f"Quarantined: {skill_name}")
print(f" Moved: skills/{skill_name}/ -> skills/{QUARANTINE_PREFIX}{skill_name}/")
print(f" To restore: run 'unquarantine {skill_name}'")
return 0
def cmd_unquarantine(ws, skill_name):
sd = ws / "skills"; src = sd / (QUARANTINE_PREFIX + skill_name)
if not src.is_dir():
print(f"No quarantined skill found: {skill_name}"); _print_skills(sd); sys.exit(1)
dst = sd / skill_name
if dst.is_dir():
print(f"Cannot unquarantine: skills/{skill_name}/ already exists"); sys.exit(1)
src.rename(dst)
print(f"Unquarantined: {skill_name}")
print(f" Moved: skills/{QUARANTINE_PREFIX}{skill_name}/ -> skills/{skill_name}/")
print(f" WARNING: Re-scan this skill before use.")
return 0
def cmd_allowlist(ws, add=None, remove=None, show=False):
cur = load_allowlist(ws)
if add:
d = add.lower().strip()
if d in SAFE_DOMAINS:
print(f"'{d}' is already a built-in safe domain."); return 0
if d in cur:
print(f"'{d}' is already on the custom allowlist."); return 0
cur.add(d); save_allowlist(ws, cur)
print(f"Added: {d} (total custom: {len(cur)})"); return 0
if remove:
d = remove.lower().strip()
if d not in cur:
print(f"'{d}' is not on the custom allowlist.")
if d in SAFE_DOMAINS: print(" (Built-in safe domain, cannot be removed.)")
return 0
cur.discard(d); save_allowlist(ws, cur)
print(f"Removed: {d} (total custom: {len(cur)})"); return 0
# --show (default)
print("=" * 60); print("DOMAIN ALLOWLIST"); print("=" * 60); print()
print("Built-in safe domains:")
for d in sorted(SAFE_DOMAINS): print(f" - {d}")
print()
if cur:
print(f"Custom allowlist ({len(cur)} domain(s)):")
for d in sorted(cur): print(f" + {d}")
else:
print("Custom allowlist: (empty)")
print(f"\nAllowlist file: {_al_path(ws)}\n"); return 0
def cmdtect(ws):
al = load_allowlist(ws)
print("=" * 60); print("OPENCLAW EGRESS FULL β FULLTECTION SWEEP"); print("=" * 60)
print(f"Workspace: {ws}"); print(f"Timestamp: {now_iso()}")
if al: print(f"Custom allowlist: {len(al)} domain(s)")
print()
sd = ws / "skills"
if not sd.is_dir():
print("No skills directory found."); return 0
active = [d.name for d in sorted(sd.iterdir())
if d.is_dir() and not d.name.startswith(QUARANTINE_PREFIX)
and not d.name.startswith(".") and d.name not in SELF_SKILL_DIRS]
if not active:
print("No active skills to scan."); return 0
print(f"Scanning {len(active)} active skill(s)...\n")
actions, q_list, b_list, b_total = [], [], [], 0
for skill in active:
findings = scan_skill(ws, skill, al)
crit = [f for f in findings if f["risk"] == "CRITICAL"]
high = [f for f in findings if f["risk"] == "HIGH"]
if not crit and not high:
continue
if crit:
src = sd / skill; dst = sd / (QUARANTINE_PREFIX + skill)
if src.is_dir():
src.rename(dst); q_list.append(skill)
actions.append(f"QUARANTINED: {skill} ({len(crit)} critical)")
print(f" [QUARANTINE] {skill} β {len(crit)} CRITICAL finding(s)")
for f in crit:
print(f" {f['file']}:{f['line']}: {f['reason']}")
if f["url"]: print(f" URL: {f['url']}")
continue
# HIGH only -> block
by_file = {}
for f in high: by_file.setdefault(f["file"], []).append(f)
skill_cnt = 0
for rel, ffs in by_file.items():
ap = ws / rel
if not ap.is_file() or ap.suffix not in CODE_SUFFIXES:
continue
cnt = _block_lines(ap, {ff["line"] - 1 for ff in ffs})
skill_cnt += cnt
if skill_cnt:
b_total += skill_cnt; b_list.append(skill)
actions.append(f"BLOCKED: {skill} ({skill_cnt} line(s))")
print(f" [BLOCK] {skill} β {skill_cnt} HIGH line(s) neutralized")
print(); print("-" * 40); print("FULLTECTION SWEEP RESULTS"); print("-" * 40)
if not actions:
print("[CLEAN] No threats found. All skills are safe."); return 0
print(f" Actions taken: {len(actions)}")
if q_list:
print(f" Skills quarantined: {len(q_list)}")
for s in q_list: print(f" - {s}")
if b_list:
print(f" Skills with blocked calls: {len(b_list)}")
for s in b_list: print(f" - {s}")
print(f" Total lines blocked: {b_total}")
print("\nNEXT STEPS:")
if q_list:
print(" - Quarantined skills will not load on next session.")
print(" - Use 'unquarantine <skill>' to restore after review.")
if b_list:
print(" - Blocked lines are commented out with .bak backups.")
print(" - Review blocked code and remove backups when satisfied.")
print(" - Run 'scan --skills-only' to verify the workspace is clean.")
print("=" * 60)
return 2 if q_list else 1
# -- Main --------------------------------------------------------------------
def main():
p = argparse.ArgumentParser(description="OpenClaw Egressβ Full Network DLP Suite")
sub = p.add_subparsers(dest="command", help="Command to run")
s = sub.add_parser("scan", help="Full egress scan")
s.add_argument("--skills-only", action="store_true", help="Only scan skills directory")
s.add_argument("--workspace", "-w", help="Workspace path")
for name in ("domains", "status"):
sp = sub.add_parser(name)
sp.add_argument("--workspace", "-w", help="Workspace path")
for name in ("block", "quarantine", "unquarantine"):
sp = sub.add_parser(name)
sp.add_argument("skill", help="Skill name")
sp.add_argument("--workspace", "-w", help="Workspace path")
al = sub.add_parser("allowlist", help="Manage domain allowlist")
al.add_argument("--add", metavar="DOMAIN", help="Add domain")
al.add_argument("--remove", metavar="DOMAIN", help="Remove domain")
al.add_argument("--show", action="store_true", help="Show allowlist")
al.add_argument("--workspace", "-w", help="Workspace path")
sp = sub.add_parser("protect", help="Full automated protection sweep")
sp.add_argument("--workspace", "-w", help="Workspace path")
args = p.parse_args()
if not args.command:
p.print_help(); sys.exit(1)
ws = resolve_workspace(getattr(args, "workspace", None))
if not ws.exists():
print(f"Workspace not found: {ws}"); sys.exit(1)
dispatch = {
"scan": lambda: cmd_scan(ws, args.skills_only),
"domains": lambda: cmd_domains(ws),
"status": lambda: cmd_status(ws),
"block": lambda: cmd_block(ws, args.skill),
"quarantine": lambda: cmd_quarantine(ws, args.skill),
"unquarantine": lambda: cmd_unquarantine(ws, args.skill),
"allowlist": lambda: cmd_allowlist(ws, args.add, args.remove, args.show),
"protect": lambda: cmdtect(ws),
}
sys.exit(dispatch[args.command]())
if __name__ == "__main__":
main()
```