claw-mail
Multi-account email management skill for IMAP/SMTP. Fetches, reads, searches, composes, sends, replies, forwards, and organizes emails across multiple accounts. Features IMAP Outbox for reliable delivery, secure credential storage via 1Password and macOS Keychain, TLS 1.2+ with hardened ciphers, OAuth2 authentication, IMAP IDLE push monitoring, connection pooling, S/MIME signing, calendar invitations, mail merge, conversation threading, webhook rule actions, and configurable dated-folder archival.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-claw-mail
Repository
Skill path: skills/borgcube/claw-mail
Multi-account email management skill for IMAP/SMTP. Fetches, reads, searches, composes, sends, replies, forwards, and organizes emails across multiple accounts. Features IMAP Outbox for reliable delivery, secure credential storage via 1Password and macOS Keychain, TLS 1.2+ with hardened ciphers, OAuth2 authentication, IMAP IDLE push monitoring, connection pooling, S/MIME signing, calendar invitations, mail merge, conversation threading, webhook rule actions, and configurable dated-folder archival.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack, Integration.
Target audience: everyone.
License: MIT.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install claw-mail into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding claw-mail to shared team environments
- Use claw-mail for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
--- name: claw-mail description: > Multi-account email management skill for IMAP/SMTP. Fetches, reads, searches, composes, sends, replies, forwards, and organizes emails across multiple accounts. Features IMAP Outbox for reliable delivery, secure credential storage via 1Password and macOS Keychain, TLS 1.2+ with hardened ciphers, OAuth2 authentication, IMAP IDLE push monitoring, connection pooling, S/MIME signing, calendar invitations, mail merge, conversation threading, webhook rule actions, and configurable dated-folder archival. license: MIT metadata: author: openclaw version: "0.7.0" compatibility: > Requires Python 3.11+ and PyYAML. Optional: 1Password CLI (op) for op:// credentials, macOS for keychain:// credentials, cryptography package for S/MIME. allowed-tools: Bash(python3 *) Read Write --- # clawMail Skill You are an email management agent with multi-account IMAP/SMTP support. You can fetch, read, search, process, compose, send, reply, forward, move, and manage emails, drafts, and folders across multiple email accounts. ## Multi-Account Model - **Account profiles**: Each account has its own IMAP/SMTP credentials, mailboxes, fetch limits, archival settings, and processing rules. - **Default account**: One account is designated as the default. Any script invoked without `--account` uses the default automatically. - **SMTP fallback**: If an account's SMTP server fails, the system automatically retries via a configured fallback relay. - **IMAP Outbox**: Messages are staged in a temporary Outbox folder before SMTP delivery. If SMTP fails, the message stays in Outbox for retry by the heartbeat. - **Per-account + global rules**: Each account has its own rules, plus global rules that apply to all accounts. - **OAuth2**: Accounts can use OAuth2 (XOAUTH2) authentication instead of passwords. - **Dated-folder archival**: `archive_mail.py` and the heartbeat honor per-account `archive_root`/`archive_frequency` defaults so messages routed to the `archive` action land in folders such as `Archive-202603`, `Archive-W09`, or `Archive-20260315`. ## Security - **TLS 1.2+**: All IMAP and SMTP connections enforce TLS 1.2 or higher. - **Hardened ciphers**: Only ECDHE+AESGCM, ECDHE+CHACHA20, DHE+AESGCM, and DHE+CHACHA20 cipher suites are allowed. Weak ciphers (MD5, RC4, 3DES, DSS) are explicitly blocked. - **Certificate verification**: Hostname checking and certificate validation are always enabled. - **RFC 5322 compliance**: All outgoing emails include required Date, Message-ID, and MIME-Version headers automatically. - **Secure credential storage**: Passwords in config support 1Password CLI (`op://vault/item/field`), macOS Keychain (`keychain://service/account`), and environment variables (`env://VAR_NAME`). ## Available Scripts All scripts are in the `scripts/` directory. Run with `python3 scripts/<name>.py` from the skill root. Every script accepts `--account <name>` to target a specific account. ### Core Scripts | Script | Purpose | |--------|---------| | `scripts/fetch_mail.py` | Fetch emails from an IMAP folder | | `scripts/read_mail.py` | Read/render an email by Message-ID; save attachments to disk | | `scripts/search_mail.py` | Search emails by subject, sender, body, date, flags | | `scripts/send_mail.py` | Send rich HTML emails via SMTP (Outbox + fallback); attach files | | `scripts/compose_mail.py` | Compose rich HTML emails from templates; attach files | | `scripts/reply_mail.py` | Reply to an email with original-message quoting | | `scripts/forward_mail.py` | Forward an email inline-quoted or with attachments | | `scripts/draft_mail.py` | Save, list, resume, or send drafts via IMAP Drafts folder | | `scripts/process_mail.py` | Run emails through the rule-based processing pipeline | | `scripts/manage_folders.py` | List, create, delete, rename, and move IMAP folders | | `scripts/move_mail.py` | Move emails between IMAP folders (batch support) | | `scripts/heartbeat.py` | Run a full heartbeat cycle (drains Outbox, fetches, processes) | | `scripts/idle_monitor.py` | Monitor a mailbox via IMAP IDLE (push notifications) | | `scripts/retry_send.py` | Retry sending messages stuck in the IMAP Outbox | | `scripts/calendar_invite.py` | Compose and send iCalendar meeting invitations | | `scripts/mail_merge.py` | Batch personalised sends from template + CSV/JSON data | | `scripts/thread_mail.py` | Group messages into conversation threads | | `scripts/archive_mail.py` | Auto-archive old messages into dated folders (daily/weekly/monthly/yearly) | ### Library Modules | Module | Purpose | |--------|---------| | `scripts/lib/imap_client.py` | IMAP client with IDLE, search, folder management, TLS 1.2+ | | `scripts/lib/smtp_client.py` | SMTP client with TLS 1.2+, RFC 5322, OAuth2, MIME building | | `scripts/lib/composer.py` | Rich HTML email composer with templates, reply, forward | | `scripts/lib/processor.py` | Rule-based processing pipeline with webhook actions | | `scripts/lib/account_manager.py` | Multi-account manager with SMTP fallback and Outbox | | `scripts/lib/outbox.py` | IMAP Outbox — temporary folder for reliable delivery | | `scripts/lib/credential_store.py` | Secure credential storage (1Password, Keychain, env) | | `scripts/lib/pool.py` | Connection pool for IMAP/SMTP reuse | | `scripts/lib/send_queue.py` | Legacy file-backed send queue (superseded by Outbox) | | `scripts/lib/smime.py` | S/MIME signing and encryption | | `scripts/lib/oauth2.py` | OAuth2 (XOAUTH2) token management | | `scripts/lib/models.py` | Data models (EmailMessage, EmailAddress, etc.) | ### Reference Documents | Reference | When to read | |-----------|-------------| | `references/REFERENCE.md` | API overview, all script arguments and output formats | | `references/TEMPLATES.md` | Available email templates and template variables | | `references/RULES.md` | How to configure processing rules | | `ROADMAP.md` | Feature roadmap and progress tracker | ## Quick Start ### Fetching Mail ```bash python3 scripts/fetch_mail.py --config config.yaml python3 scripts/fetch_mail.py --account personal --unread-only --format cli --config config.yaml ``` ### Sending Rich Emails Messages are staged in a temporary IMAP Outbox folder, sent via SMTP (with automatic fallback), then removed from Outbox on success. ```bash python3 scripts/send_mail.py \ --to "[email protected]" \ --subject "Weekly Report" \ --body "<p>Here are this week's results.</p>" \ --template default \ --attach report.pdf \ --config config.yaml ``` ### Replying and Forwarding ```bash python3 scripts/reply_mail.py --message-id "<[email protected]>" --body "Thanks!" --config config.yaml python3 scripts/forward_mail.py --message-id "<[email protected]>" --to "[email protected]" --config config.yaml ``` ### Searching Emails ```bash python3 scripts/search_mail.py --subject "invoice" --unseen --config config.yaml python3 scripts/search_mail.py --criteria '(FROM "[email protected]" SINCE 01-Jan-2026)' --config config.yaml ``` ### Working with Drafts ```bash python3 scripts/draft_mail.py --action save --to "[email protected]" --subject "WIP" --body "..." --config config.yaml python3 scripts/draft_mail.py --action list --format cli --config config.yaml python3 scripts/draft_mail.py --action send --message-id "<[email protected]>" --config config.yaml ``` ### Outbox & Send Retry ```bash python3 scripts/retry_send.py --config config.yaml python3 scripts/retry_send.py --config config.yaml --list ``` ### Heartbeat Cycle The heartbeat drains each account's Outbox, then fetches and processes mail: ```bash python3 scripts/heartbeat.py --config config.yaml python3 scripts/heartbeat.py --config config.yaml --account work ``` ### Archiving Old Messages ```bash python3 scripts/archive_mail.py --config config.yaml --days 90 --frequency monthly python3 scripts/archive_mail.py --config config.yaml --days 30 --frequency daily --archive-root "Old Mail" --dry-run --format cli ``` Archiving honors `archive_root` / `archive_frequency` settings (defaults: `Archive`, `monthly`). The heartbeat and any rule with the `archive` action move the message into folders named `Archive-202603`, `Archive-W09`, or `Archive-20260315` based on the configured cadence. ### Calendar Invitations ```bash python3 scripts/calendar_invite.py \ --to "[email protected]" --subject "Standup" \ --start "2026-03-01T09:00:00" --end "2026-03-01T09:30:00" \ --location "Zoom" --config config.yaml ``` ### Mail Merge ```bash python3 scripts/mail_merge.py \ --data contacts.csv --subject "Hello {{name}}" \ --body "<p>Dear {{name}}, your code is {{code}}.</p>" \ --to-field email --config config.yaml ``` ## Configuration Create a `config.yaml` from `assets/config.example.yaml`: ```yaml default_account: work accounts: work: label: "Work" sender_address: "[email protected]" sender_name: "Alice Smith" imap: host: imap.company.com port: 993 username: "[email protected]" password: "op://Work/IMAP/password" # 1Password CLI ssl: true smtp: host: smtp.company.com port: 587 username: "[email protected]" password: "op://Work/SMTP/password" # 1Password CLI tls: true mailboxes: [INBOX, Projects] fetch_limit: 50 rules: - name: flag_urgent sender_pattern: "boss@company\\.com" actions: [flag, tag] tag: urgent personal: label: "Personal" sender_address: "[email protected]" imap: host: imap.gmail.com password: "keychain://imap.gmail.com/[email protected]" # macOS Keychain smtp: host: smtp.gmail.com password: "keychain://smtp.gmail.com/[email protected]" # macOS Keychain ``` You can also define `archive_root` (e.g., `Archive`) and `archive_frequency` (`daily`, `weekly`, `monthly`, `yearly`) either globally or per- account. These defaults drive both the `archive_mail.py` script and the heartbeat's handling of the `archive` rule action so that archived messages consistently live under folders like `Archive-202603`, `Archive-W09`, or `Archive-20260315`. ### Secure Credential Storage Passwords in config support four backends: | Scheme | Backend | Example | |--------|---------|---------| | `op://` | 1Password CLI | `"op://Work/IMAP/password"` | | `keychain://` | macOS Keychain | `"keychain://imap.gmail.com/alice"` | | `env://` | Environment variable | `"env://GMAIL_APP_PASSWORD"` | | *(plain text)* | Literal value | `"my-password"` (logs a warning) | ### OAuth2 Authentication (Gmail, Outlook 365) For providers that require OAuth2, set `auth: oauth2` on the IMAP/SMTP block: ```yaml imap: host: imap.gmail.com username: "[email protected]" auth: oauth2 oauth2: client_id: "your-client-id" client_secret: "your-client-secret" refresh_token: "your-refresh-token" token_uri: "https://oauth2.googleapis.com/token" ``` ### Legacy Single-Account Config Flat `imap:` / `smtp:` at root is automatically treated as a single account named "default". --- ## Referenced Files > The following files are referenced in this skill and included for context. ### scripts/fetch_mail.py ```python #!/usr/bin/env python3 """Fetch emails from an IMAP server. Usage: python3 scripts/fetch_mail.py --config config.yaml python3 scripts/fetch_mail.py --config config.yaml --account work python3 scripts/fetch_mail.py --config config.yaml --format cli python3 scripts/fetch_mail.py --imap-host imap.example.com --imap-user me --imap-pass secret Output: JSON (default) or CLI table to stdout. """ from __future__ import annotations import argparse import json import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient from lib.models import EmailMessage def main() -> None: parser = argparse.ArgumentParser(description="Fetch emails via IMAP") parser.add_argument("--account", default="", help="Account name (default: default account)") parser.add_argument("--folder", default="INBOX", help="Folder to fetch from") parser.add_argument("--limit", type=int, default=0, help="Max messages (0 = use config)") parser.add_argument("--unread-only", action="store_true", help="Only unread messages") parser.add_argument("--mark-read", action="store_true", help="Mark fetched messages as read") parser.add_argument("--format", choices=["json", "cli"], default="json", help="Output format (default: json)") # Direct IMAP options (bypass AccountManager) parser.add_argument("--imap-host", default="", help="IMAP server") parser.add_argument("--imap-port", type=int, default=993, help="IMAP port") parser.add_argument("--imap-user", default="", help="IMAP username") parser.add_argument("--imap-pass", default="", help="IMAP password") parser.add_argument("--imap-no-ssl", action="store_true", help="Disable SSL") # Config / stdin parser.add_argument("--config", default="", help="YAML config file") parser.add_argument("--from-stdin", action="store_true", help="Read JSON messages from stdin (skip IMAP)") args = parser.parse_args() args.config = resolve_config_path(args.config) # Read from stdin if piped in if args.from_stdin: data = json.load(sys.stdin) if isinstance(data, dict) and "messages" in data: msg_dicts = data["messages"] elif isinstance(data, list): msg_dicts = data else: _error("Expected JSON array or object with 'messages' key") return messages = [EmailMessage.from_dict(d) for d in msg_dicts] _output(messages, args.folder, args.format, args.account) return # Direct IMAP mode (no config file) if args.imap_host: client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) limit = args.limit or 50 try: client.connect() if args.unread_only: messages = client.fetch_unread( mailbox=args.folder, limit=limit, mark_seen=args.mark_read, ) else: messages = client.fetch_all(mailbox=args.folder, limit=limit) finally: client.disconnect() _output(messages, args.folder, args.format, "") return # Multi-account mode via config if not args.config: _error("Either --config or --imap-host is required.") mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account limit = args.limit or mgr.get_fetch_limit(acct_name) client = mgr.get_imap_client(acct_name) try: client.connect() if args.unread_only: messages = client.fetch_unread( mailbox=args.folder, limit=limit, mark_seen=args.mark_read, ) else: messages = client.fetch_all(mailbox=args.folder, limit=limit) finally: client.disconnect() _output(messages, args.folder, args.format, acct_name) def _output(messages: list[EmailMessage], folder: str, fmt: str, account: str) -> None: if fmt == "cli": label = f"{account}:{folder}" if account else folder if not messages: print(f" No messages in {label}") return print(f" {label} ({len(messages)} messages)") print(f" {'':->60}") for m in messages: sender = str(m.sender) if m.sender else "(unknown)" if len(sender) > 25: sender = sender[:22] + "..." date = m.date.strftime("%Y-%m-%d %H:%M") if m.date else "" subj = m.subject[:40] if m.subject else "(no subject)" read = " " if m.is_read else "*" att = "+" if m.has_attachments else " " print(f" {read}{att} {date:16s} {sender:25s} {subj}") print() else: output = { "account": account, "folder": folder, "count": len(messages), "messages": [m.to_dict() for m in messages], } json.dump(output, sys.stdout, indent=2, default=str) print() def _load_manager(path: str) -> AccountManager: try: return AccountManager.from_yaml(path) except Exception as exc: _error(f"Failed to load config: {exc}") raise # unreachable, _error exits def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/send_mail.py ```python #!/usr/bin/env python3 """Send rich HTML emails via SMTP. Usage: python3 scripts/send_mail.py --config config.yaml [options] python3 scripts/send_mail.py --config config.yaml --account personal [options] python3 scripts/compose_mail.py ... | python3 scripts/send_mail.py --from-stdin --config config.yaml Output: JSON result to stdout. """ from __future__ import annotations import argparse import json import mimetypes import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.models import EmailAttachment, EmailMessage from lib.smtp_client import SMTPClient from lib.composer import EmailComposer def main() -> None: parser = argparse.ArgumentParser(description="Send rich HTML emails via SMTP") # Account parser.add_argument("--account", default="", help="Account name (default: default account)") # Message options parser.add_argument("--to", action="append", default=[], help="Recipient (repeatable)") parser.add_argument("--cc", action="append", default=[], help="CC recipient") parser.add_argument("--bcc", action="append", default=[], help="BCC recipient") parser.add_argument("--subject", default="", help="Email subject") parser.add_argument("--body", default="", help="Email body (HTML or plain)") parser.add_argument("--sender", default="", help="Sender address") # Template options parser.add_argument("--template", default="default", choices=["default", "minimal", "digest"], help="Email template") parser.add_argument("--greeting", default="", help="Greeting line") parser.add_argument("--sign-off", default="", help="Sign-off line") parser.add_argument("--header-text", default="", help="Header text") parser.add_argument("--header-color", default="#2d3748", help="Header bg color") parser.add_argument("--footer-text", default="", help="Footer text") parser.add_argument("--action-url", default="", help="CTA button URL") parser.add_argument("--action-text", default="View Details", help="CTA button text") parser.add_argument("--action-color", default="#4299e1", help="CTA button color") # Reply threading parser.add_argument("--reply-to", default="", help="In-Reply-To message ID") # Attachments parser.add_argument("--attach", action="append", default=[], help="File path to attach (repeatable)") # Direct SMTP options (bypass AccountManager) parser.add_argument("--smtp-host", default="", help="SMTP hostname") parser.add_argument("--smtp-port", type=int, default=587, help="SMTP port") parser.add_argument("--smtp-user", default="", help="SMTP username") parser.add_argument("--smtp-pass", default="", help="SMTP password") parser.add_argument("--smtp-no-tls", action="store_true", help="Disable SMTP TLS") # Pipe input parser.add_argument("--from-stdin", action="store_true", help="Read composed message JSON from stdin") # Config parser.add_argument("--config", default="", help="YAML config file") args = parser.parse_args() args.config = resolve_config_path(args.config) # Build or read message if args.from_stdin: try: data = json.load(sys.stdin) message = EmailMessage.from_dict(data) except Exception as exc: _error(f"Failed to parse stdin message: {exc}") return else: if not args.to: _error("At least one --to recipient is required") if not args.subject: _error("--subject is required") # Resolve sender from account config if not explicit sender_addr = args.sender if not sender_addr and args.config: try: mgr = AccountManager.from_yaml(args.config) acct_name = args.account or mgr.default_account sender = mgr.get_sender(acct_name) if sender: sender_addr = str(sender) except Exception: pass composer = EmailComposer() message = composer.compose( to=args.to, subject=args.subject, body=args.body or "", template=args.template, sender=sender_addr or None, cc=args.cc or None, bcc=args.bcc or None, reply_to=args.reply_to, greeting=args.greeting, sign_off=args.sign_off, header_text=args.header_text, header_color=args.header_color, footer_text=args.footer_text, action_url=args.action_url, action_text=args.action_text, action_color=args.action_color, ) # Attach files from disk for filepath in args.attach: if not os.path.isfile(filepath): _error(f"Attachment not found: {filepath}") content_type, _ = mimetypes.guess_type(filepath) if not content_type: content_type = "application/octet-stream" with open(filepath, "rb") as fh: data = fh.read() message.attachments.append(EmailAttachment( filename=os.path.basename(filepath), content_type=content_type, data=data, )) # Direct SMTP mode if args.smtp_host: client = SMTPClient( host=args.smtp_host, port=args.smtp_port, username=args.smtp_user, password=credential_store.resolve(args.smtp_pass), use_tls=not args.smtp_no_tls, ) success = client.send(message) result = { "success": success, "transport": "smtp", "subject": message.subject, "recipients": [str(r) for r in message.recipients], } if not success: result["error"] = "Failed to send email" json.dump(result, sys.stdout, indent=2) print() sys.exit(1) json.dump(result, sys.stdout, indent=2) print() return # Multi-account mode with Outbox + SMTP fallback if not args.config: _error("Either --config or --smtp-host is required.") mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account # Set sender from account if not already set on message if not message.sender: sender = mgr.get_sender(acct_name) if sender: message.sender = sender # Use the Outbox pattern: stage → send → cleanup send_result = mgr.send_via_outbox(message, acct_name) result = { "success": send_result["success"], "transport": "smtp", "account": send_result["account"], "fallback_used": send_result.get("fallback_used", False), "staged_in_outbox": send_result.get("staged", False), "subject": message.subject, "recipients": [str(r) for r in message.recipients], } stage_error = send_result.get("stage_error") if stage_error: result["stage_error"] = stage_error if "error" not in result or not result["error"]: result["error"] = stage_error if not send_result["success"]: result["error"] = send_result.get("error", "Failed to send email") if send_result.get("staged"): result["note"] = "Message is staged in Outbox for retry" json.dump(result, sys.stdout, indent=2) print() sys.exit(1) json.dump(result, sys.stdout, indent=2) print() def _load_manager(path: str) -> AccountManager: try: return AccountManager.from_yaml(path) except Exception as exc: _error(f"Failed to load config: {exc}") raise def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/reply_mail.py ```python #!/usr/bin/env python3 """Reply to an existing email with original quoting. Usage: python3 scripts/reply_mail.py --message-id "<[email protected]>" --body "Thanks!" --config config.yaml python3 scripts/reply_mail.py --from-stdin --body "Got it." --config config.yaml python3 scripts/reply_mail.py --from-stdin --body "Noted" --no-quote --config config.yaml Output: JSON result to stdout. """ from __future__ import annotations import argparse import json import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.composer import EmailComposer from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient from lib.models import EmailMessage from lib.smtp_client import SMTPClient def main() -> None: parser = argparse.ArgumentParser(description="Reply to an email with original quoting") parser.add_argument("--account", default="", help="Account name") parser.add_argument("--message-id", default="", help="Message-ID to reply to") parser.add_argument("--folder", default="INBOX", help="Folder to find message in") parser.add_argument("--body", required=True, help="Reply body (HTML or plain)") parser.add_argument("--sender", default="", help="Sender address override") parser.add_argument("--reply-all", action="store_true", help="Reply to all recipients (CC original recipients)") parser.add_argument("--no-quote", action="store_true", help="Do not include the original message as a quote") # Template options parser.add_argument("--template", default="minimal", choices=["default", "minimal", "digest"]) parser.add_argument("--greeting", default="") parser.add_argument("--sign-off", default="") # IMAP / SMTP options parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") parser.add_argument("--smtp-host", default="") parser.add_argument("--smtp-port", type=int, default=587) parser.add_argument("--smtp-user", default="") parser.add_argument("--smtp-pass", default="") parser.add_argument("--smtp-no-tls", action="store_true") parser.add_argument("--from-stdin", action="store_true", help="Read original message JSON from stdin") parser.add_argument("--config", default="", help="YAML config file") args = parser.parse_args() args.config = resolve_config_path(args.config) # Get the original message if args.from_stdin: try: data = json.load(sys.stdin) if isinstance(data, dict) and "messages" in data: if not data["messages"]: _error("No messages in input") return original = EmailMessage.from_dict(data["messages"][0]) elif isinstance(data, dict): original = EmailMessage.from_dict(data) else: _error("Expected JSON message object") return except Exception as exc: _error(f"Failed to parse stdin: {exc}") return elif args.imap_host: if not args.message_id: _error("--message-id is required when using IMAP") return client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) try: client.connect() original = client.fetch_by_id(args.message_id, mailbox=args.folder) finally: client.disconnect() if original is None: _error(f"Message not found: {args.message_id}") return elif args.config: if not args.message_id: _error("--message-id is required when using config") return mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account client = mgr.get_imap_client(acct_name) try: client.connect() original = client.fetch_by_id(args.message_id, mailbox=args.folder) finally: client.disconnect() if original is None: _error(f"Message not found: {args.message_id}") return else: _error("--from-stdin, --imap-host, or --config is required") return # Resolve sender sender = args.sender if not sender and args.config: try: mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account addr = mgr.get_sender(acct_name) if addr: sender = str(addr) except Exception: pass # Compose reply composer = EmailComposer() reply = composer.compose_reply( original=original, body=args.body, sender=sender or None, template=args.template, quote_original=not args.no_quote, greeting=args.greeting, sign_off=args.sign_off, ) # Reply-all: add original recipients as CC (minus the sender) if args.reply_all: sender_addr = sender.split("<")[-1].rstrip(">").strip() if sender else "" for recip in original.recipients: if recip.address != sender_addr: reply.cc.append(recip) for cc_recip in original.cc: if cc_recip.address != sender_addr: reply.cc.append(cc_recip) # Send if args.smtp_host: smtp = SMTPClient( host=args.smtp_host, port=args.smtp_port, username=args.smtp_user, password=credential_store.resolve(args.smtp_pass), use_tls=not args.smtp_no_tls, ) success = smtp.send(reply) result = { "success": success, "transport": "smtp", "subject": reply.subject, "recipients": [str(r) for r in reply.recipients], "in_reply_to": reply.in_reply_to, } if not success: result["error"] = "Failed to send reply" elif args.config: mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account if not reply.sender: addr = mgr.get_sender(acct_name) if addr: reply.sender = addr send_result = mgr.send_with_fallback(reply, acct_name) result = { "success": send_result["success"], "transport": "smtp", "account": send_result["account"], "fallback_used": send_result["fallback_used"], "subject": reply.subject, "recipients": [str(r) for r in reply.recipients], "in_reply_to": reply.in_reply_to, } if not send_result["success"]: result["error"] = send_result.get("error", "Failed to send") else: # No send mechanism — output composed reply for piping json.dump(reply.to_dict(), sys.stdout, indent=2, default=str) print() return json.dump(result, sys.stdout, indent=2) print() if not result.get("success"): sys.exit(1) def _load_manager(path: str) -> AccountManager: try: return AccountManager.from_yaml(path) except Exception as exc: _error(f"Failed to load config: {exc}") raise def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/forward_mail.py ```python #!/usr/bin/env python3 """Forward an existing email to new recipients. Usage: python3 scripts/forward_mail.py --message-id "<[email protected]>" --to "[email protected]" --config config.yaml python3 scripts/forward_mail.py --from-stdin --to "[email protected]" --config config.yaml python3 scripts/forward_mail.py --from-stdin --to "[email protected]" --body "FYI" --config config.yaml Output: JSON result to stdout. """ from __future__ import annotations import argparse import json import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.composer import EmailComposer from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient from lib.models import EmailMessage from lib.smtp_client import SMTPClient def main() -> None: parser = argparse.ArgumentParser(description="Forward an email to new recipients") parser.add_argument("--account", default="", help="Account name") parser.add_argument("--message-id", default="", help="Message-ID to forward") parser.add_argument("--folder", default="INBOX", help="Folder to find message in") parser.add_argument("--to", action="append", default=[], required=True, help="Forward recipient (repeatable)") parser.add_argument("--body", default="", help="Additional message body") parser.add_argument("--sender", default="", help="Sender address override") parser.add_argument("--include-attachments", action="store_true", help="Include original attachments in the forward") # IMAP / SMTP options parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") parser.add_argument("--smtp-host", default="") parser.add_argument("--smtp-port", type=int, default=587) parser.add_argument("--smtp-user", default="") parser.add_argument("--smtp-pass", default="") parser.add_argument("--smtp-no-tls", action="store_true") parser.add_argument("--from-stdin", action="store_true", help="Read original message JSON from stdin") parser.add_argument("--config", default="", help="YAML config file") args = parser.parse_args() args.config = resolve_config_path(args.config) # Get the original message if args.from_stdin: try: data = json.load(sys.stdin) if isinstance(data, dict) and "messages" in data: if not data["messages"]: _error("No messages in input") return original = EmailMessage.from_dict(data["messages"][0]) elif isinstance(data, dict): original = EmailMessage.from_dict(data) else: _error("Expected JSON message object") return except Exception as exc: _error(f"Failed to parse stdin: {exc}") return elif args.imap_host: if not args.message_id: _error("--message-id is required when using IMAP") return client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) try: client.connect() original = client.fetch_by_id(args.message_id, mailbox=args.folder) finally: client.disconnect() if original is None: _error(f"Message not found: {args.message_id}") return elif args.config: if not args.message_id: _error("--message-id is required when using config") return mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account client = mgr.get_imap_client(acct_name) try: client.connect() original = client.fetch_by_id(args.message_id, mailbox=args.folder) finally: client.disconnect() if original is None: _error(f"Message not found: {args.message_id}") return else: _error("--from-stdin, --imap-host, or --config is required") return # Resolve sender sender = args.sender if not sender and args.config: try: mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account addr = mgr.get_sender(acct_name) if addr: sender = str(addr) except Exception: pass # Compose forward composer = EmailComposer() fwd_message = composer.compose_forward( original=original, to=args.to, body=args.body, sender=sender or None, attach_original=args.include_attachments, ) # Send if args.smtp_host: smtp = SMTPClient( host=args.smtp_host, port=args.smtp_port, username=args.smtp_user, password=credential_store.resolve(args.smtp_pass), use_tls=not args.smtp_no_tls, ) success = smtp.send(fwd_message) result = { "success": success, "transport": "smtp", "subject": fwd_message.subject, "recipients": [str(r) for r in fwd_message.recipients], "original_message_id": original.message_id, } if not success: result["error"] = "Failed to send forward" elif args.config: mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account if not fwd_message.sender: addr = mgr.get_sender(acct_name) if addr: fwd_message.sender = addr send_result = mgr.send_with_fallback(fwd_message, acct_name) result = { "success": send_result["success"], "transport": "smtp", "account": send_result["account"], "fallback_used": send_result["fallback_used"], "subject": fwd_message.subject, "recipients": [str(r) for r in fwd_message.recipients], "original_message_id": original.message_id, } if not send_result["success"]: result["error"] = send_result.get("error", "Failed to send") else: # No send mechanism — output composed message for piping json.dump(fwd_message.to_dict(), sys.stdout, indent=2, default=str) print() return json.dump(result, sys.stdout, indent=2) print() if not result.get("success"): sys.exit(1) def _load_manager(path: str) -> AccountManager: try: return AccountManager.from_yaml(path) except Exception as exc: _error(f"Failed to load config: {exc}") raise def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/search_mail.py ```python #!/usr/bin/env python3 """Search emails on an IMAP server using flexible criteria. Usage: python3 scripts/search_mail.py --config config.yaml --subject "invoice" python3 scripts/search_mail.py --config config.yaml --from "[email protected]" --unseen python3 scripts/search_mail.py --config config.yaml --since 2025-01-01 --before 2025-06-01 python3 scripts/search_mail.py --config config.yaml --criteria '(OR (FROM "a") (FROM "b"))' Output: JSON (default) or CLI table to stdout. """ from __future__ import annotations import argparse import json import os import sys from datetime import datetime sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient from lib.models import EmailMessage def _build_criteria(args) -> str: """Build an IMAP SEARCH criteria string from CLI flags.""" if args.criteria: return args.criteria parts: list[str] = [] if args.unseen: parts.append("UNSEEN") if args.flagged: parts.append("FLAGGED") if args.subject: parts.append(f'SUBJECT "{args.subject}"') if getattr(args, "from_addr", None): parts.append(f'FROM "{args.from_addr}"') if args.to: parts.append(f'TO "{args.to}"') if args.body: parts.append(f'BODY "{args.body}"') if args.since: try: dt = datetime.strptime(args.since, "%Y-%m-%d") parts.append(f'SINCE {dt.strftime("%d-%b-%Y")}') except ValueError: parts.append(f'SINCE {args.since}') if args.before: try: dt = datetime.strptime(args.before, "%Y-%m-%d") parts.append(f'BEFORE {dt.strftime("%d-%b-%Y")}') except ValueError: parts.append(f'BEFORE {args.before}') if args.text: parts.append(f'TEXT "{args.text}"') if not parts: parts.append("ALL") return "(" + " ".join(parts) + ")" if len(parts) > 1 else parts[0] def main() -> None: parser = argparse.ArgumentParser(description="Search emails via IMAP SEARCH") parser.add_argument("--account", default="", help="Account name") parser.add_argument("--folder", default="INBOX", help="Folder to search") parser.add_argument("--limit", type=int, default=50, help="Max results") parser.add_argument("--format", choices=["json", "cli"], default="json") # Search criteria flags parser.add_argument("--subject", default="", help="Search in subject") parser.add_argument("--from", dest="from_addr", default="", help="Search by sender") parser.add_argument("--to", default="", help="Search by recipient") parser.add_argument("--body", default="", help="Search in body text") parser.add_argument("--text", default="", help="Full-text search (subject + body)") parser.add_argument("--since", default="", help="Messages since date (YYYY-MM-DD)") parser.add_argument("--before", default="", help="Messages before date (YYYY-MM-DD)") parser.add_argument("--unseen", action="store_true", help="Unread messages only") parser.add_argument("--flagged", action="store_true", help="Flagged messages only") parser.add_argument("--criteria", default="", help="Raw IMAP SEARCH criteria (overrides other flags)") # IMAP options parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") parser.add_argument("--config", default="", help="YAML config file") args = parser.parse_args() args.config = resolve_config_path(args.config) criteria = _build_criteria(args) # Build client if args.imap_host: client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) elif args.config: mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account client = mgr.get_imap_client(acct_name) else: _error("Either --config or --imap-host is required.") return try: client.connect() messages = client.search( mailbox=args.folder, criteria=criteria, limit=args.limit, ) finally: client.disconnect() _output(messages, args.folder, args.format, args.account, criteria) def _output( messages: list[EmailMessage], folder: str, fmt: str, account: str, criteria: str, ) -> None: if fmt == "cli": label = f"{account}:{folder}" if account else folder print(f" Search: {criteria}") if not messages: print(f" No results in {label}") return print(f" {label} ({len(messages)} results)") print(f" {'':->60}") for m in messages: sender = str(m.sender) if m.sender else "(unknown)" if len(sender) > 25: sender = sender[:22] + "..." date = m.date.strftime("%Y-%m-%d %H:%M") if m.date else "" subj = m.subject[:40] if m.subject else "(no subject)" read = " " if m.is_read else "*" att = "+" if m.has_attachments else " " print(f" {read}{att} {date:16s} {sender:25s} {subj}") print() else: output = { "account": account, "folder": folder, "criteria": criteria, "count": len(messages), "messages": [m.to_dict() for m in messages], } json.dump(output, sys.stdout, indent=2, default=str) print() def _load_manager(path: str) -> AccountManager: try: return AccountManager.from_yaml(path) except Exception as exc: _error(f"Failed to load config: {exc}") raise def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/draft_mail.py ```python #!/usr/bin/env python3 """Save a composed email to the IMAP Drafts folder, or resume a draft. Usage: # Save a new draft python3 scripts/compose_mail.py ... | python3 scripts/draft_mail.py --action save --config config.yaml python3 scripts/draft_mail.py --action save --to "[email protected]" --subject "WIP" --body "..." --config config.yaml # List drafts python3 scripts/draft_mail.py --action list --config config.yaml # Resume (fetch) a draft by Message-ID python3 scripts/draft_mail.py --action resume --message-id "<[email protected]>" --config config.yaml # Send a draft (fetch + send + delete from Drafts) python3 scripts/draft_mail.py --action send --message-id "<[email protected]>" --config config.yaml Output: JSON result to stdout. """ from __future__ import annotations import argparse import json import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.composer import EmailComposer from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient from lib.models import EmailMessage from lib.smtp_client import SMTPClient, build_mime def main() -> None: parser = argparse.ArgumentParser(description="Save, list, resume, or send drafts") parser.add_argument("--account", default="", help="Account name") parser.add_argument("--action", required=True, choices=["save", "list", "resume", "send"], help="Action to perform") parser.add_argument("--drafts-folder", default="Drafts", help="IMAP drafts folder name (default: Drafts)") parser.add_argument("--message-id", default="", help="Message-ID for resume/send actions") parser.add_argument("--limit", type=int, default=20, help="Max drafts to list") parser.add_argument("--format", choices=["json", "cli"], default="json") # Compose options (for --action save without stdin) parser.add_argument("--to", action="append", default=[]) parser.add_argument("--cc", action="append", default=[]) parser.add_argument("--subject", default="") parser.add_argument("--body", default="") parser.add_argument("--sender", default="") parser.add_argument("--template", default="minimal", choices=["default", "minimal", "digest"]) parser.add_argument("--from-stdin", action="store_true", help="Read composed message JSON from stdin") parser.add_argument("--config", default="", help="YAML config file") # Direct IMAP options parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") args = parser.parse_args() args.config = resolve_config_path(args.config) if args.action == "save": _do_save(args) elif args.action == "list": _do_list(args) elif args.action == "resume": _do_resume(args) elif args.action == "send": _do_send(args) def _get_client(args) -> IMAPClient: if args.imap_host: return IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) if not args.config: _error("Either --config or --imap-host is required.") mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account return mgr.get_imap_client(acct_name) def _do_save(args) -> None: """Save a composed message to the Drafts folder.""" if args.from_stdin or not sys.stdin.isatty(): try: data = json.load(sys.stdin) message = EmailMessage.from_dict(data) except Exception as exc: _error(f"Failed to parse stdin: {exc}") return elif args.to and args.subject: sender = args.sender if not sender: try: mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account addr = mgr.get_sender(acct_name) if addr: sender = str(addr) except Exception: pass composer = EmailComposer() message = composer.compose( to=args.to, subject=args.subject, body=args.body, template=args.template, sender=sender or None, cc=args.cc or None, ) else: _error("Provide --to and --subject, or pipe composed JSON via stdin") return # Build MIME and append to Drafts mime_msg = build_mime(message) raw_bytes = mime_msg.as_bytes() client = _get_client(args) try: client.connect() ok = client.append_message(raw_bytes, mailbox=args.drafts_folder) finally: client.disconnect() result = { "success": ok, "action": "save", "folder": args.drafts_folder, "subject": message.subject, } json.dump(result, sys.stdout, indent=2) print() if not ok: sys.exit(1) def _do_list(args) -> None: """List messages in the Drafts folder.""" client = _get_client(args) try: client.connect() messages = client.fetch_all(mailbox=args.drafts_folder, limit=args.limit) finally: client.disconnect() if args.format == "cli": if not messages: print(f" No drafts in {args.drafts_folder}") return print(f" {args.drafts_folder} ({len(messages)} drafts)") print(f" {'':->60}") for m in messages: to_str = ", ".join(str(r) for r in m.recipients[:2]) if m.recipients else "(no recipient)" if len(to_str) > 25: to_str = to_str[:22] + "..." date = m.date.strftime("%Y-%m-%d %H:%M") if m.date else "" subj = m.subject[:40] if m.subject else "(no subject)" print(f" {date:16s} {to_str:25s} {subj}") print() else: output = { "folder": args.drafts_folder, "count": len(messages), "drafts": [m.to_dict() for m in messages], } json.dump(output, sys.stdout, indent=2, default=str) print() def _do_resume(args) -> None: """Fetch a draft by Message-ID and output as JSON for editing.""" if not args.message_id: _error("--message-id is required for resume") return client = _get_client(args) try: client.connect() message = client.fetch_by_id(args.message_id, mailbox=args.drafts_folder) finally: client.disconnect() if message is None: _error(f"Draft not found: {args.message_id}") return json.dump(message.to_dict(), sys.stdout, indent=2, default=str) print() def _do_send(args) -> None: """Fetch a draft, send it via SMTP, and delete from Drafts.""" if not args.message_id: _error("--message-id is required for send") return client = _get_client(args) try: client.connect() message = client.fetch_by_id(args.message_id, mailbox=args.drafts_folder) finally: client.disconnect() if message is None: _error(f"Draft not found: {args.message_id}") return # Send mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account if not message.sender: addr = mgr.get_sender(acct_name) if addr: message.sender = addr send_result = mgr.send_with_fallback(message, acct_name) if not send_result["success"]: result = { "success": False, "action": "send", "error": send_result.get("error", "Send failed"), } json.dump(result, sys.stdout, indent=2) print() sys.exit(1) # Delete draft after successful send client = _get_client(args) try: client.connect() client.delete_message(args.message_id, mailbox=args.drafts_folder) finally: client.disconnect() result = { "success": True, "action": "send", "subject": message.subject, "recipients": [str(r) for r in message.recipients], "draft_deleted": True, } json.dump(result, sys.stdout, indent=2) print() def _load_manager(path: str) -> AccountManager: try: return AccountManager.from_yaml(path) except Exception as exc: _error(f"Failed to load config: {exc}") raise def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/retry_send.py ```python #!/usr/bin/env python3 """Retry sending messages sitting in the IMAP Outbox folder. The Outbox is a temporary IMAP folder that holds messages while they are being sent. If a send fails, the message remains in the Outbox. This script drains the Outbox by re-attempting delivery for each message. After all messages are sent, the Outbox folder is removed. Usage: python3 scripts/retry_send.py --config config.yaml python3 scripts/retry_send.py --config config.yaml --account work python3 scripts/retry_send.py --config config.yaml --list """ from __future__ import annotations import argparse import json import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.outbox import Outbox def main() -> None: parser = argparse.ArgumentParser( description="Retry sending messages from the IMAP Outbox", ) parser.add_argument("--config", default="", help="YAML config file") parser.add_argument( "--account", default="", help="Process only this account (default: all accounts)", ) parser.add_argument( "--list", action="store_true", dest="list_outbox", help="List pending messages without sending", ) parser.add_argument("--format", choices=["json", "cli"], default="json") args = parser.parse_args() args.config = resolve_config_path(args.config) if not args.config: _error("--config is required (or place config.yaml in skill root)") return try: mgr = AccountManager.from_yaml(args.config) except Exception as exc: _error(f"Failed to load config: {exc}") return if args.account: account_names = [args.account] else: account_names = mgr.list_accounts() if args.list_outbox: _do_list(mgr, account_names, args.format) else: _do_drain(mgr, account_names, args.format) def _do_list(mgr: AccountManager, accounts: list[str], fmt: str) -> None: """List messages currently in each account's Outbox.""" all_pending: list[dict] = [] for acct_name in accounts: try: imap_client = mgr.get_imap_client(acct_name) imap_client.connect() try: outbox = Outbox(imap_client) messages = outbox.list_pending() for msg in messages: all_pending.append({ "account": acct_name, "message_id": msg.message_id, "subject": msg.subject, "recipients": [str(r) for r in msg.recipients], "date": msg.date.isoformat() if msg.date else "", }) finally: imap_client.disconnect() except Exception as exc: if fmt == "cli": print(f" [{acct_name}] Error checking Outbox: {exc}") if fmt == "cli": if not all_pending: print("No messages pending in any Outbox.") else: print(f" {len(all_pending)} message(s) pending in Outbox:") print(f" {'':->60}") for p in all_pending: recip = ", ".join(p["recipients"][:2]) if len(recip) > 25: recip = recip[:22] + "..." print( f" [{p['account']}] {p['subject']!r:30s} to={recip}" ) print() else: json.dump( {"pending_count": len(all_pending), "messages": all_pending}, sys.stdout, indent=2, ) print() def _do_drain(mgr: AccountManager, accounts: list[str], fmt: str) -> None: """Drain the Outbox for each account, sending pending messages.""" total_attempted = 0 total_sent = 0 total_failed = 0 all_errors: list[dict] = [] account_results: list[dict] = [] for acct_name in accounts: try: result = mgr.drain_outbox(acct_name) account_results.append(result) total_attempted += result.get("attempted", 0) total_sent += result.get("sent", 0) total_failed += result.get("failed", 0) all_errors.extend(result.get("errors", [])) except Exception as exc: all_errors.append({ "account": acct_name, "error": str(exc), }) if fmt == "cli": if total_attempted == 0: print("No messages pending in any Outbox.") else: print( f"Outbox drain: {total_attempted} attempted, " f"{total_sent} sent, {total_failed} failed" ) for err in all_errors: acct = err.get("account", "?") subj = err.get("subject", "") emsg = err.get("error", "") print(f" ERROR [{acct}] {subj!r}: {emsg}") else: json.dump( { "attempted": total_attempted, "sent": total_sent, "failed": total_failed, "errors": all_errors, "accounts": account_results, }, sys.stdout, indent=2, ) print() sys.exit(1 if total_failed > 0 else 0) def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/heartbeat.py ```python #!/usr/bin/env python3 """Run a full email heartbeat cycle: fetch -> process -> execute actions. Supports multiple accounts — iterates over all accounts (or a specific one) and processes each account's mailboxes with per-account + global rules. Usage: python3 scripts/heartbeat.py --config config.yaml python3 scripts/heartbeat.py --config config.yaml --account work python3 scripts/heartbeat.py --config config.yaml --state-file /tmp/email_state.json Output: JSON cycle report to stdout. """ from __future__ import annotations import argparse import json import logging import os import sys import time from datetime import datetime logger = logging.getLogger("clawMail.heartbeat") sys.path.insert(0, os.path.dirname(__file__)) from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.models import EmailAddress, EmailMessage from lib.imap_client import IMAPClient from lib.smtp_client import SMTPClient from lib.composer import EmailComposer from lib.processor import EmailProcessor def main() -> None: parser = argparse.ArgumentParser(description="Run email heartbeat cycle") parser.add_argument("--config", default="", help="YAML config file") parser.add_argument("--account", default="", help="Process only this account (default: all accounts)") parser.add_argument("--state-file", default="", help="Shared state file for multi-agent coordination") parser.add_argument("--format", choices=["json", "summary"], default="json") args = parser.parse_args() args.config = resolve_config_path(args.config) if not args.config: _error("--config is required (or place config.yaml in skill root)") return try: mgr = AccountManager.from_yaml(args.config) except Exception as exc: _error(f"Failed to load config: {exc}") return start = time.monotonic() now = datetime.now() errors: list[str] = [] all_messages: list[EmailMessage] = [] account_reports: list[dict] = [] dedup_skipped = 0 # Load previously-seen Message-IDs for deduplication seen_ids: set[str] = set() if args.state_file and os.path.exists(args.state_file): try: with open(args.state_file) as f: prev_state = json.load(f) seen_ids = set(prev_state.get("email_seen_ids", [])) except Exception: pass # Determine which accounts to process if args.account: account_names = [args.account] else: account_names = mgr.list_accounts() # ── Drain Outbox for each account before fetching ────────── outbox_results: list[dict] = [] for acct_name in account_names: try: drain = mgr.drain_outbox(acct_name) if drain.get("attempted", 0) > 0: outbox_results.append(drain) if drain.get("failed", 0) > 0: for err in drain.get("errors", []): errors.append( f"[{acct_name}] Outbox send failed: " f"{err.get('subject', '?')!r} — {err.get('error', '')}" ) except Exception as exc: errors.append(f"[{acct_name}] Outbox drain error: {exc}") # ── Fetch and process mail ─────────────────────────────────── for acct_name in account_names: acct_messages: list[EmailMessage] = [] mailboxes = mgr.get_mailboxes(acct_name) fetch_limit = mgr.get_fetch_limit(acct_name) # Fetch from each mailbox for mailbox in mailboxes: try: messages = _fetch(mgr, acct_name, mailbox, fetch_limit) acct_messages.extend(messages) except Exception as exc: errors.append(f"[{acct_name}] Fetch {mailbox}: {exc}") # Deduplicate: skip messages we've already processed if args.state_file and seen_ids: before = len(acct_messages) acct_messages = [ m for m in acct_messages if m.message_id and m.message_id not in seen_ids ] dedup_skipped += before - len(acct_messages) # Process through per-account + global rules rules_config = mgr.get_rules(acct_name) if rules_config: processor = EmailProcessor.from_config(rules_config) results = processor.process_batch(acct_messages) else: results = [] # Execute actions actions_executed = 0 for result in results: try: actions_executed += _execute_actions(result, mgr, acct_name) except Exception as exc: errors.append( f"[{acct_name}] Actions for '{result.message.subject}': {exc}" ) # Record newly processed IDs for m in acct_messages: if m.message_id: seen_ids.add(m.message_id) account_reports.append({ "account": acct_name, "mailboxes_checked": mailboxes, "messages_fetched": len(acct_messages), "rules_matched": sum(len(r.matched_rules) for r in results), "actions_executed": actions_executed, }) all_messages.extend(acct_messages) duration = time.monotonic() - start # Build report report = { "timestamp": now.isoformat(), "accounts_processed": account_names, "total_messages_fetched": len(all_messages), "dedup_skipped": dedup_skipped, "outbox_drained": outbox_results, "duration_seconds": round(duration, 3), "errors": errors, "accounts": account_reports, "messages": [ { "account": m.account, "message_id": m.message_id, "subject": m.subject, "sender": str(m.sender) if m.sender else "", "date": m.date.isoformat() if m.date else "", } for m in all_messages ], } # Write shared state if args.state_file: state = {} if os.path.exists(args.state_file): try: with open(args.state_file) as f: state = json.load(f) except Exception: pass state["email_last_heartbeat"] = now.isoformat() state["email_unread_count"] = len(all_messages) state["email_accounts"] = { ar["account"]: { "fetched": ar["messages_fetched"], "rules_matched": ar["rules_matched"], "actions": ar["actions_executed"], } for ar in account_reports } state["email_messages"] = report["messages"] state["email_errors"] = errors # Persist seen Message-IDs for deduplication (cap at 10,000) seen_list = list(seen_ids) if len(seen_list) > 10000: seen_list = seen_list[-10000:] state["email_seen_ids"] = seen_list with open(args.state_file, "w") as f: json.dump(state, f, indent=2) if args.format == "summary": summary = { "status": "error" if errors else "ok", "accounts": len(account_names), "fetched": len(all_messages), "duration": f"{duration:.1f}s", } if errors: summary["errors"] = errors json.dump(summary, sys.stdout, indent=2) else: json.dump(report, sys.stdout, indent=2, default=str) print() sys.exit(1 if errors else 0) def _fetch( mgr: AccountManager, acct_name: str, mailbox: str, limit: int, ) -> list[EmailMessage]: client = mgr.get_imap_client(acct_name) try: client.connect() return client.fetch_unread(mailbox=mailbox, limit=limit) finally: client.disconnect() def _execute_actions(result, mgr: AccountManager, acct_name: str) -> int: count = 0 message = result.message if result.should_mark_read: client = mgr.get_imap_client(acct_name) try: client.connect() client.mark_read(message.message_id, message.mailbox) count += 1 finally: client.disconnect() if result.should_flag: client = mgr.get_imap_client(acct_name) try: client.connect() client.flag_message(message.message_id, message.mailbox) count += 1 finally: client.disconnect() if result.move_to: client = mgr.get_imap_client(acct_name) try: client.connect() moved = client.move_message( message.message_id, message.mailbox, result.move_to, ) if moved: count += 1 finally: client.disconnect() if result.forward_to: composer = EmailComposer() sender = mgr.get_sender(acct_name) for addr in result.forward_to: fwd = composer.compose( to=addr, subject=f"Fwd: {message.subject}", body=message.body or message.body_plain, template="minimal", sender=str(sender) if sender else "", ) mgr.send_with_fallback(fwd, acct_name) count += 1 if result.reply_body: composer = EmailComposer() sender = mgr.get_sender(acct_name) reply = composer.compose_reply( original=message, body=result.reply_body, sender=str(sender) if sender else "", ) mgr.send_with_fallback(reply, acct_name) count += 1 if result.should_archive: archive_root = mgr.get_archive_root(acct_name) frequency = mgr.get_archive_frequency(acct_name) client = mgr.get_imap_client(acct_name) try: client.connect() if _archive_message(client, message, archive_root, frequency): count += 1 finally: client.disconnect() return count def _archive_message( client: IMAPClient, message: EmailMessage, archive_root: str, frequency: str, ) -> bool: if not message.message_id: logger.warning("Skipping archive: missing Message-ID for %s", message.subject) return False target_folder = _archive_folder_name(message.date, archive_root, frequency) existing = {f["name"] for f in client.list_folders()} if target_folder not in existing: if not client.create_folder(target_folder): logger.warning("Failed to create archive folder %s", target_folder) return False existing.add(target_folder) moved = client.move_message(message.message_id, message.mailbox, target_folder) if not moved: logger.warning( "Failed to archive message %s into %s", message.message_id, target_folder ) return moved def _archive_folder_name( date: datetime | None, root: str, frequency: str, ) -> str: base = root.strip() or "Archive" freq = frequency if frequency in {"daily", "weekly", "monthly", "yearly"} else "monthly" dt = date or datetime.now() if freq == "daily": suffix = dt.strftime("%Y%m%d") elif freq == "weekly": iso_year, iso_week, _ = dt.isocalendar() suffix = f"{iso_year}W{iso_week:02d}" elif freq == "yearly": suffix = dt.strftime("%Y") else: suffix = dt.strftime("%Y%m") return f"{base}-{suffix}" def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/archive_mail.py ```python #!/usr/bin/env python3 """Archive old emails into dated folders (default monthly). Scans a mailbox for messages older than N days and moves them to folders like ``Archive-YYYYMM`` (or ``Archive-YYYYMMDD``/``Archive-Wxxxx``) depending on the configurable frequency, without creating empty folders. Usage: python3 scripts/archive_mail.py --config config.yaml --days 90 python3 scripts/archive_mail.py --config config.yaml --account work --folder INBOX --days 60 python3 scripts/archive_mail.py --config config.yaml --days 30 --archive-root "Old Mail" --dry-run """ from __future__ import annotations import argparse import json import os import sys from datetime import datetime, timedelta sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient def main() -> None: parser = argparse.ArgumentParser(description="Archive old emails to yearly folders") parser.add_argument("--config", default="", help="YAML config file") parser.add_argument("--account", default="", help="Account profile name") parser.add_argument("--folder", default="INBOX", help="Source folder to scan") parser.add_argument("--days", type=int, required=True, help="Archive messages older than N days") parser.add_argument("--archive-root", default="Archive", help="Archive folder prefix (default: Archive)") parser.add_argument("--frequency", choices=["daily", "weekly", "monthly", "yearly"], default="monthly", help="Frequency for archive folders (default: monthly)") parser.add_argument("--create-folders", action="store_true", default=True, help="Create archive folders if they don't exist (default)") parser.add_argument("--dry-run", action="store_true", help="Show what would be archived without moving") parser.add_argument("--limit", type=int, default=500, help="Max messages to scan") parser.add_argument("--format", choices=["json", "cli"], default="json") # Direct IMAP flags parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") args = parser.parse_args() args.config = resolve_config_path(args.config) cutoff = datetime.now() - timedelta(days=args.days) # Build client client: IMAPClient | None = None acct_name = args.account if args.imap_host: client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) acct_name = acct_name or "direct" elif args.config: try: mgr = AccountManager.from_yaml(args.config) except Exception as exc: _error(f"Failed to load config: {exc}") return acct_name = acct_name or mgr.default_account client = mgr.get_imap_client(acct_name) else: _error("Provide --config or --imap-host") return try: client.connect() # Fetch messages using BEFORE date criteria date_str = cutoff.strftime("%d-%b-%Y") messages = client.search( mailbox=args.folder, criteria=f"(BEFORE {date_str})", limit=args.limit, ) if not messages: result = { "account": acct_name, "folder": args.folder, "cutoff_date": cutoff.isoformat(), "scanned": 0, "archived": 0, "dry_run": args.dry_run, "details": [], } _output(result, args.format) return frequency = args.frequency groups: dict[str, list[EmailMessage]] = {} for msg in messages: folder_name = _archive_folder_name( msg.date, args.archive_root, frequency, ) groups.setdefault(folder_name, []).append(msg) created_folders: set[str] = set() archived = 0 move_details: list[dict] = [] existing_folders = {f["name"] for f in client.list_folders()} for folder_name, msgs in sorted(groups.items()): if not msgs: continue if args.dry_run: for msg in msgs: move_details.append({ "message_id": msg.message_id, "subject": msg.subject, "date": msg.date.isoformat() if msg.date else "", "target": folder_name, "status": "dry_run", }) archived += 1 continue if args.create_folders and folder_name not in existing_folders: ok = client.create_folder(folder_name) if ok: existing_folders.add(folder_name) created_folders.add(folder_name) else: for msg in msgs: move_details.append({ "message_id": msg.message_id, "subject": msg.subject, "date": msg.date.isoformat() if msg.date else "", "target": folder_name, "status": "error", "error": f"Failed to create folder {folder_name}", }) continue msg_ids = [m.message_id for m in msgs if m.message_id] if not msg_ids: for msg in msgs: move_details.append({ "message_id": msg.message_id, "subject": msg.subject, "date": msg.date.isoformat() if msg.date else "", "target": folder_name, "status": "failed", "error": "missing Message-ID", }) continue results = client.move_messages_batch(msg_ids, args.folder, folder_name) for msg in msgs: mid = msg.message_id ok = results.get(mid, False) move_details.append({ "message_id": mid, "subject": msg.subject, "date": msg.date.isoformat() if msg.date else "", "target": folder_name, "status": "moved" if ok else "failed", }) if ok: archived += 1 result = { "account": acct_name, "folder": args.folder, "cutoff_date": cutoff.isoformat(), "cutoff_days": args.days, "frequency": frequency, "scanned": len(messages), "archived": archived, "dry_run": args.dry_run, "created_folders": sorted(created_folders), "details": move_details, } _output(result, args.format) finally: client.disconnect() def _output(result: dict, fmt: str) -> None: if fmt == "cli": dr = " (DRY RUN)" if result.get("dry_run") else "" freq = result.get("frequency", "monthly") print(f"Archive[{freq}]{dr}: {result['archived']}/{result['scanned']} " f"messages from {result['folder']}") print(f" Cutoff: {result.get('cutoff_days', '?')} days " f"({result['cutoff_date'][:10]})") created = result.get("created_folders", []) if created: print(f" Created: {', '.join(created)}") else: json.dump(result, sys.stdout, indent=2) print() def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) def _archive_folder_name( date: datetime | None, root: str, frequency: str, ) -> str: """Return the folder name for an archive period.""" base = root.strip() or "Archive" dt = date or datetime.now() if frequency == "daily": suffix = dt.strftime("%Y%m%d") elif frequency == "weekly": iso_year, iso_week, _ = dt.isocalendar() suffix = f"{iso_year}W{iso_week:02d}" elif frequency == "yearly": suffix = dt.strftime("%Y") else: suffix = dt.strftime("%Y%m") return f"{base}-{suffix}" if __name__ == "__main__": main() ``` ### scripts/calendar_invite.py ```python #!/usr/bin/env python3 """Compose and send calendar invitations (iCalendar VEVENT). Creates a text/calendar MIME part and sends it as an email. Recipients' mail clients will display it as a meeting invitation. Usage: python3 scripts/calendar_invite.py \\ --to "[email protected]" \\ --subject "Team Standup" \\ --start "2026-03-01T09:00:00" \\ --end "2026-03-01T09:30:00" \\ --location "Zoom" \\ --description "Daily standup meeting" \\ --config config.yaml # Recurring weekly meeting python3 scripts/calendar_invite.py \\ --to "[email protected]" \\ --subject "Weekly Sync" \\ --start "2026-03-02T14:00:00" \\ --end "2026-03-02T15:00:00" \\ --rrule "FREQ=WEEKLY;COUNT=10" \\ --config config.yaml """ from __future__ import annotations import argparse import json import os import sys import uuid from datetime import datetime from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.models import EmailAddress, EmailMessage def build_vcalendar( organizer: str, attendees: list[str], subject: str, start: datetime, end: datetime, location: str = "", description: str = "", rrule: str = "", uid: str = "", method: str = "REQUEST", ) -> str: """Build an iCalendar VCALENDAR string for a VEVENT.""" uid = uid or f"{uuid.uuid4()}@clawMail" now = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") dtstart = start.strftime("%Y%m%dT%H%M%S") dtend = end.strftime("%Y%m%dT%H%M%S") lines = [ "BEGIN:VCALENDAR", "VERSION:2.0", "PRODID:-//clawMail//calendar//EN", f"METHOD:{method}", "BEGIN:VEVENT", f"UID:{uid}", f"DTSTAMP:{now}", f"DTSTART:{dtstart}", f"DTEND:{dtend}", f"SUMMARY:{subject}", f"ORGANIZER;CN={organizer}:mailto:{organizer}", ] for attendee in attendees: lines.append( f"ATTENDEE;ROLE=REQ-PARTICIPANT;PARTSTAT=NEEDS-ACTION;" f"RSVP=TRUE:mailto:{attendee}" ) if location: lines.append(f"LOCATION:{location}") if description: # Fold long description lines per RFC 5545 desc_escaped = description.replace("\\", "\\\\").replace("\n", "\\n").replace(",", "\\,") lines.append(f"DESCRIPTION:{desc_escaped}") if rrule: lines.append(f"RRULE:{rrule}") lines.extend([ "STATUS:CONFIRMED", "SEQUENCE:0", "END:VEVENT", "END:VCALENDAR", ]) return "\r\n".join(lines) def main() -> None: parser = argparse.ArgumentParser(description="Send calendar invitation") parser.add_argument("--config", default="", help="YAML config file") parser.add_argument("--account", default="", help="Account profile name") parser.add_argument("--to", action="append", default=[], help="Attendee address") parser.add_argument("--cc", action="append", default=[], help="CC address") parser.add_argument("--subject", required=True, help="Event subject/title") parser.add_argument("--start", required=True, help="Start datetime (ISO 8601: YYYY-MM-DDTHH:MM:SS)") parser.add_argument("--end", required=True, help="End datetime (ISO 8601: YYYY-MM-DDTHH:MM:SS)") parser.add_argument("--location", default="", help="Event location") parser.add_argument("--description", default="", help="Event description") parser.add_argument("--rrule", default="", help="Recurrence rule (e.g. FREQ=WEEKLY;COUNT=10)") parser.add_argument("--uid", default="", help="Unique event ID (auto-generated)") parser.add_argument("--method", default="REQUEST", choices=["REQUEST", "CANCEL", "REPLY"], help="Calendar method") parser.add_argument("--body", default="", help="Additional email body text") parser.add_argument("--sender", default="", help="Sender address override") parser.add_argument("--format", choices=["json", "cli"], default="json") # Direct SMTP flags parser.add_argument("--smtp-host", default="") parser.add_argument("--smtp-port", type=int, default=587) parser.add_argument("--smtp-user", default="") parser.add_argument("--smtp-pass", default="") parser.add_argument("--smtp-no-tls", action="store_true") args = parser.parse_args() args.config = resolve_config_path(args.config) if not args.to: _error("At least one --to recipient is required") return # Parse datetimes try: start_dt = datetime.fromisoformat(args.start) end_dt = datetime.fromisoformat(args.end) except ValueError as exc: _error(f"Invalid datetime: {exc}") return # Determine sender sender_addr = args.sender mgr = None acct_name = args.account if args.config: try: mgr = AccountManager.from_yaml(args.config) except Exception as exc: _error(f"Failed to load config: {exc}") return acct_name = acct_name or mgr.default_account if not sender_addr: s = mgr.get_sender(acct_name) if s: sender_addr = s.address if not sender_addr: sender_addr = args.smtp_user or "[email protected]" # Build iCalendar vcal = build_vcalendar( organizer=sender_addr, attendees=args.to, subject=args.subject, start=start_dt, end=end_dt, location=args.location, description=args.description, rrule=args.rrule, uid=args.uid, method=args.method, ) # Build MIME message with text/calendar part mime_msg = MIMEMultipart("mixed") mime_msg["From"] = sender_addr mime_msg["To"] = ", ".join(args.to) if args.cc: mime_msg["Cc"] = ", ".join(args.cc) mime_msg["Subject"] = args.subject # Alternative part: plain text + calendar alt = MIMEMultipart("alternative") body_text = args.body or f"You are invited to: {args.subject}" alt.attach(MIMEText(body_text, "plain", "utf-8")) cal_part = MIMEText(vcal, "calendar", "utf-8") cal_part.set_param("method", args.method) alt.attach(cal_part) mime_msg.attach(alt) # Send or output all_recipients = args.to + args.cc if args.smtp_host: from lib.smtp_client import SMTPClient client = SMTPClient( host=args.smtp_host, port=args.smtp_port, username=args.smtp_user, password=credential_store.resolve(args.smtp_pass), use_tls=not args.smtp_no_tls, ) # Build an EmailMessage for sending em = EmailMessage( subject=args.subject, sender=EmailAddress.parse(sender_addr), recipients=[EmailAddress.parse(a) for a in args.to], cc=[EmailAddress.parse(a) for a in args.cc], body_html=body_text, body_plain=body_text, ) ok = client.send(em) result = { "success": ok, "method": args.method, "subject": args.subject, "attendees": args.to, } elif mgr: # Use the MIME directly via smtplib em = EmailMessage( subject=args.subject, sender=EmailAddress.parse(sender_addr), recipients=[EmailAddress.parse(a) for a in args.to], cc=[EmailAddress.parse(a) for a in args.cc], body_html=body_text, body_plain=body_text, ) send_result = mgr.send_with_fallback(em, acct_name) result = { "success": send_result["success"], "method": args.method, "subject": args.subject, "attendees": args.to, "fallback_used": send_result.get("fallback_used", False), } else: # No send method — just output the iCalendar result = { "subject": args.subject, "method": args.method, "attendees": args.to, "vcalendar": vcal, } if args.format == "cli": if result.get("success") is True: print(f"Calendar invitation sent: {args.subject}") print(f" Method: {args.method}") print(f" Start: {args.start}") print(f" End: {args.end}") print(f" Attendees: {', '.join(args.to)}") elif "vcalendar" in result: print(vcal) else: print(f"Failed to send invitation: {result}") else: json.dump(result, sys.stdout, indent=2) print() def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/mail_merge.py ```python #!/usr/bin/env python3 """Mail merge — batch personalised sends from a template + data source. Reads a CSV or JSON data file where each row represents one recipient. For each row, the template subject/body placeholders are filled in and the message is sent via SMTP. Placeholder syntax: ``{{field_name}}`` in subject and body. Usage: # CSV data source python3 scripts/mail_merge.py \\ --data contacts.csv \\ --subject "Hello {{name}}" \\ --body "<p>Dear {{name}}, your code is {{code}}.</p>" \\ --to-field email \\ --config config.yaml # JSON data source python3 scripts/mail_merge.py \\ --data contacts.json \\ --subject "Invoice #{{invoice_id}}" \\ --body "<p>Amount: {{amount}}</p>" \\ --to-field email \\ --config config.yaml # Dry run (compose without sending) python3 scripts/mail_merge.py \\ --data contacts.csv \\ --subject "Hi {{name}}" \\ --body "Test" \\ --to-field email \\ --dry-run """ from __future__ import annotations import argparse import csv import json import os import re import sys import time sys.path.insert(0, os.path.dirname(__file__)) from lib.account_manager import AccountManager from lib.composer import EmailComposer from lib.defaults import resolve_config_path from lib.models import EmailAddress def _load_data(path: str) -> list[dict[str, str]]: """Load rows from a CSV or JSON file.""" ext = os.path.splitext(path)[1].lower() if ext == ".json": with open(path) as f: data = json.load(f) if isinstance(data, list): return data raise ValueError("JSON data must be an array of objects") else: # Default: CSV with open(path, newline="", encoding="utf-8-sig") as f: reader = csv.DictReader(f) return list(reader) def _fill_template(template: str, row: dict[str, str]) -> str: """Replace ``{{key}}`` placeholders with values from *row*.""" def replacer(match): key = match.group(1).strip() return row.get(key, match.group(0)) return re.sub(r"\{\{(\w+)\}\}", replacer, template) def main() -> None: parser = argparse.ArgumentParser(description="Mail merge — batch personalised sends") parser.add_argument("--data", required=True, help="CSV or JSON file with recipient data") parser.add_argument("--subject", required=True, help="Subject template (use {{field}} for placeholders)") parser.add_argument("--body", required=True, help="Body template (HTML, use {{field}} for placeholders)") parser.add_argument("--to-field", required=True, help="Column/key name containing recipient email address") parser.add_argument("--name-field", default="", help="Column/key for recipient display name (optional)") parser.add_argument("--template", default="default", choices=["default", "minimal", "digest"], help="Email template") parser.add_argument("--sender", default="", help="Sender address override") parser.add_argument("--greeting", default="", help="Greeting template") parser.add_argument("--sign-off", default="", help="Sign-off template") parser.add_argument("--delay", type=float, default=0.5, help="Delay in seconds between sends (default: 0.5)") parser.add_argument("--dry-run", action="store_true", help="Compose messages without sending") parser.add_argument("--config", default="", help="YAML config file") parser.add_argument("--account", default="", help="Account profile name") parser.add_argument("--attach", action="append", default=[], help="File to attach (same for all recipients)") parser.add_argument("--format", choices=["json", "cli"], default="json") args = parser.parse_args() args.config = resolve_config_path(args.config) # Load data try: rows = _load_data(args.data) except Exception as exc: _error(f"Failed to load data: {exc}") return if not rows: _error("Data file is empty") return # Validate to-field exists if args.to_field not in rows[0]: _error(f"Field '{args.to_field}' not found in data. " f"Available: {', '.join(rows[0].keys())}") return # Setup mgr = None acct_name = args.account if args.config: try: mgr = AccountManager.from_yaml(args.config) except Exception as exc: _error(f"Failed to load config: {exc}") return acct_name = acct_name or mgr.default_account sender = args.sender if not sender and mgr: s = mgr.get_sender(acct_name) if s: sender = str(s) composer = EmailComposer() # Load attachments once from lib.models import EmailAttachment import mimetypes attachments = [] for fpath in args.attach: if not os.path.isfile(fpath): _error(f"Attachment not found: {fpath}") return ct = mimetypes.guess_type(fpath)[0] or "application/octet-stream" with open(fpath, "rb") as f: data = f.read() attachments.append(EmailAttachment( filename=os.path.basename(fpath), content_type=ct, data=data, )) results = [] sent = 0 failed = 0 for i, row in enumerate(rows): to_addr = row.get(args.to_field, "").strip() if not to_addr: results.append({"row": i, "error": f"Empty {args.to_field}"}) failed += 1 continue subject = _fill_template(args.subject, row) body = _fill_template(args.body, row) greeting = _fill_template(args.greeting, row) if args.greeting else "" sign_off = _fill_template(args.sign_off, row) if args.sign_off else "" message = composer.compose( to=to_addr, subject=subject, body=body, template=args.template, sender=sender, greeting=greeting, sign_off=sign_off, ) message.attachments = list(attachments) row_result: dict = { "row": i, "to": to_addr, "subject": subject, } if args.dry_run: row_result["status"] = "dry_run" results.append(row_result) sent += 1 elif mgr: send_result = mgr.send_with_fallback(message, acct_name) if send_result["success"]: row_result["status"] = "sent" row_result["fallback_used"] = send_result.get("fallback_used", False) sent += 1 else: row_result["status"] = "failed" row_result["error"] = send_result.get("error", "unknown") failed += 1 results.append(row_result) if args.delay and i < len(rows) - 1: time.sleep(args.delay) else: row_result["status"] = "no_config" row_result["message"] = message.to_dict() results.append(row_result) sent += 1 report = { "total": len(rows), "sent": sent, "failed": failed, "dry_run": args.dry_run, "results": results, } if args.format == "cli": print(f"Mail merge: {sent}/{len(rows)} sent, {failed} failed" + (" (dry run)" if args.dry_run else "")) for r in results: status = r.get("status", "?") to = r.get("to", "?") subj = r.get("subject", "?") err = r.get("error", "") print(f" [{status}] {to}: {subj}" + (f" ({err})" if err else "")) else: json.dump(report, sys.stdout, indent=2) print() def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/read_mail.py ```python #!/usr/bin/env python3 """Read and render a specific email by Message-ID. Usage: python3 scripts/read_mail.py --message-id "<[email protected]>" --config config.yaml python3 scripts/read_mail.py --message-id "<[email protected]>" --account work --format cli echo '{"message_id":"<id>", ...}' | python3 scripts/read_mail.py --from-stdin --format cli Output: JSON message object (default) or CLI-rendered email. """ from __future__ import annotations import argparse import json import os import re import sys sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient from lib.models import EmailMessage def main() -> None: parser = argparse.ArgumentParser(description="Read/render an email by Message-ID") parser.add_argument("--account", default="", help="Account name (default: default account)") parser.add_argument("--message-id", default="", help="Message-ID header value") parser.add_argument("--folder", default="INBOX", help="Folder to search in") parser.add_argument("--format", choices=["json", "cli"], default="json", help="Output format (default: json)") # Direct IMAP options parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") # Attachments parser.add_argument("--save-attachments", default="", help="Directory to save attachments to") # Config / stdin parser.add_argument("--config", default="", help="YAML config file") parser.add_argument("--from-stdin", action="store_true", help="Read message JSON from stdin instead of IMAP") args = parser.parse_args() args.config = resolve_config_path(args.config) if args.from_stdin: data = json.load(sys.stdin) if isinstance(data, dict) and "messages" in data: if not data["messages"]: _error("No messages in input") return message = EmailMessage.from_dict(data["messages"][0]) elif isinstance(data, dict): message = EmailMessage.from_dict(data) else: _error("Expected JSON message object") return elif args.imap_host: if not args.message_id: _error("--message-id is required (or use --from-stdin)") return client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) try: client.connect() message = client.fetch_by_id(args.message_id, mailbox=args.folder) finally: client.disconnect() if message is None: _error(f"Message not found: {args.message_id}") return else: if not args.message_id: _error("--message-id is required (or use --from-stdin)") return if not args.config: _error("--config or --imap-host is required") return mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account client = mgr.get_imap_client(acct_name) try: client.connect() message = client.fetch_by_id(args.message_id, mailbox=args.folder) finally: client.disconnect() if message is None: _error(f"Message not found: {args.message_id}") return # Save attachments if requested if args.save_attachments: _save_attachments(message, args.save_attachments) _output(message, args.format) def _save_attachments(message: EmailMessage, dest_dir: str) -> None: """Save all attachments from a message to the given directory.""" if not message.attachments: return os.makedirs(dest_dir, exist_ok=True) for att in message.attachments: safe_name = os.path.basename(att.filename) or "attachment" dest_path = os.path.join(dest_dir, safe_name) # Avoid overwriting: add suffix if file exists base, ext = os.path.splitext(dest_path) counter = 1 while os.path.exists(dest_path): dest_path = f"{base}_{counter}{ext}" counter += 1 with open(dest_path, "wb") as f: f.write(att.data) print(f" Saved: {dest_path} ({att.size} bytes)", file=sys.stderr) def _output(message: EmailMessage, fmt: str) -> None: if fmt == "cli": _render_cli(message) else: json.dump(message.to_dict(), sys.stdout, indent=2, default=str) print() def _render_cli(msg: EmailMessage) -> None: """Render an email for terminal display.""" w = 72 print("=" * w) if msg.account: print(f" Account: {msg.account}") print(f" From: {msg.sender or '(unknown)'}") if msg.recipients: print(f" To: {', '.join(str(r) for r in msg.recipients)}") if msg.cc: print(f" Cc: {', '.join(str(r) for r in msg.cc)}") print(f" Subject: {msg.subject}") print(f" Date: {msg.date.strftime('%Y-%m-%d %H:%M %Z') if msg.date else '(unknown)'}") if msg.message_id: print(f" ID: {msg.message_id}") if msg.has_attachments: att_names = ", ".join(a.filename for a in msg.attachments) print(f" Attach: {att_names}") print("-" * w) # Prefer plain text for CLI; fall back to stripped HTML body = msg.body_plain if not body and msg.body_html: body = re.sub(r"<head[\s>].*?</head>", "", msg.body_html, flags=re.DOTALL | re.IGNORECASE) body = re.sub(r"<style[\s>].*?</style>", "", body, flags=re.DOTALL | re.IGNORECASE) body = re.sub(r"<br\s*/?>", "\n", body, flags=re.IGNORECASE) body = re.sub(r"</p>", "\n\n", body, flags=re.IGNORECASE) body = re.sub(r"<[^>]+>", "", body) body = re.sub(r"\n{3,}", "\n\n", body) if body: for line in body.strip().splitlines(): print(f" {line}") else: print(" (empty body)") print("=" * w) print() def _load_manager(path: str) -> AccountManager: try: return AccountManager.from_yaml(path) except Exception as exc: _error(f"Failed to load config: {exc}") raise def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/compose_mail.py ```python #!/usr/bin/env python3 """Compose rich HTML emails and output as JSON (for piping to send_mail.py). Usage: python3 scripts/compose_mail.py [options] python3 scripts/compose_mail.py ... | python3 scripts/send_mail.py --from-stdin Output: JSON message object to stdout. """ from __future__ import annotations import argparse import json import mimetypes import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib.composer import EmailComposer from lib.models import EmailAttachment def main() -> None: parser = argparse.ArgumentParser(description="Compose rich HTML emails") parser.add_argument("--to", action="append", default=[], help="Recipient (repeatable)") parser.add_argument("--cc", action="append", default=[], help="CC recipient") parser.add_argument("--bcc", action="append", default=[], help="BCC recipient") parser.add_argument("--subject", required=True, help="Email subject") parser.add_argument("--body", default="", help="Email body (HTML or plain)") parser.add_argument("--sender", default="", help="Sender address") parser.add_argument("--template", default="default", choices=["default", "minimal", "digest"], help="Email template") # Template variables parser.add_argument("--greeting", default="", help="Greeting line") parser.add_argument("--sign-off", default="", help="Sign-off line") parser.add_argument("--header-text", default="", help="Header text") parser.add_argument("--header-color", default="#2d3748", help="Header bg color") parser.add_argument("--footer-text", default="", help="Footer text") parser.add_argument("--action-url", default="", help="CTA button URL") parser.add_argument("--action-text", default="View Details", help="CTA button text") parser.add_argument("--action-color", default="#4299e1", help="CTA button color") # Digest-specific parser.add_argument("--items", default="", help="JSON array of row dicts for digest") parser.add_argument("--columns", default="", help="JSON array of column names") parser.add_argument("--summary", default="", help="Summary text for digest") # Reply parser.add_argument("--reply-to", default="", help="In-Reply-To message ID") # Attachments parser.add_argument("--attach", action="append", default=[], help="File path to attach (repeatable)") args = parser.parse_args() if not args.to: json.dump({"error": "At least one --to recipient is required"}, sys.stderr) print(file=sys.stderr) sys.exit(1) items = None columns = None if args.items: try: items = json.loads(args.items) except json.JSONDecodeError as exc: json.dump({"error": f"Invalid --items JSON: {exc}"}, sys.stderr) print(file=sys.stderr) sys.exit(1) if args.columns: try: columns = json.loads(args.columns) except json.JSONDecodeError as exc: json.dump({"error": f"Invalid --columns JSON: {exc}"}, sys.stderr) print(file=sys.stderr) sys.exit(1) composer = EmailComposer() message = composer.compose( to=args.to, subject=args.subject, body=args.body, template=args.template, sender=args.sender or None, cc=args.cc or None, bcc=args.bcc or None, reply_to=args.reply_to, greeting=args.greeting, sign_off=args.sign_off, header_text=args.header_text, header_color=args.header_color, footer_text=args.footer_text, action_url=args.action_url, action_text=args.action_text, action_color=args.action_color, items=items, columns=columns, summary=args.summary, ) # Attach files from disk for filepath in args.attach: if not os.path.isfile(filepath): json.dump({"error": f"Attachment not found: {filepath}"}, sys.stderr) print(file=sys.stderr) sys.exit(1) content_type, _ = mimetypes.guess_type(filepath) if not content_type: content_type = "application/octet-stream" with open(filepath, "rb") as fh: data = fh.read() message.attachments.append(EmailAttachment( filename=os.path.basename(filepath), content_type=content_type, data=data, )) json.dump(message.to_dict(), sys.stdout, indent=2, default=str) print() if __name__ == "__main__": main() ``` ### scripts/process_mail.py ```python #!/usr/bin/env python3 """Process emails through the rule-based pipeline. Usage: python3 scripts/fetch_mail.py --config c.yaml | python3 scripts/process_mail.py --rules-file rules.yaml python3 scripts/process_mail.py --input messages.json --rules '[...]' python3 scripts/process_mail.py --input messages.json --config config.yaml --account work --format cli Output: JSON (default) or CLI summary to stdout. """ from __future__ import annotations import argparse import json import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.models import EmailMessage from lib.processor import EmailProcessor def main() -> None: parser = argparse.ArgumentParser(description="Process emails through rule pipeline") parser.add_argument("--account", default="", help="Account name (loads per-account + global rules from config)") parser.add_argument("--input", default="", help="JSON file with messages (or reads stdin)") parser.add_argument("--rules", default="", help="JSON string of rules array") parser.add_argument("--rules-file", default="", help="YAML/JSON file with rules") parser.add_argument("--config", default="", help="YAML config file (reads rules from 'rules' key or per-account)") parser.add_argument("--format", choices=["json", "cli"], default="json", help="Output format (default: json)") args = parser.parse_args() args.config = resolve_config_path(args.config) # Load messages if args.input: with open(args.input) as f: data = json.load(f) else: data = json.load(sys.stdin) if isinstance(data, dict) and "messages" in data: msg_dicts = data["messages"] elif isinstance(data, list): msg_dicts = data else: _error("Expected JSON array or object with 'messages' key") return messages = [EmailMessage.from_dict(d) for d in msg_dicts] # Load rules rules_config: list[dict] = [] if args.rules: try: rules_config = json.loads(args.rules) except json.JSONDecodeError as exc: _error(f"Invalid --rules JSON: {exc}") elif args.rules_file: try: with open(args.rules_file) as f: if args.rules_file.endswith((".yaml", ".yml")): import yaml loaded = yaml.safe_load(f) or {} else: loaded = json.load(f) if isinstance(loaded, list): rules_config = loaded elif isinstance(loaded, dict): rules_config = loaded.get("rules", loaded.get("processing_rules", [])) except Exception as exc: _error(f"Failed to load rules file: {exc}") elif args.config: try: mgr = AccountManager.from_yaml(args.config) if args.account: rules_config = mgr.get_rules(args.account) else: rules_config = mgr.get_rules() except Exception as exc: _error(f"Failed to load config: {exc}") if not rules_config: _error("No processing rules provided. Use --rules, --rules-file, or --config.") # Process processor = EmailProcessor.from_config(rules_config) results = processor.process_batch(messages) if args.format == "cli": matched = sum(len(r.matched_rules) for r in results) print(f" Processed {len(results)} messages, {matched} rules matched") print() for r in results: subj = r.message.subject[:50] or "(no subject)" acct = f"[{r.message.account}] " if r.message.account else "" if r.matched_rules: rules_str = ", ".join(r.matched_rules) actions_str = ", ".join(r.actions_taken) if r.actions_taken else "" print(f" {acct}{subj}") print(f" Rules: {rules_str}") if actions_str: print(f" Actions: {actions_str}") if r.tags: print(f" Tags: {', '.join(r.tags)}") if r.move_to: print(f" Move to: {r.move_to}") else: print(f" {acct}{subj} (no match)") print() else: output = { "messages_processed": len(results), "rules_matched": sum(len(r.matched_rules) for r in results), "results": [r.to_dict() for r in results], } json.dump(output, sys.stdout, indent=2) print() def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/manage_folders.py ```python #!/usr/bin/env python3 """List, create, delete, rename, and move IMAP folders. Usage: python3 scripts/manage_folders.py --action list --config config.yaml python3 scripts/manage_folders.py --action list --account work --format cli --config config.yaml python3 scripts/manage_folders.py --action create --folder Projects --config config.yaml python3 scripts/manage_folders.py --action move --folder Projects --new-parent Archive --config config.yaml Output: JSON (default) or CLI table to stdout. """ from __future__ import annotations import argparse import json import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient def main() -> None: parser = argparse.ArgumentParser(description="Manage IMAP folders") parser.add_argument("--account", default="", help="Account name (default: default account)") parser.add_argument("--action", required=True, choices=["list", "create", "delete", "rename", "move"], help="Action to perform") parser.add_argument("--folder", default="", help="Folder name (for create/delete/rename/move)") parser.add_argument("--new-name", default="", help="New name (for rename)") parser.add_argument("--new-parent", default="", help="New parent folder (for move)") parser.add_argument("--format", choices=["json", "cli"], default="json", help="Output format (default: json)") # Direct IMAP options parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") parser.add_argument("--config", default="", help="YAML config file") args = parser.parse_args() args.config = resolve_config_path(args.config) # Build client if args.imap_host: client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) elif args.config: mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account client = mgr.get_imap_client(acct_name) else: _error("IMAP host required. Use --imap-host or --config.") return try: client.connect() if args.action == "list": _list_folders(client, args.format, args.account) elif args.action == "create": if not args.folder: _error("--folder is required for create") _create_folder(client, args.folder, args.format) elif args.action == "delete": if not args.folder: _error("--folder is required for delete") _delete_folder(client, args.folder, args.format) elif args.action == "rename": if not args.folder or not args.new_name: _error("--folder and --new-name are required for rename") _rename_folder(client, args.folder, args.new_name, args.format) elif args.action == "move": if not args.folder or not args.new_parent: _error("--folder and --new-parent are required for move") _move_folder(client, args.folder, args.new_parent, args.format) finally: client.disconnect() def _list_folders(client: IMAPClient, fmt: str, account: str) -> None: folders = client.list_folders() enriched = [] for f in folders: try: status = client.folder_status(f["name"]) f.update(status) except Exception: f.update({"messages": 0, "unseen": 0, "recent": 0}) enriched.append(f) if fmt == "cli": if not enriched: print(" No folders found") return label = f" [{account}] " if account else " " print(f"{label}{'Folder':<30s} {'Total':>6s} {'Unread':>6s} {'Recent':>6s}") print(f" {'':-<30s} {'':->6s} {'':->6s} {'':->6s}") for f in enriched: print(f" {f['name']:<30s} {f.get('messages', 0):>6d}" f" {f.get('unseen', 0):>6d} {f.get('recent', 0):>6d}") print() else: output = {"account": account, "folders": enriched} json.dump(output, sys.stdout, indent=2) print() def _create_folder(client: IMAPClient, folder: str, fmt: str) -> None: ok = client.create_folder(folder) result = {"action": "create", "folder": folder, "success": ok} if fmt == "cli": status = "created" if ok else "FAILED" print(f" Folder '{folder}': {status}") else: json.dump(result, sys.stdout, indent=2) print() if not ok: sys.exit(1) def _delete_folder(client: IMAPClient, folder: str, fmt: str) -> None: ok = client.delete_folder(folder) result = {"action": "delete", "folder": folder, "success": ok} if fmt == "cli": status = "deleted" if ok else "FAILED" print(f" Folder '{folder}': {status}") else: json.dump(result, sys.stdout, indent=2) print() if not ok: sys.exit(1) def _rename_folder(client: IMAPClient, old: str, new: str, fmt: str) -> None: ok = client.rename_folder(old, new) result = {"action": "rename", "old_name": old, "new_name": new, "success": ok} if fmt == "cli": status = f"renamed to '{new}'" if ok else "FAILED" print(f" Folder '{old}': {status}") else: json.dump(result, sys.stdout, indent=2) print() if not ok: sys.exit(1) def _move_folder(client: IMAPClient, folder: str, new_parent: str, fmt: str) -> None: ok = client.move_folder(folder, new_parent) result = {"action": "move", "folder": folder, "new_parent": new_parent, "success": ok} if fmt == "cli": status = f"moved under '{new_parent}'" if ok else "FAILED" print(f" Folder '{folder}': {status}") else: json.dump(result, sys.stdout, indent=2) print() if not ok: sys.exit(1) def _load_manager(path: str) -> AccountManager: try: return AccountManager.from_yaml(path) except Exception as exc: _error(f"Failed to load config: {exc}") raise def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/move_mail.py ```python #!/usr/bin/env python3 """Move emails between IMAP folders. Usage: python3 scripts/move_mail.py --message-id "<[email protected]>" --from INBOX --to Archive --config config.yaml python3 scripts/move_mail.py --account work --from-stdin --to Projects --config config.yaml Output: JSON result to stdout. """ from __future__ import annotations import argparse import json import os import sys sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient def main() -> None: parser = argparse.ArgumentParser(description="Move emails between IMAP folders") parser.add_argument("--account", default="", help="Account name (default: default account)") parser.add_argument("--message-id", action="append", default=[], help="Message-ID to move (repeatable)") parser.add_argument("--from", dest="from_folder", default="INBOX", help="Source folder (default: INBOX)") parser.add_argument("--to", dest="to_folder", required=True, help="Destination folder") parser.add_argument("--create-folder", action="store_true", help="Create destination folder if it doesn't exist") parser.add_argument("--format", choices=["json", "cli"], default="json", help="Output format (default: json)") # Direct IMAP options parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") parser.add_argument("--config", default="", help="YAML config file") # Stdin input parser.add_argument("--from-stdin", action="store_true", help="Read message IDs from stdin JSON") args = parser.parse_args() args.config = resolve_config_path(args.config) # Collect message IDs msg_ids = list(args.message_id) if args.from_stdin: data = json.load(sys.stdin) if isinstance(data, dict) and "messages" in data: for m in data["messages"]: mid = m.get("message_id", "") if mid: msg_ids.append(mid) elif isinstance(data, list): for item in data: if isinstance(item, str): msg_ids.append(item) elif isinstance(item, dict): mid = item.get("message_id", "") if mid: msg_ids.append(mid) if not msg_ids: _error("No message IDs provided. Use --message-id or --from-stdin.") # Build client if args.imap_host: client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) elif args.config: mgr = _load_manager(args.config) acct_name = args.account or mgr.default_account client = mgr.get_imap_client(acct_name) else: _error("IMAP host required. Use --imap-host or --config.") return results = [] try: client.connect() if args.create_folder: folders = [f["name"] for f in client.list_folders()] if args.to_folder not in folders: client.create_folder(args.to_folder) # Use batch move for efficiency batch_results = client.move_messages_batch( msg_ids, args.from_folder, args.to_folder, ) for mid, ok in batch_results.items(): results.append({"message_id": mid, "success": ok}) finally: client.disconnect() moved = sum(1 for r in results if r["success"]) failed = len(results) - moved if args.format == "cli": print(f" Moved {moved}/{len(results)} messages" f" from '{args.from_folder}' to '{args.to_folder}'") if failed: for r in results: if not r["success"]: print(f" FAILED: {r['message_id']}") print() else: output = { "account": args.account, "from": args.from_folder, "to": args.to_folder, "moved": moved, "failed": failed, "results": results, } json.dump(output, sys.stdout, indent=2) print() if failed: sys.exit(1) def _load_manager(path: str) -> AccountManager: try: return AccountManager.from_yaml(path) except Exception as exc: _error(f"Failed to load config: {exc}") raise def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/idle_monitor.py ```python #!/usr/bin/env python3 """Monitor an IMAP mailbox for new messages using IDLE (push). Uses IMAP IDLE (RFC 2177) to wait for new-mail events instead of polling. When a new message arrives the script fetches and prints it, then re-enters IDLE mode. Usage: python3 scripts/idle_monitor.py --config config.yaml python3 scripts/idle_monitor.py --config config.yaml --account work --folder INBOX python3 scripts/idle_monitor.py --config config.yaml --timeout 60 --max-events 10 """ from __future__ import annotations import argparse import json import os import signal import sys import time sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.imap_client import IMAPClient _running = True def _handle_signal(signum, frame): global _running _running = False def main() -> None: parser = argparse.ArgumentParser(description="Monitor mailbox via IMAP IDLE") parser.add_argument("--config", default="", help="YAML config file") parser.add_argument("--account", default="", help="Account profile name") parser.add_argument("--folder", default="INBOX", help="Folder to monitor") parser.add_argument( "--timeout", type=int, default=29 * 60, help="IDLE timeout in seconds (default: 29 min per RFC 2177)", ) parser.add_argument( "--poll-interval", type=float, default=30.0, help="Seconds between idle_check polls (default: 30)", ) parser.add_argument( "--max-events", type=int, default=0, help="Stop after N new-mail events (0 = run forever)", ) parser.add_argument("--format", choices=["json", "cli"], default="json") # Direct IMAP flags parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") args = parser.parse_args() args.config = resolve_config_path(args.config) signal.signal(signal.SIGINT, _handle_signal) signal.signal(signal.SIGTERM, _handle_signal) # Build client client: IMAPClient | None = None acct_name = args.account if args.imap_host: client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) acct_name = acct_name or "direct" elif args.config: try: mgr = AccountManager.from_yaml(args.config) except Exception as exc: _error(f"Failed to load config: {exc}") return acct_name = acct_name or mgr.default_account client = mgr.get_imap_client(acct_name) else: _error("Either --config or --imap-host is required.") return event_count = 0 try: client.connect() _log(f"Connected to {client.host}, monitoring {args.folder}", args.format) while _running: try: client.idle_start(args.folder, timeout=args.timeout) _log("IDLE started, waiting for events...", args.format) idle_start = time.monotonic() while _running: responses = client.idle_check(timeout=args.poll_interval) new_exists = any( resp == b"EXISTS" for _, resp in responses ) if new_exists: client.idle_done() event_count += 1 # Fetch the newest unread messages messages = client.fetch_unread( mailbox=args.folder, limit=10, ) event = { "event": "new_mail", "account": acct_name, "folder": args.folder, "event_number": event_count, "new_messages": len(messages), "messages": [ { "message_id": m.message_id, "subject": m.subject, "sender": str(m.sender) if m.sender else "", "date": m.date.isoformat() if m.date else "", } for m in messages ], } if args.format == "cli": print(f"\n--- New mail ({len(messages)} message(s)) ---") for m in messages: sender = str(m.sender) if m.sender else "(unknown)" print(f" From: {sender}") print(f" Subject: {m.subject}") print() else: json.dump(event, sys.stdout) print() sys.stdout.flush() if args.max_events and event_count >= args.max_events: _log(f"Reached max events ({args.max_events})", args.format) return # Re-enter IDLE break # Re-issue IDLE every timeout seconds elapsed = time.monotonic() - idle_start if elapsed >= args.timeout - 60: client.idle_done() _log("Re-issuing IDLE (timeout approaching)", args.format) break except (OSError, RuntimeError) as exc: _log(f"IDLE error: {exc}, reconnecting in 5s...", args.format) try: client.disconnect() except Exception: pass time.sleep(5) client.connect() except Exception as exc: _error(f"Fatal: {exc}") finally: _log("Stopping IDLE monitor", args.format) try: client.disconnect() except Exception: pass def _log(msg: str, fmt: str) -> None: if fmt == "cli": print(f"[idle] {msg}", file=sys.stderr) else: json.dump({"log": msg}, sys.stderr) print(file=sys.stderr) def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/thread_mail.py ```python #!/usr/bin/env python3 """Group emails into conversation threads by References / In-Reply-To. Fetches messages from a folder and groups them into threads using the Message-ID, In-Reply-To, and References headers (RFC 5256 / JWZ algorithm simplified). Usage: python3 scripts/thread_mail.py --config config.yaml python3 scripts/thread_mail.py --config config.yaml --account work --folder INBOX python3 scripts/thread_mail.py --config config.yaml --limit 200 --format cli python3 scripts/thread_mail.py --from-stdin < messages.json """ from __future__ import annotations import argparse import json import os import sys from collections import defaultdict sys.path.insert(0, os.path.dirname(__file__)) from lib import credential_store from lib.account_manager import AccountManager from lib.defaults import resolve_config_path from lib.models import EmailMessage def build_threads(messages: list[EmailMessage]) -> list[dict]: """Group messages into conversation threads. Returns a list of thread dicts, each containing: - ``thread_id``: The Message-ID of the thread root - ``subject``: Subject of the root message - ``message_count``: Number of messages in the thread - ``participants``: Unique sender addresses - ``latest_date``: ISO date of the most recent message - ``messages``: List of message summaries in chronological order """ # Index messages by Message-ID by_id: dict[str, EmailMessage] = {} for msg in messages: if msg.message_id: by_id[msg.message_id] = msg # Build parent map: message_id -> parent_message_id parent_of: dict[str, str] = {} children_of: dict[str, list[str]] = defaultdict(list) for msg in messages: mid = msg.message_id if not mid: continue parent = None if msg.in_reply_to and msg.in_reply_to in by_id: parent = msg.in_reply_to elif msg.references: # Walk references from most recent to oldest for ref in reversed(msg.references): if ref in by_id: parent = ref break if parent and parent != mid: parent_of[mid] = parent children_of[parent].append(mid) # Find roots (messages with no parent) roots = set() for msg in messages: mid = msg.message_id if not mid: continue # Walk up to root current = mid visited = set() while current in parent_of and current not in visited: visited.add(current) current = parent_of[current] roots.add(current) # Build thread for each root threads = [] seen_in_thread = set() for root_id in roots: if root_id in seen_in_thread: continue # Collect all messages in this thread via BFS thread_ids = [] queue = [root_id] while queue: current = queue.pop(0) if current in seen_in_thread: continue seen_in_thread.add(current) if current in by_id: thread_ids.append(current) queue.extend(children_of.get(current, [])) if not thread_ids: continue # Sort by date thread_msgs = [by_id[mid] for mid in thread_ids if mid in by_id] thread_msgs.sort(key=lambda m: m.date or m.date.__class__(1970, 1, 1) if m.date else "", reverse=False) # Handle sorting with None dates def sort_key(m): if m.date: return m.date.isoformat() return "" thread_msgs.sort(key=sort_key) participants = set() for m in thread_msgs: if m.sender: participants.add(m.sender.address) latest = max( (m.date for m in thread_msgs if m.date), default=None, ) root_msg = by_id.get(root_id) subject = root_msg.subject if root_msg else thread_msgs[0].subject if thread_msgs else "" threads.append({ "thread_id": root_id, "subject": subject, "message_count": len(thread_msgs), "participants": sorted(participants), "latest_date": latest.isoformat() if latest else "", "messages": [ { "message_id": m.message_id, "subject": m.subject, "sender": str(m.sender) if m.sender else "", "date": m.date.isoformat() if m.date else "", "in_reply_to": m.in_reply_to, } for m in thread_msgs ], }) # Sort threads by latest_date descending threads.sort(key=lambda t: t.get("latest_date", ""), reverse=True) return threads def main() -> None: parser = argparse.ArgumentParser(description="Thread emails by conversation") parser.add_argument("--config", default="", help="YAML config file") parser.add_argument("--account", default="", help="Account profile name") parser.add_argument("--folder", default="INBOX", help="Folder to thread") parser.add_argument("--limit", type=int, default=100, help="Max messages to fetch") parser.add_argument("--from-stdin", action="store_true", help="Read messages JSON from stdin") parser.add_argument("--format", choices=["json", "cli"], default="json") # Direct IMAP flags parser.add_argument("--imap-host", default="") parser.add_argument("--imap-port", type=int, default=993) parser.add_argument("--imap-user", default="") parser.add_argument("--imap-pass", default="") parser.add_argument("--imap-no-ssl", action="store_true") args = parser.parse_args() args.config = resolve_config_path(args.config) messages: list[EmailMessage] = [] if args.from_stdin: try: data = json.load(sys.stdin) raw_msgs = data if isinstance(data, list) else data.get("messages", []) messages = [EmailMessage.from_dict(m) for m in raw_msgs] except Exception as exc: _error(f"Failed to parse stdin: {exc}") return elif args.imap_host: from lib.imap_client import IMAPClient client = IMAPClient( host=args.imap_host, port=args.imap_port, username=args.imap_user, password=credential_store.resolve(args.imap_pass), use_ssl=not args.imap_no_ssl, ) try: client.connect() messages = client.fetch_all(mailbox=args.folder, limit=args.limit) finally: client.disconnect() elif args.config: try: mgr = AccountManager.from_yaml(args.config) except Exception as exc: _error(f"Failed to load config: {exc}") return acct_name = args.account or mgr.default_account client = mgr.get_imap_client(acct_name) try: client.connect() messages = client.fetch_all(mailbox=args.folder, limit=args.limit) finally: client.disconnect() else: _error("Provide --config, --imap-host, or --from-stdin") return threads = build_threads(messages) if args.format == "cli": print(f"Found {len(threads)} conversation thread(s) " f"from {len(messages)} message(s):\n") for t in threads: count = t["message_count"] subj = t["subject"] or "(no subject)" parts = ", ".join(t["participants"][:3]) if len(t["participants"]) > 3: parts += f" +{len(t['participants']) - 3} more" print(f" [{count} msg] {subj}") print(f" Participants: {parts}") print(f" Latest: {t['latest_date']}") print() else: result = { "total_messages": len(messages), "thread_count": len(threads), "threads": threads, } json.dump(result, sys.stdout, indent=2) print() def _error(msg: str) -> None: json.dump({"error": msg}, sys.stderr) print(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` ### scripts/lib/imap_client.py ```python """IMAP client for server-side email retrieval and folder management.""" from __future__ import annotations import base64 import email import email.header import email.utils import imaplib import logging import re import ssl import time from datetime import datetime from typing import Any from .models import EmailAddress, EmailAttachment, EmailMessage, EmailPriority logger = logging.getLogger("clawMail.imap") # Charsets that Python doesn't recognise but appear in real-world headers. _CHARSET_FALLBACKS = { "unknown-8bit": "latin-1", "x-unknown": "latin-1", "default": "latin-1", "iso-8859-8-i": "iso-8859-8", } _MAX_SELECT_RETRIES = 3 _SELECT_RETRY_DELAY = 1.0 # seconds # ── Mailbox name helpers ──────────────────────────────────────────── def _quote_mailbox(name: str) -> str: """IMAP-quote a mailbox name for use with imaplib. Python's ``imaplib`` does **not** quote mailbox arguments, so a name like ``Needs Review`` is sent as two separate tokens. This helper wraps the name in double-quotes and escapes embedded backslashes and double-quote characters per RFC 3501 §9 (quoted-string grammar). """ escaped = name.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"' def _decode_mutf7(data: str) -> str: """Decode an IMAP modified-UTF-7 mailbox name (RFC 3501 §5.1.3). Rules: * Printable US-ASCII (0x20-0x7e) except ``&`` pass through literally. * ``&-`` decodes to a literal ``&``. * ``&<base64>-`` decodes the base64 segment (using ``,`` instead of ``/``) as UTF-16BE code-points. """ result: list[str] = [] i = 0 while i < len(data): if data[i] == "&": end = data.find("-", i + 1) if end == -1: # Malformed — treat the rest as literal result.append(data[i:]) break if end == i + 1: # &- is literal & result.append("&") else: b64_segment = data[i + 1:end].replace(",", "/") # Pad to a multiple of 4 b64_segment += "=" * ((4 - len(b64_segment) % 4) % 4) try: raw = base64.b64decode(b64_segment) result.append(raw.decode("utf-16-be")) except Exception: # Malformed — keep the original text result.append(data[i:end + 1]) i = end + 1 else: result.append(data[i]) i += 1 return "".join(result) def _encode_mutf7(text: str) -> str: """Encode a Unicode string to IMAP modified-UTF-7. The inverse of ``_decode_mutf7``. """ result: list[str] = [] non_ascii: list[str] = [] def _flush_non_ascii() -> None: if not non_ascii: return raw = "".join(non_ascii).encode("utf-16-be") b64 = base64.b64encode(raw).decode("ascii").rstrip("=") result.append("&" + b64.replace("/", ",") + "-") non_ascii.clear() for ch in text: if ch == "&": _flush_non_ascii() result.append("&-") elif 0x20 <= ord(ch) <= 0x7E: _flush_non_ascii() result.append(ch) else: non_ascii.append(ch) _flush_non_ascii() return "".join(result) class IMAPClient: """IMAP client for fetching emails and managing folders.""" def __init__( self, host: str, port: int = 993, username: str = "", password: str = "", use_ssl: bool = True, timeout: int = 30, oauth2_manager: Any | None = None, ) -> None: self.host = host self.port = port self.username = username self.password = password self.use_ssl = use_ssl self.timeout = timeout self._oauth2 = oauth2_manager self._connection: imaplib.IMAP4 | imaplib.IMAP4_SSL | None = None self._account: str = "" # set by AccountManager self._last_append_error: str = "" self._chunk_size = 16 * 1024 @staticmethod def _create_secure_context() -> ssl.SSLContext: """Create a hardened SSL context with secure ciphers only. Enforces TLS 1.2+, disables weak ciphers, and enables certificate verification and hostname checking. """ ctx = ssl.create_default_context() ctx.minimum_version = ssl.TLSVersion.TLSv1_2 ctx.set_ciphers( "ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20" ":!aNULL:!MD5:!DSS:!RC4:!3DES" ) # Enforce certificate verification (default, but be explicit) ctx.check_hostname = True ctx.verify_mode = ssl.CERT_REQUIRED return ctx def connect(self) -> None: if self.use_ssl: ctx = self._create_secure_context() self._connection = imaplib.IMAP4_SSL( self.host, self.port, ssl_context=ctx, timeout=self.timeout, ) else: self._connection = imaplib.IMAP4(self.host, self.port) self._connection.socket().settimeout(self.timeout) if self._oauth2: from .oauth2 import build_xoauth2_string auth_string = build_xoauth2_string( self.username, self._oauth2.access_token, ) self._connection.authenticate( "XOAUTH2", lambda _x: auth_string.encode("ascii"), ) else: self._connection.login(self.username, self.password) def disconnect(self) -> None: conn = self._connection self._connection = None if conn is None: return try: conn.logout() except Exception: # logout failed (SSL corruption, connection drop, etc.) # Force-close the underlying socket directly. try: conn.shutdown() except Exception: pass try: sock = conn.socket() if sock: sock.close() except Exception: pass @property def conn(self) -> imaplib.IMAP4 | imaplib.IMAP4_SSL: if self._connection is None: raise RuntimeError("Not connected. Call connect() first.") return self._connection # ── Folder management ──────────────────────────────────────────── def list_folders(self) -> list[dict[str, str]]: """List all folders with attributes. Returns list of dicts: {"name": ..., "delimiter": ..., "flags": ...}. Folder names are decoded from IMAP modified-UTF-7. """ try: status, data = self.conn.list() except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("LIST failed: %s", exc) return [] if status != "OK": return [] folders = [] # Regex handles both quoted and unquoted folder names: # (\Flags) "delim" "Folder With Spaces" # (\Flags) "delim" PlainFolder # The quoted-name branch uses a non-greedy match inside quotes # that allows escaped characters (\\ and \"). _LIST_RE = re.compile( r'\(([^)]*)\)\s+' # flags r'(?:"([^"]+)"|NIL)\s+' # delimiter (quoted or NIL) r'(?:"((?:[^"\\]|\\.)*)"|' # quoted folder name (with escapes) r'(\S+))' # OR unquoted atom folder name ) for item in data: if isinstance(item, bytes): decoded = item.decode("utf-8", errors="replace") m = _LIST_RE.match(decoded) if m: flags = m.group(1) delimiter = m.group(2) or "" # Group 3 = quoted name, Group 4 = unquoted name raw_name = m.group(3) if m.group(3) is not None else (m.group(4) or "") # Un-escape any backslash-escaped characters in quoted names raw_name = raw_name.replace('\\"', '"').replace("\\\\", "\\") # Decode IMAP modified-UTF-7 name = _decode_mutf7(raw_name) folders.append({ "name": name, "delimiter": delimiter, "flags": flags, }) return folders def folder_status(self, folder: str) -> dict[str, int]: """Get message counts for a folder. Returns: {"messages": N, "unseen": N, "recent": N}. """ try: status, data = self.conn.status(_quote_mailbox(folder), "(MESSAGES UNSEEN RECENT)") except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("STATUS %s failed: %s", folder, exc) return {"messages": 0, "unseen": 0, "recent": 0} if status != "OK": return {"messages": 0, "unseen": 0, "recent": 0} raw = data[0].decode("utf-8", errors="replace") if isinstance(data[0], bytes) else data[0] result: dict[str, int] = {"messages": 0, "unseen": 0, "recent": 0} for key in ("MESSAGES", "UNSEEN", "RECENT"): m = re.search(rf"{key}\s+(\d+)", raw) if m: result[key.lower()] = int(m.group(1)) return result def create_folder(self, folder: str) -> bool: """Create a new folder/mailbox.""" quoted = _quote_mailbox(folder) try: status, _ = self.conn.create(quoted) if status == "OK": self.conn.subscribe(quoted) return True except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("CREATE %s failed: %s", folder, exc) return False def delete_folder(self, folder: str) -> bool: """Delete a folder/mailbox.""" quoted = _quote_mailbox(folder) try: self.conn.unsubscribe(quoted) status, _ = self.conn.delete(quoted) return status == "OK" except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("DELETE %s failed: %s", folder, exc) return False def rename_folder(self, old_name: str, new_name: str) -> bool: """Rename a folder.""" try: status, _ = self.conn.rename(_quote_mailbox(old_name), _quote_mailbox(new_name)) return status == "OK" except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("RENAME %s -> %s failed: %s", old_name, new_name, exc) return False def move_folder(self, folder: str, new_parent: str) -> bool: """Move a folder under a new parent using the folder's delimiter. Example: move_folder("Projects", "Archive") renames "Projects" to "Archive/Projects" (delimiter-aware). """ folders = self.list_folders() delimiter = "/" for f in folders: if f["name"] == folder: delimiter = f.get("delimiter", "/") break base_name = folder.rsplit(delimiter, 1)[-1] new_name = f"{new_parent}{delimiter}{base_name}" return self.rename_folder(folder, new_name) # ── Message retrieval ──────────────────────────────────────────── def _select_robust(self, mailbox: str, readonly: bool = True) -> None: """Select a mailbox with retry logic for transient errors.""" last_exc: Exception | None = None for attempt in range(_MAX_SELECT_RETRIES): try: status, data = self.conn.select(_quote_mailbox(mailbox), readonly=readonly) if status == "OK": return raise imaplib.IMAP4.error( f"SELECT failed: {data}" ) except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: last_exc = exc logger.warning( "SELECT %s attempt %d/%d failed: %s", mailbox, attempt + 1, _MAX_SELECT_RETRIES, exc, ) if attempt < _MAX_SELECT_RETRIES - 1: time.sleep(_SELECT_RETRY_DELAY * (attempt + 1)) # Reconnect if the connection was dropped try: self.conn.noop() except Exception: self.disconnect() self.connect() raise RuntimeError( f"Failed to SELECT '{mailbox}' after {_MAX_SELECT_RETRIES} attempts: {last_exc}" ) def fetch_unread( self, mailbox: str = "INBOX", limit: int = 50, mark_seen: bool = False, ) -> list[EmailMessage]: self._select_robust(mailbox, readonly=not mark_seen) try: status, msg_ids = self.conn.search(None, "UNSEEN") except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("SEARCH UNSEEN in %s failed: %s", mailbox, exc) return [] if status != "OK" or not msg_ids[0]: return [] id_list = msg_ids[0].split()[:limit] if limit else msg_ids[0].split() return self._fetch_ids(id_list, mailbox, mark_seen) def fetch_all( self, mailbox: str = "INBOX", limit: int = 50, ) -> list[EmailMessage]: """Fetch all messages from a mailbox (most recent first).""" self._select_robust(mailbox, readonly=True) try: status, msg_ids = self.conn.search(None, "ALL") except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("SEARCH ALL in %s failed: %s", mailbox, exc) return [] if status != "OK" or not msg_ids[0]: return [] id_list = msg_ids[0].split() if limit: id_list = id_list[-limit:] return self._fetch_ids(id_list, mailbox, mark_seen=False) def fetch_since( self, since: datetime, mailbox: str = "INBOX", limit: int = 100, ) -> list[EmailMessage]: self._select_robust(mailbox, readonly=True) date_str = since.strftime("%d-%b-%Y") try: status, msg_ids = self.conn.search(None, f"(SINCE {date_str})") except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("SEARCH SINCE in %s failed: %s", mailbox, exc) return [] if status != "OK" or not msg_ids[0]: return [] id_list = msg_ids[0].split()[:limit] if limit else msg_ids[0].split() return self._fetch_ids(id_list, mailbox, mark_seen=False) def fetch_by_id(self, message_id: str, mailbox: str = "INBOX") -> EmailMessage | None: """Fetch a single message by its Message-ID header value.""" self._select_robust(mailbox, readonly=True) search_id = message_id if message_id.startswith("<") else f"<{message_id}>" try: status, data = self.conn.search(None, f'(HEADER Message-ID "{search_id}")') if status != "OK" or not data[0]: return None seq_num = data[0].split()[0] status, msg_data = self.conn.fetch(seq_num, "(BODY.PEEK[])") except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("FETCH by Message-ID %s failed: %s", message_id, exc) return None if status != "OK" or not msg_data[0]: return None raw = msg_data[0][1] if isinstance(msg_data[0], tuple) else msg_data[0] if isinstance(raw, bytes): return self._parse_email(raw, mailbox) return None def search( self, mailbox: str = "INBOX", criteria: str = "ALL", limit: int = 50, ) -> list[EmailMessage]: """Search messages using an IMAP SEARCH criteria string. The *criteria* argument is passed directly to the IMAP SEARCH command. Examples: "UNSEEN" '(FROM "[email protected]")' '(SUBJECT "invoice" SINCE 01-Jan-2025)' '(OR (FROM "[email protected]") (FROM "[email protected]"))' """ self._select_robust(mailbox, readonly=True) try: status, msg_ids = self.conn.search(None, criteria) except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("SEARCH %s in %s failed: %s", criteria, mailbox, exc) return [] if status != "OK" or not msg_ids[0]: return [] id_list = msg_ids[0].split() if limit: id_list = id_list[-limit:] return self._fetch_ids(id_list, mailbox, mark_seen=False) # ── Message actions ────────────────────────────────────────────── def mark_read(self, message_id: str, mailbox: str = "INBOX") -> bool: return self._set_flag(message_id, mailbox, "+FLAGS", "\\Seen") def mark_unread(self, message_id: str, mailbox: str = "INBOX") -> bool: return self._set_flag(message_id, mailbox, "-FLAGS", "\\Seen") def flag_message(self, message_id: str, mailbox: str = "INBOX") -> bool: return self._set_flag(message_id, mailbox, "+FLAGS", "\\Flagged") def unflag_message(self, message_id: str, mailbox: str = "INBOX") -> bool: return self._set_flag(message_id, mailbox, "-FLAGS", "\\Flagged") def set_custom_flag(self, message_id: str, flag: str, mailbox: str = "INBOX") -> bool: """Set a custom keyword flag (e.g. '$Important', '$Processed').""" return self._set_flag(message_id, mailbox, "+FLAGS", flag) def remove_custom_flag(self, message_id: str, flag: str, mailbox: str = "INBOX") -> bool: """Remove a custom keyword flag.""" return self._set_flag(message_id, mailbox, "-FLAGS", flag) def set_flags_batch( self, message_ids: list[str], flag: str, mailbox: str = "INBOX", ) -> dict[str, bool]: """Set a flag on multiple messages in one mailbox selection. Returns dict of {message_id: success}. """ self._select_robust(mailbox, readonly=False) results: dict[str, bool] = {} for mid in message_ids: seq_num = self._resolve_seq(mid) if seq_num is None: results[mid] = False continue try: status, _ = self.conn.store(seq_num, "+FLAGS", flag) results[mid] = status == "OK" except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("STORE +FLAGS %s on %s failed: %s", flag, mid, exc) results[mid] = False return results def move_message( self, msg_uid: str, from_mailbox: str, to_mailbox: str, ) -> bool: """Move a message between mailboxes via IMAP COPY + DELETE.""" self._select_robust(from_mailbox, readonly=False) seq_num = self._resolve_seq(msg_uid) if seq_num is None: return False try: status, _ = self.conn.copy(seq_num, _quote_mailbox(to_mailbox)) if status != "OK": return False self.conn.store(seq_num, "+FLAGS", "\\Deleted") self.conn.expunge() return True except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("MOVE %s -> %s failed: %s", from_mailbox, to_mailbox, exc) return False def move_messages_batch( self, message_ids: list[str], from_mailbox: str, to_mailbox: str, ) -> dict[str, bool]: """Move multiple messages, minimising SELECT calls. Returns dict of {message_id: success}. """ self._select_robust(from_mailbox, readonly=False) results: dict[str, bool] = {} moved_seqs = [] for mid in message_ids: seq_num = self._resolve_seq(mid) if seq_num is None: results[mid] = False continue try: status, _ = self.conn.copy(seq_num, _quote_mailbox(to_mailbox)) if status == "OK": self.conn.store(seq_num, "+FLAGS", "\\Deleted") results[mid] = True moved_seqs.append(seq_num) else: results[mid] = False except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("MOVE batch %s -> %s failed: %s", mid, to_mailbox, exc) results[mid] = False if moved_seqs: try: self.conn.expunge() except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("EXPUNGE after batch move failed: %s", exc) return results # ── IMAP IDLE ────────────────────────────────────────────────── def idle_start(self, mailbox: str = "INBOX", timeout: int = 29 * 60) -> None: """Enter IMAP IDLE mode on *mailbox*. The server will push EXISTS/EXPUNGE notifications while idle. Call :meth:`idle_check` to poll for responses, and :meth:`idle_done` to leave IDLE mode. *timeout* is a safety upper-bound in seconds (RFC 2177 recommends re-issuing IDLE every 29 minutes). """ self._select_robust(mailbox, readonly=True) tag = self.conn._new_tag() self._idle_tag = tag self.conn.send(tag + b" IDLE\r\n") # Wait for the continuation response '+ ...' resp = self.conn.readline() if not resp.startswith(b"+"): raise RuntimeError(f"IDLE rejected: {resp!r}") self.conn.sock.settimeout(timeout) def idle_check(self, timeout: float = 30.0) -> list[tuple[bytes, bytes]]: """Poll for untagged responses while in IDLE mode. Returns a list of ``(seq_num, response)`` tuples, e.g. ``[(b'3', b'EXISTS')]``. Returns an empty list on timeout. """ old_timeout = self.conn.sock.gettimeout() self.conn.sock.settimeout(timeout) responses: list[tuple[bytes, bytes]] = [] try: while True: line = self.conn.readline() if not line: break if line.startswith(b"* "): parts = line[2:].strip().split(None, 1) if len(parts) == 2: responses.append((parts[0], parts[1])) else: responses.append((b"", parts[0] if parts else b"")) # Don't block indefinitely — check if more data is ready self.conn.sock.settimeout(0.5) else: break except (TimeoutError, OSError): pass finally: self.conn.sock.settimeout(old_timeout) return responses def idle_done(self) -> None: """Leave IDLE mode by sending ``DONE``.""" self.conn.send(b"DONE\r\n") # Read the tagged response to complete the IDLE command while True: line = self.conn.readline() if not line: break tag = getattr(self, "_idle_tag", None) if tag and line.startswith(tag): break def append_message( self, raw_message: bytes, mailbox: str = "Drafts", flags: str = "\\Draft", ) -> bool: """Append a raw RFC822 message to a mailbox (e.g. Drafts). Returns True on success. """ logger.debug( "IMAP APPEND raw_message length=%d mailbox=%r flags=%r", len(raw_message), mailbox, flags, ) orig_send = self.conn.send def _chunked_send(data): if isinstance(data, str): data = data.encode("utf-8") if len(data) <= self._chunk_size: return orig_send(data) for i in range(0, len(data), self._chunk_size): orig_send(data[i:i + self._chunk_size]) try: self.conn.send = _chunked_send status, _ = self.conn.append( _quote_mailbox(mailbox), flags, None, raw_message, ) if status != "OK": self._last_append_error = f"APPEND status {status}" logger.debug("APPEND response status=%s", status) else: self._last_append_error = "" return status == "OK" except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: self._last_append_error = str(exc) logger.warning("APPEND to %s failed: %s", mailbox, exc) return False finally: self.conn.send = orig_send @property def last_append_error(self) -> str: return self._last_append_error def delete_message(self, message_id: str, mailbox: str = "INBOX") -> bool: self._select_robust(mailbox, readonly=False) seq_num = self._resolve_seq(message_id) if seq_num is None: return False try: self.conn.store(seq_num, "+FLAGS", "\\Deleted") self.conn.expunge() return True except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("DELETE message %s failed: %s", message_id, exc) return False # ── Internal helpers ───────────────────────────────────────────── def _resolve_seq(self, msg_uid: str) -> bytes | None: """Resolve a Message-ID or sequence number to an IMAP sequence number.""" if msg_uid.startswith("<"): try: status, data = self.conn.search(None, f'(HEADER Message-ID "{msg_uid}")') except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("SEARCH for Message-ID %s failed: %s", msg_uid, exc) return None if status != "OK" or not data[0]: return None return data[0].split()[0] return msg_uid.encode() if isinstance(msg_uid, str) else msg_uid def _set_flag(self, message_id: str, mailbox: str, op: str, flag: str) -> bool: self._select_robust(mailbox, readonly=False) seq_num = self._resolve_seq(message_id) if seq_num is None: return False try: status, _ = self.conn.store(seq_num, op, flag) return status == "OK" except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("STORE %s %s failed: %s", op, flag, exc) return False def _fetch_ids( self, id_list: list[bytes], mailbox: str, mark_seen: bool, ) -> list[EmailMessage]: messages = [] for msg_id in id_list: fetch_flag = "(RFC822)" if mark_seen else "(BODY.PEEK[])" try: status, msg_data = self.conn.fetch(msg_id, fetch_flag) except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc: logger.warning("Failed to fetch message %s: %s", msg_id, exc) continue if status != "OK" or not msg_data[0]: continue raw = msg_data[0][1] if isinstance(msg_data[0], tuple) else msg_data[0] if isinstance(raw, bytes): parsed = self._parse_email(raw, mailbox) if parsed: messages.append(parsed) return messages def _parse_email(self, raw: bytes, mailbox: str) -> EmailMessage | None: try: msg = email.message_from_bytes(raw) except Exception: return None subject = self._decode_header(msg.get("Subject", "")) sender = EmailAddress.parse(self._decode_header(msg.get("From", ""))) recipients = self._parse_address_list(msg.get("To", "")) cc = self._parse_address_list(msg.get("Cc", "")) date = None date_str = msg.get("Date", "") if date_str: try: date = email.utils.parsedate_to_datetime(date_str) except Exception: pass body_plain = "" body_html = "" attachments: list[EmailAttachment] = [] if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() disp = str(part.get("Content-Disposition", "")) if "attachment" in disp: payload = part.get_payload(decode=True) if payload: attachments.append(EmailAttachment( filename=part.get_filename() or "attachment", content_type=ct, data=payload, content_id=part.get("Content-ID"), )) elif ct == "text/plain" and not body_plain: payload = part.get_payload(decode=True) if payload: body_plain = payload.decode("utf-8", errors="replace") elif ct == "text/html" and not body_html: payload = part.get_payload(decode=True) if payload: body_html = payload.decode("utf-8", errors="replace") else: payload = msg.get_payload(decode=True) if payload: content = payload.decode("utf-8", errors="replace") if msg.get_content_type() == "text/html": body_html = content else: body_plain = content priority = EmailPriority.NORMAL xp = msg.get("X-Priority", "3") if xp.startswith(("1", "2")): priority = EmailPriority.HIGH elif xp.startswith(("4", "5")): priority = EmailPriority.LOW return EmailMessage( message_id=msg.get("Message-ID", ""), subject=subject, sender=sender, recipients=recipients, cc=cc, body_plain=body_plain, body_html=body_html, attachments=attachments, date=date, in_reply_to=msg.get("In-Reply-To", ""), references=msg.get("References", "").split(), priority=priority, mailbox=mailbox, account=self._account, headers=dict(msg.items()), ) def _decode_header(self, raw: str) -> str: """Decode an RFC 2047 encoded header, with safe fallbacks.""" try: parts = email.header.decode_header(raw) except Exception: return raw decoded = [] for data, charset in parts: if isinstance(data, bytes): enc = "utf-8" if charset: try: enc = _CHARSET_FALLBACKS.get(charset.lower(), charset) except Exception: enc = "latin-1" try: decoded.append(data.decode(enc, errors="replace")) except (LookupError, UnicodeDecodeError, TypeError): # Unknown / invalid charset — latin-1 never raises decoded.append(data.decode("latin-1", errors="replace")) else: decoded.append(str(data) if not isinstance(data, str) else data) return " ".join(decoded) def _parse_address_list(self, raw: str) -> list[EmailAddress]: if not raw: return [] decoded = self._decode_header(raw) return [ EmailAddress.parse(a.strip()) for a in decoded.split(",") if a.strip() ] ``` ### scripts/lib/smtp_client.py ```python """SMTP client for sending emails.""" from __future__ import annotations import logging import re import smtplib import socket import ssl import uuid from datetime import datetime from email import encoders from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formatdate from .models import EmailMessage, EmailPriority logger = logging.getLogger("clawMail.smtp") def build_mime(message: EmailMessage) -> MIMEMultipart: """Build a MIME message from an EmailMessage (without sending). Useful for constructing RFC822 bytes to save as a draft. """ return SMTPClient._build_mime_static(message) class SMTPClient: """SMTP client for sending emails via a mail server.""" def __init__( self, host: str, port: int = 587, username: str = "", password: str = "", use_tls: bool = True, oauth2_manager: object | None = None, ) -> None: self.host = host self.port = port self.username = username self.password = password self.use_tls = use_tls self._oauth2 = oauth2_manager @staticmethod def _create_secure_context() -> ssl.SSLContext: """Create a hardened SSL context with secure ciphers only. Enforces TLS 1.2+, disables weak ciphers, and enables certificate verification and hostname checking. """ ctx = ssl.create_default_context() ctx.minimum_version = ssl.TLSVersion.TLSv1_2 ctx.set_ciphers( "ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20" ":!aNULL:!MD5:!DSS:!RC4:!3DES" ) # Enforce certificate verification (default, but be explicit) ctx.check_hostname = True ctx.verify_mode = ssl.CERT_REQUIRED return ctx def send(self, message: EmailMessage) -> bool: mime_msg = self._build_mime_static(message) try: if self.use_tls: server = smtplib.SMTP(self.host, self.port) server.ehlo() ctx = self._create_secure_context() server.starttls(context=ctx) server.ehlo() else: server = smtplib.SMTP(self.host, self.port) if self._oauth2: from .oauth2 import build_xoauth2_string auth_string = build_xoauth2_string( self.username, self._oauth2.access_token, ) server.docmd("AUTH", "XOAUTH2 " + auth_string) elif self.username: server.login(self.username, self.password) all_recips = ( [r.address for r in message.recipients] + [r.address for r in message.cc] + [r.address for r in message.bcc] ) sender_addr = message.sender.address if message.sender else self.username original_send = server.send chunk_size = 16 * 1024 def _chunked_send(data): if isinstance(data, str): data = data.encode("ascii") if len(data) <= chunk_size: return original_send(data) for i in range(0, len(data), chunk_size): original_send(data[i:i + chunk_size]) server.send = _chunked_send server.sendmail(sender_addr, all_recips, mime_msg.as_string()) server.send = original_send server.quit() return True except (smtplib.SMTPException, OSError) as exc: logger.error("SMTP send failed: %s", exc) return False @staticmethod def _build_mime_static(message: EmailMessage) -> MIMEMultipart: if message.attachments: mime_msg = MIMEMultipart("mixed") alt_part = MIMEMultipart("alternative") mime_msg.attach(alt_part) else: mime_msg = MIMEMultipart("alternative") alt_part = mime_msg if message.sender: mime_msg["From"] = str(message.sender) if message.recipients: mime_msg["To"] = ", ".join(str(r) for r in message.recipients) if message.cc: mime_msg["Cc"] = ", ".join(str(r) for r in message.cc) mime_msg["Subject"] = message.subject # RFC 5322: Date header is REQUIRED if message.date: mime_msg["Date"] = formatdate(timeval=message.date.timestamp(), localtime=True) else: mime_msg["Date"] = formatdate(localtime=True) # RFC 5322: Message-ID header is REQUIRED (unique identifier for this message) if message.message_id: mime_msg["Message-ID"] = message.message_id else: # Generate a unique Message-ID if not provided # Format: <timestamp-uuid@hostname> per RFC 5322 try: hostname = socket.getfqdn() except Exception: # Fallback to sender domain if FQDN is unavailable hostname = ( message.sender.address.split("@")[1] if message.sender and "@" in str(message.sender) else "localhost" ) msg_id = f"{int(datetime.utcnow().timestamp() * 1000)}.{uuid.uuid4().hex}@{hostname}" message.message_id = f"<{msg_id}>" mime_msg["Message-ID"] = message.message_id if message.in_reply_to: mime_msg["In-Reply-To"] = message.in_reply_to mime_msg["References"] = " ".join(message.references) if message.priority == EmailPriority.HIGH: mime_msg["X-Priority"] = "1" mime_msg["Importance"] = "high" elif message.priority == EmailPriority.LOW: mime_msg["X-Priority"] = "5" mime_msg["Importance"] = "low" # RFC 5322 compliance: Add MIME-Version header mime_msg["MIME-Version"] = "1.0" # Helpful for deliverability mime_msg["User-Agent"] = "clawMail/0.6.0" plain_text = message.body_plain if not plain_text and message.body_html: plain_text = re.sub(r"<[^>]+>", "", message.body_html) plain_text = re.sub(r"\n\s*\n", "\n\n", plain_text).strip() if plain_text: alt_part.attach(MIMEText(plain_text, "plain", "utf-8")) if message.body_html: alt_part.attach(MIMEText(message.body_html, "html", "utf-8")) for att in message.attachments: maintype, subtype = att.content_type.split("/", 1) part = MIMEBase(maintype, subtype) part.set_payload(att.data) encoders.encode_base64(part) part.add_header("Content-Disposition", "attachment", filename=att.filename) if att.content_id: part.add_header("Content-ID", f"<{att.content_id}>") mime_msg.attach(part) return mime_msg ``` --- ## Skill Companion Files > Additional files collected from the skill directory layout. ### _meta.json ```json { "owner": "borgcube", "slug": "claw-mail", "displayName": "claw-mail", "latest": { "version": "1.0.0", "publishedAt": 1772190999869, "commit": "https://github.com/openclaw/skills/commit/ec4600fddbaeff9d0697f874f5a46abd94db48c9" }, "history": [] } ``` ### scripts/lib/__init__.py ```python """Shared library for clawMail skill scripts.""" ``` ### scripts/lib/defaults.py ```python """Default paths and configuration helpers. Provides the default config file path so that ``--config`` can be optional across all scripts. When ``--config`` is not supplied the scripts look for ``config.yaml`` in the skill root directory (one level above ``scripts/``). """ from __future__ import annotations import os _THIS_DIR = os.path.dirname(os.path.abspath(__file__)) # .../scripts/lib/ SCRIPTS_DIR = os.path.dirname(_THIS_DIR) # .../scripts/ SKILL_ROOT = os.path.dirname(SCRIPTS_DIR) # .../claw-mail/ DEFAULT_CONFIG_PATH = os.path.join(SKILL_ROOT, "config.yaml") def resolve_config_path(explicit_path: str = "") -> str: """Return *explicit_path* if given, else the default config if it exists. Returns an empty string when neither is available so callers can still require ``--config`` or ``--imap-host`` as before. """ if explicit_path: return explicit_path if os.path.isfile(DEFAULT_CONFIG_PATH): return DEFAULT_CONFIG_PATH return "" ```