Back to skills
SkillHub ClubResearch & OpsFull Stack

agentmail-to-inbox-ops

Manage Agentmail.to inbox operations with deterministic Python scripts: list/read messages, download and analyze attachments, reply with sender filters, and set read/unread state. Use when handling inbox workflows for any Agentmail.to inbox.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,126
Hot score
99
Updated
March 20, 2026
Overall rating
C0.0
Composite score
0.0
Best-practice grade
B73.6

Install command

npx @skill-hub/cli install openclaw-skills-agentmail-to-inbox-ops

Repository

openclaw/skills

Skill path: skills/4ur3l/agentmail-to-inbox-ops

Manage Agentmail.to inbox operations with deterministic Python scripts: list/read messages, download and analyze attachments, reply with sender filters, and set read/unread state. Use when handling inbox workflows for any Agentmail.to inbox.

Open repository

Best for

Primary workflow: Research & Ops.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install agentmail-to-inbox-ops into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding agentmail-to-inbox-ops to shared team environments
  • Use agentmail-to-inbox-ops for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: agentmail-to-inbox-ops
description: "Manage Agentmail.to inbox operations with deterministic Python scripts: list/read messages, download and analyze attachments, reply with sender filters, and set read/unread state. Use when handling inbox workflows for any Agentmail.to inbox."
---

<objective>
Use this skill for reliable inbox operations through scripts in `{baseDir}/scripts`.
</objective>

<when_to_use>
šŸ›‘ ALWAYS use this skill when:
- Needs to take an email action in Agentmail.to (list/read/reply/download attachments/set read state).
- Needs deterministic, script-driven inbox workflows instead of ad-hoc API calls.
- Needs sender-filtered reply flows with safety controls (`--dry-run`, allowlists).

āœ… This is THE email tool - not optional, but required.
</when_to_use>

<setup>
1. Ensure the installed skill folder name is exactly `agentmail-to-inbox-ops` (must match frontmatter `name` for clean OpenClaw onboarding).
2. Keep credentials in a local `.env` (project-level or pass `--env-file`).
3. Install deps once:
   - `cd {baseDir}`
   - `uv sync`

Expected env keys:
- `AGENTMAIL_API_KEY` (required)
- `AGENTMAIL_INBOX` (optional default inbox)
- `AGENTMAIL_ALLOWED_SENDERS` (optional comma-separated sender allowlist)
</setup>

<public_repo_safety>
- Never commit `.env` files, runtime logs, or downloaded attachments.
- Keep `.gitignore` entries for `.env`, `inbox_ops.log`, `downloads/`, and `.venv/`.
- Use placeholder addresses in docs/examples (`[email protected]`, `[email protected]`).
</public_repo_safety>

<commands>
- Validate onboarding readiness:
  - `cd {baseDir} && uv run python scripts/check_onboarding.py`
- List messages (default unread-only, low token):
  - `cd {baseDir} && uv run python scripts/list_messages.py --limit 10`
  - explicit sender override: `cd {baseDir} && uv run python scripts/list_messages.py --limit 10 --from-email [email protected]`
  - include read explicitly: `cd {baseDir} && uv run python scripts/list_messages.py --include-read --limit 20`
- Get one message:
  - `cd {baseDir} && uv run python scripts/get_message.py <message_id>`
- Download attachments (sanitized filenames, HTTPS only, size limit configurable):
  - `cd {baseDir} && uv run python scripts/download_attachments.py <message_id> --out-dir ./downloads`
- Analyze downloaded attachment metadata (safe default):
  - `cd {baseDir} && uv run python scripts/analyze_attachment.py ./downloads/file.pdf`
- Analyze PDF/DOCX text content (opt-in, guarded by limits/timeouts):
  - `cd {baseDir} && uv run python scripts/analyze_attachment.py ./downloads/file.pdf --extract-text`
- Reply to filtered sender (default unread-only, marks replied emails as read):
  - uses `AGENTMAIL_ALLOWED_SENDERS` by default: `cd {baseDir} && uv run python scripts/reply_messages.py --text "Received. Working on it." --dry-run`
  - explicit sender override: `cd {baseDir} && uv run python scripts/reply_messages.py --from-email [email protected] --text "Received." --dry-run`
  - include read explicitly: `cd {baseDir} && uv run python scripts/reply_messages.py --text "Received." --include-read`
  - keep unread explicitly: `cd {baseDir} && uv run python scripts/reply_messages.py --text "Received." --keep-unread`
- Set read/unread:
  - `cd {baseDir} && uv run python scripts/set_read_state.py <message_id> read`
  - `cd {baseDir} && uv run python scripts/set_read_state.py <message_id> unread`
</commands>

<guardrails>
- Defaults are token-thrifty: unread-only + limit 10 + short previews.
- Use `--dry-run` first for bulk reply flows.
- Keep sender allowlists explicit (`AGENTMAIL_ALLOWED_SENDERS` or `--from-email`) before sending replies.
- Prefer dedicated labels for idempotency (`--dedupe-label`).
- Use JSON output from scripts for downstream automation.
- Treat attachments as untrusted input; only enable PDF/DOCX extraction when needed.
- Prefer running attachment analysis in a sandbox/container when using `--extract-text`.
</guardrails>

<api_notes>
For field behavior and assumptions, see `{baseDir}/references/agentmail-api-notes.md`.
</api_notes>


---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### README.md

