Back to skills
SkillHub ClubShip Full StackFull StackIntegration

claw-mail

Multi-account email management skill for IMAP/SMTP. Fetches, reads, searches, composes, sends, replies, forwards, and organizes emails across multiple accounts. Features IMAP Outbox for reliable delivery, secure credential storage via 1Password and macOS Keychain, TLS 1.2+ with hardened ciphers, OAuth2 authentication, IMAP IDLE push monitoring, connection pooling, S/MIME signing, calendar invitations, mail merge, conversation threading, webhook rule actions, and configurable dated-folder archival.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,126
Hot score
99
Updated
March 20, 2026
Overall rating
C0.0
Composite score
0.0
Best-practice grade
C62.8

Install command

npx @skill-hub/cli install openclaw-skills-claw-mail

Repository

openclaw/skills

Skill path: skills/borgcube/claw-mail

Multi-account email management skill for IMAP/SMTP. Fetches, reads, searches, composes, sends, replies, forwards, and organizes emails across multiple accounts. Features IMAP Outbox for reliable delivery, secure credential storage via 1Password and macOS Keychain, TLS 1.2+ with hardened ciphers, OAuth2 authentication, IMAP IDLE push monitoring, connection pooling, S/MIME signing, calendar invitations, mail merge, conversation threading, webhook rule actions, and configurable dated-folder archival.

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack, Integration.

Target audience: everyone.

License: MIT.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install claw-mail into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding claw-mail to shared team environments
  • Use claw-mail for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: claw-mail
description: >
  Multi-account email management skill for IMAP/SMTP. Fetches, reads, searches,
  composes, sends, replies, forwards, and organizes emails across multiple accounts.
  Features IMAP Outbox for reliable delivery, secure credential storage via 1Password
  and macOS Keychain, TLS 1.2+ with hardened ciphers, OAuth2 authentication,
  IMAP IDLE push monitoring, connection pooling, S/MIME signing, calendar invitations,
  mail merge, conversation threading, webhook rule actions, and configurable
  dated-folder archival.
license: MIT
metadata:
  author: openclaw
  version: "0.7.0"
compatibility: >
  Requires Python 3.11+ and PyYAML. Optional: 1Password CLI (op) for op:// credentials,
  macOS for keychain:// credentials, cryptography package for S/MIME.
allowed-tools: Bash(python3 *) Read Write
---

# clawMail Skill

You are an email management agent with multi-account IMAP/SMTP support. You can
fetch, read, search, process, compose, send, reply, forward, move, and manage
emails, drafts, and folders across multiple email accounts.

## Multi-Account Model

- **Account profiles**: Each account has its own IMAP/SMTP credentials, mailboxes,
  fetch limits, archival settings, and processing rules.
- **Default account**: One account is designated as the default. Any script invoked
  without `--account` uses the default automatically.
- **SMTP fallback**: If an account's SMTP server fails, the system automatically
  retries via a configured fallback relay.
- **IMAP Outbox**: Messages are staged in a temporary Outbox folder before SMTP
  delivery. If SMTP fails, the message stays in Outbox for retry by the heartbeat.
- **Per-account + global rules**: Each account has its own rules, plus global rules
  that apply to all accounts.
- **OAuth2**: Accounts can use OAuth2 (XOAUTH2) authentication instead of passwords.
- **Dated-folder archival**: `archive_mail.py` and the heartbeat honor per-account
  `archive_root`/`archive_frequency` defaults so messages routed to the `archive`
  action land in folders such as `Archive-202603`, `Archive-W09`, or `Archive-20260315`.

## Security

- **TLS 1.2+**: All IMAP and SMTP connections enforce TLS 1.2 or higher.
- **Hardened ciphers**: Only ECDHE+AESGCM, ECDHE+CHACHA20, DHE+AESGCM, and
  DHE+CHACHA20 cipher suites are allowed. Weak ciphers (MD5, RC4, 3DES, DSS)
  are explicitly blocked.
- **Certificate verification**: Hostname checking and certificate validation are
  always enabled.
- **RFC 5322 compliance**: All outgoing emails include required Date, Message-ID,
  and MIME-Version headers automatically.
- **Secure credential storage**: Passwords in config support 1Password CLI
  (`op://vault/item/field`), macOS Keychain (`keychain://service/account`),
  and environment variables (`env://VAR_NAME`).

## Available Scripts

All scripts are in the `scripts/` directory. Run with
`python3 scripts/<name>.py` from the skill root. Every script accepts
`--account <name>` to target a specific account.

### Core Scripts

| Script | Purpose |
|--------|---------|
| `scripts/fetch_mail.py` | Fetch emails from an IMAP folder |
| `scripts/read_mail.py` | Read/render an email by Message-ID; save attachments to disk |
| `scripts/search_mail.py` | Search emails by subject, sender, body, date, flags |
| `scripts/send_mail.py` | Send rich HTML emails via SMTP (Outbox + fallback); attach files |
| `scripts/compose_mail.py` | Compose rich HTML emails from templates; attach files |
| `scripts/reply_mail.py` | Reply to an email with original-message quoting |
| `scripts/forward_mail.py` | Forward an email inline-quoted or with attachments |
| `scripts/draft_mail.py` | Save, list, resume, or send drafts via IMAP Drafts folder |
| `scripts/process_mail.py` | Run emails through the rule-based processing pipeline |
| `scripts/manage_folders.py` | List, create, delete, rename, and move IMAP folders |
| `scripts/move_mail.py` | Move emails between IMAP folders (batch support) |
| `scripts/heartbeat.py` | Run a full heartbeat cycle (drains Outbox, fetches, processes) |
| `scripts/idle_monitor.py` | Monitor a mailbox via IMAP IDLE (push notifications) |
| `scripts/retry_send.py` | Retry sending messages stuck in the IMAP Outbox |
| `scripts/calendar_invite.py` | Compose and send iCalendar meeting invitations |
| `scripts/mail_merge.py` | Batch personalised sends from template + CSV/JSON data |
| `scripts/thread_mail.py` | Group messages into conversation threads |
| `scripts/archive_mail.py` | Auto-archive old messages into dated folders (daily/weekly/monthly/yearly) |

### Library Modules

| Module | Purpose |
|--------|---------|
| `scripts/lib/imap_client.py` | IMAP client with IDLE, search, folder management, TLS 1.2+ |
| `scripts/lib/smtp_client.py` | SMTP client with TLS 1.2+, RFC 5322, OAuth2, MIME building |
| `scripts/lib/composer.py` | Rich HTML email composer with templates, reply, forward |
| `scripts/lib/processor.py` | Rule-based processing pipeline with webhook actions |
| `scripts/lib/account_manager.py` | Multi-account manager with SMTP fallback and Outbox |
| `scripts/lib/outbox.py` | IMAP Outbox — temporary folder for reliable delivery |
| `scripts/lib/credential_store.py` | Secure credential storage (1Password, Keychain, env) |
| `scripts/lib/pool.py` | Connection pool for IMAP/SMTP reuse |
| `scripts/lib/send_queue.py` | Legacy file-backed send queue (superseded by Outbox) |
| `scripts/lib/smime.py` | S/MIME signing and encryption |
| `scripts/lib/oauth2.py` | OAuth2 (XOAUTH2) token management |
| `scripts/lib/models.py` | Data models (EmailMessage, EmailAddress, etc.) |

### Reference Documents

| Reference | When to read |
|-----------|-------------|
| `references/REFERENCE.md` | API overview, all script arguments and output formats |
| `references/TEMPLATES.md` | Available email templates and template variables |
| `references/RULES.md` | How to configure processing rules |
| `ROADMAP.md` | Feature roadmap and progress tracker |

## Quick Start

### Fetching Mail

```bash
python3 scripts/fetch_mail.py --config config.yaml

python3 scripts/fetch_mail.py --account personal --unread-only --format cli --config config.yaml
```

### Sending Rich Emails

Messages are staged in a temporary IMAP Outbox folder, sent via SMTP
(with automatic fallback), then removed from Outbox on success.

```bash
python3 scripts/send_mail.py \
  --to "[email protected]" \
  --subject "Weekly Report" \
  --body "<p>Here are this week's results.</p>" \
  --template default \
  --attach report.pdf \
  --config config.yaml
```

### Replying and Forwarding

```bash
python3 scripts/reply_mail.py --message-id "<[email protected]>" --body "Thanks!" --config config.yaml

python3 scripts/forward_mail.py --message-id "<[email protected]>" --to "[email protected]" --config config.yaml
```

### Searching Emails

```bash
python3 scripts/search_mail.py --subject "invoice" --unseen --config config.yaml

python3 scripts/search_mail.py --criteria '(FROM "[email protected]" SINCE 01-Jan-2026)' --config config.yaml
```

### Working with Drafts

```bash
python3 scripts/draft_mail.py --action save --to "[email protected]" --subject "WIP" --body "..." --config config.yaml
python3 scripts/draft_mail.py --action list --format cli --config config.yaml
python3 scripts/draft_mail.py --action send --message-id "<[email protected]>" --config config.yaml
```

### Outbox & Send Retry

```bash
python3 scripts/retry_send.py --config config.yaml
python3 scripts/retry_send.py --config config.yaml --list
```

### Heartbeat Cycle

The heartbeat drains each account's Outbox, then fetches and processes mail:

```bash
python3 scripts/heartbeat.py --config config.yaml
python3 scripts/heartbeat.py --config config.yaml --account work
```

### Archiving Old Messages

```bash
python3 scripts/archive_mail.py --config config.yaml --days 90 --frequency monthly
python3 scripts/archive_mail.py --config config.yaml --days 30 --frequency daily --archive-root "Old Mail" --dry-run --format cli
```

Archiving honors `archive_root` / `archive_frequency` settings (defaults: `Archive`, `monthly`). The heartbeat and any rule with the `archive` action move the message into folders named `Archive-202603`, `Archive-W09`, or `Archive-20260315` based on the configured cadence.

### Calendar Invitations

```bash
python3 scripts/calendar_invite.py \
  --to "[email protected]" --subject "Standup" \
  --start "2026-03-01T09:00:00" --end "2026-03-01T09:30:00" \
  --location "Zoom" --config config.yaml
```

### Mail Merge

```bash
python3 scripts/mail_merge.py \
  --data contacts.csv --subject "Hello {{name}}" \
  --body "<p>Dear {{name}}, your code is {{code}}.</p>" \
  --to-field email --config config.yaml
```

## Configuration

Create a `config.yaml` from `assets/config.example.yaml`:

```yaml
default_account: work

accounts:
  work:
    label: "Work"
    sender_address: "[email protected]"
    sender_name: "Alice Smith"
    imap:
      host: imap.company.com
      port: 993
      username: "[email protected]"
      password: "op://Work/IMAP/password"          # 1Password CLI
      ssl: true
    smtp:
      host: smtp.company.com
      port: 587
      username: "[email protected]"
      password: "op://Work/SMTP/password"          # 1Password CLI
      tls: true
    mailboxes: [INBOX, Projects]
    fetch_limit: 50
    rules:
      - name: flag_urgent
        sender_pattern: "boss@company\\.com"
        actions: [flag, tag]
        tag: urgent

  personal:
    label: "Personal"
    sender_address: "[email protected]"
    imap:
      host: imap.gmail.com
      password: "keychain://imap.gmail.com/[email protected]"  # macOS Keychain
    smtp:
      host: smtp.gmail.com
      password: "keychain://smtp.gmail.com/[email protected]"  # macOS Keychain
```

You can also define `archive_root` (e.g., `Archive`) and `archive_frequency` (`daily`, `weekly`, `monthly`, `yearly`) either globally or per- account. These defaults drive both the `archive_mail.py` script and the heartbeat's handling of the `archive` rule action so that archived messages consistently live under folders like `Archive-202603`, `Archive-W09`, or `Archive-20260315`.

### Secure Credential Storage

Passwords in config support four backends:

| Scheme | Backend | Example |
|--------|---------|---------|
| `op://` | 1Password CLI | `"op://Work/IMAP/password"` |
| `keychain://` | macOS Keychain | `"keychain://imap.gmail.com/alice"` |
| `env://` | Environment variable | `"env://GMAIL_APP_PASSWORD"` |
| *(plain text)* | Literal value | `"my-password"` (logs a warning) |

### OAuth2 Authentication (Gmail, Outlook 365)

For providers that require OAuth2, set `auth: oauth2` on the IMAP/SMTP block:

```yaml
imap:
  host: imap.gmail.com
  username: "[email protected]"
  auth: oauth2
  oauth2:
    client_id: "your-client-id"
    client_secret: "your-client-secret"
    refresh_token: "your-refresh-token"
    token_uri: "https://oauth2.googleapis.com/token"
```

### Legacy Single-Account Config

Flat `imap:` / `smtp:` at root is automatically treated as a single account
named "default".


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### scripts/fetch_mail.py

```python
#!/usr/bin/env python3
"""Fetch emails from an IMAP server.

Usage:
    python3 scripts/fetch_mail.py --config config.yaml
    python3 scripts/fetch_mail.py --config config.yaml --account work
    python3 scripts/fetch_mail.py --config config.yaml --format cli
    python3 scripts/fetch_mail.py --imap-host imap.example.com --imap-user me --imap-pass secret

Output: JSON (default) or CLI table to stdout.
"""

from __future__ import annotations

import argparse
import json
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient
from lib.models import EmailMessage


def main() -> None:
    parser = argparse.ArgumentParser(description="Fetch emails via IMAP")
    parser.add_argument("--account", default="", help="Account name (default: default account)")
    parser.add_argument("--folder", default="INBOX", help="Folder to fetch from")
    parser.add_argument("--limit", type=int, default=0, help="Max messages (0 = use config)")
    parser.add_argument("--unread-only", action="store_true", help="Only unread messages")
    parser.add_argument("--mark-read", action="store_true", help="Mark fetched messages as read")
    parser.add_argument("--format", choices=["json", "cli"], default="json",
                        help="Output format (default: json)")

    # Direct IMAP options (bypass AccountManager)
    parser.add_argument("--imap-host", default="", help="IMAP server")
    parser.add_argument("--imap-port", type=int, default=993, help="IMAP port")
    parser.add_argument("--imap-user", default="", help="IMAP username")
    parser.add_argument("--imap-pass", default="", help="IMAP password")
    parser.add_argument("--imap-no-ssl", action="store_true", help="Disable SSL")

    # Config / stdin
    parser.add_argument("--config", default="", help="YAML config file")
    parser.add_argument("--from-stdin", action="store_true",
                        help="Read JSON messages from stdin (skip IMAP)")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    # Read from stdin if piped in
    if args.from_stdin:
        data = json.load(sys.stdin)
        if isinstance(data, dict) and "messages" in data:
            msg_dicts = data["messages"]
        elif isinstance(data, list):
            msg_dicts = data
        else:
            _error("Expected JSON array or object with 'messages' key")
            return
        messages = [EmailMessage.from_dict(d) for d in msg_dicts]
        _output(messages, args.folder, args.format, args.account)
        return

    # Direct IMAP mode (no config file)
    if args.imap_host:
        client = IMAPClient(
            host=args.imap_host,
            port=args.imap_port,
            username=args.imap_user,
            password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
        limit = args.limit or 50
        try:
            client.connect()
            if args.unread_only:
                messages = client.fetch_unread(
                    mailbox=args.folder, limit=limit, mark_seen=args.mark_read,
                )
            else:
                messages = client.fetch_all(mailbox=args.folder, limit=limit)
        finally:
            client.disconnect()
        _output(messages, args.folder, args.format, "")
        return

    # Multi-account mode via config
    if not args.config:
        _error("Either --config or --imap-host is required.")

    mgr = _load_manager(args.config)
    acct_name = args.account or mgr.default_account
    limit = args.limit or mgr.get_fetch_limit(acct_name)

    client = mgr.get_imap_client(acct_name)
    try:
        client.connect()
        if args.unread_only:
            messages = client.fetch_unread(
                mailbox=args.folder, limit=limit, mark_seen=args.mark_read,
            )
        else:
            messages = client.fetch_all(mailbox=args.folder, limit=limit)
    finally:
        client.disconnect()

    _output(messages, args.folder, args.format, acct_name)


def _output(messages: list[EmailMessage], folder: str, fmt: str, account: str) -> None:
    if fmt == "cli":
        label = f"{account}:{folder}" if account else folder
        if not messages:
            print(f"  No messages in {label}")
            return
        print(f"  {label} ({len(messages)} messages)")
        print(f"  {'':->60}")
        for m in messages:
            sender = str(m.sender) if m.sender else "(unknown)"
            if len(sender) > 25:
                sender = sender[:22] + "..."
            date = m.date.strftime("%Y-%m-%d %H:%M") if m.date else ""
            subj = m.subject[:40] if m.subject else "(no subject)"
            read = " " if m.is_read else "*"
            att = "+" if m.has_attachments else " "
            print(f"  {read}{att} {date:16s}  {sender:25s}  {subj}")
        print()
    else:
        output = {
            "account": account,
            "folder": folder,
            "count": len(messages),
            "messages": [m.to_dict() for m in messages],
        }
        json.dump(output, sys.stdout, indent=2, default=str)
        print()


def _load_manager(path: str) -> AccountManager:
    try:
        return AccountManager.from_yaml(path)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        raise  # unreachable, _error exits


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/send_mail.py

```python
#!/usr/bin/env python3
"""Send rich HTML emails via SMTP.

Usage:
    python3 scripts/send_mail.py --config config.yaml [options]
    python3 scripts/send_mail.py --config config.yaml --account personal [options]
    python3 scripts/compose_mail.py ... | python3 scripts/send_mail.py --from-stdin --config config.yaml

Output: JSON result to stdout.
"""

from __future__ import annotations

import argparse
import json
import mimetypes
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.models import EmailAttachment, EmailMessage
from lib.smtp_client import SMTPClient
from lib.composer import EmailComposer


