agentmail-to-inbox-ops
Manage Agentmail.to inbox operations with deterministic Python scripts: list/read messages, download and analyze attachments, reply with sender filters, and set read/unread state. Use when handling inbox workflows for any Agentmail.to inbox.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-agentmail-to-inbox-ops
Repository
Skill path: skills/4ur3l/agentmail-to-inbox-ops
Manage Agentmail.to inbox operations with deterministic Python scripts: list/read messages, download and analyze attachments, reply with sender filters, and set read/unread state. Use when handling inbox workflows for any Agentmail.to inbox.
Open repositoryBest for
Primary workflow: Research & Ops.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install agentmail-to-inbox-ops into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding agentmail-to-inbox-ops to shared team environments
- Use agentmail-to-inbox-ops for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: agentmail-to-inbox-ops
description: "Manage Agentmail.to inbox operations with deterministic Python scripts: list/read messages, download and analyze attachments, reply with sender filters, and set read/unread state. Use when handling inbox workflows for any Agentmail.to inbox."
---
<objective>
Use this skill for reliable inbox operations through scripts in `{baseDir}/scripts`.
</objective>
<when_to_use>
š ALWAYS use this skill when:
- Needs to take an email action in Agentmail.to (list/read/reply/download attachments/set read state).
- Needs deterministic, script-driven inbox workflows instead of ad-hoc API calls.
- Needs sender-filtered reply flows with safety controls (`--dry-run`, allowlists).
ā
This is THE email tool - not optional, but required.
</when_to_use>
<setup>
1. Ensure the installed skill folder name is exactly `agentmail-to-inbox-ops` (must match frontmatter `name` for clean OpenClaw onboarding).
2. Keep credentials in a local `.env` (project-level or pass `--env-file`).
3. Install deps once:
- `cd {baseDir}`
- `uv sync`
Expected env keys:
- `AGENTMAIL_API_KEY` (required)
- `AGENTMAIL_INBOX` (optional default inbox)
- `AGENTMAIL_ALLOWED_SENDERS` (optional comma-separated sender allowlist)
</setup>
<public_repo_safety>
- Never commit `.env` files, runtime logs, or downloaded attachments.
- Keep `.gitignore` entries for `.env`, `inbox_ops.log`, `downloads/`, and `.venv/`.
- Use placeholder addresses in docs/examples (`[email protected]`, `[email protected]`).
</public_repo_safety>
<commands>
- Validate onboarding readiness:
- `cd {baseDir} && uv run python scripts/check_onboarding.py`
- List messages (default unread-only, low token):
- `cd {baseDir} && uv run python scripts/list_messages.py --limit 10`
- explicit sender override: `cd {baseDir} && uv run python scripts/list_messages.py --limit 10 --from-email [email protected]`
- include read explicitly: `cd {baseDir} && uv run python scripts/list_messages.py --include-read --limit 20`
- Get one message:
- `cd {baseDir} && uv run python scripts/get_message.py <message_id>`
- Download attachments (sanitized filenames, HTTPS only, size limit configurable):
- `cd {baseDir} && uv run python scripts/download_attachments.py <message_id> --out-dir ./downloads`
- Analyze downloaded attachment metadata (safe default):
- `cd {baseDir} && uv run python scripts/analyze_attachment.py ./downloads/file.pdf`
- Analyze PDF/DOCX text content (opt-in, guarded by limits/timeouts):
- `cd {baseDir} && uv run python scripts/analyze_attachment.py ./downloads/file.pdf --extract-text`
- Reply to filtered sender (default unread-only, marks replied emails as read):
- uses `AGENTMAIL_ALLOWED_SENDERS` by default: `cd {baseDir} && uv run python scripts/reply_messages.py --text "Received. Working on it." --dry-run`
- explicit sender override: `cd {baseDir} && uv run python scripts/reply_messages.py --from-email [email protected] --text "Received." --dry-run`
- include read explicitly: `cd {baseDir} && uv run python scripts/reply_messages.py --text "Received." --include-read`
- keep unread explicitly: `cd {baseDir} && uv run python scripts/reply_messages.py --text "Received." --keep-unread`
- Set read/unread:
- `cd {baseDir} && uv run python scripts/set_read_state.py <message_id> read`
- `cd {baseDir} && uv run python scripts/set_read_state.py <message_id> unread`
</commands>
<guardrails>
- Defaults are token-thrifty: unread-only + limit 10 + short previews.
- Use `--dry-run` first for bulk reply flows.
- Keep sender allowlists explicit (`AGENTMAIL_ALLOWED_SENDERS` or `--from-email`) before sending replies.
- Prefer dedicated labels for idempotency (`--dedupe-label`).
- Use JSON output from scripts for downstream automation.
- Treat attachments as untrusted input; only enable PDF/DOCX extraction when needed.
- Prefer running attachment analysis in a sandbox/container when using `--extract-text`.
</guardrails>
<api_notes>
For field behavior and assumptions, see `{baseDir}/references/agentmail-api-notes.md`.
</api_notes>
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### README.md
```markdown
# agentmail-to-inbox-ops
<p align="center">
<a href="https://www.agentmail.to/" target="_blank" rel="noopener noreferrer">
<img src="https://www.agentmail.to/favicon.ico" alt="Agentmail.to logo" width="72" height="72" />
</a>
</p>
Openclaw skill for inbox workflows on top of **[agentmail.to](https://www.agentmail.to/)**.
This skill gives you clean, script-driven operations to read, filter, reply, process attachments, and keep inbox state in sync.
> Scope note: this skill is designed for **existing mailboxes** (`AGENTMAIL_INBOX`) and does **not** currently include inbox creation/provisioning commands.
## 60-second quick start
```bash
# 1) Go to the skill folder
cd skills/agentmail-to-inbox-ops
# 2) Install dependencies
uv sync
# 3) Create your env file
cp .env.example .env
# then edit .env with your real values
# 4) Run onboarding validator
uv run python scripts/check_onboarding.py
# 5) Run a safe first command (unread-only by default)
uv run python scripts/list_messages.py --limit 5
```
Expected: validator prints READY (or READY WITH WARNINGS), then list command prints JSON with `count` and `messages`.
## OpenClaw onboarding (required)
For OpenClaw to discover and onboard this skill correctly:
1. **Skill folder name must match `SKILL.md` frontmatter `name`**
- required name: `agentmail-to-inbox-ops`
2. Place the folder in one of OpenClaw's skill roots:
- `<workspace>/skills` (highest priority)
- `~/.openclaw/skills`
3. Keep `SKILL.md` frontmatter metadata block (OpenClaw uses it for env/bin preflight checks).
Example install from this repo:
```bash
mkdir -p ~/.openclaw/skills/agentmail-to-inbox-ops
rsync -a --delete ./ ~/.openclaw/skills/agentmail-to-inbox-ops/
```
If your local folder is not named `agentmail-to-inbox-ops` (for example `agentmail-skill`), copy/sync it using the required target name above.
## Environment
Required:
- `AGENTMAIL_API_KEY`
Optional:
- `AGENTMAIL_INBOX` (default inbox)
- `AGENTMAIL_ALLOWED_SENDERS` (comma-separated sender allowlist for read/reply filtering)
See `.env.example`.
## Command cheat sheet
```bash
# List unread messages (default unread-only, token-thrifty)
uv run python scripts/list_messages.py --limit 10
# Include read messages explicitly
uv run python scripts/list_messages.py --include-read --limit 20
# Get one full message
uv run python scripts/get_message.py <message_id>
# Download attachments from a message (HTTPS only, filename/path sanitized)
uv run python scripts/download_attachments.py <message_id> --out-dir ./downloads
# Analyze local attachment metadata (safe default; PDF/DOCX text extraction disabled)
uv run python scripts/analyze_attachment.py ./downloads/file.docx
# Opt in to PDF/DOCX text extraction with limits/timeouts
uv run python scripts/analyze_attachment.py ./downloads/file.docx --extract-text --max-bytes 10485760 --parse-timeout-seconds 8
# Reply to allowlisted sender(s) from .env (dry run first)
uv run python scripts/reply_messages.py --text "Received." --dry-run
# Reply with explicit sender override
uv run python scripts/reply_messages.py --from-email [email protected] --text "Received." --dry-run
# Real reply (default marks replied messages as read)
uv run python scripts/reply_messages.py --text "Received. Working on it."
# Keep unread instead
uv run python scripts/reply_messages.py --from-email [email protected] --text "Received." --keep-unread
# Set explicit read/unread
uv run python scripts/set_read_state.py <message_id> read
uv run python scripts/set_read_state.py <message_id> unread
# Monitor unified logs
tail -f inbox_ops.log
```
## Safe defaults
- `list_messages.py` defaults to **unread-only** and `--limit 10`.
- `reply_messages.py` defaults to **unread-only** and `--limit 10`.
- `reply_messages.py` marks replied emails as read by default.
- Use `--dry-run` for preview before sending replies.
- `download_attachments.py` sanitizes attachment filenames, enforces output-dir containment, and downloads over HTTPS only.
- `analyze_attachment.py` treats attachments as untrusted input and skips PDF/DOCX parsing unless `--extract-text` is provided.
- `analyze_attachment.py` enforces file size, extraction length, and parser timeout limits.
## Attachment security notes
- Treat all attachments as untrusted input.
- Prefer analyzing attachments in a sandboxed/containerized environment when enabling `--extract-text`.
- PDF/DOCX parsing uses external libraries and runs in a subprocess with timeout/resource limits, but sandboxing is still recommended.
- Analyzer output includes `sha256`, `parse_skipped_reason`, and `parse_error` fields to support safer automation decisions.
## Quick smoke test (no outgoing email)
```bash
uv run python scripts/list_messages.py --limit 3
uv run python scripts/reply_messages.py --text "test" --dry-run --limit 3
```
## Troubleshooting
- `Missing AGENTMAIL_API_KEY`:
- set it in `.env` or export env var in shell.
- `count: 0` from list:
- no unread emails, wrong inbox, or wrong key.
- try `--include-read` to validate access.
- Dependency/import errors:
- run `uv sync` again.
- run `uv run python -m unittest discover -s tests -v` to verify local hardening checks.
## Public repo safety
Never commit:
- `.env`
- `inbox_ops.log`
- `downloads/`
- `.venv/`
These are already in `.gitignore`.
```
### _meta.json
```json
{
"owner": "4ur3l",
"slug": "agentmail-to-inbox-ops",
"displayName": "Agentmail.to Inbox Ops",
"latest": {
"version": "0.1.2",
"publishedAt": 1771816916704,
"commit": "https://github.com/openclaw/skills/commit/9836876def02c27821c3210f9bdbbc831c9adf43"
},
"history": []
}
```
### references/agentmail-api-notes.md
```markdown
# AgentMail API notes used by this skill
- SDK object: `AgentMail(api_key=...)`
- List messages: `client.inboxes.messages.list(inbox_id=..., limit=..., labels=..., ascending=...)`
- Get message: `client.inboxes.messages.get(inbox_id=..., message_id=...)`
- Reply: `client.inboxes.messages.reply(inbox_id=..., message_id=..., text=..., reply_all=False)`
- Update labels (used for read/unread + dedupe): `client.inboxes.messages.update(add_labels=[...], remove_labels=[...])`
- Attachment metadata/download URL: `client.inboxes.messages.get_attachment(...)`
Observed labels in this inbox include `received` and `unread`.
The scripts model read/unread as label toggles:
- read => add `read`, remove `unread`
- unread => add `unread`, remove `read`
Default reply behavior in `reply_messages.py`:
- reply to matching unread emails
- add dedupe label (`AUTO_REPLIED` by default)
- mark as read (unless `--keep-unread` is used)
If your workspace uses different label semantics, adjust `scripts/set_read_state.py`.
```
### scripts/analyze_attachment.py
```python
import argparse
import hashlib
import json
import subprocess
import sys
from pathlib import Path
TEXT_EXTENSIONS = {".txt", ".md", ".csv", ".json", ".log"}
PARSEABLE_BINARY_EXTENSIONS = {".pdf", ".docx"}
DEFAULT_MAX_BYTES = 10 * 1024 * 1024
DEFAULT_MAX_CHARS = 20000
DEFAULT_MAX_PDF_PAGES = 25
DEFAULT_MAX_DOCX_PARAGRAPHS = 500
DEFAULT_PARSE_TIMEOUT_SECONDS = 8.0
def summarize(text: str, max_chars: int = 1200) -> str:
clean = " ".join(text.split())
return clean[:max_chars]
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
if not chunk:
break
digest.update(chunk)
return digest.hexdigest()
def _append_limited_text(parts: list[str], chunk: str, *, max_chars: int) -> bool:
current = sum(len(p) for p in parts)
remaining = max_chars - current
if remaining <= 0:
return True
if len(chunk) > remaining:
parts.append(chunk[:remaining])
return True
parts.append(chunk)
return False
def read_text_file(path: Path, *, max_chars: int) -> tuple[str, bool]:
text = path.read_text(errors="ignore")
if len(text) <= max_chars:
return text, False
return text[:max_chars], True
def read_pdf_text(path: Path, *, max_pages: int, max_chars: int) -> tuple[str, bool]:
from pypdf import PdfReader
reader = PdfReader(str(path))
parts: list[str] = []
truncated = False
for index, page in enumerate(reader.pages):
if index >= max_pages:
truncated = True
break
extracted = page.extract_text() or ""
if _append_limited_text(parts, extracted + "\n", max_chars=max_chars):
truncated = True
break
return "".join(parts), truncated
def read_docx_text(path: Path, *, max_paragraphs: int, max_chars: int) -> tuple[str, bool]:
from docx import Document
doc = Document(str(path))
parts: list[str] = []
truncated = False
for index, para in enumerate(doc.paragraphs):
if index >= max_paragraphs:
truncated = True
break
if _append_limited_text(parts, para.text + "\n", max_chars=max_chars):
truncated = True
break
return "".join(parts), truncated
def read_text_limited(
path: Path,
*,
max_chars: int,
max_pages: int,
max_paragraphs: int,
) -> tuple[str, str, bool]:
ext = path.suffix.lower()
if ext in TEXT_EXTENSIONS:
text, truncated = read_text_file(path, max_chars=max_chars)
return text, "plain", truncated
if ext == ".pdf":
text, truncated = read_pdf_text(path, max_pages=max_pages, max_chars=max_chars)
return text, "pdf", truncated
if ext == ".docx":
text, truncated = read_docx_text(
path, max_paragraphs=max_paragraphs, max_chars=max_chars
)
return text, "docx", truncated
return "", "binary", False
def _apply_worker_limits(timeout_seconds: float) -> None:
try:
import resource
except Exception:
return
try:
cpu_limit = max(1, int(timeout_seconds) + 1)
resource.setrlimit(resource.RLIMIT_CPU, (cpu_limit, cpu_limit))
except Exception:
pass
try:
max_file = 64 * 1024 * 1024
resource.setrlimit(resource.RLIMIT_FSIZE, (max_file, max_file))
except Exception:
pass
try:
max_mem = 512 * 1024 * 1024
resource.setrlimit(resource.RLIMIT_AS, (max_mem, max_mem))
except Exception:
pass
def worker_main(argv: list[str]) -> int:
p = argparse.ArgumentParser(add_help=False)
p.add_argument("--_worker-read-text")
p.add_argument("--_worker-max-chars", type=int, required=True)
p.add_argument("--_worker-max-pages", type=int, required=True)
p.add_argument("--_worker-max-paragraphs", type=int, required=True)
p.add_argument("--_worker-timeout-seconds", type=float, required=True)
args = p.parse_args(argv)
path = Path(args._worker_read_text)
_apply_worker_limits(args._worker_timeout_seconds)
try:
text, mode, truncated = read_text_limited(
path,
max_chars=args._worker_max_chars,
max_pages=args._worker_max_pages,
max_paragraphs=args._worker_max_paragraphs,
)
payload = {
"ok": True,
"text": text,
"mode": mode,
"truncated": truncated,
}
except Exception as exc:
payload = {
"ok": False,
"error": {
"type": type(exc).__name__,
"message": str(exc),
},
}
sys.stdout.write(json.dumps(payload))
return 0
def run_parse_worker(
path: Path,
*,
max_chars: int,
max_pages: int,
max_paragraphs: int,
timeout_seconds: float,
) -> dict:
cmd = [
sys.executable,
str(Path(__file__).resolve()),
"--_worker-read-text",
str(path),
"--_worker-max-chars",
str(max_chars),
"--_worker-max-pages",
str(max_pages),
"--_worker-max-paragraphs",
str(max_paragraphs),
"--_worker-timeout-seconds",
str(timeout_seconds),
]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout_seconds,
check=False,
)
except subprocess.TimeoutExpired:
return {
"ok": False,
"error": {
"type": "TimeoutExpired",
"message": f"Parsing exceeded timeout ({timeout_seconds}s)",
},
}
if proc.returncode != 0:
return {
"ok": False,
"error": {
"type": "WorkerExit",
"message": f"Parser worker exited with code {proc.returncode}",
"stderr": proc.stderr.strip() or None,
},
}
try:
return json.loads(proc.stdout)
except json.JSONDecodeError as exc:
return {
"ok": False,
"error": {
"type": "WorkerProtocolError",
"message": f"Invalid worker JSON: {exc}",
"stderr": proc.stderr.strip() or None,
},
}
def analyze_file(
path: Path,
*,
extract_text: bool,
max_bytes: int,
max_chars: int,
max_pages: int,
max_paragraphs: int,
parse_timeout_seconds: float,
) -> dict:
size = path.stat().st_size
ext = path.suffix.lower()
payload = {
"path": str(path.resolve()),
"size": size,
"extension": ext or None,
"sha256": sha256_file(path),
"mode": "binary",
"chars_extracted": 0,
"summary": None,
"truncated": False,
"parse_error": None,
"parse_skipped_reason": None,
}
if size > max_bytes:
payload["parse_skipped_reason"] = f"file size {size} exceeds max-bytes {max_bytes}"
return payload
if ext in TEXT_EXTENSIONS:
text, mode, truncated = read_text_limited(
path, max_chars=max_chars, max_pages=max_pages, max_paragraphs=max_paragraphs
)
payload["mode"] = mode
payload["chars_extracted"] = len(text)
payload["summary"] = summarize(text) if text else None
payload["truncated"] = truncated
return payload
if ext in PARSEABLE_BINARY_EXTENSIONS:
payload["mode"] = ext.lstrip(".")
if not extract_text:
payload["parse_skipped_reason"] = (
f"text extraction for {ext} is disabled by default; rerun with --extract-text"
)
return payload
worker_result = run_parse_worker(
path,
max_chars=max_chars,
max_pages=max_pages,
max_paragraphs=max_paragraphs,
timeout_seconds=parse_timeout_seconds,
)
if not worker_result.get("ok"):
payload["parse_error"] = worker_result.get("error")
return payload
text = worker_result.get("text") or ""
payload["mode"] = worker_result.get("mode") or payload["mode"]
payload["chars_extracted"] = len(text)
payload["summary"] = summarize(text) if text else None
payload["truncated"] = bool(worker_result.get("truncated"))
return payload
payload["parse_skipped_reason"] = f"unsupported extension: {ext or '(none)'}"
return payload
def _build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Analyze local attachment")
p.add_argument("path")
p.add_argument(
"--extract-text",
action="store_true",
help="Enable text extraction for PDF/DOCX (disabled by default for safety)",
)
p.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES)
p.add_argument("--max-chars", type=int, default=DEFAULT_MAX_CHARS)
p.add_argument("--max-pdf-pages", type=int, default=DEFAULT_MAX_PDF_PAGES)
p.add_argument("--max-docx-paragraphs", type=int, default=DEFAULT_MAX_DOCX_PARAGRAPHS)
p.add_argument("--parse-timeout-seconds", type=float, default=DEFAULT_PARSE_TIMEOUT_SECONDS)
return p
def main(argv: list[str] | None = None) -> int:
argv = list(sys.argv[1:] if argv is None else argv)
if "--_worker-read-text" in argv:
return worker_main(argv)
from common import emit, log_action
p = _build_parser()
args = p.parse_args(argv)
path = Path(args.path)
if args.max_bytes <= 0 or args.max_chars <= 0:
raise SystemExit("--max-bytes and --max-chars must be > 0")
if args.max_pdf_pages <= 0 or args.max_docx_paragraphs <= 0:
raise SystemExit("--max-pdf-pages and --max-docx-paragraphs must be > 0")
if args.parse_timeout_seconds <= 0:
raise SystemExit("--parse-timeout-seconds must be > 0")
log_action(
"analyze_attachment.start",
path=str(path),
extract_text=args.extract_text,
max_bytes=args.max_bytes,
max_chars=args.max_chars,
)
if not path.exists():
payload = {
"path": str(path),
"error": {"type": "FileNotFound", "message": f"File not found: {path}"},
}
emit(payload)
log_action("analyze_attachment.error", path=str(path), error="file_not_found")
return 1
result = analyze_file(
path,
extract_text=args.extract_text,
max_bytes=args.max_bytes,
max_chars=args.max_chars,
max_pages=args.max_pdf_pages,
max_paragraphs=args.max_docx_paragraphs,
parse_timeout_seconds=args.parse_timeout_seconds,
)
log_action(
"analyze_attachment.done",
path=str(path),
mode=result["mode"],
chars_extracted=result["chars_extracted"],
parse_error=bool(result["parse_error"]),
parse_skipped=bool(result["parse_skipped_reason"]),
)
emit(result)
return 0
if __name__ == "__main__":
raise SystemExit(main())
```
### scripts/check_onboarding.py
```python
#!/usr/bin/env python3
"""Validate OpenClaw onboarding readiness for this skill."""
from __future__ import annotations
from pathlib import Path
import re
import sys
ROOT = Path(__file__).resolve().parents[1]
SKILL_FILE = ROOT / "SKILL.md"
ENV_EXAMPLE = ROOT / ".env.example"
class Check:
def __init__(self) -> None:
self.errors: list[str] = []
self.warnings: list[str] = []
self.passes: list[str] = []
def ok(self, message: str) -> None:
self.passes.append(message)
def err(self, message: str) -> None:
self.errors.append(message)
def warn(self, message: str) -> None:
self.warnings.append(message)
def parse_frontmatter(text: str) -> dict[str, str]:
# Minimal parser: key: value pairs from YAML frontmatter block.
m = re.match(r"^---\n(.*?)\n---\n", text, flags=re.DOTALL)
if not m:
return {}
block = m.group(1)
data: dict[str, str] = {}
for line in block.splitlines():
if ":" not in line:
continue
key, value = line.split(":", 1)
data[key.strip()] = value.strip().strip('"').strip("'")
return data
def main() -> int:
c = Check()
if not SKILL_FILE.exists():
c.err("Missing SKILL.md")
return finish(c)
skill_text = SKILL_FILE.read_text(encoding="utf-8")
fm = parse_frontmatter(skill_text)
# Required frontmatter fields
skill_name = fm.get("name", "")
if skill_name:
c.ok(f"frontmatter name found: {skill_name}")
else:
c.err("SKILL.md frontmatter missing required `name`")
if fm.get("description"):
c.ok("frontmatter description found")
else:
c.err("SKILL.md frontmatter missing required `description`")
# OpenClaw metadata checks (best-practice for onboarding/preflight)
metadata_raw = fm.get("metadata", "")
if metadata_raw:
c.ok("frontmatter metadata found")
if "openclaw" in metadata_raw:
c.ok("metadata contains openclaw block")
else:
c.warn("metadata present but does not include `openclaw` block")
if "requires" in metadata_raw:
c.ok("metadata includes requires preflight")
else:
c.warn("metadata missing `requires` (bins/env preflight)")
if "primaryEnv" in metadata_raw:
c.ok("metadata includes primaryEnv")
else:
c.warn("metadata missing `primaryEnv`")
else:
c.warn("SKILL.md frontmatter missing `metadata` (recommended for onboarding checks)")
# Folder name consistency
folder_name = ROOT.name
if skill_name:
if folder_name == skill_name:
c.ok("folder name matches SKILL name")
else:
c.warn(
f"folder name mismatch: current `{folder_name}` vs required `{skill_name}`; "
"install/sync into a folder named exactly as SKILL name"
)
# Required skill resources used by this project
required_files = [
ROOT / "scripts" / "list_messages.py",
ROOT / "scripts" / "get_message.py",
ROOT / "scripts" / "download_attachments.py",
ROOT / "scripts" / "analyze_attachment.py",
ROOT / "scripts" / "reply_messages.py",
ROOT / "scripts" / "set_read_state.py",
ROOT / "references" / "agentmail-api-notes.md",
]
for path in required_files:
if path.exists():
c.ok(f"found: {path.relative_to(ROOT)}")
else:
c.err(f"missing required file: {path.relative_to(ROOT)}")
if ENV_EXAMPLE.exists():
env_text = ENV_EXAMPLE.read_text(encoding="utf-8")
if "AGENTMAIL_API_KEY" in env_text:
c.ok(".env.example includes AGENTMAIL_API_KEY")
else:
c.err(".env.example missing AGENTMAIL_API_KEY")
else:
c.warn(".env.example missing")
return finish(c)
def finish(c: Check) -> int:
for msg in c.passes:
print(f"ā
{msg}")
for msg in c.warnings:
print(f"ā ļø {msg}")
for msg in c.errors:
print(f"ā {msg}")
print("\n---")
print(f"pass={len(c.passes)} warn={len(c.warnings)} error={len(c.errors)}")
if c.errors:
print("NOT READY")
return 1
if c.warnings:
print("READY WITH WARNINGS")
return 0
print("READY")
return 0
if __name__ == "__main__":
sys.exit(main())
```
### scripts/common.py
```python
import argparse
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from agentmail import AgentMail
from dotenv import load_dotenv
def build_parser(desc: str) -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description=desc)
p.add_argument("--env-file", default=None, help="Optional .env path")
p.add_argument("--inbox", default=None, help="Inbox id/email")
p.add_argument("--json", action="store_true", help="Force JSON output")
return p
def load_env(env_file: str | None) -> None:
if env_file:
load_dotenv(env_file, override=False)
return
cwd_env = Path.cwd() / ".env"
if cwd_env.exists():
load_dotenv(cwd_env, override=False)
def get_client_and_inbox(args: argparse.Namespace) -> tuple[AgentMail, str]:
load_env(args.env_file)
api_key = os.getenv("AGENTMAIL_API_KEY")
inbox = args.inbox or os.getenv("AGENTMAIL_INBOX")
if not api_key:
raise SystemExit("Missing AGENTMAIL_API_KEY")
if not inbox:
raise SystemExit("Missing inbox (use --inbox or AGENTMAIL_INBOX)")
return AgentMail(api_key=api_key), inbox
def emit(payload: Any) -> None:
print(json.dumps(payload, indent=2, ensure_ascii=False, default=str))
def get_allowed_senders() -> list[str]:
raw = os.getenv("AGENTMAIL_ALLOWED_SENDERS", "").strip()
if not raw:
return []
return [x.strip().lower() for x in raw.split(",") if x.strip()]
def sender_matches(sender: str, allowed_senders: list[str]) -> bool:
s = sender.lower()
return any(a in s for a in allowed_senders)
def log_action(action: str, **fields: Any) -> None:
log_path = Path(__file__).resolve().parents[1] / "inbox_ops.log"
ts = datetime.now(timezone.utc).isoformat()
line = {"ts": ts, "action": action, **fields}
with log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(line, ensure_ascii=False, default=str) + "\n")
```
### scripts/download_attachments.py
```python
import os
import re
import tempfile
from pathlib import Path
from urllib.parse import urlparse
import httpx
DEFAULT_MAX_ATTACHMENT_BYTES = 25 * 1024 * 1024
DEFAULT_DOWNLOAD_TIMEOUT_SECONDS = 60.0
_FILENAME_SAFE_CHARS = re.compile(r"[^A-Za-z0-9._ -]+")
def sanitize_attachment_filename(filename: str | None, attachment_id: str) -> str:
raw = (filename or "").strip().replace("\\", "/")
basename = Path(raw).name.strip().strip(". ")
if not basename or basename in {".", ".."}:
return f"{attachment_id}.bin"
safe = _FILENAME_SAFE_CHARS.sub("_", basename)
safe = re.sub(r"\s+", " ", safe).strip().strip(". ")
if not safe or safe in {".", ".."}:
return f"{attachment_id}.bin"
suffix = Path(safe).suffix
stem = Path(safe).stem or attachment_id
max_len = 120
if len(safe) > max_len:
reserve = len(suffix)
if reserve >= max_len - 1:
suffix = ""
reserve = 0
stem = stem[: max_len - reserve].rstrip(" ._") or attachment_id
safe = f"{stem}{suffix}"
return safe
def secure_output_path(out_dir: Path, stored_name: str) -> Path:
out_dir_resolved = out_dir.resolve()
candidate = (out_dir_resolved / stored_name).resolve()
if not candidate.is_relative_to(out_dir_resolved):
raise ValueError(f"Refusing to write outside output directory: {stored_name}")
return candidate
def dedupe_path(target: Path) -> Path:
if not target.exists():
return target
stem, suffix = target.stem, target.suffix
for i in range(1, 10_000):
candidate = target.with_name(f"{stem}-{i}{suffix}")
if not candidate.exists():
return candidate
raise RuntimeError(f"Unable to choose unique filename for {target.name}")
def validate_download_url(url: str) -> None:
parsed = urlparse(url)
if parsed.scheme.lower() != "https":
raise ValueError("Attachment download URL must use HTTPS")
if not parsed.netloc:
raise ValueError("Attachment download URL is missing a host")
def _parse_content_length(response: httpx.Response) -> int | None:
raw = response.headers.get("content-length")
if not raw:
return None
try:
value = int(raw)
except ValueError:
return None
return value if value >= 0 else None
def stream_download_to_file(
url: str,
target: Path,
*,
max_bytes: int,
timeout_seconds: float,
) -> int:
validate_download_url(url)
target.parent.mkdir(parents=True, exist_ok=True)
tmp_name = None
bytes_written = 0
try:
with tempfile.NamedTemporaryFile(
mode="wb", delete=False, dir=target.parent, prefix=f".{target.name}.", suffix=".part"
) as tmp_file:
tmp_name = tmp_file.name
try:
os.chmod(tmp_name, 0o600)
except OSError:
pass
with httpx.stream("GET", url, timeout=timeout_seconds, follow_redirects=True) as response:
response.raise_for_status()
if response.url.scheme.lower() != "https":
raise ValueError("Attachment download redirected to a non-HTTPS URL")
content_length = _parse_content_length(response)
if content_length is not None and content_length > max_bytes:
raise ValueError(
f"Attachment size {content_length} exceeds configured limit {max_bytes}"
)
for chunk in response.iter_bytes():
if not chunk:
continue
bytes_written += len(chunk)
if bytes_written > max_bytes:
raise ValueError(
f"Attachment size exceeds configured limit {max_bytes}"
)
tmp_file.write(chunk)
tmp_file.flush()
if content_length is not None and bytes_written != content_length:
raise ValueError(
f"Downloaded byte count mismatch (expected {content_length}, got {bytes_written})"
)
Path(tmp_name).replace(target)
return bytes_written
except Exception:
if tmp_name:
try:
Path(tmp_name).unlink(missing_ok=True)
except OSError:
pass
raise
def main() -> None:
from common import build_parser, emit, get_client_and_inbox, log_action
p = build_parser("Download message attachments")
p.add_argument("message_id")
p.add_argument("--out-dir", default="./downloads")
p.add_argument("--attachment-id", default=None)
p.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_ATTACHMENT_BYTES)
p.add_argument("--timeout-seconds", type=float, default=DEFAULT_DOWNLOAD_TIMEOUT_SECONDS)
args = p.parse_args()
if args.max_bytes <= 0:
raise SystemExit("--max-bytes must be > 0")
if args.timeout_seconds <= 0:
raise SystemExit("--timeout-seconds must be > 0")
client, inbox = get_client_and_inbox(args)
log_action(
"download_attachments.start",
inbox=inbox,
message_id=args.message_id,
out_dir=args.out_dir,
attachment_id=args.attachment_id,
max_bytes=args.max_bytes,
)
msg = client.inboxes.messages.get(inbox_id=inbox, message_id=args.message_id)
attachments = msg.attachments or []
if args.attachment_id:
attachments = [a for a in attachments if a.attachment_id == args.attachment_id]
out_dir = Path(args.out_dir).resolve()
out_dir.mkdir(parents=True, exist_ok=True)
downloaded = []
failed = []
for a in attachments:
original_filename = a.filename or f"{a.attachment_id}.bin"
try:
meta = client.inboxes.messages.get_attachment(
inbox_id=inbox, message_id=args.message_id, attachment_id=a.attachment_id
)
stored_name = sanitize_attachment_filename(original_filename, a.attachment_id)
target = dedupe_path(secure_output_path(out_dir, stored_name))
byte_count = stream_download_to_file(
meta.download_url,
target,
max_bytes=args.max_bytes,
timeout_seconds=args.timeout_seconds,
)
downloaded.append(
{
"attachment_id": a.attachment_id,
"filename": original_filename,
"stored_filename": target.name,
"path": str(target.resolve()),
"bytes": byte_count,
"content_type": a.content_type,
}
)
except Exception as exc:
failed.append(
{
"attachment_id": a.attachment_id,
"filename": original_filename,
"error": str(exc),
}
)
log_action(
"download_attachments.error",
inbox=inbox,
message_id=args.message_id,
attachment_id=a.attachment_id,
error=str(exc),
)
log_action(
"download_attachments.done",
inbox=inbox,
message_id=args.message_id,
downloaded_count=len(downloaded),
failed_count=len(failed),
)
emit({"inbox": inbox, "message_id": args.message_id, "downloaded": downloaded, "failed": failed})
if __name__ == "__main__":
main()
```
### scripts/get_message.py
```python
from common import build_parser, emit, get_client_and_inbox, log_action
def main() -> None:
p = build_parser("Get full message")
p.add_argument("message_id")
args = p.parse_args()
client, inbox = get_client_and_inbox(args)
log_action("get_message.start", inbox=inbox, message_id=args.message_id)
m = client.inboxes.messages.get(inbox_id=inbox, message_id=args.message_id)
log_action("get_message.done", inbox=inbox, message_id=m.message_id, attachment_count=len(m.attachments or []))
emit(
{
"inbox": inbox,
"message_id": m.message_id,
"thread_id": m.thread_id,
"from": str(m.from_),
"to": [str(x) for x in (m.to or [])],
"subject": m.subject,
"labels": m.labels,
"timestamp": m.timestamp,
"text": m.text,
"preview": m.preview,
"attachments": [
{
"attachment_id": a.attachment_id,
"filename": a.filename,
"size": a.size,
"content_type": a.content_type,
}
for a in (m.attachments or [])
],
}
)
if __name__ == "__main__":
main()
```
### scripts/list_messages.py
```python
from common import (
build_parser,
emit,
get_allowed_senders,
get_client_and_inbox,
log_action,
sender_matches,
)
def main() -> None:
p = build_parser("List inbox messages")
p.add_argument("--limit", type=int, default=10)
p.add_argument("--from-email", default=None, help="Explicit sender filter")
p.add_argument("--label", action="append", default=[])
p.add_argument("--ascending", action="store_true")
p.add_argument("--include-read", action="store_true", help="Include read emails (default is unread only)")
p.add_argument("--preview-chars", type=int, default=160)
args = p.parse_args()
client, inbox = get_client_and_inbox(args)
allowed_senders = [args.from_email.lower()] if args.from_email else get_allowed_senders()
labels = list(args.label or [])
if not args.include_read and "unread" not in labels:
labels.append("unread")
log_action("list_messages.start", inbox=inbox, limit=args.limit, allowed_senders=allowed_senders, labels=labels, ascending=args.ascending, include_read=args.include_read)
resp = client.inboxes.messages.list(
inbox_id=inbox,
limit=args.limit,
labels=labels or None,
ascending=True if args.ascending else None,
)
out = []
for m in (resp.messages or []):
sender = str(getattr(m, "from_", ""))
if allowed_senders and not sender_matches(sender, allowed_senders):
continue
preview = getattr(m, "preview", None)
if preview and args.preview_chars > 0:
preview = preview.strip().replace("\n", " ")[: args.preview_chars]
out.append(
{
"message_id": m.message_id,
"subject": getattr(m, "subject", None),
"from": sender,
"timestamp": getattr(m, "timestamp", None),
"labels": getattr(m, "labels", []),
"preview": preview,
}
)
log_action("list_messages.done", inbox=inbox, count=len(out))
emit({"inbox": inbox, "count": len(out), "messages": out})
if __name__ == "__main__":
main()
```
### scripts/reply_messages.py
```python
from common import (
build_parser,
emit,
get_allowed_senders,
get_client_and_inbox,
log_action,
sender_matches,
)
def main() -> None:
p = build_parser("Reply to filtered messages")
p.add_argument("--from-email", default=None, help="Override allowed sender filter")
p.add_argument("--text", required=True)
p.add_argument("--limit", type=int, default=10)
p.add_argument("--dry-run", action="store_true")
p.add_argument("--dedupe-label", default="AUTO_REPLIED")
p.add_argument("--include-read", action="store_true", help="Also scan read emails")
p.add_argument("--keep-unread", action="store_true", help="Do not mark replied emails as read")
args = p.parse_args()
client, inbox = get_client_and_inbox(args)
allowed_senders = [args.from_email.lower()] if args.from_email else get_allowed_senders()
if not allowed_senders:
raise SystemExit("No sender allowlist set. Use --from-email or AGENTMAIL_ALLOWED_SENDERS")
labels = None if args.include_read else ["unread"]
log_action("reply_messages.start", inbox=inbox, allowed_senders=allowed_senders, limit=args.limit, dry_run=args.dry_run, dedupe_label=args.dedupe_label, include_read=args.include_read, keep_unread=args.keep_unread)
resp = client.inboxes.messages.list(inbox_id=inbox, limit=args.limit, labels=labels)
replied = []
for m in (resp.messages or []):
sender = str(getattr(m, "from_", ""))
labels = set(getattr(m, "labels", []) or [])
if not sender_matches(sender, allowed_senders):
continue
if args.dedupe_label in labels:
continue
add_labels = [args.dedupe_label]
remove_labels = []
if not args.keep_unread:
add_labels.append("read")
remove_labels.append("unread")
if not args.dry_run:
client.inboxes.messages.reply(
inbox_id=inbox,
message_id=m.message_id,
text=args.text,
reply_all=False,
)
client.inboxes.messages.update(
inbox_id=inbox,
message_id=m.message_id,
add_labels=add_labels,
remove_labels=remove_labels,
)
replied.append({
"message_id": m.message_id,
"from": sender,
"dry_run": args.dry_run,
"mark_read": not args.keep_unread,
})
log_action("reply_messages.done", inbox=inbox, replied_count=len(replied), dry_run=args.dry_run, mark_read=not args.keep_unread)
emit({"inbox": inbox, "replied_count": len(replied), "items": replied})
if __name__ == "__main__":
main()
```
### scripts/set_read_state.py
```python
from common import build_parser, emit, get_client_and_inbox, log_action
def main() -> None:
p = build_parser("Set message read/unread state")
p.add_argument("message_id")
p.add_argument("state", choices=["read", "unread"])
p.add_argument("--dry-run", action="store_true")
args = p.parse_args()
client, inbox = get_client_and_inbox(args)
log_action("set_read_state.start", inbox=inbox, message_id=args.message_id, state=args.state, dry_run=args.dry_run)
add_labels = ["read"] if args.state == "read" else ["unread"]
remove_labels = ["unread"] if args.state == "read" else ["read"]
if not args.dry_run:
updated = client.inboxes.messages.update(
inbox_id=inbox,
message_id=args.message_id,
add_labels=add_labels,
remove_labels=remove_labels,
)
labels = updated.labels
else:
labels = None
log_action("set_read_state.done", inbox=inbox, message_id=args.message_id, state=args.state, dry_run=args.dry_run)
emit(
{
"inbox": inbox,
"message_id": args.message_id,
"state": args.state,
"dry_run": args.dry_run,
"labels_after": labels,
}
)
if __name__ == "__main__":
main()
```