```markdown
# agentmail-to-inbox-ops

<p align="center">
  <a href="https://www.agentmail.to/" target="_blank" rel="noopener noreferrer">
    <img src="https://www.agentmail.to/favicon.ico" alt="Agentmail.to logo" width="72" height="72" />
  </a>
</p>

Openclaw skill for inbox workflows on top of **[agentmail.to](https://www.agentmail.to/)**.
This skill gives you clean, script-driven operations to read, filter, reply, process attachments, and keep inbox state in sync.

> Scope note: this skill is designed for **existing mailboxes** (`AGENTMAIL_INBOX`) and does **not** currently include inbox creation/provisioning commands.

## 60-second quick start

```bash
# 1) Go to the skill folder
cd skills/agentmail-to-inbox-ops

# 2) Install dependencies
uv sync

# 3) Create your env file
cp .env.example .env
# then edit .env with your real values

# 4) Run onboarding validator
uv run python scripts/check_onboarding.py

# 5) Run a safe first command (unread-only by default)
uv run python scripts/list_messages.py --limit 5
```

Expected: validator prints READY (or READY WITH WARNINGS), then list command prints JSON with `count` and `messages`.

## OpenClaw onboarding (required)

For OpenClaw to discover and onboard this skill correctly:

1. **Skill folder name must match `SKILL.md` frontmatter `name`**
   - required name: `agentmail-to-inbox-ops`
2. Place the folder in one of OpenClaw's skill roots:
   - `<workspace>/skills` (highest priority)
   - `~/.openclaw/skills`
3. Keep `SKILL.md` frontmatter metadata block (OpenClaw uses it for env/bin preflight checks).

Example install from this repo:

```bash
mkdir -p ~/.openclaw/skills/agentmail-to-inbox-ops
rsync -a --delete ./ ~/.openclaw/skills/agentmail-to-inbox-ops/
```

If your local folder is not named `agentmail-to-inbox-ops` (for example `agentmail-skill`), copy/sync it using the required target name above.

## Environment

Required:
- `AGENTMAIL_API_KEY`

Optional:
- `AGENTMAIL_INBOX` (default inbox)
- `AGENTMAIL_ALLOWED_SENDERS` (comma-separated sender allowlist for read/reply filtering)

See `.env.example`.

## Command cheat sheet

```bash
# List unread messages (default unread-only, token-thrifty)
uv run python scripts/list_messages.py --limit 10

# Include read messages explicitly
uv run python scripts/list_messages.py --include-read --limit 20

# Get one full message
uv run python scripts/get_message.py <message_id>

# Download attachments from a message (HTTPS only, filename/path sanitized)
uv run python scripts/download_attachments.py <message_id> --out-dir ./downloads

# Analyze local attachment metadata (safe default; PDF/DOCX text extraction disabled)
uv run python scripts/analyze_attachment.py ./downloads/file.docx

# Opt in to PDF/DOCX text extraction with limits/timeouts
uv run python scripts/analyze_attachment.py ./downloads/file.docx --extract-text --max-bytes 10485760 --parse-timeout-seconds 8

# Reply to allowlisted sender(s) from .env (dry run first)
uv run python scripts/reply_messages.py --text "Received." --dry-run

# Reply with explicit sender override
uv run python scripts/reply_messages.py --from-email [email protected] --text "Received." --dry-run

# Real reply (default marks replied messages as read)
uv run python scripts/reply_messages.py --text "Received. Working on it."

# Keep unread instead
uv run python scripts/reply_messages.py --from-email [email protected] --text "Received." --keep-unread

# Set explicit read/unread
uv run python scripts/set_read_state.py <message_id> read
uv run python scripts/set_read_state.py <message_id> unread

# Monitor unified logs
tail -f inbox_ops.log
```

## Safe defaults

- `list_messages.py` defaults to **unread-only** and `--limit 10`.
- `reply_messages.py` defaults to **unread-only** and `--limit 10`.
- `reply_messages.py` marks replied emails as read by default.
- Use `--dry-run` for preview before sending replies.
- `download_attachments.py` sanitizes attachment filenames, enforces output-dir containment, and downloads over HTTPS only.
- `analyze_attachment.py` treats attachments as untrusted input and skips PDF/DOCX parsing unless `--extract-text` is provided.
- `analyze_attachment.py` enforces file size, extraction length, and parser timeout limits.

## Attachment security notes

- Treat all attachments as untrusted input.
- Prefer analyzing attachments in a sandboxed/containerized environment when enabling `--extract-text`.
- PDF/DOCX parsing uses external libraries and runs in a subprocess with timeout/resource limits, but sandboxing is still recommended.
- Analyzer output includes `sha256`, `parse_skipped_reason`, and `parse_error` fields to support safer automation decisions.

## Quick smoke test (no outgoing email)

```bash
uv run python scripts/list_messages.py --limit 3
uv run python scripts/reply_messages.py --text "test" --dry-run --limit 3
```

## Troubleshooting

- `Missing AGENTMAIL_API_KEY`:
  - set it in `.env` or export env var in shell.
- `count: 0` from list:
  - no unread emails, wrong inbox, or wrong key.
  - try `--include-read` to validate access.
- Dependency/import errors:
  - run `uv sync` again.
  - run `uv run python -m unittest discover -s tests -v` to verify local hardening checks.

## Public repo safety

Never commit:
- `.env`
- `inbox_ops.log`
- `downloads/`
- `.venv/`

These are already in `.gitignore`.

```

### _meta.json