def main() -> None:
    parser = argparse.ArgumentParser(description="Send rich HTML emails via SMTP")

    # Account
    parser.add_argument("--account", default="", help="Account name (default: default account)")

    # Message options
    parser.add_argument("--to", action="append", default=[], help="Recipient (repeatable)")
    parser.add_argument("--cc", action="append", default=[], help="CC recipient")
    parser.add_argument("--bcc", action="append", default=[], help="BCC recipient")
    parser.add_argument("--subject", default="", help="Email subject")
    parser.add_argument("--body", default="", help="Email body (HTML or plain)")
    parser.add_argument("--sender", default="", help="Sender address")

    # Template options
    parser.add_argument("--template", default="default",
                        choices=["default", "minimal", "digest"],
                        help="Email template")
    parser.add_argument("--greeting", default="", help="Greeting line")
    parser.add_argument("--sign-off", default="", help="Sign-off line")
    parser.add_argument("--header-text", default="", help="Header text")
    parser.add_argument("--header-color", default="#2d3748", help="Header bg color")
    parser.add_argument("--footer-text", default="", help="Footer text")
    parser.add_argument("--action-url", default="", help="CTA button URL")
    parser.add_argument("--action-text", default="View Details", help="CTA button text")
    parser.add_argument("--action-color", default="#4299e1", help="CTA button color")

    # Reply threading
    parser.add_argument("--reply-to", default="", help="In-Reply-To message ID")

    # Attachments
    parser.add_argument("--attach", action="append", default=[],
                        help="File path to attach (repeatable)")

    # Direct SMTP options (bypass AccountManager)
    parser.add_argument("--smtp-host", default="", help="SMTP hostname")
    parser.add_argument("--smtp-port", type=int, default=587, help="SMTP port")
    parser.add_argument("--smtp-user", default="", help="SMTP username")
    parser.add_argument("--smtp-pass", default="", help="SMTP password")
    parser.add_argument("--smtp-no-tls", action="store_true", help="Disable SMTP TLS")

    # Pipe input
    parser.add_argument("--from-stdin", action="store_true",
                        help="Read composed message JSON from stdin")

    # Config
    parser.add_argument("--config", default="", help="YAML config file")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    # Build or read message
    if args.from_stdin:
        try:
            data = json.load(sys.stdin)
            message = EmailMessage.from_dict(data)
        except Exception as exc:
            _error(f"Failed to parse stdin message: {exc}")
            return
    else:
        if not args.to:
            _error("At least one --to recipient is required")
        if not args.subject:
            _error("--subject is required")

        # Resolve sender from account config if not explicit
        sender_addr = args.sender
        if not sender_addr and args.config:
            try:
                mgr = AccountManager.from_yaml(args.config)
                acct_name = args.account or mgr.default_account
                sender = mgr.get_sender(acct_name)
                if sender:
                    sender_addr = str(sender)
            except Exception:
                pass

        composer = EmailComposer()
        message = composer.compose(
            to=args.to,
            subject=args.subject,
            body=args.body or "",
            template=args.template,
            sender=sender_addr or None,
            cc=args.cc or None,
            bcc=args.bcc or None,
            reply_to=args.reply_to,
            greeting=args.greeting,
            sign_off=args.sign_off,
            header_text=args.header_text,
            header_color=args.header_color,
            footer_text=args.footer_text,
            action_url=args.action_url,
            action_text=args.action_text,
            action_color=args.action_color,
        )

    # Attach files from disk
    for filepath in args.attach:
        if not os.path.isfile(filepath):
            _error(f"Attachment not found: {filepath}")
        content_type, _ = mimetypes.guess_type(filepath)
        if not content_type:
            content_type = "application/octet-stream"
        with open(filepath, "rb") as fh:
            data = fh.read()
        message.attachments.append(EmailAttachment(
            filename=os.path.basename(filepath),
            content_type=content_type,
            data=data,
        ))

    # Direct SMTP mode
    if args.smtp_host:
        client = SMTPClient(
            host=args.smtp_host,
            port=args.smtp_port,
            username=args.smtp_user,
            password=credential_store.resolve(args.smtp_pass),
            use_tls=not args.smtp_no_tls,
        )
        success = client.send(message)
        result = {
            "success": success,
            "transport": "smtp",
            "subject": message.subject,
            "recipients": [str(r) for r in message.recipients],
        }
        if not success:
            result["error"] = "Failed to send email"
            json.dump(result, sys.stdout, indent=2)
            print()
            sys.exit(1)
        json.dump(result, sys.stdout, indent=2)
        print()
        return

    # Multi-account mode with Outbox + SMTP fallback
    if not args.config:
        _error("Either --config or --smtp-host is required.")

    mgr = _load_manager(args.config)
    acct_name = args.account or mgr.default_account

    # Set sender from account if not already set on message
    if not message.sender:
        sender = mgr.get_sender(acct_name)
        if sender:
            message.sender = sender

    # Use the Outbox pattern: stage → send → cleanup
    send_result = mgr.send_via_outbox(message, acct_name)

    result = {
        "success": send_result["success"],
        "transport": "smtp",
        "account": send_result["account"],
        "fallback_used": send_result.get("fallback_used", False),
        "staged_in_outbox": send_result.get("staged", False),
        "subject": message.subject,
        "recipients": [str(r) for r in message.recipients],
    }

    stage_error = send_result.get("stage_error")
    if stage_error:
        result["stage_error"] = stage_error
        if "error" not in result or not result["error"]:
            result["error"] = stage_error

    if not send_result["success"]:
        result["error"] = send_result.get("error", "Failed to send email")
        if send_result.get("staged"):
            result["note"] = "Message is staged in Outbox for retry"
        json.dump(result, sys.stdout, indent=2)
        print()
        sys.exit(1)

    json.dump(result, sys.stdout, indent=2)
    print()


def _load_manager(path: str) -> AccountManager:
    try:
        return AccountManager.from_yaml(path)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        raise


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/reply_mail.py

```python
#!/usr/bin/env python3
"""Reply to an existing email with original quoting.

Usage:
    python3 scripts/reply_mail.py --message-id "<[email protected]>" --body "Thanks!" --config config.yaml
    python3 scripts/reply_mail.py --from-stdin --body "Got it." --config config.yaml
    python3 scripts/reply_mail.py --from-stdin --body "Noted" --no-quote --config config.yaml

Output: JSON result to stdout.
"""

from __future__ import annotations

import argparse
import json
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.composer import EmailComposer
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient
from lib.models import EmailMessage
from lib.smtp_client import SMTPClient


def main() -> None:
    parser = argparse.ArgumentParser(description="Reply to an email with original quoting")

    parser.add_argument("--account", default="", help="Account name")
    parser.add_argument("--message-id", default="", help="Message-ID to reply to")
    parser.add_argument("--folder", default="INBOX", help="Folder to find message in")
    parser.add_argument("--body", required=True, help="Reply body (HTML or plain)")
    parser.add_argument("--sender", default="", help="Sender address override")
    parser.add_argument("--reply-all", action="store_true",
                        help="Reply to all recipients (CC original recipients)")
    parser.add_argument("--no-quote", action="store_true",
                        help="Do not include the original message as a quote")

    # Template options
    parser.add_argument("--template", default="minimal",
                        choices=["default", "minimal", "digest"])
    parser.add_argument("--greeting", default="")
    parser.add_argument("--sign-off", default="")

    # IMAP / SMTP options
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")
    parser.add_argument("--smtp-host", default="")
    parser.add_argument("--smtp-port", type=int, default=587)
    parser.add_argument("--smtp-user", default="")
    parser.add_argument("--smtp-pass", default="")
    parser.add_argument("--smtp-no-tls", action="store_true")

    parser.add_argument("--from-stdin", action="store_true",
                        help="Read original message JSON from stdin")
    parser.add_argument("--config", default="", help="YAML config file")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    # Get the original message
    if args.from_stdin:
        try:
            data = json.load(sys.stdin)
            if isinstance(data, dict) and "messages" in data:
                if not data["messages"]:
                    _error("No messages in input")
                    return
                original = EmailMessage.from_dict(data["messages"][0])
            elif isinstance(data, dict):
                original = EmailMessage.from_dict(data)
            else:
                _error("Expected JSON message object")
                return
        except Exception as exc:
            _error(f"Failed to parse stdin: {exc}")
            return
    elif args.imap_host:
        if not args.message_id:
            _error("--message-id is required when using IMAP")
            return
        client = IMAPClient(
            host=args.imap_host, port=args.imap_port,
            username=args.imap_user, password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
        try:
            client.connect()
            original = client.fetch_by_id(args.message_id, mailbox=args.folder)
        finally:
            client.disconnect()
        if original is None:
            _error(f"Message not found: {args.message_id}")
            return
    elif args.config:
        if not args.message_id:
            _error("--message-id is required when using config")
            return
        mgr = _load_manager(args.config)
        acct_name = args.account or mgr.default_account
        client = mgr.get_imap_client(acct_name)
        try:
            client.connect()
            original = client.fetch_by_id(args.message_id, mailbox=args.folder)
        finally:
            client.disconnect()
        if original is None:
            _error(f"Message not found: {args.message_id}")
            return
    else:
        _error("--from-stdin, --imap-host, or --config is required")
        return

    # Resolve sender
    sender = args.sender
    if not sender and args.config:
        try:
            mgr = _load_manager(args.config)
            acct_name = args.account or mgr.default_account
            addr = mgr.get_sender(acct_name)
            if addr:
                sender = str(addr)
        except Exception:
            pass

    # Compose reply
    composer = EmailComposer()
    reply = composer.compose_reply(
        original=original,
        body=args.body,
        sender=sender or None,
        template=args.template,
        quote_original=not args.no_quote,
        greeting=args.greeting,
        sign_off=args.sign_off,
    )

    # Reply-all: add original recipients as CC (minus the sender)
    if args.reply_all:
        sender_addr = sender.split("<")[-1].rstrip(">").strip() if sender else ""
        for recip in original.recipients:
            if recip.address != sender_addr:
                reply.cc.append(recip)
        for cc_recip in original.cc:
            if cc_recip.address != sender_addr:
                reply.cc.append(cc_recip)

    # Send
    if args.smtp_host:
        smtp = SMTPClient(
            host=args.smtp_host, port=args.smtp_port,
            username=args.smtp_user, password=credential_store.resolve(args.smtp_pass),
            use_tls=not args.smtp_no_tls,
        )
        success = smtp.send(reply)
        result = {
            "success": success,
            "transport": "smtp",
            "subject": reply.subject,
            "recipients": [str(r) for r in reply.recipients],
            "in_reply_to": reply.in_reply_to,
        }
        if not success:
            result["error"] = "Failed to send reply"
    elif args.config:
        mgr = _load_manager(args.config)
        acct_name = args.account or mgr.default_account
        if not reply.sender:
            addr = mgr.get_sender(acct_name)
            if addr:
                reply.sender = addr
        send_result = mgr.send_with_fallback(reply, acct_name)
        result = {
            "success": send_result["success"],
            "transport": "smtp",
            "account": send_result["account"],
            "fallback_used": send_result["fallback_used"],
            "subject": reply.subject,
            "recipients": [str(r) for r in reply.recipients],
            "in_reply_to": reply.in_reply_to,
        }
        if not send_result["success"]:
            result["error"] = send_result.get("error", "Failed to send")
    else:
        # No send mechanism — output composed reply for piping
        json.dump(reply.to_dict(), sys.stdout, indent=2, default=str)
        print()
        return

    json.dump(result, sys.stdout, indent=2)
    print()
    if not result.get("success"):
        sys.exit(1)


def _load_manager(path: str) -> AccountManager:
    try:
        return AccountManager.from_yaml(path)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        raise


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/forward_mail.py

```python
#!/usr/bin/env python3
"""Forward an existing email to new recipients.

Usage:
    python3 scripts/forward_mail.py --message-id "<[email protected]>" --to "[email protected]" --config config.yaml
    python3 scripts/forward_mail.py --from-stdin --to "[email protected]" --config config.yaml
    python3 scripts/forward_mail.py --from-stdin --to "[email protected]" --body "FYI" --config config.yaml

Output: JSON result to stdout.
"""

from __future__ import annotations

import argparse
import json
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.composer import EmailComposer
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient
from lib.models import EmailMessage
from lib.smtp_client import SMTPClient