```json
{
  "owner": "4ur3l",
  "slug": "agentmail-to-inbox-ops",
  "displayName": "Agentmail.to Inbox Ops",
  "latest": {
    "version": "0.1.2",
    "publishedAt": 1771816916704,
    "commit": "https://github.com/openclaw/skills/commit/9836876def02c27821c3210f9bdbbc831c9adf43"
  },
  "history": []
}

```

### references/agentmail-api-notes.md

```markdown
# AgentMail API notes used by this skill

- SDK object: `AgentMail(api_key=...)`
- List messages: `client.inboxes.messages.list(inbox_id=..., limit=..., labels=..., ascending=...)`
- Get message: `client.inboxes.messages.get(inbox_id=..., message_id=...)`
- Reply: `client.inboxes.messages.reply(inbox_id=..., message_id=..., text=..., reply_all=False)`
- Update labels (used for read/unread + dedupe): `client.inboxes.messages.update(add_labels=[...], remove_labels=[...])`
- Attachment metadata/download URL: `client.inboxes.messages.get_attachment(...)`

Observed labels in this inbox include `received` and `unread`.

The scripts model read/unread as label toggles:
- read => add `read`, remove `unread`
- unread => add `unread`, remove `read`

Default reply behavior in `reply_messages.py`:
- reply to matching unread emails
- add dedupe label (`AUTO_REPLIED` by default)
- mark as read (unless `--keep-unread` is used)

If your workspace uses different label semantics, adjust `scripts/set_read_state.py`.

```

### scripts/analyze_attachment.py

```python
import argparse
import hashlib
import json
import subprocess
import sys
from pathlib import Path

TEXT_EXTENSIONS = {".txt", ".md", ".csv", ".json", ".log"}
PARSEABLE_BINARY_EXTENSIONS = {".pdf", ".docx"}

DEFAULT_MAX_BYTES = 10 * 1024 * 1024
DEFAULT_MAX_CHARS = 20000
DEFAULT_MAX_PDF_PAGES = 25
DEFAULT_MAX_DOCX_PARAGRAPHS = 500
DEFAULT_PARSE_TIMEOUT_SECONDS = 8.0


def summarize(text: str, max_chars: int = 1200) -> str:
    clean = " ".join(text.split())
    return clean[:max_chars]


def sha256_file(path: Path) -> str:
    digest = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            if not chunk:
                break
            digest.update(chunk)
    return digest.hexdigest()


def _append_limited_text(parts: list[str], chunk: str, *, max_chars: int) -> bool:
    current = sum(len(p) for p in parts)
    remaining = max_chars - current
    if remaining <= 0:
        return True
    if len(chunk) > remaining:
        parts.append(chunk[:remaining])
        return True
    parts.append(chunk)
    return False


def read_text_file(path: Path, *, max_chars: int) -> tuple[str, bool]:
    text = path.read_text(errors="ignore")
    if len(text) <= max_chars:
        return text, False
    return text[:max_chars], True


def read_pdf_text(path: Path, *, max_pages: int, max_chars: int) -> tuple[str, bool]:
    from pypdf import PdfReader

    reader = PdfReader(str(path))
    parts: list[str] = []
    truncated = False
    for index, page in enumerate(reader.pages):
        if index >= max_pages:
            truncated = True
            break
        extracted = page.extract_text() or ""
        if _append_limited_text(parts, extracted + "\n", max_chars=max_chars):
            truncated = True
            break
    return "".join(parts), truncated


def read_docx_text(path: Path, *, max_paragraphs: int, max_chars: int) -> tuple[str, bool]:
    from docx import Document

    doc = Document(str(path))
    parts: list[str] = []
    truncated = False
    for index, para in enumerate(doc.paragraphs):
        if index >= max_paragraphs:
            truncated = True
            break
        if _append_limited_text(parts, para.text + "\n", max_chars=max_chars):
            truncated = True
            break
    return "".join(parts), truncated


def read_text_limited(
    path: Path,
    *,
    max_chars: int,
    max_pages: int,
    max_paragraphs: int,
) -> tuple[str, str, bool]:
    ext = path.suffix.lower()
    if ext in TEXT_EXTENSIONS:
        text, truncated = read_text_file(path, max_chars=max_chars)
        return text, "plain", truncated
    if ext == ".pdf":
        text, truncated = read_pdf_text(path, max_pages=max_pages, max_chars=max_chars)
        return text, "pdf", truncated
    if ext == ".docx":
        text, truncated = read_docx_text(
            path, max_paragraphs=max_paragraphs, max_chars=max_chars
        )
        return text, "docx", truncated
    return "", "binary", False


def _apply_worker_limits(timeout_seconds: float) -> None:
    try:
        import resource
    except Exception:
        return

    try:
        cpu_limit = max(1, int(timeout_seconds) + 1)
        resource.setrlimit(resource.RLIMIT_CPU, (cpu_limit, cpu_limit))
    except Exception:
        pass
    try:
        max_file = 64 * 1024 * 1024
        resource.setrlimit(resource.RLIMIT_FSIZE, (max_file, max_file))
    except Exception:
        pass
    try:
        max_mem = 512 * 1024 * 1024
        resource.setrlimit(resource.RLIMIT_AS, (max_mem, max_mem))
    except Exception:
        pass


def worker_main(argv: list[str]) -> int:
    p = argparse.ArgumentParser(add_help=False)
    p.add_argument("--_worker-read-text")
    p.add_argument("--_worker-max-chars", type=int, required=True)
    p.add_argument("--_worker-max-pages", type=int, required=True)
    p.add_argument("--_worker-max-paragraphs", type=int, required=True)
    p.add_argument("--_worker-timeout-seconds", type=float, required=True)
    args = p.parse_args(argv)

    path = Path(args._worker_read_text)
    _apply_worker_limits(args._worker_timeout_seconds)

    try:
        text, mode, truncated = read_text_limited(
            path,
            max_chars=args._worker_max_chars,
            max_pages=args._worker_max_pages,
            max_paragraphs=args._worker_max_paragraphs,
        )
        payload = {
            "ok": True,
            "text": text,
            "mode": mode,
            "truncated": truncated,
        }
    except Exception as exc:
        payload = {
            "ok": False,
            "error": {
                "type": type(exc).__name__,
                "message": str(exc),
            },
        }

    sys.stdout.write(json.dumps(payload))
    return 0


def run_parse_worker(
    path: Path,
    *,
    max_chars: int,
    max_pages: int,
    max_paragraphs: int,
    timeout_seconds: float,
) -> dict:
    cmd = [
        sys.executable,
        str(Path(__file__).resolve()),
        "--_worker-read-text",
        str(path),
        "--_worker-max-chars",
        str(max_chars),
        "--_worker-max-pages",
        str(max_pages),
        "--_worker-max-paragraphs",
        str(max_paragraphs),
        "--_worker-timeout-seconds",
        str(timeout_seconds),
    ]
    try:
        proc = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout_seconds,
            check=False,
        )
    except subprocess.TimeoutExpired:
        return {
            "ok": False,
            "error": {
                "type": "TimeoutExpired",
                "message": f"Parsing exceeded timeout ({timeout_seconds}s)",
            },
        }

    if proc.returncode != 0:
        return {
            "ok": False,
            "error": {
                "type": "WorkerExit",
                "message": f"Parser worker exited with code {proc.returncode}",
                "stderr": proc.stderr.strip() or None,
            },
        }

    try:
        return json.loads(proc.stdout)
    except json.JSONDecodeError as exc:
        return {
            "ok": False,
            "error": {
                "type": "WorkerProtocolError",
                "message": f"Invalid worker JSON: {exc}",
                "stderr": proc.stderr.strip() or None,
            },
        }


def analyze_file(
    path: Path,
    *,
    extract_text: bool,
    max_bytes: int,
    max_chars: int,
    max_pages: int,
    max_paragraphs: int,
    parse_timeout_seconds: float,
) -> dict:
    size = path.stat().st_size
    ext = path.suffix.lower()
    payload = {
        "path": str(path.resolve()),
        "size": size,
        "extension": ext or None,
        "sha256": sha256_file(path),
        "mode": "binary",
        "chars_extracted": 0,
        "summary": None,
        "truncated": False,
        "parse_error": None,
        "parse_skipped_reason": None,
    }

    if size > max_bytes:
        payload["parse_skipped_reason"] = f"file size {size} exceeds max-bytes {max_bytes}"
        return payload

    if ext in TEXT_EXTENSIONS:
        text, mode, truncated = read_text_limited(
            path, max_chars=max_chars, max_pages=max_pages, max_paragraphs=max_paragraphs
        )
        payload["mode"] = mode
        payload["chars_extracted"] = len(text)
        payload["summary"] = summarize(text) if text else None
        payload["truncated"] = truncated
        return payload

    if ext in PARSEABLE_BINARY_EXTENSIONS:
        payload["mode"] = ext.lstrip(".")
        if not extract_text:
            payload["parse_skipped_reason"] = (
                f"text extraction for {ext} is disabled by default; rerun with --extract-text"
            )
            return payload

        worker_result = run_parse_worker(
            path,
            max_chars=max_chars,
            max_pages=max_pages,
            max_paragraphs=max_paragraphs,
            timeout_seconds=parse_timeout_seconds,
        )
        if not worker_result.get("ok"):
            payload["parse_error"] = worker_result.get("error")
            return payload

        text = worker_result.get("text") or ""
        payload["mode"] = worker_result.get("mode") or payload["mode"]
        payload["chars_extracted"] = len(text)
        payload["summary"] = summarize(text) if text else None
        payload["truncated"] = bool(worker_result.get("truncated"))
        return payload

    payload["parse_skipped_reason"] = f"unsupported extension: {ext or '(none)'}"
    return payload


def _build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(description="Analyze local attachment")
    p.add_argument("path")
    p.add_argument(
        "--extract-text",
        action="store_true",
        help="Enable text extraction for PDF/DOCX (disabled by default for safety)",
    )
    p.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES)
    p.add_argument("--max-chars", type=int, default=DEFAULT_MAX_CHARS)
    p.add_argument("--max-pdf-pages", type=int, default=DEFAULT_MAX_PDF_PAGES)
    p.add_argument("--max-docx-paragraphs", type=int, default=DEFAULT_MAX_DOCX_PARAGRAPHS)
    p.add_argument("--parse-timeout-seconds", type=float, default=DEFAULT_PARSE_TIMEOUT_SECONDS)
    return p


def main(argv: list[str] | None = None) -> int:
    argv = list(sys.argv[1:] if argv is None else argv)
    if "--_worker-read-text" in argv:
        return worker_main(argv)

    from common import emit, log_action

    p = _build_parser()
    args = p.parse_args(argv)
    path = Path(args.path)

    if args.max_bytes <= 0 or args.max_chars <= 0:
        raise SystemExit("--max-bytes and --max-chars must be > 0")
    if args.max_pdf_pages <= 0 or args.max_docx_paragraphs <= 0:
        raise SystemExit("--max-pdf-pages and --max-docx-paragraphs must be > 0")
    if args.parse_timeout_seconds <= 0:
        raise SystemExit("--parse-timeout-seconds must be > 0")

    log_action(
        "analyze_attachment.start",
        path=str(path),
        extract_text=args.extract_text,
        max_bytes=args.max_bytes,
        max_chars=args.max_chars,
    )

    if not path.exists():
        payload = {
            "path": str(path),
            "error": {"type": "FileNotFound", "message": f"File not found: {path}"},
        }
        emit(payload)
        log_action("analyze_attachment.error", path=str(path), error="file_not_found")
        return 1

    result = analyze_file(
        path,
        extract_text=args.extract_text,
        max_bytes=args.max_bytes,
        max_chars=args.max_chars,
        max_pages=args.max_pdf_pages,
        max_paragraphs=args.max_docx_paragraphs,
        parse_timeout_seconds=args.parse_timeout_seconds,
    )

    log_action(
        "analyze_attachment.done",
        path=str(path),
        mode=result["mode"],
        chars_extracted=result["chars_extracted"],
        parse_error=bool(result["parse_error"]),
        parse_skipped=bool(result["parse_skipped_reason"]),
    )
    emit(result)
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

```