def main() -> None:
    parser = argparse.ArgumentParser(description="Forward an email to new recipients")

    parser.add_argument("--account", default="", help="Account name")
    parser.add_argument("--message-id", default="", help="Message-ID to forward")
    parser.add_argument("--folder", default="INBOX", help="Folder to find message in")
    parser.add_argument("--to", action="append", default=[], required=True,
                        help="Forward recipient (repeatable)")
    parser.add_argument("--body", default="", help="Additional message body")
    parser.add_argument("--sender", default="", help="Sender address override")
    parser.add_argument("--include-attachments", action="store_true",
                        help="Include original attachments in the forward")

    # IMAP / SMTP options
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")
    parser.add_argument("--smtp-host", default="")
    parser.add_argument("--smtp-port", type=int, default=587)
    parser.add_argument("--smtp-user", default="")
    parser.add_argument("--smtp-pass", default="")
    parser.add_argument("--smtp-no-tls", action="store_true")

    parser.add_argument("--from-stdin", action="store_true",
                        help="Read original message JSON from stdin")
    parser.add_argument("--config", default="", help="YAML config file")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    # Get the original message
    if args.from_stdin:
        try:
            data = json.load(sys.stdin)
            if isinstance(data, dict) and "messages" in data:
                if not data["messages"]:
                    _error("No messages in input")
                    return
                original = EmailMessage.from_dict(data["messages"][0])
            elif isinstance(data, dict):
                original = EmailMessage.from_dict(data)
            else:
                _error("Expected JSON message object")
                return
        except Exception as exc:
            _error(f"Failed to parse stdin: {exc}")
            return
    elif args.imap_host:
        if not args.message_id:
            _error("--message-id is required when using IMAP")
            return
        client = IMAPClient(
            host=args.imap_host, port=args.imap_port,
            username=args.imap_user, password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
        try:
            client.connect()
            original = client.fetch_by_id(args.message_id, mailbox=args.folder)
        finally:
            client.disconnect()
        if original is None:
            _error(f"Message not found: {args.message_id}")
            return
    elif args.config:
        if not args.message_id:
            _error("--message-id is required when using config")
            return
        mgr = _load_manager(args.config)
        acct_name = args.account or mgr.default_account
        client = mgr.get_imap_client(acct_name)
        try:
            client.connect()
            original = client.fetch_by_id(args.message_id, mailbox=args.folder)
        finally:
            client.disconnect()
        if original is None:
            _error(f"Message not found: {args.message_id}")
            return
    else:
        _error("--from-stdin, --imap-host, or --config is required")
        return

    # Resolve sender
    sender = args.sender
    if not sender and args.config:
        try:
            mgr = _load_manager(args.config)
            acct_name = args.account or mgr.default_account
            addr = mgr.get_sender(acct_name)
            if addr:
                sender = str(addr)
        except Exception:
            pass

    # Compose forward
    composer = EmailComposer()
    fwd_message = composer.compose_forward(
        original=original,
        to=args.to,
        body=args.body,
        sender=sender or None,
        attach_original=args.include_attachments,
    )

    # Send
    if args.smtp_host:
        smtp = SMTPClient(
            host=args.smtp_host, port=args.smtp_port,
            username=args.smtp_user, password=credential_store.resolve(args.smtp_pass),
            use_tls=not args.smtp_no_tls,
        )
        success = smtp.send(fwd_message)
        result = {
            "success": success,
            "transport": "smtp",
            "subject": fwd_message.subject,
            "recipients": [str(r) for r in fwd_message.recipients],
            "original_message_id": original.message_id,
        }
        if not success:
            result["error"] = "Failed to send forward"
    elif args.config:
        mgr = _load_manager(args.config)
        acct_name = args.account or mgr.default_account
        if not fwd_message.sender:
            addr = mgr.get_sender(acct_name)
            if addr:
                fwd_message.sender = addr
        send_result = mgr.send_with_fallback(fwd_message, acct_name)
        result = {
            "success": send_result["success"],
            "transport": "smtp",
            "account": send_result["account"],
            "fallback_used": send_result["fallback_used"],
            "subject": fwd_message.subject,
            "recipients": [str(r) for r in fwd_message.recipients],
            "original_message_id": original.message_id,
        }
        if not send_result["success"]:
            result["error"] = send_result.get("error", "Failed to send")
    else:
        # No send mechanism — output composed message for piping
        json.dump(fwd_message.to_dict(), sys.stdout, indent=2, default=str)
        print()
        return

    json.dump(result, sys.stdout, indent=2)
    print()
    if not result.get("success"):
        sys.exit(1)


def _load_manager(path: str) -> AccountManager:
    try:
        return AccountManager.from_yaml(path)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        raise


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/search_mail.py

```python
#!/usr/bin/env python3
"""Search emails on an IMAP server using flexible criteria.

Usage:
    python3 scripts/search_mail.py --config config.yaml --subject "invoice"
    python3 scripts/search_mail.py --config config.yaml --from "[email protected]" --unseen
    python3 scripts/search_mail.py --config config.yaml --since 2025-01-01 --before 2025-06-01
    python3 scripts/search_mail.py --config config.yaml --criteria '(OR (FROM "a") (FROM "b"))'

Output: JSON (default) or CLI table to stdout.
"""

from __future__ import annotations

import argparse
import json
import os
import sys
from datetime import datetime

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient
from lib.models import EmailMessage


def _build_criteria(args) -> str:
    """Build an IMAP SEARCH criteria string from CLI flags."""
    if args.criteria:
        return args.criteria

    parts: list[str] = []

    if args.unseen:
        parts.append("UNSEEN")
    if args.flagged:
        parts.append("FLAGGED")
    if args.subject:
        parts.append(f'SUBJECT "{args.subject}"')
    if getattr(args, "from_addr", None):
        parts.append(f'FROM "{args.from_addr}"')
    if args.to:
        parts.append(f'TO "{args.to}"')
    if args.body:
        parts.append(f'BODY "{args.body}"')
    if args.since:
        try:
            dt = datetime.strptime(args.since, "%Y-%m-%d")
            parts.append(f'SINCE {dt.strftime("%d-%b-%Y")}')
        except ValueError:
            parts.append(f'SINCE {args.since}')
    if args.before:
        try:
            dt = datetime.strptime(args.before, "%Y-%m-%d")
            parts.append(f'BEFORE {dt.strftime("%d-%b-%Y")}')
        except ValueError:
            parts.append(f'BEFORE {args.before}')
    if args.text:
        parts.append(f'TEXT "{args.text}"')

    if not parts:
        parts.append("ALL")

    return "(" + " ".join(parts) + ")" if len(parts) > 1 else parts[0]


def main() -> None:
    parser = argparse.ArgumentParser(description="Search emails via IMAP SEARCH")

    parser.add_argument("--account", default="", help="Account name")
    parser.add_argument("--folder", default="INBOX", help="Folder to search")
    parser.add_argument("--limit", type=int, default=50, help="Max results")
    parser.add_argument("--format", choices=["json", "cli"], default="json")

    # Search criteria flags
    parser.add_argument("--subject", default="", help="Search in subject")
    parser.add_argument("--from", dest="from_addr", default="", help="Search by sender")
    parser.add_argument("--to", default="", help="Search by recipient")
    parser.add_argument("--body", default="", help="Search in body text")
    parser.add_argument("--text", default="", help="Full-text search (subject + body)")
    parser.add_argument("--since", default="", help="Messages since date (YYYY-MM-DD)")
    parser.add_argument("--before", default="", help="Messages before date (YYYY-MM-DD)")
    parser.add_argument("--unseen", action="store_true", help="Unread messages only")
    parser.add_argument("--flagged", action="store_true", help="Flagged messages only")
    parser.add_argument("--criteria", default="",
                        help="Raw IMAP SEARCH criteria (overrides other flags)")

    # IMAP options
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")
    parser.add_argument("--config", default="", help="YAML config file")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    criteria = _build_criteria(args)

    # Build client
    if args.imap_host:
        client = IMAPClient(
            host=args.imap_host,
            port=args.imap_port,
            username=args.imap_user,
            password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
    elif args.config:
        mgr = _load_manager(args.config)
        acct_name = args.account or mgr.default_account
        client = mgr.get_imap_client(acct_name)
    else:
        _error("Either --config or --imap-host is required.")
        return

    try:
        client.connect()
        messages = client.search(
            mailbox=args.folder, criteria=criteria, limit=args.limit,
        )
    finally:
        client.disconnect()

    _output(messages, args.folder, args.format, args.account, criteria)


def _output(
    messages: list[EmailMessage], folder: str, fmt: str,
    account: str, criteria: str,
) -> None:
    if fmt == "cli":
        label = f"{account}:{folder}" if account else folder
        print(f"  Search: {criteria}")
        if not messages:
            print(f"  No results in {label}")
            return
        print(f"  {label} ({len(messages)} results)")
        print(f"  {'':->60}")
        for m in messages:
            sender = str(m.sender) if m.sender else "(unknown)"
            if len(sender) > 25:
                sender = sender[:22] + "..."
            date = m.date.strftime("%Y-%m-%d %H:%M") if m.date else ""
            subj = m.subject[:40] if m.subject else "(no subject)"
            read = " " if m.is_read else "*"
            att = "+" if m.has_attachments else " "
            print(f"  {read}{att} {date:16s}  {sender:25s}  {subj}")
        print()
    else:
        output = {
            "account": account,
            "folder": folder,
            "criteria": criteria,
            "count": len(messages),
            "messages": [m.to_dict() for m in messages],
        }
        json.dump(output, sys.stdout, indent=2, default=str)
        print()


def _load_manager(path: str) -> AccountManager:
    try:
        return AccountManager.from_yaml(path)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        raise


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/draft_mail.py

```python
#!/usr/bin/env python3
"""Save a composed email to the IMAP Drafts folder, or resume a draft.

Usage:
    # Save a new draft
    python3 scripts/compose_mail.py ... | python3 scripts/draft_mail.py --action save --config config.yaml
    python3 scripts/draft_mail.py --action save --to "[email protected]" --subject "WIP" --body "..." --config config.yaml

    # List drafts
    python3 scripts/draft_mail.py --action list --config config.yaml

    # Resume (fetch) a draft by Message-ID
    python3 scripts/draft_mail.py --action resume --message-id "<[email protected]>" --config config.yaml

    # Send a draft (fetch + send + delete from Drafts)
    python3 scripts/draft_mail.py --action send --message-id "<[email protected]>" --config config.yaml

Output: JSON result to stdout.
"""

from __future__ import annotations

import argparse
import json
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.composer import EmailComposer
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient
from lib.models import EmailMessage
from lib.smtp_client import SMTPClient, build_mime


def main() -> None:
    parser = argparse.ArgumentParser(description="Save, list, resume, or send drafts")

    parser.add_argument("--account", default="", help="Account name")
    parser.add_argument("--action", required=True,
                        choices=["save", "list", "resume", "send"],
                        help="Action to perform")
    parser.add_argument("--drafts-folder", default="Drafts",
                        help="IMAP drafts folder name (default: Drafts)")
    parser.add_argument("--message-id", default="",
                        help="Message-ID for resume/send actions")
    parser.add_argument("--limit", type=int, default=20,
                        help="Max drafts to list")
    parser.add_argument("--format", choices=["json", "cli"], default="json")

    # Compose options (for --action save without stdin)
    parser.add_argument("--to", action="append", default=[])
    parser.add_argument("--cc", action="append", default=[])
    parser.add_argument("--subject", default="")
    parser.add_argument("--body", default="")
    parser.add_argument("--sender", default="")
    parser.add_argument("--template", default="minimal",
                        choices=["default", "minimal", "digest"])

    parser.add_argument("--from-stdin", action="store_true",
                        help="Read composed message JSON from stdin")
    parser.add_argument("--config", default="", help="YAML config file")

    # Direct IMAP options
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    if args.action == "save":
        _do_save(args)
    elif args.action == "list":
        _do_list(args)
    elif args.action == "resume":
        _do_resume(args)
    elif args.action == "send":
        _do_send(args)


def _get_client(args) -> IMAPClient:
    if args.imap_host:
        return IMAPClient(
            host=args.imap_host, port=args.imap_port,
            username=args.imap_user,
            password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
    if not args.config:
        _error("Either --config or --imap-host is required.")
    mgr = _load_manager(args.config)
    acct_name = args.account or mgr.default_account
    return mgr.get_imap_client(acct_name)


def _do_save(args) -> None:
    """Save a composed message to the Drafts folder."""
    if args.from_stdin or not sys.stdin.isatty():
        try:
            data = json.load(sys.stdin)
            message = EmailMessage.from_dict(data)
        except Exception as exc:
            _error(f"Failed to parse stdin: {exc}")
            return
    elif args.to and args.subject:
        sender = args.sender
        if not sender:
            try:
                mgr = _load_manager(args.config)
                acct_name = args.account or mgr.default_account
                addr = mgr.get_sender(acct_name)
                if addr:
                    sender = str(addr)
            except Exception:
                pass
        composer = EmailComposer()
        message = composer.compose(
            to=args.to, subject=args.subject, body=args.body,
            template=args.template, sender=sender or None,
            cc=args.cc or None,
        )
    else:
        _error("Provide --to and --subject, or pipe composed JSON via stdin")
        return

    # Build MIME and append to Drafts
    mime_msg = build_mime(message)
    raw_bytes = mime_msg.as_bytes()

    client = _get_client(args)
    try:
        client.connect()
        ok = client.append_message(raw_bytes, mailbox=args.drafts_folder)
    finally:
        client.disconnect()

    result = {
        "success": ok,
        "action": "save",
        "folder": args.drafts_folder,
        "subject": message.subject,
    }
    json.dump(result, sys.stdout, indent=2)
    print()
    if not ok:
        sys.exit(1)


def _do_list(args) -> None:
    """List messages in the Drafts folder."""
    client = _get_client(args)
    try:
        client.connect()
        messages = client.fetch_all(mailbox=args.drafts_folder, limit=args.limit)
    finally:
        client.disconnect()

    if args.format == "cli":
        if not messages:
            print(f"  No drafts in {args.drafts_folder}")
            return
        print(f"  {args.drafts_folder} ({len(messages)} drafts)")
        print(f"  {'':->60}")
        for m in messages:
            to_str = ", ".join(str(r) for r in m.recipients[:2]) if m.recipients else "(no recipient)"
            if len(to_str) > 25:
                to_str = to_str[:22] + "..."
            date = m.date.strftime("%Y-%m-%d %H:%M") if m.date else ""
            subj = m.subject[:40] if m.subject else "(no subject)"
            print(f"    {date:16s}  {to_str:25s}  {subj}")
        print()
    else:
        output = {
            "folder": args.drafts_folder,
            "count": len(messages),
            "drafts": [m.to_dict() for m in messages],
        }
        json.dump(output, sys.stdout, indent=2, default=str)
        print()


def _do_resume(args) -> None:
    """Fetch a draft by Message-ID and output as JSON for editing."""
    if not args.message_id:
        _error("--message-id is required for resume")
        return

    client = _get_client(args)
    try:
        client.connect()
        message = client.fetch_by_id(args.message_id, mailbox=args.drafts_folder)
    finally:
        client.disconnect()

    if message is None:
        _error(f"Draft not found: {args.message_id}")
        return

    json.dump(message.to_dict(), sys.stdout, indent=2, default=str)
    print()


def _do_send(args) -> None:
    """Fetch a draft, send it via SMTP, and delete from Drafts."""
    if not args.message_id:
        _error("--message-id is required for send")
        return

    client = _get_client(args)
    try:
        client.connect()
        message = client.fetch_by_id(args.message_id, mailbox=args.drafts_folder)
    finally:
        client.disconnect()

    if message is None:
        _error(f"Draft not found: {args.message_id}")
        return

    # Send
    mgr = _load_manager(args.config)
    acct_name = args.account or mgr.default_account
    if not message.sender:
        addr = mgr.get_sender(acct_name)
        if addr:
            message.sender = addr

    send_result = mgr.send_with_fallback(message, acct_name)
    if not send_result["success"]:
        result = {
            "success": False,
            "action": "send",
            "error": send_result.get("error", "Send failed"),
        }
        json.dump(result, sys.stdout, indent=2)
        print()
        sys.exit(1)

    # Delete draft after successful send
    client = _get_client(args)
    try:
        client.connect()
        client.delete_message(args.message_id, mailbox=args.drafts_folder)
    finally:
        client.disconnect()

    result = {
        "success": True,
        "action": "send",
        "subject": message.subject,
        "recipients": [str(r) for r in message.recipients],
        "draft_deleted": True,
    }
    json.dump(result, sys.stdout, indent=2)
    print()


def _load_manager(path: str) -> AccountManager:
    try:
        return AccountManager.from_yaml(path)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        raise


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/retry_send.py

```python
#!/usr/bin/env python3
"""Retry sending messages sitting in the IMAP Outbox folder.

The Outbox is a temporary IMAP folder that holds messages while they
are being sent.  If a send fails, the message remains in the Outbox.
This script drains the Outbox by re-attempting delivery for each
message.  After all messages are sent, the Outbox folder is removed.

Usage:
    python3 scripts/retry_send.py --config config.yaml
    python3 scripts/retry_send.py --config config.yaml --account work
    python3 scripts/retry_send.py --config config.yaml --list
"""

from __future__ import annotations

import argparse
import json
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.outbox import Outbox


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Retry sending messages from the IMAP Outbox",
    )
    parser.add_argument("--config", default="", help="YAML config file")
    parser.add_argument(
        "--account", default="",
        help="Process only this account (default: all accounts)",
    )
    parser.add_argument(
        "--list", action="store_true", dest="list_outbox",
        help="List pending messages without sending",
    )
    parser.add_argument("--format", choices=["json", "cli"], default="json")
    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    if not args.config:
        _error("--config is required (or place config.yaml in skill root)")
        return

    try:
        mgr = AccountManager.from_yaml(args.config)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        return

    if args.account:
        account_names = [args.account]
    else:
        account_names = mgr.list_accounts()

    if args.list_outbox:
        _do_list(mgr, account_names, args.format)
    else:
        _do_drain(mgr, account_names, args.format)


def _do_list(mgr: AccountManager, accounts: list[str], fmt: str) -> None:
    """List messages currently in each account's Outbox."""
    all_pending: list[dict] = []

    for acct_name in accounts:
        try:
            imap_client = mgr.get_imap_client(acct_name)
            imap_client.connect()
            try:
                outbox = Outbox(imap_client)
                messages = outbox.list_pending()
                for msg in messages:
                    all_pending.append({
                        "account": acct_name,
                        "message_id": msg.message_id,
                        "subject": msg.subject,
                        "recipients": [str(r) for r in msg.recipients],
                        "date": msg.date.isoformat() if msg.date else "",
                    })
            finally:
                imap_client.disconnect()
        except Exception as exc:
            if fmt == "cli":
                print(f"  [{acct_name}] Error checking Outbox: {exc}")

    if fmt == "cli":
        if not all_pending:
            print("No messages pending in any Outbox.")
        else:
            print(f"  {len(all_pending)} message(s) pending in Outbox:")
            print(f"  {'':->60}")
            for p in all_pending:
                recip = ", ".join(p["recipients"][:2])
                if len(recip) > 25:
                    recip = recip[:22] + "..."
                print(
                    f"    [{p['account']}] {p['subject']!r:30s}  to={recip}"
                )
            print()
    else:
        json.dump(
            {"pending_count": len(all_pending), "messages": all_pending},
            sys.stdout, indent=2,
        )
        print()


def _do_drain(mgr: AccountManager, accounts: list[str], fmt: str) -> None:
    """Drain the Outbox for each account, sending pending messages."""
    total_attempted = 0
    total_sent = 0
    total_failed = 0
    all_errors: list[dict] = []
    account_results: list[dict] = []

    for acct_name in accounts:
        try:
            result = mgr.drain_outbox(acct_name)
            account_results.append(result)
            total_attempted += result.get("attempted", 0)
            total_sent += result.get("sent", 0)
            total_failed += result.get("failed", 0)
            all_errors.extend(result.get("errors", []))
        except Exception as exc:
            all_errors.append({
                "account": acct_name,
                "error": str(exc),
            })

    if fmt == "cli":
        if total_attempted == 0:
            print("No messages pending in any Outbox.")
        else:
            print(
                f"Outbox drain: {total_attempted} attempted, "
                f"{total_sent} sent, {total_failed} failed"
            )
            for err in all_errors:
                acct = err.get("account", "?")
                subj = err.get("subject", "")
                emsg = err.get("error", "")
                print(f"  ERROR [{acct}] {subj!r}: {emsg}")
    else:
        json.dump(
            {
                "attempted": total_attempted,
                "sent": total_sent,
                "failed": total_failed,
                "errors": all_errors,
                "accounts": account_results,
            },
            sys.stdout, indent=2,
        )
        print()

    sys.exit(1 if total_failed > 0 else 0)


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/heartbeat.py

```python
#!/usr/bin/env python3
"""Run a full email heartbeat cycle: fetch -> process -> execute actions.

Supports multiple accounts — iterates over all accounts (or a specific one)
and processes each account's mailboxes with per-account + global rules.

Usage:
    python3 scripts/heartbeat.py --config config.yaml
    python3 scripts/heartbeat.py --config config.yaml --account work
    python3 scripts/heartbeat.py --config config.yaml --state-file /tmp/email_state.json

Output: JSON cycle report to stdout.
"""

from __future__ import annotations

import argparse
import json
import logging
import os
import sys
import time
from datetime import datetime

logger = logging.getLogger("clawMail.heartbeat")

sys.path.insert(0, os.path.dirname(__file__))

from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.models import EmailAddress, EmailMessage
from lib.imap_client import IMAPClient
from lib.smtp_client import SMTPClient
from lib.composer import EmailComposer
from lib.processor import EmailProcessor


def main() -> None:
    parser = argparse.ArgumentParser(description="Run email heartbeat cycle")
    parser.add_argument("--config", default="", help="YAML config file")
    parser.add_argument("--account", default="",
                        help="Process only this account (default: all accounts)")
    parser.add_argument("--state-file", default="",
                        help="Shared state file for multi-agent coordination")
    parser.add_argument("--format", choices=["json", "summary"], default="json")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    if not args.config:
        _error("--config is required (or place config.yaml in skill root)")
        return

    try:
        mgr = AccountManager.from_yaml(args.config)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        return

    start = time.monotonic()
    now = datetime.now()
    errors: list[str] = []
    all_messages: list[EmailMessage] = []
    account_reports: list[dict] = []
    dedup_skipped = 0

    # Load previously-seen Message-IDs for deduplication
    seen_ids: set[str] = set()
    if args.state_file and os.path.exists(args.state_file):
        try:
            with open(args.state_file) as f:
                prev_state = json.load(f)
            seen_ids = set(prev_state.get("email_seen_ids", []))
        except Exception:
            pass

    # Determine which accounts to process
    if args.account:
        account_names = [args.account]
    else:
        account_names = mgr.list_accounts()

    # ── Drain Outbox for each account before fetching ──────────
    outbox_results: list[dict] = []
    for acct_name in account_names:
        try:
            drain = mgr.drain_outbox(acct_name)
            if drain.get("attempted", 0) > 0:
                outbox_results.append(drain)
                if drain.get("failed", 0) > 0:
                    for err in drain.get("errors", []):
                        errors.append(
                            f"[{acct_name}] Outbox send failed: "
                            f"{err.get('subject', '?')!r} — {err.get('error', '')}"
                        )
        except Exception as exc:
            errors.append(f"[{acct_name}] Outbox drain error: {exc}")

    # ── Fetch and process mail ───────────────────────────────────
    for acct_name in account_names:
        acct_messages: list[EmailMessage] = []
        mailboxes = mgr.get_mailboxes(acct_name)
        fetch_limit = mgr.get_fetch_limit(acct_name)

        # Fetch from each mailbox
        for mailbox in mailboxes:
            try:
                messages = _fetch(mgr, acct_name, mailbox, fetch_limit)
                acct_messages.extend(messages)
            except Exception as exc:
                errors.append(f"[{acct_name}] Fetch {mailbox}: {exc}")

        # Deduplicate: skip messages we've already processed
        if args.state_file and seen_ids:
            before = len(acct_messages)
            acct_messages = [
                m for m in acct_messages
                if m.message_id and m.message_id not in seen_ids
            ]
            dedup_skipped += before - len(acct_messages)

        # Process through per-account + global rules
        rules_config = mgr.get_rules(acct_name)
        if rules_config:
            processor = EmailProcessor.from_config(rules_config)
            results = processor.process_batch(acct_messages)
        else:
            results = []

        # Execute actions
        actions_executed = 0
        for result in results:
            try:
                actions_executed += _execute_actions(result, mgr, acct_name)
            except Exception as exc:
                errors.append(
                    f"[{acct_name}] Actions for '{result.message.subject}': {exc}"
                )

        # Record newly processed IDs
        for m in acct_messages:
            if m.message_id:
                seen_ids.add(m.message_id)

        account_reports.append({
            "account": acct_name,
            "mailboxes_checked": mailboxes,
            "messages_fetched": len(acct_messages),
            "rules_matched": sum(len(r.matched_rules) for r in results),
            "actions_executed": actions_executed,
        })
        all_messages.extend(acct_messages)

    duration = time.monotonic() - start

    # Build report
    report = {
        "timestamp": now.isoformat(),
        "accounts_processed": account_names,
        "total_messages_fetched": len(all_messages),
        "dedup_skipped": dedup_skipped,
        "outbox_drained": outbox_results,
        "duration_seconds": round(duration, 3),
        "errors": errors,
        "accounts": account_reports,
        "messages": [
            {
                "account": m.account,
                "message_id": m.message_id,
                "subject": m.subject,
                "sender": str(m.sender) if m.sender else "",
                "date": m.date.isoformat() if m.date else "",
            }
            for m in all_messages
        ],
    }

    # Write shared state
    if args.state_file:
        state = {}
        if os.path.exists(args.state_file):
            try:
                with open(args.state_file) as f:
                    state = json.load(f)
            except Exception:
                pass

        state["email_last_heartbeat"] = now.isoformat()
        state["email_unread_count"] = len(all_messages)
        state["email_accounts"] = {
            ar["account"]: {
                "fetched": ar["messages_fetched"],
                "rules_matched": ar["rules_matched"],
                "actions": ar["actions_executed"],
            }
            for ar in account_reports
        }
        state["email_messages"] = report["messages"]
        state["email_errors"] = errors

        # Persist seen Message-IDs for deduplication (cap at 10,000)
        seen_list = list(seen_ids)
        if len(seen_list) > 10000:
            seen_list = seen_list[-10000:]
        state["email_seen_ids"] = seen_list

        with open(args.state_file, "w") as f:
            json.dump(state, f, indent=2)

    if args.format == "summary":
        summary = {
            "status": "error" if errors else "ok",
            "accounts": len(account_names),
            "fetched": len(all_messages),
            "duration": f"{duration:.1f}s",
        }
        if errors:
            summary["errors"] = errors
        json.dump(summary, sys.stdout, indent=2)
    else:
        json.dump(report, sys.stdout, indent=2, default=str)

    print()
    sys.exit(1 if errors else 0)


def _fetch(
    mgr: AccountManager, acct_name: str, mailbox: str, limit: int,
) -> list[EmailMessage]:
    client = mgr.get_imap_client(acct_name)
    try:
        client.connect()
        return client.fetch_unread(mailbox=mailbox, limit=limit)
    finally:
        client.disconnect()


def _execute_actions(result, mgr: AccountManager, acct_name: str) -> int:
    count = 0
    message = result.message

    if result.should_mark_read:
        client = mgr.get_imap_client(acct_name)
        try:
            client.connect()
            client.mark_read(message.message_id, message.mailbox)
            count += 1
        finally:
            client.disconnect()

    if result.should_flag:
        client = mgr.get_imap_client(acct_name)
        try:
            client.connect()
            client.flag_message(message.message_id, message.mailbox)
            count += 1
        finally:
            client.disconnect()

    if result.move_to:
        client = mgr.get_imap_client(acct_name)
        try:
            client.connect()
            moved = client.move_message(
                message.message_id, message.mailbox, result.move_to,
            )
            if moved:
                count += 1
        finally:
            client.disconnect()

    if result.forward_to:
        composer = EmailComposer()
        sender = mgr.get_sender(acct_name)
        for addr in result.forward_to:
            fwd = composer.compose(
                to=addr,
                subject=f"Fwd: {message.subject}",
                body=message.body or message.body_plain,
                template="minimal",
                sender=str(sender) if sender else "",
            )
            mgr.send_with_fallback(fwd, acct_name)
            count += 1

    if result.reply_body:
        composer = EmailComposer()
        sender = mgr.get_sender(acct_name)
        reply = composer.compose_reply(
            original=message,
            body=result.reply_body,
            sender=str(sender) if sender else "",
        )
        mgr.send_with_fallback(reply, acct_name)
        count += 1

    if result.should_archive:
        archive_root = mgr.get_archive_root(acct_name)
        frequency = mgr.get_archive_frequency(acct_name)
        client = mgr.get_imap_client(acct_name)
        try:
            client.connect()
            if _archive_message(client, message, archive_root, frequency):
                count += 1
        finally:
            client.disconnect()

    return count


def _archive_message(
    client: IMAPClient,
    message: EmailMessage,
    archive_root: str,
    frequency: str,
) -> bool:
    if not message.message_id:
        logger.warning("Skipping archive: missing Message-ID for %s", message.subject)
        return False
    target_folder = _archive_folder_name(message.date, archive_root, frequency)
    existing = {f["name"] for f in client.list_folders()}
    if target_folder not in existing:
        if not client.create_folder(target_folder):
            logger.warning("Failed to create archive folder %s", target_folder)
            return False
        existing.add(target_folder)
    moved = client.move_message(message.message_id, message.mailbox, target_folder)
    if not moved:
        logger.warning(
            "Failed to archive message %s into %s", message.message_id, target_folder
        )
    return moved


def _archive_folder_name(
    date: datetime | None,
    root: str,
    frequency: str,
) -> str:
    base = root.strip() or "Archive"
    freq = frequency if frequency in {"daily", "weekly", "monthly", "yearly"} else "monthly"
    dt = date or datetime.now()
    if freq == "daily":
        suffix = dt.strftime("%Y%m%d")
    elif freq == "weekly":
        iso_year, iso_week, _ = dt.isocalendar()
        suffix = f"{iso_year}W{iso_week:02d}"
    elif freq == "yearly":
        suffix = dt.strftime("%Y")
    else:
        suffix = dt.strftime("%Y%m")
    return f"{base}-{suffix}"


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/archive_mail.py

```python
#!/usr/bin/env python3
"""Archive old emails into dated folders (default monthly).

Scans a mailbox for messages older than N days and moves them to folders like
``Archive-YYYYMM`` (or ``Archive-YYYYMMDD``/``Archive-Wxxxx``) depending on
the configurable frequency, without creating empty folders.

Usage:
    python3 scripts/archive_mail.py --config config.yaml --days 90
    python3 scripts/archive_mail.py --config config.yaml --account work --folder INBOX --days 60
    python3 scripts/archive_mail.py --config config.yaml --days 30 --archive-root "Old Mail" --dry-run
"""

from __future__ import annotations

import argparse
import json
import os
import sys
from datetime import datetime, timedelta

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient


def main() -> None:
    parser = argparse.ArgumentParser(description="Archive old emails to yearly folders")
    parser.add_argument("--config", default="", help="YAML config file")
    parser.add_argument("--account", default="", help="Account profile name")
    parser.add_argument("--folder", default="INBOX", help="Source folder to scan")
    parser.add_argument("--days", type=int, required=True,
                        help="Archive messages older than N days")
    parser.add_argument("--archive-root", default="Archive",
                        help="Archive folder prefix (default: Archive)")
    parser.add_argument("--frequency", choices=["daily", "weekly", "monthly", "yearly"],
                        default="monthly",
                        help="Frequency for archive folders (default: monthly)")
    parser.add_argument("--create-folders", action="store_true", default=True,
                        help="Create archive folders if they don't exist (default)")
    parser.add_argument("--dry-run", action="store_true",
                        help="Show what would be archived without moving")
    parser.add_argument("--limit", type=int, default=500,
                        help="Max messages to scan")
    parser.add_argument("--format", choices=["json", "cli"], default="json")

    # Direct IMAP flags
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")
    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    cutoff = datetime.now() - timedelta(days=args.days)

    # Build client
    client: IMAPClient | None = None
    acct_name = args.account

    if args.imap_host:
        client = IMAPClient(
            host=args.imap_host, port=args.imap_port,
            username=args.imap_user, password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
        acct_name = acct_name or "direct"
    elif args.config:
        try:
            mgr = AccountManager.from_yaml(args.config)
        except Exception as exc:
            _error(f"Failed to load config: {exc}")
            return
        acct_name = acct_name or mgr.default_account
        client = mgr.get_imap_client(acct_name)
    else:
        _error("Provide --config or --imap-host")
        return

    try:
        client.connect()

        # Fetch messages using BEFORE date criteria
        date_str = cutoff.strftime("%d-%b-%Y")
        messages = client.search(
            mailbox=args.folder,
            criteria=f"(BEFORE {date_str})",
            limit=args.limit,
        )

        if not messages:
            result = {
                "account": acct_name,
                "folder": args.folder,
                "cutoff_date": cutoff.isoformat(),
                "scanned": 0,
                "archived": 0,
                "dry_run": args.dry_run,
                "details": [],
            }
            _output(result, args.format)
            return

        frequency = args.frequency
        groups: dict[str, list[EmailMessage]] = {}
        for msg in messages:
            folder_name = _archive_folder_name(
                msg.date, args.archive_root, frequency,
            )
            groups.setdefault(folder_name, []).append(msg)

        created_folders: set[str] = set()
        archived = 0
        move_details: list[dict] = []
        existing_folders = {f["name"] for f in client.list_folders()}

        for folder_name, msgs in sorted(groups.items()):
            if not msgs:
                continue

            if args.dry_run:
                for msg in msgs:
                    move_details.append({
                        "message_id": msg.message_id,
                        "subject": msg.subject,
                        "date": msg.date.isoformat() if msg.date else "",
                        "target": folder_name,
                        "status": "dry_run",
                    })
                    archived += 1
                continue

            if args.create_folders and folder_name not in existing_folders:
                ok = client.create_folder(folder_name)
                if ok:
                    existing_folders.add(folder_name)
                    created_folders.add(folder_name)
                else:
                    for msg in msgs:
                        move_details.append({
                            "message_id": msg.message_id,
                            "subject": msg.subject,
                            "date": msg.date.isoformat() if msg.date else "",
                            "target": folder_name,
                            "status": "error",
                            "error": f"Failed to create folder {folder_name}",
                        })
                    continue

            msg_ids = [m.message_id for m in msgs if m.message_id]
            if not msg_ids:
                for msg in msgs:
                    move_details.append({
                        "message_id": msg.message_id,
                        "subject": msg.subject,
                        "date": msg.date.isoformat() if msg.date else "",
                        "target": folder_name,
                        "status": "failed",
                        "error": "missing Message-ID",
                    })
                continue

            results = client.move_messages_batch(msg_ids, args.folder, folder_name)
            for msg in msgs:
                mid = msg.message_id
                ok = results.get(mid, False)
                move_details.append({
                    "message_id": mid,
                    "subject": msg.subject,
                    "date": msg.date.isoformat() if msg.date else "",
                    "target": folder_name,
                    "status": "moved" if ok else "failed",
                })
                if ok:
                    archived += 1

        result = {
            "account": acct_name,
            "folder": args.folder,
            "cutoff_date": cutoff.isoformat(),
            "cutoff_days": args.days,
            "frequency": frequency,
            "scanned": len(messages),
            "archived": archived,
            "dry_run": args.dry_run,
            "created_folders": sorted(created_folders),
            "details": move_details,
        }

        _output(result, args.format)

    finally:
        client.disconnect()


def _output(result: dict, fmt: str) -> None:
    if fmt == "cli":
        dr = " (DRY RUN)" if result.get("dry_run") else ""
        freq = result.get("frequency", "monthly")
        print(f"Archive[{freq}]{dr}: {result['archived']}/{result['scanned']} "
              f"messages from {result['folder']}")
        print(f"  Cutoff: {result.get('cutoff_days', '?')} days "
              f"({result['cutoff_date'][:10]})")
        created = result.get("created_folders", [])
        if created:
            print(f"  Created: {', '.join(created)}")
    else:
        json.dump(result, sys.stdout, indent=2)
        print()


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


def _archive_folder_name(
    date: datetime | None,
    root: str,
    frequency: str,
) -> str:
    """Return the folder name for an archive period."""
    base = root.strip() or "Archive"
    dt = date or datetime.now()
    if frequency == "daily":
        suffix = dt.strftime("%Y%m%d")
    elif frequency == "weekly":
        iso_year, iso_week, _ = dt.isocalendar()
        suffix = f"{iso_year}W{iso_week:02d}"
    elif frequency == "yearly":
        suffix = dt.strftime("%Y")
    else:
        suffix = dt.strftime("%Y%m")
    return f"{base}-{suffix}"


if __name__ == "__main__":
    main()

```

### scripts/calendar_invite.py

```python
#!/usr/bin/env python3
"""Compose and send calendar invitations (iCalendar VEVENT).

Creates a text/calendar MIME part and sends it as an email. Recipients'
mail clients will display it as a meeting invitation.

Usage:
    python3 scripts/calendar_invite.py \\
        --to "[email protected]" \\
        --subject "Team Standup" \\
        --start "2026-03-01T09:00:00" \\
        --end "2026-03-01T09:30:00" \\
        --location "Zoom" \\
        --description "Daily standup meeting" \\
        --config config.yaml

    # Recurring weekly meeting
    python3 scripts/calendar_invite.py \\
        --to "[email protected]" \\
        --subject "Weekly Sync" \\
        --start "2026-03-02T14:00:00" \\
        --end "2026-03-02T15:00:00" \\
        --rrule "FREQ=WEEKLY;COUNT=10" \\
        --config config.yaml
"""

from __future__ import annotations

import argparse
import json
import os
import sys
import uuid
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.models import EmailAddress, EmailMessage


def build_vcalendar(
    organizer: str,
    attendees: list[str],
    subject: str,
    start: datetime,
    end: datetime,
    location: str = "",
    description: str = "",
    rrule: str = "",
    uid: str = "",
    method: str = "REQUEST",
) -> str:
    """Build an iCalendar VCALENDAR string for a VEVENT."""
    uid = uid or f"{uuid.uuid4()}@clawMail"
    now = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
    dtstart = start.strftime("%Y%m%dT%H%M%S")
    dtend = end.strftime("%Y%m%dT%H%M%S")

    lines = [
        "BEGIN:VCALENDAR",
        "VERSION:2.0",
        "PRODID:-//clawMail//calendar//EN",
        f"METHOD:{method}",
        "BEGIN:VEVENT",
        f"UID:{uid}",
        f"DTSTAMP:{now}",
        f"DTSTART:{dtstart}",
        f"DTEND:{dtend}",
        f"SUMMARY:{subject}",
        f"ORGANIZER;CN={organizer}:mailto:{organizer}",
    ]

    for attendee in attendees:
        lines.append(
            f"ATTENDEE;ROLE=REQ-PARTICIPANT;PARTSTAT=NEEDS-ACTION;"
            f"RSVP=TRUE:mailto:{attendee}"
        )

    if location:
        lines.append(f"LOCATION:{location}")
    if description:
        # Fold long description lines per RFC 5545
        desc_escaped = description.replace("\\", "\\\\").replace("\n", "\\n").replace(",", "\\,")
        lines.append(f"DESCRIPTION:{desc_escaped}")
    if rrule:
        lines.append(f"RRULE:{rrule}")

    lines.extend([
        "STATUS:CONFIRMED",
        "SEQUENCE:0",
        "END:VEVENT",
        "END:VCALENDAR",
    ])

    return "\r\n".join(lines)


def main() -> None:
    parser = argparse.ArgumentParser(description="Send calendar invitation")
    parser.add_argument("--config", default="", help="YAML config file")
    parser.add_argument("--account", default="", help="Account profile name")
    parser.add_argument("--to", action="append", default=[], help="Attendee address")
    parser.add_argument("--cc", action="append", default=[], help="CC address")
    parser.add_argument("--subject", required=True, help="Event subject/title")
    parser.add_argument("--start", required=True,
                        help="Start datetime (ISO 8601: YYYY-MM-DDTHH:MM:SS)")
    parser.add_argument("--end", required=True,
                        help="End datetime (ISO 8601: YYYY-MM-DDTHH:MM:SS)")
    parser.add_argument("--location", default="", help="Event location")
    parser.add_argument("--description", default="", help="Event description")
    parser.add_argument("--rrule", default="",
                        help="Recurrence rule (e.g. FREQ=WEEKLY;COUNT=10)")
    parser.add_argument("--uid", default="", help="Unique event ID (auto-generated)")
    parser.add_argument("--method", default="REQUEST",
                        choices=["REQUEST", "CANCEL", "REPLY"],
                        help="Calendar method")
    parser.add_argument("--body", default="", help="Additional email body text")
    parser.add_argument("--sender", default="", help="Sender address override")
    parser.add_argument("--format", choices=["json", "cli"], default="json")

    # Direct SMTP flags
    parser.add_argument("--smtp-host", default="")
    parser.add_argument("--smtp-port", type=int, default=587)
    parser.add_argument("--smtp-user", default="")
    parser.add_argument("--smtp-pass", default="")
    parser.add_argument("--smtp-no-tls", action="store_true")
    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    if not args.to:
        _error("At least one --to recipient is required")
        return

    # Parse datetimes
    try:
        start_dt = datetime.fromisoformat(args.start)
        end_dt = datetime.fromisoformat(args.end)
    except ValueError as exc:
        _error(f"Invalid datetime: {exc}")
        return

    # Determine sender
    sender_addr = args.sender
    mgr = None
    acct_name = args.account

    if args.config:
        try:
            mgr = AccountManager.from_yaml(args.config)
        except Exception as exc:
            _error(f"Failed to load config: {exc}")
            return
        acct_name = acct_name or mgr.default_account
        if not sender_addr:
            s = mgr.get_sender(acct_name)
            if s:
                sender_addr = s.address

    if not sender_addr:
        sender_addr = args.smtp_user or "[email protected]"

    # Build iCalendar
    vcal = build_vcalendar(
        organizer=sender_addr,
        attendees=args.to,
        subject=args.subject,
        start=start_dt,
        end=end_dt,
        location=args.location,
        description=args.description,
        rrule=args.rrule,
        uid=args.uid,
        method=args.method,
    )

    # Build MIME message with text/calendar part
    mime_msg = MIMEMultipart("mixed")
    mime_msg["From"] = sender_addr
    mime_msg["To"] = ", ".join(args.to)
    if args.cc:
        mime_msg["Cc"] = ", ".join(args.cc)
    mime_msg["Subject"] = args.subject

    # Alternative part: plain text + calendar
    alt = MIMEMultipart("alternative")

    body_text = args.body or f"You are invited to: {args.subject}"
    alt.attach(MIMEText(body_text, "plain", "utf-8"))

    cal_part = MIMEText(vcal, "calendar", "utf-8")
    cal_part.set_param("method", args.method)
    alt.attach(cal_part)

    mime_msg.attach(alt)

    # Send or output
    all_recipients = args.to + args.cc

    if args.smtp_host:
        from lib.smtp_client import SMTPClient
        client = SMTPClient(
            host=args.smtp_host, port=args.smtp_port,
            username=args.smtp_user, password=credential_store.resolve(args.smtp_pass),
            use_tls=not args.smtp_no_tls,
        )
        # Build an EmailMessage for sending
        em = EmailMessage(
            subject=args.subject,
            sender=EmailAddress.parse(sender_addr),
            recipients=[EmailAddress.parse(a) for a in args.to],
            cc=[EmailAddress.parse(a) for a in args.cc],
            body_html=body_text,
            body_plain=body_text,
        )
        ok = client.send(em)
        result = {
            "success": ok,
            "method": args.method,
            "subject": args.subject,
            "attendees": args.to,
        }
    elif mgr:
        # Use the MIME directly via smtplib
        em = EmailMessage(
            subject=args.subject,
            sender=EmailAddress.parse(sender_addr),
            recipients=[EmailAddress.parse(a) for a in args.to],
            cc=[EmailAddress.parse(a) for a in args.cc],
            body_html=body_text,
            body_plain=body_text,
        )
        send_result = mgr.send_with_fallback(em, acct_name)
        result = {
            "success": send_result["success"],
            "method": args.method,
            "subject": args.subject,
            "attendees": args.to,
            "fallback_used": send_result.get("fallback_used", False),
        }
    else:
        # No send method — just output the iCalendar
        result = {
            "subject": args.subject,
            "method": args.method,
            "attendees": args.to,
            "vcalendar": vcal,
        }

    if args.format == "cli":
        if result.get("success") is True:
            print(f"Calendar invitation sent: {args.subject}")
            print(f"  Method: {args.method}")
            print(f"  Start: {args.start}")
            print(f"  End: {args.end}")
            print(f"  Attendees: {', '.join(args.to)}")
        elif "vcalendar" in result:
            print(vcal)
        else:
            print(f"Failed to send invitation: {result}")
    else:
        json.dump(result, sys.stdout, indent=2)
        print()


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/mail_merge.py

```python
#!/usr/bin/env python3
"""Mail merge — batch personalised sends from a template + data source.

Reads a CSV or JSON data file where each row represents one recipient.
For each row, the template subject/body placeholders are filled in and
the message is sent via SMTP.

Placeholder syntax: ``{{field_name}}`` in subject and body.

Usage:
    # CSV data source
    python3 scripts/mail_merge.py \\
        --data contacts.csv \\
        --subject "Hello {{name}}" \\
        --body "<p>Dear {{name}}, your code is {{code}}.</p>" \\
        --to-field email \\
        --config config.yaml

    # JSON data source
    python3 scripts/mail_merge.py \\
        --data contacts.json \\
        --subject "Invoice #{{invoice_id}}" \\
        --body "<p>Amount: {{amount}}</p>" \\
        --to-field email \\
        --config config.yaml

    # Dry run (compose without sending)
    python3 scripts/mail_merge.py \\
        --data contacts.csv \\
        --subject "Hi {{name}}" \\
        --body "Test" \\
        --to-field email \\
        --dry-run
"""

from __future__ import annotations

import argparse
import csv
import json
import os
import re
import sys
import time

sys.path.insert(0, os.path.dirname(__file__))

from lib.account_manager import AccountManager
from lib.composer import EmailComposer
from lib.defaults import resolve_config_path
from lib.models import EmailAddress


def _load_data(path: str) -> list[dict[str, str]]:
    """Load rows from a CSV or JSON file."""
    ext = os.path.splitext(path)[1].lower()
    if ext == ".json":
        with open(path) as f:
            data = json.load(f)
        if isinstance(data, list):
            return data
        raise ValueError("JSON data must be an array of objects")
    else:
        # Default: CSV
        with open(path, newline="", encoding="utf-8-sig") as f:
            reader = csv.DictReader(f)
            return list(reader)


def _fill_template(template: str, row: dict[str, str]) -> str:
    """Replace ``{{key}}`` placeholders with values from *row*."""
    def replacer(match):
        key = match.group(1).strip()
        return row.get(key, match.group(0))
    return re.sub(r"\{\{(\w+)\}\}", replacer, template)


def main() -> None:
    parser = argparse.ArgumentParser(description="Mail merge — batch personalised sends")
    parser.add_argument("--data", required=True,
                        help="CSV or JSON file with recipient data")
    parser.add_argument("--subject", required=True,
                        help="Subject template (use {{field}} for placeholders)")
    parser.add_argument("--body", required=True,
                        help="Body template (HTML, use {{field}} for placeholders)")
    parser.add_argument("--to-field", required=True,
                        help="Column/key name containing recipient email address")
    parser.add_argument("--name-field", default="",
                        help="Column/key for recipient display name (optional)")
    parser.add_argument("--template", default="default",
                        choices=["default", "minimal", "digest"],
                        help="Email template")
    parser.add_argument("--sender", default="", help="Sender address override")
    parser.add_argument("--greeting", default="", help="Greeting template")
    parser.add_argument("--sign-off", default="", help="Sign-off template")
    parser.add_argument("--delay", type=float, default=0.5,
                        help="Delay in seconds between sends (default: 0.5)")
    parser.add_argument("--dry-run", action="store_true",
                        help="Compose messages without sending")
    parser.add_argument("--config", default="", help="YAML config file")
    parser.add_argument("--account", default="", help="Account profile name")
    parser.add_argument("--attach", action="append", default=[],
                        help="File to attach (same for all recipients)")
    parser.add_argument("--format", choices=["json", "cli"], default="json")
    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    # Load data
    try:
        rows = _load_data(args.data)
    except Exception as exc:
        _error(f"Failed to load data: {exc}")
        return

    if not rows:
        _error("Data file is empty")
        return

    # Validate to-field exists
    if args.to_field not in rows[0]:
        _error(f"Field '{args.to_field}' not found in data. "
               f"Available: {', '.join(rows[0].keys())}")
        return

    # Setup
    mgr = None
    acct_name = args.account
    if args.config:
        try:
            mgr = AccountManager.from_yaml(args.config)
        except Exception as exc:
            _error(f"Failed to load config: {exc}")
            return
        acct_name = acct_name or mgr.default_account

    sender = args.sender
    if not sender and mgr:
        s = mgr.get_sender(acct_name)
        if s:
            sender = str(s)

    composer = EmailComposer()

    # Load attachments once
    from lib.models import EmailAttachment
    import mimetypes
    attachments = []
    for fpath in args.attach:
        if not os.path.isfile(fpath):
            _error(f"Attachment not found: {fpath}")
            return
        ct = mimetypes.guess_type(fpath)[0] or "application/octet-stream"
        with open(fpath, "rb") as f:
            data = f.read()
        attachments.append(EmailAttachment(
            filename=os.path.basename(fpath), content_type=ct, data=data,
        ))

    results = []
    sent = 0
    failed = 0

    for i, row in enumerate(rows):
        to_addr = row.get(args.to_field, "").strip()
        if not to_addr:
            results.append({"row": i, "error": f"Empty {args.to_field}"})
            failed += 1
            continue

        subject = _fill_template(args.subject, row)
        body = _fill_template(args.body, row)
        greeting = _fill_template(args.greeting, row) if args.greeting else ""
        sign_off = _fill_template(args.sign_off, row) if args.sign_off else ""

        message = composer.compose(
            to=to_addr,
            subject=subject,
            body=body,
            template=args.template,
            sender=sender,
            greeting=greeting,
            sign_off=sign_off,
        )
        message.attachments = list(attachments)

        row_result: dict = {
            "row": i,
            "to": to_addr,
            "subject": subject,
        }

        if args.dry_run:
            row_result["status"] = "dry_run"
            results.append(row_result)
            sent += 1
        elif mgr:
            send_result = mgr.send_with_fallback(message, acct_name)
            if send_result["success"]:
                row_result["status"] = "sent"
                row_result["fallback_used"] = send_result.get("fallback_used", False)
                sent += 1
            else:
                row_result["status"] = "failed"
                row_result["error"] = send_result.get("error", "unknown")
                failed += 1
            results.append(row_result)
            if args.delay and i < len(rows) - 1:
                time.sleep(args.delay)
        else:
            row_result["status"] = "no_config"
            row_result["message"] = message.to_dict()
            results.append(row_result)
            sent += 1

    report = {
        "total": len(rows),
        "sent": sent,
        "failed": failed,
        "dry_run": args.dry_run,
        "results": results,
    }

    if args.format == "cli":
        print(f"Mail merge: {sent}/{len(rows)} sent, {failed} failed"
              + (" (dry run)" if args.dry_run else ""))
        for r in results:
            status = r.get("status", "?")
            to = r.get("to", "?")
            subj = r.get("subject", "?")
            err = r.get("error", "")
            print(f"  [{status}] {to}: {subj}" + (f" ({err})" if err else ""))
    else:
        json.dump(report, sys.stdout, indent=2)
        print()


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/read_mail.py

```python
#!/usr/bin/env python3
"""Read and render a specific email by Message-ID.

Usage:
    python3 scripts/read_mail.py --message-id "<[email protected]>" --config config.yaml
    python3 scripts/read_mail.py --message-id "<[email protected]>" --account work --format cli
    echo '{"message_id":"<id>", ...}' | python3 scripts/read_mail.py --from-stdin --format cli

Output: JSON message object (default) or CLI-rendered email.
"""

from __future__ import annotations

import argparse
import json
import os
import re
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient
from lib.models import EmailMessage


def main() -> None:
    parser = argparse.ArgumentParser(description="Read/render an email by Message-ID")
    parser.add_argument("--account", default="", help="Account name (default: default account)")
    parser.add_argument("--message-id", default="", help="Message-ID header value")
    parser.add_argument("--folder", default="INBOX", help="Folder to search in")
    parser.add_argument("--format", choices=["json", "cli"], default="json",
                        help="Output format (default: json)")

    # Direct IMAP options
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")

    # Attachments
    parser.add_argument("--save-attachments", default="",
                        help="Directory to save attachments to")

    # Config / stdin
    parser.add_argument("--config", default="", help="YAML config file")
    parser.add_argument("--from-stdin", action="store_true",
                        help="Read message JSON from stdin instead of IMAP")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    if args.from_stdin:
        data = json.load(sys.stdin)
        if isinstance(data, dict) and "messages" in data:
            if not data["messages"]:
                _error("No messages in input")
                return
            message = EmailMessage.from_dict(data["messages"][0])
        elif isinstance(data, dict):
            message = EmailMessage.from_dict(data)
        else:
            _error("Expected JSON message object")
            return
    elif args.imap_host:
        if not args.message_id:
            _error("--message-id is required (or use --from-stdin)")
            return
        client = IMAPClient(
            host=args.imap_host,
            port=args.imap_port,
            username=args.imap_user,
            password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
        try:
            client.connect()
            message = client.fetch_by_id(args.message_id, mailbox=args.folder)
        finally:
            client.disconnect()
        if message is None:
            _error(f"Message not found: {args.message_id}")
            return
    else:
        if not args.message_id:
            _error("--message-id is required (or use --from-stdin)")
            return
        if not args.config:
            _error("--config or --imap-host is required")
            return

        mgr = _load_manager(args.config)
        acct_name = args.account or mgr.default_account
        client = mgr.get_imap_client(acct_name)
        try:
            client.connect()
            message = client.fetch_by_id(args.message_id, mailbox=args.folder)
        finally:
            client.disconnect()
        if message is None:
            _error(f"Message not found: {args.message_id}")
            return

    # Save attachments if requested
    if args.save_attachments:
        _save_attachments(message, args.save_attachments)

    _output(message, args.format)


def _save_attachments(message: EmailMessage, dest_dir: str) -> None:
    """Save all attachments from a message to the given directory."""
    if not message.attachments:
        return
    os.makedirs(dest_dir, exist_ok=True)
    for att in message.attachments:
        safe_name = os.path.basename(att.filename) or "attachment"
        dest_path = os.path.join(dest_dir, safe_name)
        # Avoid overwriting: add suffix if file exists
        base, ext = os.path.splitext(dest_path)
        counter = 1
        while os.path.exists(dest_path):
            dest_path = f"{base}_{counter}{ext}"
            counter += 1
        with open(dest_path, "wb") as f:
            f.write(att.data)
        print(f"  Saved: {dest_path} ({att.size} bytes)", file=sys.stderr)


def _output(message: EmailMessage, fmt: str) -> None:
    if fmt == "cli":
        _render_cli(message)
    else:
        json.dump(message.to_dict(), sys.stdout, indent=2, default=str)
        print()


def _render_cli(msg: EmailMessage) -> None:
    """Render an email for terminal display."""
    w = 72
    print("=" * w)
    if msg.account:
        print(f"  Account: {msg.account}")
    print(f"  From:    {msg.sender or '(unknown)'}")
    if msg.recipients:
        print(f"  To:      {', '.join(str(r) for r in msg.recipients)}")
    if msg.cc:
        print(f"  Cc:      {', '.join(str(r) for r in msg.cc)}")
    print(f"  Subject: {msg.subject}")
    print(f"  Date:    {msg.date.strftime('%Y-%m-%d %H:%M %Z') if msg.date else '(unknown)'}")
    if msg.message_id:
        print(f"  ID:      {msg.message_id}")
    if msg.has_attachments:
        att_names = ", ".join(a.filename for a in msg.attachments)
        print(f"  Attach:  {att_names}")
    print("-" * w)

    # Prefer plain text for CLI; fall back to stripped HTML
    body = msg.body_plain
    if not body and msg.body_html:
        body = re.sub(r"<head[\s>].*?</head>", "", msg.body_html,
                       flags=re.DOTALL | re.IGNORECASE)
        body = re.sub(r"<style[\s>].*?</style>", "", body,
                       flags=re.DOTALL | re.IGNORECASE)
        body = re.sub(r"<br\s*/?>", "\n", body, flags=re.IGNORECASE)
        body = re.sub(r"</p>", "\n\n", body, flags=re.IGNORECASE)
        body = re.sub(r"<[^>]+>", "", body)
        body = re.sub(r"\n{3,}", "\n\n", body)

    if body:
        for line in body.strip().splitlines():
            print(f"  {line}")
    else:
        print("  (empty body)")

    print("=" * w)
    print()


def _load_manager(path: str) -> AccountManager:
    try:
        return AccountManager.from_yaml(path)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        raise


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/compose_mail.py

```python
#!/usr/bin/env python3
"""Compose rich HTML emails and output as JSON (for piping to send_mail.py).

Usage:
    python3 scripts/compose_mail.py [options]
    python3 scripts/compose_mail.py ... | python3 scripts/send_mail.py --from-stdin

Output: JSON message object to stdout.
"""

from __future__ import annotations

import argparse
import json
import mimetypes
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib.composer import EmailComposer
from lib.models import EmailAttachment


def main() -> None:
    parser = argparse.ArgumentParser(description="Compose rich HTML emails")

    parser.add_argument("--to", action="append", default=[], help="Recipient (repeatable)")
    parser.add_argument("--cc", action="append", default=[], help="CC recipient")
    parser.add_argument("--bcc", action="append", default=[], help="BCC recipient")
    parser.add_argument("--subject", required=True, help="Email subject")
    parser.add_argument("--body", default="", help="Email body (HTML or plain)")
    parser.add_argument("--sender", default="", help="Sender address")
    parser.add_argument("--template", default="default",
                        choices=["default", "minimal", "digest"],
                        help="Email template")

    # Template variables
    parser.add_argument("--greeting", default="", help="Greeting line")
    parser.add_argument("--sign-off", default="", help="Sign-off line")
    parser.add_argument("--header-text", default="", help="Header text")
    parser.add_argument("--header-color", default="#2d3748", help="Header bg color")
    parser.add_argument("--footer-text", default="", help="Footer text")
    parser.add_argument("--action-url", default="", help="CTA button URL")
    parser.add_argument("--action-text", default="View Details", help="CTA button text")
    parser.add_argument("--action-color", default="#4299e1", help="CTA button color")

    # Digest-specific
    parser.add_argument("--items", default="", help="JSON array of row dicts for digest")
    parser.add_argument("--columns", default="", help="JSON array of column names")
    parser.add_argument("--summary", default="", help="Summary text for digest")

    # Reply
    parser.add_argument("--reply-to", default="", help="In-Reply-To message ID")

    # Attachments
    parser.add_argument("--attach", action="append", default=[],
                        help="File path to attach (repeatable)")

    args = parser.parse_args()

    if not args.to:
        json.dump({"error": "At least one --to recipient is required"}, sys.stderr)
        print(file=sys.stderr)
        sys.exit(1)

    items = None
    columns = None
    if args.items:
        try:
            items = json.loads(args.items)
        except json.JSONDecodeError as exc:
            json.dump({"error": f"Invalid --items JSON: {exc}"}, sys.stderr)
            print(file=sys.stderr)
            sys.exit(1)
    if args.columns:
        try:
            columns = json.loads(args.columns)
        except json.JSONDecodeError as exc:
            json.dump({"error": f"Invalid --columns JSON: {exc}"}, sys.stderr)
            print(file=sys.stderr)
            sys.exit(1)

    composer = EmailComposer()
    message = composer.compose(
        to=args.to,
        subject=args.subject,
        body=args.body,
        template=args.template,
        sender=args.sender or None,
        cc=args.cc or None,
        bcc=args.bcc or None,
        reply_to=args.reply_to,
        greeting=args.greeting,
        sign_off=args.sign_off,
        header_text=args.header_text,
        header_color=args.header_color,
        footer_text=args.footer_text,
        action_url=args.action_url,
        action_text=args.action_text,
        action_color=args.action_color,
        items=items,
        columns=columns,
        summary=args.summary,
    )

    # Attach files from disk
    for filepath in args.attach:
        if not os.path.isfile(filepath):
            json.dump({"error": f"Attachment not found: {filepath}"}, sys.stderr)
            print(file=sys.stderr)
            sys.exit(1)
        content_type, _ = mimetypes.guess_type(filepath)
        if not content_type:
            content_type = "application/octet-stream"
        with open(filepath, "rb") as fh:
            data = fh.read()
        message.attachments.append(EmailAttachment(
            filename=os.path.basename(filepath),
            content_type=content_type,
            data=data,
        ))

    json.dump(message.to_dict(), sys.stdout, indent=2, default=str)
    print()


if __name__ == "__main__":
    main()

```

### scripts/process_mail.py

```python
#!/usr/bin/env python3
"""Process emails through the rule-based pipeline.

Usage:
    python3 scripts/fetch_mail.py --config c.yaml | python3 scripts/process_mail.py --rules-file rules.yaml
    python3 scripts/process_mail.py --input messages.json --rules '[...]'
    python3 scripts/process_mail.py --input messages.json --config config.yaml --account work --format cli

Output: JSON (default) or CLI summary to stdout.
"""

from __future__ import annotations

import argparse
import json
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.models import EmailMessage
from lib.processor import EmailProcessor


def main() -> None:
    parser = argparse.ArgumentParser(description="Process emails through rule pipeline")

    parser.add_argument("--account", default="",
                        help="Account name (loads per-account + global rules from config)")
    parser.add_argument("--input", default="",
                        help="JSON file with messages (or reads stdin)")
    parser.add_argument("--rules", default="",
                        help="JSON string of rules array")
    parser.add_argument("--rules-file", default="",
                        help="YAML/JSON file with rules")
    parser.add_argument("--config", default="",
                        help="YAML config file (reads rules from 'rules' key or per-account)")
    parser.add_argument("--format", choices=["json", "cli"], default="json",
                        help="Output format (default: json)")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    # Load messages
    if args.input:
        with open(args.input) as f:
            data = json.load(f)
    else:
        data = json.load(sys.stdin)

    if isinstance(data, dict) and "messages" in data:
        msg_dicts = data["messages"]
    elif isinstance(data, list):
        msg_dicts = data
    else:
        _error("Expected JSON array or object with 'messages' key")
        return

    messages = [EmailMessage.from_dict(d) for d in msg_dicts]

    # Load rules
    rules_config: list[dict] = []
    if args.rules:
        try:
            rules_config = json.loads(args.rules)
        except json.JSONDecodeError as exc:
            _error(f"Invalid --rules JSON: {exc}")
    elif args.rules_file:
        try:
            with open(args.rules_file) as f:
                if args.rules_file.endswith((".yaml", ".yml")):
                    import yaml
                    loaded = yaml.safe_load(f) or {}
                else:
                    loaded = json.load(f)
                if isinstance(loaded, list):
                    rules_config = loaded
                elif isinstance(loaded, dict):
                    rules_config = loaded.get("rules", loaded.get("processing_rules", []))
        except Exception as exc:
            _error(f"Failed to load rules file: {exc}")
    elif args.config:
        try:
            mgr = AccountManager.from_yaml(args.config)
            if args.account:
                rules_config = mgr.get_rules(args.account)
            else:
                rules_config = mgr.get_rules()
        except Exception as exc:
            _error(f"Failed to load config: {exc}")

    if not rules_config:
        _error("No processing rules provided. Use --rules, --rules-file, or --config.")

    # Process
    processor = EmailProcessor.from_config(rules_config)
    results = processor.process_batch(messages)

    if args.format == "cli":
        matched = sum(len(r.matched_rules) for r in results)
        print(f"  Processed {len(results)} messages, {matched} rules matched")
        print()
        for r in results:
            subj = r.message.subject[:50] or "(no subject)"
            acct = f"[{r.message.account}] " if r.message.account else ""
            if r.matched_rules:
                rules_str = ", ".join(r.matched_rules)
                actions_str = ", ".join(r.actions_taken) if r.actions_taken else ""
                print(f"  {acct}{subj}")
                print(f"    Rules:   {rules_str}")
                if actions_str:
                    print(f"    Actions: {actions_str}")
                if r.tags:
                    print(f"    Tags:    {', '.join(r.tags)}")
                if r.move_to:
                    print(f"    Move to: {r.move_to}")
            else:
                print(f"  {acct}{subj}  (no match)")
        print()
    else:
        output = {
            "messages_processed": len(results),
            "rules_matched": sum(len(r.matched_rules) for r in results),
            "results": [r.to_dict() for r in results],
        }
        json.dump(output, sys.stdout, indent=2)
        print()


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/manage_folders.py

```python
#!/usr/bin/env python3
"""List, create, delete, rename, and move IMAP folders.

Usage:
    python3 scripts/manage_folders.py --action list --config config.yaml
    python3 scripts/manage_folders.py --action list --account work --format cli --config config.yaml
    python3 scripts/manage_folders.py --action create --folder Projects --config config.yaml
    python3 scripts/manage_folders.py --action move --folder Projects --new-parent Archive --config config.yaml

Output: JSON (default) or CLI table to stdout.
"""

from __future__ import annotations

import argparse
import json
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient


def main() -> None:
    parser = argparse.ArgumentParser(description="Manage IMAP folders")
    parser.add_argument("--account", default="", help="Account name (default: default account)")
    parser.add_argument("--action", required=True,
                        choices=["list", "create", "delete", "rename", "move"],
                        help="Action to perform")
    parser.add_argument("--folder", default="", help="Folder name (for create/delete/rename/move)")
    parser.add_argument("--new-name", default="", help="New name (for rename)")
    parser.add_argument("--new-parent", default="", help="New parent folder (for move)")
    parser.add_argument("--format", choices=["json", "cli"], default="json",
                        help="Output format (default: json)")

    # Direct IMAP options
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")
    parser.add_argument("--config", default="", help="YAML config file")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    # Build client
    if args.imap_host:
        client = IMAPClient(
            host=args.imap_host,
            port=args.imap_port,
            username=args.imap_user,
            password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
    elif args.config:
        mgr = _load_manager(args.config)
        acct_name = args.account or mgr.default_account
        client = mgr.get_imap_client(acct_name)
    else:
        _error("IMAP host required. Use --imap-host or --config.")
        return

    try:
        client.connect()

        if args.action == "list":
            _list_folders(client, args.format, args.account)
        elif args.action == "create":
            if not args.folder:
                _error("--folder is required for create")
            _create_folder(client, args.folder, args.format)
        elif args.action == "delete":
            if not args.folder:
                _error("--folder is required for delete")
            _delete_folder(client, args.folder, args.format)
        elif args.action == "rename":
            if not args.folder or not args.new_name:
                _error("--folder and --new-name are required for rename")
            _rename_folder(client, args.folder, args.new_name, args.format)
        elif args.action == "move":
            if not args.folder or not args.new_parent:
                _error("--folder and --new-parent are required for move")
            _move_folder(client, args.folder, args.new_parent, args.format)
    finally:
        client.disconnect()


def _list_folders(client: IMAPClient, fmt: str, account: str) -> None:
    folders = client.list_folders()
    enriched = []
    for f in folders:
        try:
            status = client.folder_status(f["name"])
            f.update(status)
        except Exception:
            f.update({"messages": 0, "unseen": 0, "recent": 0})
        enriched.append(f)

    if fmt == "cli":
        if not enriched:
            print("  No folders found")
            return
        label = f"  [{account}] " if account else "  "
        print(f"{label}{'Folder':<30s}  {'Total':>6s}  {'Unread':>6s}  {'Recent':>6s}")
        print(f"  {'':-<30s}  {'':->6s}  {'':->6s}  {'':->6s}")
        for f in enriched:
            print(f"  {f['name']:<30s}  {f.get('messages', 0):>6d}"
                  f"  {f.get('unseen', 0):>6d}  {f.get('recent', 0):>6d}")
        print()
    else:
        output = {"account": account, "folders": enriched}
        json.dump(output, sys.stdout, indent=2)
        print()


def _create_folder(client: IMAPClient, folder: str, fmt: str) -> None:
    ok = client.create_folder(folder)
    result = {"action": "create", "folder": folder, "success": ok}
    if fmt == "cli":
        status = "created" if ok else "FAILED"
        print(f"  Folder '{folder}': {status}")
    else:
        json.dump(result, sys.stdout, indent=2)
        print()
    if not ok:
        sys.exit(1)


def _delete_folder(client: IMAPClient, folder: str, fmt: str) -> None:
    ok = client.delete_folder(folder)
    result = {"action": "delete", "folder": folder, "success": ok}
    if fmt == "cli":
        status = "deleted" if ok else "FAILED"
        print(f"  Folder '{folder}': {status}")
    else:
        json.dump(result, sys.stdout, indent=2)
        print()
    if not ok:
        sys.exit(1)


def _rename_folder(client: IMAPClient, old: str, new: str, fmt: str) -> None:
    ok = client.rename_folder(old, new)
    result = {"action": "rename", "old_name": old, "new_name": new, "success": ok}
    if fmt == "cli":
        status = f"renamed to '{new}'" if ok else "FAILED"
        print(f"  Folder '{old}': {status}")
    else:
        json.dump(result, sys.stdout, indent=2)
        print()
    if not ok:
        sys.exit(1)


def _move_folder(client: IMAPClient, folder: str, new_parent: str, fmt: str) -> None:
    ok = client.move_folder(folder, new_parent)
    result = {"action": "move", "folder": folder, "new_parent": new_parent, "success": ok}
    if fmt == "cli":
        status = f"moved under '{new_parent}'" if ok else "FAILED"
        print(f"  Folder '{folder}': {status}")
    else:
        json.dump(result, sys.stdout, indent=2)
        print()
    if not ok:
        sys.exit(1)


def _load_manager(path: str) -> AccountManager:
    try:
        return AccountManager.from_yaml(path)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        raise


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/move_mail.py

```python
#!/usr/bin/env python3
"""Move emails between IMAP folders.

Usage:
    python3 scripts/move_mail.py --message-id "<[email protected]>" --from INBOX --to Archive --config config.yaml
    python3 scripts/move_mail.py --account work --from-stdin --to Projects --config config.yaml

Output: JSON result to stdout.
"""

from __future__ import annotations

import argparse
import json
import os
import sys

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient


def main() -> None:
    parser = argparse.ArgumentParser(description="Move emails between IMAP folders")
    parser.add_argument("--account", default="", help="Account name (default: default account)")
    parser.add_argument("--message-id", action="append", default=[],
                        help="Message-ID to move (repeatable)")
    parser.add_argument("--from", dest="from_folder", default="INBOX",
                        help="Source folder (default: INBOX)")
    parser.add_argument("--to", dest="to_folder", required=True,
                        help="Destination folder")
    parser.add_argument("--create-folder", action="store_true",
                        help="Create destination folder if it doesn't exist")
    parser.add_argument("--format", choices=["json", "cli"], default="json",
                        help="Output format (default: json)")

    # Direct IMAP options
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")
    parser.add_argument("--config", default="", help="YAML config file")

    # Stdin input
    parser.add_argument("--from-stdin", action="store_true",
                        help="Read message IDs from stdin JSON")

    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    # Collect message IDs
    msg_ids = list(args.message_id)
    if args.from_stdin:
        data = json.load(sys.stdin)
        if isinstance(data, dict) and "messages" in data:
            for m in data["messages"]:
                mid = m.get("message_id", "")
                if mid:
                    msg_ids.append(mid)
        elif isinstance(data, list):
            for item in data:
                if isinstance(item, str):
                    msg_ids.append(item)
                elif isinstance(item, dict):
                    mid = item.get("message_id", "")
                    if mid:
                        msg_ids.append(mid)

    if not msg_ids:
        _error("No message IDs provided. Use --message-id or --from-stdin.")

    # Build client
    if args.imap_host:
        client = IMAPClient(
            host=args.imap_host,
            port=args.imap_port,
            username=args.imap_user,
            password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
    elif args.config:
        mgr = _load_manager(args.config)
        acct_name = args.account or mgr.default_account
        client = mgr.get_imap_client(acct_name)
    else:
        _error("IMAP host required. Use --imap-host or --config.")
        return

    results = []
    try:
        client.connect()

        if args.create_folder:
            folders = [f["name"] for f in client.list_folders()]
            if args.to_folder not in folders:
                client.create_folder(args.to_folder)

        # Use batch move for efficiency
        batch_results = client.move_messages_batch(
            msg_ids, args.from_folder, args.to_folder,
        )
        for mid, ok in batch_results.items():
            results.append({"message_id": mid, "success": ok})
    finally:
        client.disconnect()

    moved = sum(1 for r in results if r["success"])
    failed = len(results) - moved

    if args.format == "cli":
        print(f"  Moved {moved}/{len(results)} messages"
              f" from '{args.from_folder}' to '{args.to_folder}'")
        if failed:
            for r in results:
                if not r["success"]:
                    print(f"    FAILED: {r['message_id']}")
        print()
    else:
        output = {
            "account": args.account,
            "from": args.from_folder,
            "to": args.to_folder,
            "moved": moved,
            "failed": failed,
            "results": results,
        }
        json.dump(output, sys.stdout, indent=2)
        print()

    if failed:
        sys.exit(1)


def _load_manager(path: str) -> AccountManager:
    try:
        return AccountManager.from_yaml(path)
    except Exception as exc:
        _error(f"Failed to load config: {exc}")
        raise


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/idle_monitor.py

```python
#!/usr/bin/env python3
"""Monitor an IMAP mailbox for new messages using IDLE (push).

Uses IMAP IDLE (RFC 2177) to wait for new-mail events instead of polling.
When a new message arrives the script fetches and prints it, then re-enters
IDLE mode.

Usage:
    python3 scripts/idle_monitor.py --config config.yaml
    python3 scripts/idle_monitor.py --config config.yaml --account work --folder INBOX
    python3 scripts/idle_monitor.py --config config.yaml --timeout 60 --max-events 10
"""

from __future__ import annotations

import argparse
import json
import os
import signal
import sys
import time

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.imap_client import IMAPClient


_running = True


def _handle_signal(signum, frame):
    global _running
    _running = False


def main() -> None:
    parser = argparse.ArgumentParser(description="Monitor mailbox via IMAP IDLE")
    parser.add_argument("--config", default="", help="YAML config file")
    parser.add_argument("--account", default="", help="Account profile name")
    parser.add_argument("--folder", default="INBOX", help="Folder to monitor")
    parser.add_argument(
        "--timeout", type=int, default=29 * 60,
        help="IDLE timeout in seconds (default: 29 min per RFC 2177)",
    )
    parser.add_argument(
        "--poll-interval", type=float, default=30.0,
        help="Seconds between idle_check polls (default: 30)",
    )
    parser.add_argument(
        "--max-events", type=int, default=0,
        help="Stop after N new-mail events (0 = run forever)",
    )
    parser.add_argument("--format", choices=["json", "cli"], default="json")

    # Direct IMAP flags
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")
    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    signal.signal(signal.SIGINT, _handle_signal)
    signal.signal(signal.SIGTERM, _handle_signal)

    # Build client
    client: IMAPClient | None = None
    acct_name = args.account

    if args.imap_host:
        client = IMAPClient(
            host=args.imap_host, port=args.imap_port,
            username=args.imap_user, password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
        acct_name = acct_name or "direct"
    elif args.config:
        try:
            mgr = AccountManager.from_yaml(args.config)
        except Exception as exc:
            _error(f"Failed to load config: {exc}")
            return
        acct_name = acct_name or mgr.default_account
        client = mgr.get_imap_client(acct_name)
    else:
        _error("Either --config or --imap-host is required.")
        return

    event_count = 0

    try:
        client.connect()
        _log(f"Connected to {client.host}, monitoring {args.folder}", args.format)

        while _running:
            try:
                client.idle_start(args.folder, timeout=args.timeout)
                _log("IDLE started, waiting for events...", args.format)

                idle_start = time.monotonic()
                while _running:
                    responses = client.idle_check(timeout=args.poll_interval)

                    new_exists = any(
                        resp == b"EXISTS" for _, resp in responses
                    )

                    if new_exists:
                        client.idle_done()
                        event_count += 1

                        # Fetch the newest unread messages
                        messages = client.fetch_unread(
                            mailbox=args.folder, limit=10,
                        )

                        event = {
                            "event": "new_mail",
                            "account": acct_name,
                            "folder": args.folder,
                            "event_number": event_count,
                            "new_messages": len(messages),
                            "messages": [
                                {
                                    "message_id": m.message_id,
                                    "subject": m.subject,
                                    "sender": str(m.sender) if m.sender else "",
                                    "date": m.date.isoformat() if m.date else "",
                                }
                                for m in messages
                            ],
                        }

                        if args.format == "cli":
                            print(f"\n--- New mail ({len(messages)} message(s)) ---")
                            for m in messages:
                                sender = str(m.sender) if m.sender else "(unknown)"
                                print(f"  From: {sender}")
                                print(f"  Subject: {m.subject}")
                                print()
                        else:
                            json.dump(event, sys.stdout)
                            print()
                            sys.stdout.flush()

                        if args.max_events and event_count >= args.max_events:
                            _log(f"Reached max events ({args.max_events})", args.format)
                            return

                        # Re-enter IDLE
                        break

                    # Re-issue IDLE every timeout seconds
                    elapsed = time.monotonic() - idle_start
                    if elapsed >= args.timeout - 60:
                        client.idle_done()
                        _log("Re-issuing IDLE (timeout approaching)", args.format)
                        break

            except (OSError, RuntimeError) as exc:
                _log(f"IDLE error: {exc}, reconnecting in 5s...", args.format)
                try:
                    client.disconnect()
                except Exception:
                    pass
                time.sleep(5)
                client.connect()

    except Exception as exc:
        _error(f"Fatal: {exc}")
    finally:
        _log("Stopping IDLE monitor", args.format)
        try:
            client.disconnect()
        except Exception:
            pass


def _log(msg: str, fmt: str) -> None:
    if fmt == "cli":
        print(f"[idle] {msg}", file=sys.stderr)
    else:
        json.dump({"log": msg}, sys.stderr)
        print(file=sys.stderr)


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/thread_mail.py

```python
#!/usr/bin/env python3
"""Group emails into conversation threads by References / In-Reply-To.

Fetches messages from a folder and groups them into threads using the
Message-ID, In-Reply-To, and References headers (RFC 5256 / JWZ algorithm
simplified).

Usage:
    python3 scripts/thread_mail.py --config config.yaml
    python3 scripts/thread_mail.py --config config.yaml --account work --folder INBOX
    python3 scripts/thread_mail.py --config config.yaml --limit 200 --format cli
    python3 scripts/thread_mail.py --from-stdin < messages.json
"""

from __future__ import annotations

import argparse
import json
import os
import sys
from collections import defaultdict

sys.path.insert(0, os.path.dirname(__file__))

from lib import credential_store
from lib.account_manager import AccountManager
from lib.defaults import resolve_config_path
from lib.models import EmailMessage


def build_threads(messages: list[EmailMessage]) -> list[dict]:
    """Group messages into conversation threads.

    Returns a list of thread dicts, each containing:
    - ``thread_id``: The Message-ID of the thread root
    - ``subject``: Subject of the root message
    - ``message_count``: Number of messages in the thread
    - ``participants``: Unique sender addresses
    - ``latest_date``: ISO date of the most recent message
    - ``messages``: List of message summaries in chronological order
    """
    # Index messages by Message-ID
    by_id: dict[str, EmailMessage] = {}
    for msg in messages:
        if msg.message_id:
            by_id[msg.message_id] = msg

    # Build parent map: message_id -> parent_message_id
    parent_of: dict[str, str] = {}
    children_of: dict[str, list[str]] = defaultdict(list)

    for msg in messages:
        mid = msg.message_id
        if not mid:
            continue

        parent = None
        if msg.in_reply_to and msg.in_reply_to in by_id:
            parent = msg.in_reply_to
        elif msg.references:
            # Walk references from most recent to oldest
            for ref in reversed(msg.references):
                if ref in by_id:
                    parent = ref
                    break

        if parent and parent != mid:
            parent_of[mid] = parent
            children_of[parent].append(mid)

    # Find roots (messages with no parent)
    roots = set()
    for msg in messages:
        mid = msg.message_id
        if not mid:
            continue
        # Walk up to root
        current = mid
        visited = set()
        while current in parent_of and current not in visited:
            visited.add(current)
            current = parent_of[current]
        roots.add(current)

    # Build thread for each root
    threads = []
    seen_in_thread = set()

    for root_id in roots:
        if root_id in seen_in_thread:
            continue

        # Collect all messages in this thread via BFS
        thread_ids = []
        queue = [root_id]
        while queue:
            current = queue.pop(0)
            if current in seen_in_thread:
                continue
            seen_in_thread.add(current)
            if current in by_id:
                thread_ids.append(current)
            queue.extend(children_of.get(current, []))

        if not thread_ids:
            continue

        # Sort by date
        thread_msgs = [by_id[mid] for mid in thread_ids if mid in by_id]
        thread_msgs.sort(key=lambda m: m.date or m.date.__class__(1970, 1, 1) if m.date else "", reverse=False)

        # Handle sorting with None dates
        def sort_key(m):
            if m.date:
                return m.date.isoformat()
            return ""
        thread_msgs.sort(key=sort_key)

        participants = set()
        for m in thread_msgs:
            if m.sender:
                participants.add(m.sender.address)

        latest = max(
            (m.date for m in thread_msgs if m.date),
            default=None,
        )

        root_msg = by_id.get(root_id)
        subject = root_msg.subject if root_msg else thread_msgs[0].subject if thread_msgs else ""

        threads.append({
            "thread_id": root_id,
            "subject": subject,
            "message_count": len(thread_msgs),
            "participants": sorted(participants),
            "latest_date": latest.isoformat() if latest else "",
            "messages": [
                {
                    "message_id": m.message_id,
                    "subject": m.subject,
                    "sender": str(m.sender) if m.sender else "",
                    "date": m.date.isoformat() if m.date else "",
                    "in_reply_to": m.in_reply_to,
                }
                for m in thread_msgs
            ],
        })

    # Sort threads by latest_date descending
    threads.sort(key=lambda t: t.get("latest_date", ""), reverse=True)
    return threads


def main() -> None:
    parser = argparse.ArgumentParser(description="Thread emails by conversation")
    parser.add_argument("--config", default="", help="YAML config file")
    parser.add_argument("--account", default="", help="Account profile name")
    parser.add_argument("--folder", default="INBOX", help="Folder to thread")
    parser.add_argument("--limit", type=int, default=100, help="Max messages to fetch")
    parser.add_argument("--from-stdin", action="store_true",
                        help="Read messages JSON from stdin")
    parser.add_argument("--format", choices=["json", "cli"], default="json")

    # Direct IMAP flags
    parser.add_argument("--imap-host", default="")
    parser.add_argument("--imap-port", type=int, default=993)
    parser.add_argument("--imap-user", default="")
    parser.add_argument("--imap-pass", default="")
    parser.add_argument("--imap-no-ssl", action="store_true")
    args = parser.parse_args()
    args.config = resolve_config_path(args.config)

    messages: list[EmailMessage] = []

    if args.from_stdin:
        try:
            data = json.load(sys.stdin)
            raw_msgs = data if isinstance(data, list) else data.get("messages", [])
            messages = [EmailMessage.from_dict(m) for m in raw_msgs]
        except Exception as exc:
            _error(f"Failed to parse stdin: {exc}")
            return
    elif args.imap_host:
        from lib.imap_client import IMAPClient
        client = IMAPClient(
            host=args.imap_host, port=args.imap_port,
            username=args.imap_user, password=credential_store.resolve(args.imap_pass),
            use_ssl=not args.imap_no_ssl,
        )
        try:
            client.connect()
            messages = client.fetch_all(mailbox=args.folder, limit=args.limit)
        finally:
            client.disconnect()
    elif args.config:
        try:
            mgr = AccountManager.from_yaml(args.config)
        except Exception as exc:
            _error(f"Failed to load config: {exc}")
            return
        acct_name = args.account or mgr.default_account
        client = mgr.get_imap_client(acct_name)
        try:
            client.connect()
            messages = client.fetch_all(mailbox=args.folder, limit=args.limit)
        finally:
            client.disconnect()
    else:
        _error("Provide --config, --imap-host, or --from-stdin")
        return

    threads = build_threads(messages)

    if args.format == "cli":
        print(f"Found {len(threads)} conversation thread(s) "
              f"from {len(messages)} message(s):\n")
        for t in threads:
            count = t["message_count"]
            subj = t["subject"] or "(no subject)"
            parts = ", ".join(t["participants"][:3])
            if len(t["participants"]) > 3:
                parts += f" +{len(t['participants']) - 3} more"
            print(f"  [{count} msg] {subj}")
            print(f"         Participants: {parts}")
            print(f"         Latest: {t['latest_date']}")
            print()
    else:
        result = {
            "total_messages": len(messages),
            "thread_count": len(threads),
            "threads": threads,
        }
        json.dump(result, sys.stdout, indent=2)
        print()


def _error(msg: str) -> None:
    json.dump({"error": msg}, sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)


if __name__ == "__main__":
    main()

```

### scripts/lib/imap_client.py

```python
"""IMAP client for server-side email retrieval and folder management."""

from __future__ import annotations

import base64
import email
import email.header
import email.utils
import imaplib
import logging
import re
import ssl
import time
from datetime import datetime
from typing import Any

from .models import EmailAddress, EmailAttachment, EmailMessage, EmailPriority

logger = logging.getLogger("clawMail.imap")

# Charsets that Python doesn't recognise but appear in real-world headers.
_CHARSET_FALLBACKS = {
    "unknown-8bit": "latin-1",
    "x-unknown": "latin-1",
    "default": "latin-1",
    "iso-8859-8-i": "iso-8859-8",
}

_MAX_SELECT_RETRIES = 3
_SELECT_RETRY_DELAY = 1.0  # seconds


# ── Mailbox name helpers ────────────────────────────────────────────


def _quote_mailbox(name: str) -> str:
    """IMAP-quote a mailbox name for use with imaplib.

    Python's ``imaplib`` does **not** quote mailbox arguments, so a name
    like ``Needs Review`` is sent as two separate tokens.  This helper
    wraps the name in double-quotes and escapes embedded backslashes and
    double-quote characters per RFC 3501 §9 (quoted-string grammar).
    """
    escaped = name.replace("\\", "\\\\").replace('"', '\\"')
    return f'"{escaped}"'


def _decode_mutf7(data: str) -> str:
    """Decode an IMAP modified-UTF-7 mailbox name (RFC 3501 §5.1.3).

    Rules:
    * Printable US-ASCII (0x20-0x7e) except ``&`` pass through literally.
    * ``&-`` decodes to a literal ``&``.
    * ``&<base64>-`` decodes the base64 segment (using ``,`` instead of
      ``/``) as UTF-16BE code-points.
    """
    result: list[str] = []
    i = 0
    while i < len(data):
        if data[i] == "&":
            end = data.find("-", i + 1)
            if end == -1:
                # Malformed — treat the rest as literal
                result.append(data[i:])
                break
            if end == i + 1:
                # &- is literal &
                result.append("&")
            else:
                b64_segment = data[i + 1:end].replace(",", "/")
                # Pad to a multiple of 4
                b64_segment += "=" * ((4 - len(b64_segment) % 4) % 4)
                try:
                    raw = base64.b64decode(b64_segment)
                    result.append(raw.decode("utf-16-be"))
                except Exception:
                    # Malformed — keep the original text
                    result.append(data[i:end + 1])
            i = end + 1
        else:
            result.append(data[i])
            i += 1
    return "".join(result)


def _encode_mutf7(text: str) -> str:
    """Encode a Unicode string to IMAP modified-UTF-7.

    The inverse of ``_decode_mutf7``.
    """
    result: list[str] = []
    non_ascii: list[str] = []

    def _flush_non_ascii() -> None:
        if not non_ascii:
            return
        raw = "".join(non_ascii).encode("utf-16-be")
        b64 = base64.b64encode(raw).decode("ascii").rstrip("=")
        result.append("&" + b64.replace("/", ",") + "-")
        non_ascii.clear()

    for ch in text:
        if ch == "&":
            _flush_non_ascii()
            result.append("&-")
        elif 0x20 <= ord(ch) <= 0x7E:
            _flush_non_ascii()
            result.append(ch)
        else:
            non_ascii.append(ch)
    _flush_non_ascii()
    return "".join(result)


class IMAPClient:
    """IMAP client for fetching emails and managing folders."""

    def __init__(
        self,
        host: str,
        port: int = 993,
        username: str = "",
        password: str = "",
        use_ssl: bool = True,
        timeout: int = 30,
        oauth2_manager: Any | None = None,
    ) -> None:
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.use_ssl = use_ssl
        self.timeout = timeout
        self._oauth2 = oauth2_manager
        self._connection: imaplib.IMAP4 | imaplib.IMAP4_SSL | None = None
        self._account: str = ""  # set by AccountManager
        self._last_append_error: str = ""
        self._chunk_size = 16 * 1024

    @staticmethod
    def _create_secure_context() -> ssl.SSLContext:
        """Create a hardened SSL context with secure ciphers only.

        Enforces TLS 1.2+, disables weak ciphers, and enables
        certificate verification and hostname checking.
        """
        ctx = ssl.create_default_context()
        ctx.minimum_version = ssl.TLSVersion.TLSv1_2
        ctx.set_ciphers(
            "ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20"
            ":!aNULL:!MD5:!DSS:!RC4:!3DES"
        )
        # Enforce certificate verification (default, but be explicit)
        ctx.check_hostname = True
        ctx.verify_mode = ssl.CERT_REQUIRED
        return ctx

    def connect(self) -> None:
        if self.use_ssl:
            ctx = self._create_secure_context()
            self._connection = imaplib.IMAP4_SSL(
                self.host, self.port, ssl_context=ctx, timeout=self.timeout,
            )
        else:
            self._connection = imaplib.IMAP4(self.host, self.port)
            self._connection.socket().settimeout(self.timeout)

        if self._oauth2:
            from .oauth2 import build_xoauth2_string
            auth_string = build_xoauth2_string(
                self.username, self._oauth2.access_token,
            )
            self._connection.authenticate(
                "XOAUTH2",
                lambda _x: auth_string.encode("ascii"),
            )
        else:
            self._connection.login(self.username, self.password)

    def disconnect(self) -> None:
        conn = self._connection
        self._connection = None
        if conn is None:
            return
        try:
            conn.logout()
        except Exception:
            # logout failed (SSL corruption, connection drop, etc.)
            # Force-close the underlying socket directly.
            try:
                conn.shutdown()
            except Exception:
                pass
            try:
                sock = conn.socket()
                if sock:
                    sock.close()
            except Exception:
                pass

    @property
    def conn(self) -> imaplib.IMAP4 | imaplib.IMAP4_SSL:
        if self._connection is None:
            raise RuntimeError("Not connected. Call connect() first.")
        return self._connection

    # ── Folder management ────────────────────────────────────────────

    def list_folders(self) -> list[dict[str, str]]:
        """List all folders with attributes.

        Returns list of dicts: {"name": ..., "delimiter": ..., "flags": ...}.
        Folder names are decoded from IMAP modified-UTF-7.
        """
        try:
            status, data = self.conn.list()
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("LIST failed: %s", exc)
            return []
        if status != "OK":
            return []
        folders = []
        # Regex handles both quoted and unquoted folder names:
        #   (\Flags) "delim" "Folder With Spaces"
        #   (\Flags) "delim" PlainFolder
        # The quoted-name branch uses a non-greedy match inside quotes
        # that allows escaped characters (\\ and \").
        _LIST_RE = re.compile(
            r'\(([^)]*)\)\s+'           # flags
            r'(?:"([^"]+)"|NIL)\s+'     # delimiter (quoted or NIL)
            r'(?:"((?:[^"\\]|\\.)*)"|'  # quoted folder name (with escapes)
            r'(\S+))'                   # OR unquoted atom folder name
        )
        for item in data:
            if isinstance(item, bytes):
                decoded = item.decode("utf-8", errors="replace")
                m = _LIST_RE.match(decoded)
                if m:
                    flags = m.group(1)
                    delimiter = m.group(2) or ""
                    # Group 3 = quoted name, Group 4 = unquoted name
                    raw_name = m.group(3) if m.group(3) is not None else (m.group(4) or "")
                    # Un-escape any backslash-escaped characters in quoted names
                    raw_name = raw_name.replace('\\"', '"').replace("\\\\", "\\")
                    # Decode IMAP modified-UTF-7
                    name = _decode_mutf7(raw_name)
                    folders.append({
                        "name": name,
                        "delimiter": delimiter,
                        "flags": flags,
                    })
        return folders

    def folder_status(self, folder: str) -> dict[str, int]:
        """Get message counts for a folder.

        Returns: {"messages": N, "unseen": N, "recent": N}.
        """
        try:
            status, data = self.conn.status(_quote_mailbox(folder), "(MESSAGES UNSEEN RECENT)")
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("STATUS %s failed: %s", folder, exc)
            return {"messages": 0, "unseen": 0, "recent": 0}
        if status != "OK":
            return {"messages": 0, "unseen": 0, "recent": 0}
        raw = data[0].decode("utf-8", errors="replace") if isinstance(data[0], bytes) else data[0]
        result: dict[str, int] = {"messages": 0, "unseen": 0, "recent": 0}
        for key in ("MESSAGES", "UNSEEN", "RECENT"):
            m = re.search(rf"{key}\s+(\d+)", raw)
            if m:
                result[key.lower()] = int(m.group(1))
        return result

    def create_folder(self, folder: str) -> bool:
        """Create a new folder/mailbox."""
        quoted = _quote_mailbox(folder)
        try:
            status, _ = self.conn.create(quoted)
            if status == "OK":
                self.conn.subscribe(quoted)
                return True
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("CREATE %s failed: %s", folder, exc)
        return False

    def delete_folder(self, folder: str) -> bool:
        """Delete a folder/mailbox."""
        quoted = _quote_mailbox(folder)
        try:
            self.conn.unsubscribe(quoted)
            status, _ = self.conn.delete(quoted)
            return status == "OK"
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("DELETE %s failed: %s", folder, exc)
            return False

    def rename_folder(self, old_name: str, new_name: str) -> bool:
        """Rename a folder."""
        try:
            status, _ = self.conn.rename(_quote_mailbox(old_name), _quote_mailbox(new_name))
            return status == "OK"
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("RENAME %s -> %s failed: %s", old_name, new_name, exc)
            return False

    def move_folder(self, folder: str, new_parent: str) -> bool:
        """Move a folder under a new parent using the folder's delimiter.

        Example: move_folder("Projects", "Archive") renames
        "Projects" to "Archive/Projects" (delimiter-aware).
        """
        folders = self.list_folders()
        delimiter = "/"
        for f in folders:
            if f["name"] == folder:
                delimiter = f.get("delimiter", "/")
                break
        base_name = folder.rsplit(delimiter, 1)[-1]
        new_name = f"{new_parent}{delimiter}{base_name}"
        return self.rename_folder(folder, new_name)

    # ── Message retrieval ────────────────────────────────────────────

    def _select_robust(self, mailbox: str, readonly: bool = True) -> None:
        """Select a mailbox with retry logic for transient errors."""
        last_exc: Exception | None = None
        for attempt in range(_MAX_SELECT_RETRIES):
            try:
                status, data = self.conn.select(_quote_mailbox(mailbox), readonly=readonly)
                if status == "OK":
                    return
                raise imaplib.IMAP4.error(
                    f"SELECT failed: {data}"
                )
            except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
                last_exc = exc
                logger.warning(
                    "SELECT %s attempt %d/%d failed: %s",
                    mailbox, attempt + 1, _MAX_SELECT_RETRIES, exc,
                )
                if attempt < _MAX_SELECT_RETRIES - 1:
                    time.sleep(_SELECT_RETRY_DELAY * (attempt + 1))
                    # Reconnect if the connection was dropped
                    try:
                        self.conn.noop()
                    except Exception:
                        self.disconnect()
                        self.connect()
        raise RuntimeError(
            f"Failed to SELECT '{mailbox}' after {_MAX_SELECT_RETRIES} attempts: {last_exc}"
        )

    def fetch_unread(
        self, mailbox: str = "INBOX", limit: int = 50, mark_seen: bool = False,
    ) -> list[EmailMessage]:
        self._select_robust(mailbox, readonly=not mark_seen)
        try:
            status, msg_ids = self.conn.search(None, "UNSEEN")
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("SEARCH UNSEEN in %s failed: %s", mailbox, exc)
            return []
        if status != "OK" or not msg_ids[0]:
            return []
        id_list = msg_ids[0].split()[:limit] if limit else msg_ids[0].split()
        return self._fetch_ids(id_list, mailbox, mark_seen)

    def fetch_all(
        self, mailbox: str = "INBOX", limit: int = 50,
    ) -> list[EmailMessage]:
        """Fetch all messages from a mailbox (most recent first)."""
        self._select_robust(mailbox, readonly=True)
        try:
            status, msg_ids = self.conn.search(None, "ALL")
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("SEARCH ALL in %s failed: %s", mailbox, exc)
            return []
        if status != "OK" or not msg_ids[0]:
            return []
        id_list = msg_ids[0].split()
        if limit:
            id_list = id_list[-limit:]
        return self._fetch_ids(id_list, mailbox, mark_seen=False)

    def fetch_since(
        self, since: datetime, mailbox: str = "INBOX", limit: int = 100,
    ) -> list[EmailMessage]:
        self._select_robust(mailbox, readonly=True)
        date_str = since.strftime("%d-%b-%Y")
        try:
            status, msg_ids = self.conn.search(None, f"(SINCE {date_str})")
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("SEARCH SINCE in %s failed: %s", mailbox, exc)
            return []
        if status != "OK" or not msg_ids[0]:
            return []
        id_list = msg_ids[0].split()[:limit] if limit else msg_ids[0].split()
        return self._fetch_ids(id_list, mailbox, mark_seen=False)

    def fetch_by_id(self, message_id: str, mailbox: str = "INBOX") -> EmailMessage | None:
        """Fetch a single message by its Message-ID header value."""
        self._select_robust(mailbox, readonly=True)
        search_id = message_id if message_id.startswith("<") else f"<{message_id}>"
        try:
            status, data = self.conn.search(None, f'(HEADER Message-ID "{search_id}")')
            if status != "OK" or not data[0]:
                return None
            seq_num = data[0].split()[0]
            status, msg_data = self.conn.fetch(seq_num, "(BODY.PEEK[])")
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("FETCH by Message-ID %s failed: %s", message_id, exc)
            return None
        if status != "OK" or not msg_data[0]:
            return None
        raw = msg_data[0][1] if isinstance(msg_data[0], tuple) else msg_data[0]
        if isinstance(raw, bytes):
            return self._parse_email(raw, mailbox)
        return None

    def search(
        self,
        mailbox: str = "INBOX",
        criteria: str = "ALL",
        limit: int = 50,
    ) -> list[EmailMessage]:
        """Search messages using an IMAP SEARCH criteria string.

        The *criteria* argument is passed directly to the IMAP SEARCH command.
        Examples:
            "UNSEEN"
            '(FROM "[email protected]")'
            '(SUBJECT "invoice" SINCE 01-Jan-2025)'
            '(OR (FROM "[email protected]") (FROM "[email protected]"))'
        """
        self._select_robust(mailbox, readonly=True)
        try:
            status, msg_ids = self.conn.search(None, criteria)
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("SEARCH %s in %s failed: %s", criteria, mailbox, exc)
            return []
        if status != "OK" or not msg_ids[0]:
            return []
        id_list = msg_ids[0].split()
        if limit:
            id_list = id_list[-limit:]
        return self._fetch_ids(id_list, mailbox, mark_seen=False)

    # ── Message actions ──────────────────────────────────────────────

    def mark_read(self, message_id: str, mailbox: str = "INBOX") -> bool:
        return self._set_flag(message_id, mailbox, "+FLAGS", "\\Seen")

    def mark_unread(self, message_id: str, mailbox: str = "INBOX") -> bool:
        return self._set_flag(message_id, mailbox, "-FLAGS", "\\Seen")

    def flag_message(self, message_id: str, mailbox: str = "INBOX") -> bool:
        return self._set_flag(message_id, mailbox, "+FLAGS", "\\Flagged")

    def unflag_message(self, message_id: str, mailbox: str = "INBOX") -> bool:
        return self._set_flag(message_id, mailbox, "-FLAGS", "\\Flagged")

    def set_custom_flag(self, message_id: str, flag: str, mailbox: str = "INBOX") -> bool:
        """Set a custom keyword flag (e.g. '$Important', '$Processed')."""
        return self._set_flag(message_id, mailbox, "+FLAGS", flag)

    def remove_custom_flag(self, message_id: str, flag: str, mailbox: str = "INBOX") -> bool:
        """Remove a custom keyword flag."""
        return self._set_flag(message_id, mailbox, "-FLAGS", flag)

    def set_flags_batch(
        self, message_ids: list[str], flag: str, mailbox: str = "INBOX",
    ) -> dict[str, bool]:
        """Set a flag on multiple messages in one mailbox selection.

        Returns dict of {message_id: success}.
        """
        self._select_robust(mailbox, readonly=False)
        results: dict[str, bool] = {}
        for mid in message_ids:
            seq_num = self._resolve_seq(mid)
            if seq_num is None:
                results[mid] = False
                continue
            try:
                status, _ = self.conn.store(seq_num, "+FLAGS", flag)
                results[mid] = status == "OK"
            except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
                logger.warning("STORE +FLAGS %s on %s failed: %s", flag, mid, exc)
                results[mid] = False
        return results

    def move_message(
        self, msg_uid: str, from_mailbox: str, to_mailbox: str,
    ) -> bool:
        """Move a message between mailboxes via IMAP COPY + DELETE."""
        self._select_robust(from_mailbox, readonly=False)
        seq_num = self._resolve_seq(msg_uid)
        if seq_num is None:
            return False
        try:
            status, _ = self.conn.copy(seq_num, _quote_mailbox(to_mailbox))
            if status != "OK":
                return False
            self.conn.store(seq_num, "+FLAGS", "\\Deleted")
            self.conn.expunge()
            return True
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("MOVE %s -> %s failed: %s", from_mailbox, to_mailbox, exc)
            return False

    def move_messages_batch(
        self, message_ids: list[str], from_mailbox: str, to_mailbox: str,
    ) -> dict[str, bool]:
        """Move multiple messages, minimising SELECT calls.

        Returns dict of {message_id: success}.
        """
        self._select_robust(from_mailbox, readonly=False)
        results: dict[str, bool] = {}
        moved_seqs = []
        for mid in message_ids:
            seq_num = self._resolve_seq(mid)
            if seq_num is None:
                results[mid] = False
                continue
            try:
                status, _ = self.conn.copy(seq_num, _quote_mailbox(to_mailbox))
                if status == "OK":
                    self.conn.store(seq_num, "+FLAGS", "\\Deleted")
                    results[mid] = True
                    moved_seqs.append(seq_num)
                else:
                    results[mid] = False
            except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
                logger.warning("MOVE batch %s -> %s failed: %s", mid, to_mailbox, exc)
                results[mid] = False
        if moved_seqs:
            try:
                self.conn.expunge()
            except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
                logger.warning("EXPUNGE after batch move failed: %s", exc)
        return results

    # ── IMAP IDLE ──────────────────────────────────────────────────

    def idle_start(self, mailbox: str = "INBOX", timeout: int = 29 * 60) -> None:
        """Enter IMAP IDLE mode on *mailbox*.

        The server will push EXISTS/EXPUNGE notifications while idle.
        Call :meth:`idle_check` to poll for responses, and
        :meth:`idle_done` to leave IDLE mode.

        *timeout* is a safety upper-bound in seconds (RFC 2177 recommends
        re-issuing IDLE every 29 minutes).
        """
        self._select_robust(mailbox, readonly=True)
        tag = self.conn._new_tag()
        self._idle_tag = tag
        self.conn.send(tag + b" IDLE\r\n")
        # Wait for the continuation response '+ ...'
        resp = self.conn.readline()
        if not resp.startswith(b"+"):
            raise RuntimeError(f"IDLE rejected: {resp!r}")
        self.conn.sock.settimeout(timeout)

    def idle_check(self, timeout: float = 30.0) -> list[tuple[bytes, bytes]]:
        """Poll for untagged responses while in IDLE mode.

        Returns a list of ``(seq_num, response)`` tuples, e.g.
        ``[(b'3', b'EXISTS')]``.  Returns an empty list on timeout.
        """
        old_timeout = self.conn.sock.gettimeout()
        self.conn.sock.settimeout(timeout)
        responses: list[tuple[bytes, bytes]] = []
        try:
            while True:
                line = self.conn.readline()
                if not line:
                    break
                if line.startswith(b"* "):
                    parts = line[2:].strip().split(None, 1)
                    if len(parts) == 2:
                        responses.append((parts[0], parts[1]))
                    else:
                        responses.append((b"", parts[0] if parts else b""))
                    # Don't block indefinitely — check if more data is ready
                    self.conn.sock.settimeout(0.5)
                else:
                    break
        except (TimeoutError, OSError):
            pass
        finally:
            self.conn.sock.settimeout(old_timeout)
        return responses

    def idle_done(self) -> None:
        """Leave IDLE mode by sending ``DONE``."""
        self.conn.send(b"DONE\r\n")
        # Read the tagged response to complete the IDLE command
        while True:
            line = self.conn.readline()
            if not line:
                break
            tag = getattr(self, "_idle_tag", None)
            if tag and line.startswith(tag):
                break

    def append_message(
        self, raw_message: bytes, mailbox: str = "Drafts", flags: str = "\\Draft",
    ) -> bool:
        """Append a raw RFC822 message to a mailbox (e.g. Drafts).

        Returns True on success.
        """
        logger.debug(
            "IMAP APPEND raw_message length=%d mailbox=%r flags=%r",
            len(raw_message), mailbox, flags,
        )
        orig_send = self.conn.send
        def _chunked_send(data):
            if isinstance(data, str):
                data = data.encode("utf-8")
            if len(data) <= self._chunk_size:
                return orig_send(data)
            for i in range(0, len(data), self._chunk_size):
                orig_send(data[i:i + self._chunk_size])
        try:
            self.conn.send = _chunked_send
            status, _ = self.conn.append(
                _quote_mailbox(mailbox), flags, None, raw_message,
            )
            if status != "OK":
                self._last_append_error = f"APPEND status {status}"
                logger.debug("APPEND response status=%s", status)
            else:
                self._last_append_error = ""
            return status == "OK"
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            self._last_append_error = str(exc)
            logger.warning("APPEND to %s failed: %s", mailbox, exc)
            return False
        finally:
            self.conn.send = orig_send

    @property
    def last_append_error(self) -> str:
        return self._last_append_error

    def delete_message(self, message_id: str, mailbox: str = "INBOX") -> bool:
        self._select_robust(mailbox, readonly=False)
        seq_num = self._resolve_seq(message_id)
        if seq_num is None:
            return False
        try:
            self.conn.store(seq_num, "+FLAGS", "\\Deleted")
            self.conn.expunge()
            return True
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("DELETE message %s failed: %s", message_id, exc)
            return False

    # ── Internal helpers ─────────────────────────────────────────────

    def _resolve_seq(self, msg_uid: str) -> bytes | None:
        """Resolve a Message-ID or sequence number to an IMAP sequence number."""
        if msg_uid.startswith("<"):
            try:
                status, data = self.conn.search(None, f'(HEADER Message-ID "{msg_uid}")')
            except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
                logger.warning("SEARCH for Message-ID %s failed: %s", msg_uid, exc)
                return None
            if status != "OK" or not data[0]:
                return None
            return data[0].split()[0]
        return msg_uid.encode() if isinstance(msg_uid, str) else msg_uid

    def _set_flag(self, message_id: str, mailbox: str, op: str, flag: str) -> bool:
        self._select_robust(mailbox, readonly=False)
        seq_num = self._resolve_seq(message_id)
        if seq_num is None:
            return False
        try:
            status, _ = self.conn.store(seq_num, op, flag)
            return status == "OK"
        except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
            logger.warning("STORE %s %s failed: %s", op, flag, exc)
            return False

    def _fetch_ids(
        self, id_list: list[bytes], mailbox: str, mark_seen: bool,
    ) -> list[EmailMessage]:
        messages = []
        for msg_id in id_list:
            fetch_flag = "(RFC822)" if mark_seen else "(BODY.PEEK[])"
            try:
                status, msg_data = self.conn.fetch(msg_id, fetch_flag)
            except (imaplib.IMAP4.error, imaplib.IMAP4.abort, OSError) as exc:
                logger.warning("Failed to fetch message %s: %s", msg_id, exc)
                continue
            if status != "OK" or not msg_data[0]:
                continue
            raw = msg_data[0][1] if isinstance(msg_data[0], tuple) else msg_data[0]
            if isinstance(raw, bytes):
                parsed = self._parse_email(raw, mailbox)
                if parsed:
                    messages.append(parsed)
        return messages

    def _parse_email(self, raw: bytes, mailbox: str) -> EmailMessage | None:
        try:
            msg = email.message_from_bytes(raw)
        except Exception:
            return None

        subject = self._decode_header(msg.get("Subject", ""))
        sender = EmailAddress.parse(self._decode_header(msg.get("From", "")))
        recipients = self._parse_address_list(msg.get("To", ""))
        cc = self._parse_address_list(msg.get("Cc", ""))

        date = None
        date_str = msg.get("Date", "")
        if date_str:
            try:
                date = email.utils.parsedate_to_datetime(date_str)
            except Exception:
                pass

        body_plain = ""
        body_html = ""
        attachments: list[EmailAttachment] = []

        if msg.is_multipart():
            for part in msg.walk():
                ct = part.get_content_type()
                disp = str(part.get("Content-Disposition", ""))
                if "attachment" in disp:
                    payload = part.get_payload(decode=True)
                    if payload:
                        attachments.append(EmailAttachment(
                            filename=part.get_filename() or "attachment",
                            content_type=ct, data=payload,
                            content_id=part.get("Content-ID"),
                        ))
                elif ct == "text/plain" and not body_plain:
                    payload = part.get_payload(decode=True)
                    if payload:
                        body_plain = payload.decode("utf-8", errors="replace")
                elif ct == "text/html" and not body_html:
                    payload = part.get_payload(decode=True)
                    if payload:
                        body_html = payload.decode("utf-8", errors="replace")
        else:
            payload = msg.get_payload(decode=True)
            if payload:
                content = payload.decode("utf-8", errors="replace")
                if msg.get_content_type() == "text/html":
                    body_html = content
                else:
                    body_plain = content

        priority = EmailPriority.NORMAL
        xp = msg.get("X-Priority", "3")
        if xp.startswith(("1", "2")):
            priority = EmailPriority.HIGH
        elif xp.startswith(("4", "5")):
            priority = EmailPriority.LOW

        return EmailMessage(
            message_id=msg.get("Message-ID", ""), subject=subject,
            sender=sender, recipients=recipients, cc=cc,
            body_plain=body_plain, body_html=body_html,
            attachments=attachments, date=date,
            in_reply_to=msg.get("In-Reply-To", ""),
            references=msg.get("References", "").split(),
            priority=priority, mailbox=mailbox,
            account=self._account,
            headers=dict(msg.items()),
        )

    def _decode_header(self, raw: str) -> str:
        """Decode an RFC 2047 encoded header, with safe fallbacks."""
        try:
            parts = email.header.decode_header(raw)
        except Exception:
            return raw
        decoded = []
        for data, charset in parts:
            if isinstance(data, bytes):
                enc = "utf-8"
                if charset:
                    try:
                        enc = _CHARSET_FALLBACKS.get(charset.lower(), charset)
                    except Exception:
                        enc = "latin-1"
                try:
                    decoded.append(data.decode(enc, errors="replace"))
                except (LookupError, UnicodeDecodeError, TypeError):
                    # Unknown / invalid charset — latin-1 never raises
                    decoded.append(data.decode("latin-1", errors="replace"))
            else:
                decoded.append(str(data) if not isinstance(data, str) else data)
        return " ".join(decoded)

    def _parse_address_list(self, raw: str) -> list[EmailAddress]:
        if not raw:
            return []
        decoded = self._decode_header(raw)
        return [
            EmailAddress.parse(a.strip())
            for a in decoded.split(",") if a.strip()
        ]

```

### scripts/lib/smtp_client.py

```python
"""SMTP client for sending emails."""

from __future__ import annotations

import logging
import re
import smtplib
import socket
import ssl
import uuid
from datetime import datetime
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate

from .models import EmailMessage, EmailPriority

logger = logging.getLogger("clawMail.smtp")


def build_mime(message: EmailMessage) -> MIMEMultipart:
    """Build a MIME message from an EmailMessage (without sending).

    Useful for constructing RFC822 bytes to save as a draft.
    """
    return SMTPClient._build_mime_static(message)


class SMTPClient:
    """SMTP client for sending emails via a mail server."""

    def __init__(
        self,
        host: str,
        port: int = 587,
        username: str = "",
        password: str = "",
        use_tls: bool = True,
        oauth2_manager: object | None = None,
    ) -> None:
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.use_tls = use_tls
        self._oauth2 = oauth2_manager

    @staticmethod
    def _create_secure_context() -> ssl.SSLContext:
        """Create a hardened SSL context with secure ciphers only.

        Enforces TLS 1.2+, disables weak ciphers, and enables
        certificate verification and hostname checking.
        """
        ctx = ssl.create_default_context()
        ctx.minimum_version = ssl.TLSVersion.TLSv1_2
        ctx.set_ciphers(
            "ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20"
            ":!aNULL:!MD5:!DSS:!RC4:!3DES"
        )
        # Enforce certificate verification (default, but be explicit)
        ctx.check_hostname = True
        ctx.verify_mode = ssl.CERT_REQUIRED
        return ctx

    def send(self, message: EmailMessage) -> bool:
        mime_msg = self._build_mime_static(message)
        try:
            if self.use_tls:
                server = smtplib.SMTP(self.host, self.port)
                server.ehlo()
                ctx = self._create_secure_context()
                server.starttls(context=ctx)
                server.ehlo()
            else:
                server = smtplib.SMTP(self.host, self.port)

            if self._oauth2:
                from .oauth2 import build_xoauth2_string
                auth_string = build_xoauth2_string(
                    self.username, self._oauth2.access_token,
                )
                server.docmd("AUTH", "XOAUTH2 " + auth_string)
            elif self.username:
                server.login(self.username, self.password)

            all_recips = (
                [r.address for r in message.recipients]
                + [r.address for r in message.cc]
                + [r.address for r in message.bcc]
            )
            sender_addr = message.sender.address if message.sender else self.username
            original_send = server.send
            chunk_size = 16 * 1024
            def _chunked_send(data):
                if isinstance(data, str):
                    data = data.encode("ascii")
                if len(data) <= chunk_size:
                    return original_send(data)
                for i in range(0, len(data), chunk_size):
                    original_send(data[i:i + chunk_size])
            server.send = _chunked_send
            server.sendmail(sender_addr, all_recips, mime_msg.as_string())
            server.send = original_send
            server.quit()
            return True
        except (smtplib.SMTPException, OSError) as exc:
            logger.error("SMTP send failed: %s", exc)
            return False

    @staticmethod
    def _build_mime_static(message: EmailMessage) -> MIMEMultipart:
        if message.attachments:
            mime_msg = MIMEMultipart("mixed")
            alt_part = MIMEMultipart("alternative")
            mime_msg.attach(alt_part)
        else:
            mime_msg = MIMEMultipart("alternative")
            alt_part = mime_msg

        if message.sender:
            mime_msg["From"] = str(message.sender)
        if message.recipients:
            mime_msg["To"] = ", ".join(str(r) for r in message.recipients)
        if message.cc:
            mime_msg["Cc"] = ", ".join(str(r) for r in message.cc)
        mime_msg["Subject"] = message.subject

        # RFC 5322: Date header is REQUIRED
        if message.date:
            mime_msg["Date"] = formatdate(timeval=message.date.timestamp(), localtime=True)
        else:
            mime_msg["Date"] = formatdate(localtime=True)

        # RFC 5322: Message-ID header is REQUIRED (unique identifier for this message)
        if message.message_id:
            mime_msg["Message-ID"] = message.message_id
        else:
            # Generate a unique Message-ID if not provided
            # Format: <timestamp-uuid@hostname> per RFC 5322
            try:
                hostname = socket.getfqdn()
            except Exception:
                # Fallback to sender domain if FQDN is unavailable
                hostname = (
                    message.sender.address.split("@")[1]
                    if message.sender and "@" in str(message.sender)
                    else "localhost"
                )
            msg_id = f"{int(datetime.utcnow().timestamp() * 1000)}.{uuid.uuid4().hex}@{hostname}"
            message.message_id = f"<{msg_id}>"
            mime_msg["Message-ID"] = message.message_id

        if message.in_reply_to:
            mime_msg["In-Reply-To"] = message.in_reply_to
            mime_msg["References"] = " ".join(message.references)
        if message.priority == EmailPriority.HIGH:
            mime_msg["X-Priority"] = "1"
            mime_msg["Importance"] = "high"
        elif message.priority == EmailPriority.LOW:
            mime_msg["X-Priority"] = "5"
            mime_msg["Importance"] = "low"

        # RFC 5322 compliance: Add MIME-Version header
        mime_msg["MIME-Version"] = "1.0"

        # Helpful for deliverability
        mime_msg["User-Agent"] = "clawMail/0.6.0"

        plain_text = message.body_plain
        if not plain_text and message.body_html:
            plain_text = re.sub(r"<[^>]+>", "", message.body_html)
            plain_text = re.sub(r"\n\s*\n", "\n\n", plain_text).strip()

        if plain_text:
            alt_part.attach(MIMEText(plain_text, "plain", "utf-8"))
        if message.body_html:
            alt_part.attach(MIMEText(message.body_html, "html", "utf-8"))

        for att in message.attachments:
            maintype, subtype = att.content_type.split("/", 1)
            part = MIMEBase(maintype, subtype)
            part.set_payload(att.data)
            encoders.encode_base64(part)
            part.add_header("Content-Disposition", "attachment", filename=att.filename)
            if att.content_id:
                part.add_header("Content-ID", f"<{att.content_id}>")
            mime_msg.attach(part)

        return mime_msg

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### _meta.json

```json
{
  "owner": "borgcube",
  "slug": "claw-mail",
  "displayName": "claw-mail",
  "latest": {
    "version": "1.0.0",
    "publishedAt": 1772190999869,
    "commit": "https://github.com/openclaw/skills/commit/ec4600fddbaeff9d0697f874f5a46abd94db48c9"
  },
  "history": []
}

```

### scripts/lib/__init__.py

```python
"""Shared library for clawMail skill scripts."""

```

### scripts/lib/defaults.py

```python
"""Default paths and configuration helpers.

Provides the default config file path so that ``--config`` can be
optional across all scripts.  When ``--config`` is not supplied the
scripts look for ``config.yaml`` in the skill root directory
(one level above ``scripts/``).
"""

from __future__ import annotations

import os

_THIS_DIR = os.path.dirname(os.path.abspath(__file__))   # .../scripts/lib/
SCRIPTS_DIR = os.path.dirname(_THIS_DIR)                  # .../scripts/
SKILL_ROOT = os.path.dirname(SCRIPTS_DIR)                  # .../claw-mail/
DEFAULT_CONFIG_PATH = os.path.join(SKILL_ROOT, "config.yaml")


def resolve_config_path(explicit_path: str = "") -> str:
    """Return *explicit_path* if given, else the default config if it exists.

    Returns an empty string when neither is available so callers can
    still require ``--config`` or ``--imap-host`` as before.
    """
    if explicit_path:
        return explicit_path
    if os.path.isfile(DEFAULT_CONFIG_PATH):
        return DEFAULT_CONFIG_PATH
    return ""

```

claw-mail | SkillHub