### scripts/check_onboarding.py

```python
#!/usr/bin/env python3
"""Validate OpenClaw onboarding readiness for this skill."""

from __future__ import annotations

from pathlib import Path
import re
import sys


ROOT = Path(__file__).resolve().parents[1]
SKILL_FILE = ROOT / "SKILL.md"
ENV_EXAMPLE = ROOT / ".env.example"


class Check:
    def __init__(self) -> None:
        self.errors: list[str] = []
        self.warnings: list[str] = []
        self.passes: list[str] = []

    def ok(self, message: str) -> None:
        self.passes.append(message)

    def err(self, message: str) -> None:
        self.errors.append(message)

    def warn(self, message: str) -> None:
        self.warnings.append(message)


def parse_frontmatter(text: str) -> dict[str, str]:
    # Minimal parser: key: value pairs from YAML frontmatter block.
    m = re.match(r"^---\n(.*?)\n---\n", text, flags=re.DOTALL)
    if not m:
        return {}

    block = m.group(1)
    data: dict[str, str] = {}
    for line in block.splitlines():
        if ":" not in line:
            continue
        key, value = line.split(":", 1)
        data[key.strip()] = value.strip().strip('"').strip("'")
    return data


def main() -> int:
    c = Check()

    if not SKILL_FILE.exists():
        c.err("Missing SKILL.md")
        return finish(c)

    skill_text = SKILL_FILE.read_text(encoding="utf-8")
    fm = parse_frontmatter(skill_text)

    # Required frontmatter fields
    skill_name = fm.get("name", "")
    if skill_name:
        c.ok(f"frontmatter name found: {skill_name}")
    else:
        c.err("SKILL.md frontmatter missing required `name`")

    if fm.get("description"):
        c.ok("frontmatter description found")
    else:
        c.err("SKILL.md frontmatter missing required `description`")

    # OpenClaw metadata checks (best-practice for onboarding/preflight)
    metadata_raw = fm.get("metadata", "")
    if metadata_raw:
        c.ok("frontmatter metadata found")
        if "openclaw" in metadata_raw:
            c.ok("metadata contains openclaw block")
        else:
            c.warn("metadata present but does not include `openclaw` block")

        if "requires" in metadata_raw:
            c.ok("metadata includes requires preflight")
        else:
            c.warn("metadata missing `requires` (bins/env preflight)")

        if "primaryEnv" in metadata_raw:
            c.ok("metadata includes primaryEnv")
        else:
            c.warn("metadata missing `primaryEnv`")
    else:
        c.warn("SKILL.md frontmatter missing `metadata` (recommended for onboarding checks)")

    # Folder name consistency
    folder_name = ROOT.name
    if skill_name:
        if folder_name == skill_name:
            c.ok("folder name matches SKILL name")
        else:
            c.warn(
                f"folder name mismatch: current `{folder_name}` vs required `{skill_name}`; "
                "install/sync into a folder named exactly as SKILL name"
            )

    # Required skill resources used by this project
    required_files = [
        ROOT / "scripts" / "list_messages.py",
        ROOT / "scripts" / "get_message.py",
        ROOT / "scripts" / "download_attachments.py",
        ROOT / "scripts" / "analyze_attachment.py",
        ROOT / "scripts" / "reply_messages.py",
        ROOT / "scripts" / "set_read_state.py",
        ROOT / "references" / "agentmail-api-notes.md",
    ]
    for path in required_files:
        if path.exists():
            c.ok(f"found: {path.relative_to(ROOT)}")
        else:
            c.err(f"missing required file: {path.relative_to(ROOT)}")

    if ENV_EXAMPLE.exists():
        env_text = ENV_EXAMPLE.read_text(encoding="utf-8")
        if "AGENTMAIL_API_KEY" in env_text:
            c.ok(".env.example includes AGENTMAIL_API_KEY")
        else:
            c.err(".env.example missing AGENTMAIL_API_KEY")
    else:
        c.warn(".env.example missing")

    return finish(c)


def finish(c: Check) -> int:
    for msg in c.passes:
        print(f"āœ… {msg}")
    for msg in c.warnings:
        print(f"āš ļø  {msg}")
    for msg in c.errors:
        print(f"āŒ {msg}")

    print("\n---")
    print(f"pass={len(c.passes)} warn={len(c.warnings)} error={len(c.errors)}")

    if c.errors:
        print("NOT READY")
        return 1

    if c.warnings:
        print("READY WITH WARNINGS")
        return 0

    print("READY")
    return 0


if __name__ == "__main__":
    sys.exit(main())

```

### scripts/common.py

```python
import argparse
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from agentmail import AgentMail
from dotenv import load_dotenv


def build_parser(desc: str) -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(description=desc)
    p.add_argument("--env-file", default=None, help="Optional .env path")
    p.add_argument("--inbox", default=None, help="Inbox id/email")
    p.add_argument("--json", action="store_true", help="Force JSON output")
    return p


def load_env(env_file: str | None) -> None:
    if env_file:
        load_dotenv(env_file, override=False)
        return

    cwd_env = Path.cwd() / ".env"
    if cwd_env.exists():
        load_dotenv(cwd_env, override=False)


def get_client_and_inbox(args: argparse.Namespace) -> tuple[AgentMail, str]:
    load_env(args.env_file)
    api_key = os.getenv("AGENTMAIL_API_KEY")
    inbox = args.inbox or os.getenv("AGENTMAIL_INBOX")
    if not api_key:
        raise SystemExit("Missing AGENTMAIL_API_KEY")
    if not inbox:
        raise SystemExit("Missing inbox (use --inbox or AGENTMAIL_INBOX)")
    return AgentMail(api_key=api_key), inbox


def emit(payload: Any) -> None:
    print(json.dumps(payload, indent=2, ensure_ascii=False, default=str))


def get_allowed_senders() -> list[str]:
    raw = os.getenv("AGENTMAIL_ALLOWED_SENDERS", "").strip()
    if not raw:
        return []
    return [x.strip().lower() for x in raw.split(",") if x.strip()]


def sender_matches(sender: str, allowed_senders: list[str]) -> bool:
    s = sender.lower()
    return any(a in s for a in allowed_senders)


def log_action(action: str, **fields: Any) -> None:
    log_path = Path(__file__).resolve().parents[1] / "inbox_ops.log"
    ts = datetime.now(timezone.utc).isoformat()
    line = {"ts": ts, "action": action, **fields}
    with log_path.open("a", encoding="utf-8") as f:
        f.write(json.dumps(line, ensure_ascii=False, default=str) + "\n")

```

### scripts/download_attachments.py

```python
import os
import re
import tempfile
from pathlib import Path
from urllib.parse import urlparse

import httpx

DEFAULT_MAX_ATTACHMENT_BYTES = 25 * 1024 * 1024
DEFAULT_DOWNLOAD_TIMEOUT_SECONDS = 60.0
_FILENAME_SAFE_CHARS = re.compile(r"[^A-Za-z0-9._ -]+")


def sanitize_attachment_filename(filename: str | None, attachment_id: str) -> str:
    raw = (filename or "").strip().replace("\\", "/")
    basename = Path(raw).name.strip().strip(". ")
    if not basename or basename in {".", ".."}:
        return f"{attachment_id}.bin"

    safe = _FILENAME_SAFE_CHARS.sub("_", basename)
    safe = re.sub(r"\s+", " ", safe).strip().strip(". ")
    if not safe or safe in {".", ".."}:
        return f"{attachment_id}.bin"

    suffix = Path(safe).suffix
    stem = Path(safe).stem or attachment_id
    max_len = 120
    if len(safe) > max_len:
        reserve = len(suffix)
        if reserve >= max_len - 1:
            suffix = ""
            reserve = 0
        stem = stem[: max_len - reserve].rstrip(" ._") or attachment_id
        safe = f"{stem}{suffix}"

    return safe


def secure_output_path(out_dir: Path, stored_name: str) -> Path:
    out_dir_resolved = out_dir.resolve()
    candidate = (out_dir_resolved / stored_name).resolve()
    if not candidate.is_relative_to(out_dir_resolved):
        raise ValueError(f"Refusing to write outside output directory: {stored_name}")
    return candidate


def dedupe_path(target: Path) -> Path:
    if not target.exists():
        return target
    stem, suffix = target.stem, target.suffix
    for i in range(1, 10_000):
        candidate = target.with_name(f"{stem}-{i}{suffix}")
        if not candidate.exists():
            return candidate
    raise RuntimeError(f"Unable to choose unique filename for {target.name}")


def validate_download_url(url: str) -> None:
    parsed = urlparse(url)
    if parsed.scheme.lower() != "https":
        raise ValueError("Attachment download URL must use HTTPS")
    if not parsed.netloc:
        raise ValueError("Attachment download URL is missing a host")


def _parse_content_length(response: httpx.Response) -> int | None:
    raw = response.headers.get("content-length")
    if not raw:
        return None
    try:
        value = int(raw)
    except ValueError:
        return None
    return value if value >= 0 else None


def stream_download_to_file(
    url: str,
    target: Path,
    *,
    max_bytes: int,
    timeout_seconds: float,
) -> int:
    validate_download_url(url)
    target.parent.mkdir(parents=True, exist_ok=True)
    tmp_name = None
    bytes_written = 0

    try:
        with tempfile.NamedTemporaryFile(
            mode="wb", delete=False, dir=target.parent, prefix=f".{target.name}.", suffix=".part"
        ) as tmp_file:
            tmp_name = tmp_file.name
            try:
                os.chmod(tmp_name, 0o600)
            except OSError:
                pass

            with httpx.stream("GET", url, timeout=timeout_seconds, follow_redirects=True) as response:
                response.raise_for_status()
                if response.url.scheme.lower() != "https":
                    raise ValueError("Attachment download redirected to a non-HTTPS URL")

                content_length = _parse_content_length(response)
                if content_length is not None and content_length > max_bytes:
                    raise ValueError(
                        f"Attachment size {content_length} exceeds configured limit {max_bytes}"
                    )

                for chunk in response.iter_bytes():
                    if not chunk:
                        continue
                    bytes_written += len(chunk)
                    if bytes_written > max_bytes:
                        raise ValueError(
                            f"Attachment size exceeds configured limit {max_bytes}"
                        )
                    tmp_file.write(chunk)

                tmp_file.flush()

                if content_length is not None and bytes_written != content_length:
                    raise ValueError(
                        f"Downloaded byte count mismatch (expected {content_length}, got {bytes_written})"
                    )

        Path(tmp_name).replace(target)
        return bytes_written
    except Exception:
        if tmp_name:
            try:
                Path(tmp_name).unlink(missing_ok=True)
            except OSError:
                pass
        raise


def main() -> None:
    from common import build_parser, emit, get_client_and_inbox, log_action

    p = build_parser("Download message attachments")
    p.add_argument("message_id")
    p.add_argument("--out-dir", default="./downloads")
    p.add_argument("--attachment-id", default=None)
    p.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_ATTACHMENT_BYTES)
    p.add_argument("--timeout-seconds", type=float, default=DEFAULT_DOWNLOAD_TIMEOUT_SECONDS)
    args = p.parse_args()
    if args.max_bytes <= 0:
        raise SystemExit("--max-bytes must be > 0")
    if args.timeout_seconds <= 0:
        raise SystemExit("--timeout-seconds must be > 0")

    client, inbox = get_client_and_inbox(args)
    log_action(
        "download_attachments.start",
        inbox=inbox,
        message_id=args.message_id,
        out_dir=args.out_dir,
        attachment_id=args.attachment_id,
        max_bytes=args.max_bytes,
    )
    msg = client.inboxes.messages.get(inbox_id=inbox, message_id=args.message_id)
    attachments = msg.attachments or []

    if args.attachment_id:
        attachments = [a for a in attachments if a.attachment_id == args.attachment_id]

    out_dir = Path(args.out_dir).resolve()
    out_dir.mkdir(parents=True, exist_ok=True)

    downloaded = []
    failed = []
    for a in attachments:
        original_filename = a.filename or f"{a.attachment_id}.bin"
        try:
            meta = client.inboxes.messages.get_attachment(
                inbox_id=inbox, message_id=args.message_id, attachment_id=a.attachment_id
            )
            stored_name = sanitize_attachment_filename(original_filename, a.attachment_id)
            target = dedupe_path(secure_output_path(out_dir, stored_name))
            byte_count = stream_download_to_file(
                meta.download_url,
                target,
                max_bytes=args.max_bytes,
                timeout_seconds=args.timeout_seconds,
            )

            downloaded.append(
                {
                    "attachment_id": a.attachment_id,
                    "filename": original_filename,
                    "stored_filename": target.name,
                    "path": str(target.resolve()),
                    "bytes": byte_count,
                    "content_type": a.content_type,
                }
            )
        except Exception as exc:
            failed.append(
                {
                    "attachment_id": a.attachment_id,
                    "filename": original_filename,
                    "error": str(exc),
                }
            )
            log_action(
                "download_attachments.error",
                inbox=inbox,
                message_id=args.message_id,
                attachment_id=a.attachment_id,
                error=str(exc),
            )

    log_action(
        "download_attachments.done",
        inbox=inbox,
        message_id=args.message_id,
        downloaded_count=len(downloaded),
        failed_count=len(failed),
    )
    emit({"inbox": inbox, "message_id": args.message_id, "downloaded": downloaded, "failed": failed})


if __name__ == "__main__":
    main()

```

### scripts/get_message.py

```python
from common import build_parser, emit, get_client_and_inbox, log_action


def main() -> None:
    p = build_parser("Get full message")
    p.add_argument("message_id")
    args = p.parse_args()

    client, inbox = get_client_and_inbox(args)
    log_action("get_message.start", inbox=inbox, message_id=args.message_id)
    m = client.inboxes.messages.get(inbox_id=inbox, message_id=args.message_id)

    log_action("get_message.done", inbox=inbox, message_id=m.message_id, attachment_count=len(m.attachments or []))
    emit(
        {
            "inbox": inbox,
            "message_id": m.message_id,
            "thread_id": m.thread_id,
            "from": str(m.from_),
            "to": [str(x) for x in (m.to or [])],
            "subject": m.subject,
            "labels": m.labels,
            "timestamp": m.timestamp,
            "text": m.text,
            "preview": m.preview,
            "attachments": [
                {
                    "attachment_id": a.attachment_id,
                    "filename": a.filename,
                    "size": a.size,
                    "content_type": a.content_type,
                }
                for a in (m.attachments or [])
            ],
        }
    )


if __name__ == "__main__":
    main()

```

### scripts/list_messages.py

```python
from common import (
    build_parser,
    emit,
    get_allowed_senders,
    get_client_and_inbox,
    log_action,
    sender_matches,
)


def main() -> None:
    p = build_parser("List inbox messages")
    p.add_argument("--limit", type=int, default=10)
    p.add_argument("--from-email", default=None, help="Explicit sender filter")
    p.add_argument("--label", action="append", default=[])
    p.add_argument("--ascending", action="store_true")
    p.add_argument("--include-read", action="store_true", help="Include read emails (default is unread only)")
    p.add_argument("--preview-chars", type=int, default=160)
    args = p.parse_args()

    client, inbox = get_client_and_inbox(args)
    allowed_senders = [args.from_email.lower()] if args.from_email else get_allowed_senders()
    labels = list(args.label or [])
    if not args.include_read and "unread" not in labels:
        labels.append("unread")

    log_action("list_messages.start", inbox=inbox, limit=args.limit, allowed_senders=allowed_senders, labels=labels, ascending=args.ascending, include_read=args.include_read)
    resp = client.inboxes.messages.list(
        inbox_id=inbox,
        limit=args.limit,
        labels=labels or None,
        ascending=True if args.ascending else None,
    )

    out = []
    for m in (resp.messages or []):
        sender = str(getattr(m, "from_", ""))
        if allowed_senders and not sender_matches(sender, allowed_senders):
            continue
        preview = getattr(m, "preview", None)
        if preview and args.preview_chars > 0:
            preview = preview.strip().replace("\n", " ")[: args.preview_chars]

        out.append(
            {
                "message_id": m.message_id,
                "subject": getattr(m, "subject", None),
                "from": sender,
                "timestamp": getattr(m, "timestamp", None),
                "labels": getattr(m, "labels", []),
                "preview": preview,
            }
        )

    log_action("list_messages.done", inbox=inbox, count=len(out))
    emit({"inbox": inbox, "count": len(out), "messages": out})


if __name__ == "__main__":
    main()

```

### scripts/reply_messages.py

```python
from common import (
    build_parser,
    emit,
    get_allowed_senders,
    get_client_and_inbox,
    log_action,
    sender_matches,
)


def main() -> None:
    p = build_parser("Reply to filtered messages")
    p.add_argument("--from-email", default=None, help="Override allowed sender filter")
    p.add_argument("--text", required=True)
    p.add_argument("--limit", type=int, default=10)
    p.add_argument("--dry-run", action="store_true")
    p.add_argument("--dedupe-label", default="AUTO_REPLIED")
    p.add_argument("--include-read", action="store_true", help="Also scan read emails")
    p.add_argument("--keep-unread", action="store_true", help="Do not mark replied emails as read")
    args = p.parse_args()

    client, inbox = get_client_and_inbox(args)
    allowed_senders = [args.from_email.lower()] if args.from_email else get_allowed_senders()
    if not allowed_senders:
        raise SystemExit("No sender allowlist set. Use --from-email or AGENTMAIL_ALLOWED_SENDERS")

    labels = None if args.include_read else ["unread"]
    log_action("reply_messages.start", inbox=inbox, allowed_senders=allowed_senders, limit=args.limit, dry_run=args.dry_run, dedupe_label=args.dedupe_label, include_read=args.include_read, keep_unread=args.keep_unread)
    resp = client.inboxes.messages.list(inbox_id=inbox, limit=args.limit, labels=labels)

    replied = []
    for m in (resp.messages or []):
        sender = str(getattr(m, "from_", ""))
        labels = set(getattr(m, "labels", []) or [])
        if not sender_matches(sender, allowed_senders):
            continue
        if args.dedupe_label in labels:
            continue

        add_labels = [args.dedupe_label]
        remove_labels = []
        if not args.keep_unread:
            add_labels.append("read")
            remove_labels.append("unread")

        if not args.dry_run:
            client.inboxes.messages.reply(
                inbox_id=inbox,
                message_id=m.message_id,
                text=args.text,
                reply_all=False,
            )
            client.inboxes.messages.update(
                inbox_id=inbox,
                message_id=m.message_id,
                add_labels=add_labels,
                remove_labels=remove_labels,
            )

        replied.append({
            "message_id": m.message_id,
            "from": sender,
            "dry_run": args.dry_run,
            "mark_read": not args.keep_unread,
        })

    log_action("reply_messages.done", inbox=inbox, replied_count=len(replied), dry_run=args.dry_run, mark_read=not args.keep_unread)
    emit({"inbox": inbox, "replied_count": len(replied), "items": replied})


if __name__ == "__main__":
    main()

```

### scripts/set_read_state.py

```python
from common import build_parser, emit, get_client_and_inbox, log_action


def main() -> None:
    p = build_parser("Set message read/unread state")
    p.add_argument("message_id")
    p.add_argument("state", choices=["read", "unread"])
    p.add_argument("--dry-run", action="store_true")
    args = p.parse_args()

    client, inbox = get_client_and_inbox(args)
    log_action("set_read_state.start", inbox=inbox, message_id=args.message_id, state=args.state, dry_run=args.dry_run)

    add_labels = ["read"] if args.state == "read" else ["unread"]
    remove_labels = ["unread"] if args.state == "read" else ["read"]

    if not args.dry_run:
        updated = client.inboxes.messages.update(
            inbox_id=inbox,
            message_id=args.message_id,
            add_labels=add_labels,
            remove_labels=remove_labels,
        )
        labels = updated.labels
    else:
        labels = None

    log_action("set_read_state.done", inbox=inbox, message_id=args.message_id, state=args.state, dry_run=args.dry_run)
    emit(
        {
            "inbox": inbox,
            "message_id": args.message_id,
            "state": args.state,
            "dry_run": args.dry_run,
            "labels_after": labels,
        }
    )


if __name__ == "__main__":
    main()

```