Back to skills
SkillHub ClubShip Full StackFull Stack

email-assistant

Imported from https://github.com/openclaw/skills.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,123
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
F17.6

Install command

npx @skill-hub/cli install openclaw-skills-email-assistant

Repository

openclaw/skills

Skill path: skills/codeblackhole1024/email-assistant

Imported from https://github.com/openclaw/skills.

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install email-assistant into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding email-assistant to shared team environments
  • Use email-assistant for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: email-assistant
description: Multi-email management assistant supporting Gmail, 163, QQ, Outlook, and Hotmail. Features: (1) Fetch inbox and summarize emails (2) Keyword-based important email detection (3) Auto-extract calendar events from emails. Use when: users need unified email management across multiple accounts, want to avoid missing important emails, or need to extract schedules from email content.
---

# Email Assistant - 多邮箱管理助手

## 功能概览

- **多邮箱支持**: Gmail、163、QQ、Outlook、Hotmail
- **邮件读取**: 获取收件箱、显示摘要、搜索邮件
- **智能分析**: 关键词识别重要邮件
- **日程提取**: 自动从邮件中提取日程信息

## 支持的邮箱

| 邮箱 | 协议 | 服务器配置 |
|------|------|-----------|
| Gmail | OAuth2 + IMAP | imap.gmail.com:993 |
| 163 | IMAP | imap.163.com:993 |
| QQ | IMAP | imap.qq.com:993 |
| Outlook | IMAP | outlook.office365.com:993 |
| Hotmail | IMAP | outlook.office365.com:993 |

## 使用方法

### 1. Gmail 配置

首次使用需要配置 OAuth:

```bash
# 1. 在 Google Cloud Console 创建项目
# 2. 启用 Gmail API
# 3. 创建 OAuth 2.0 客户端凭据
# 4. 下载凭据文件为 credentials.json
# 5. 首次运行会自动打开浏览器授权
```

运行脚本:
```bash
cd scripts
python3 gmail_client.py --credentials ../credentials.json
```

### 2. IMAP 邮箱 (163/QQ/Outlook/Hotmail)

直接运行:
```bash
python3 imap_client.py --server <服务器> --email <邮箱> --password <密码或App密码>
```

### 3. 查看邮件

```bash
# 查看收件箱
python3 mail_parser.py --inbox

# 搜索特定邮件
python3 mail_parser.py --search "关键词"

# 分析重要邮件
python3 mail_parser.py --analyze
```

### 4. 提取日程

```bash
# 从邮件中提取日程并生成 ICS 文件
python3 scheduler.py --extract --output events.ics
```

## 脚本说明

### scripts/gmail_client.py
Gmail OAuth 认证客户端。首次运行会打开浏览器进行授权,之后凭据会缓存。

### scripts/imap_client.py
通用 IMAP 客户端,适用于 163、QQ、Outlook、Hotmail。使用 App Password 认证。

### scripts/mail_parser.py
邮件解析器。解析纯文本/HTML 邮件,提取关键信息,分析重要性。

### scripts/scheduler.py
日程提取器。从邮件中识别 iCal 附件或纯文本日程,生成标准 ICS 日历文件。

## 重要邮件识别

自动识别以下关键词:
- urgent, 紧急, 重要, 即时
- deadline, 截止, 期限
- meeting, 会议, 面试
- invoice, 发票, 付款, 账单
- contract, 合同, 协议

## 日程提取

支持格式:
- iCal 附件 (.ics)
- 纯文本日程(日期+时间+内容)

## 注意事项

1. **Gmail**: 必须使用 OAuth2,密码登录已被禁用
2. **163/QQ**: 需开启 IMAP 并使用 App Password
3. **Outlook/Hotmail**: 使用常规密码或 App Password
4. 建议使用 App Password 而非登录密码,提高安全性


---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### _meta.json

```json
{
  "owner": "codeblackhole1024",
  "slug": "email-assistant",
  "displayName": "Email Assistant",
  "latest": {
    "version": "1.1.0",
    "publishedAt": 1772870006206,
    "commit": "https://github.com/openclaw/skills/commit/3411735f2305096ce0eb650fc380613d2e4058ca"
  },
  "history": [
    {
      "version": "1.0.0",
      "publishedAt": 1772696946057,
      "commit": "https://github.com/openclaw/skills/commit/da1442c90d3db19c197dc87a11f4e46c3c0f1355"
    }
  ]
}

```

### references/email_config.md

```markdown
# 各邮箱配置详细指南

## Gmail

### 1. 获取 OAuth 凭据

1. 访问 [Google Cloud Console](https://console.cloud.google.com/)
2. 创建新项目或选择现有项目
3. 导航到 **API和服务** → **库**
4. 搜索并启用 **Gmail API**
5. 导航到 **凭据** → **创建凭据** → **OAuth 客户端 ID**
6. 选择 **桌面应用** 作为应用类型
7. 下载 JSON 文件,重命名为 `credentials.json`

### 2. 所需权限

创建 OAuth 凭据时需要添加以下 scope:
```
https://mail.google.com/
https://www.googleapis.com/auth/gmail.readonly
https://www.googleapis.com/auth/gmail.labels
```

### 3. 首次授权

首次运行脚本时:
1. 会自动打开浏览器
2. 使用 Gmail 账号登录
3. 点击"允许"授权
4. 凭据会自动缓存(token.json)

### 4. 常见问题

- **token.json 失效**: 删除文件重新授权
- **安全警告**: 在 Google Cloud Console 添加测试用户

---

## 163 邮箱

### 1. 开启 IMAP

1. 登录 163 邮箱
2. 设置 → POP3/IMAP/SMTP → 开启 IMAP
3. 获取 **客户端授权密码**(不是登录密码)

### 2. 服务器配置

```
IMAP 服务器: imap.163.com
IMAP 端口: 993
加密: SSL
```

### 3. 使用

```bash
python3 imap_client.py \
  --server imap.163.com \
  --email [email protected] \
  --password xxxxxxxx
```

---

## QQ 邮箱

### 1. 开启 IMAP

1. 登录 QQ 邮箱
2. 设置 → POP3/IMAP/SMTP → 开启 IMAP 服务
3. 获取 **授权码**(不是 QQ 密码)

### 2. 服务器配置

```
IMAP 服务器: imap.qq.com
IMAP 端口: 993
加密: SSL
```

### 3. 使用

```bash
python3 imap_client.py \
  --server imap.qq.com \
  --email [email protected] \
  --password xxxxxxxx
```

---

## Outlook / Hotmail

### 1. 开启 IMAP

1. 登录 Outlook 邮箱
2. 设置 → 查看所有 Outlook 设置 → 邮件 → POP 和 IMAP
3. 开启 IMAP

### 2. 服务器配置

```
IMAP 服务器: outlook.office365.com
IMAP 端口: 993
加密: SSL
```

### 3. 使用

```bash
python3 imap_client.py \
  --server outlook.office365.com \
  --email [email protected] \
  --password xxxxxxxx
```

---

## App Password 说明

| 邮箱 | 获取方式 |
|------|---------|
| 163 | 设置 → POP3/IMAP/SMTP → 客户端授权密码 |
| QQ | 设置 → POP3/IMAP/SMTP → 开启IMAP → 获取授权码 |
| Outlook | Microsoft 账户 → 安全 → 双重验证 → 应用密码 |
| Hotmail | 同 Outlook |

---

## 安全建议

1. **始终使用 App Password**:不要使用邮箱登录密码
2. **保管好凭据文件**:credentials.json 和 token.json 不要提交到代码仓库
3. **定期检查**:授权的应用列表,及时撤销不用的授权
4. **限制权限**:只申请必要的权限范围

```

### scripts/gmail_client.py

```python
#!/usr/bin/env python3
"""
Gmail Client - Gmail API OAuth 对接
使用 google-auth 和 google-auth-oauthlib 实现 OAuth2 认证

使用前需要:
1. 在 Google Cloud Console 创建项目
2. 启用 Gmail API
3. 下载 credentials.json 到脚本目录
4. 首次运行会打开浏览器进行授权

功能:
- 获取邮件列表
- 读取邮件内容
- 标记重要邮件 (STARRED)
"""

import os
import json
import pickle
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any

from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

# Gmail API 权限范围
SCOPES = [
    'https://www.googleapis.com/auth/gmail.readonly',
    'https://www.googleapis.com/auth/gmail.modify'
]


class GmailClient:
    """Gmail API 客户端"""
    
    def __init__(self, credentials_path: str = 'credentials.json', 
                 token_path: str = 'token.pickle'):
        """
        初始化 Gmail 客户端
        
        Args:
            credentials_path: OAuth 客户端配置文件路径
            token_path: 保存访问令牌的路径
        """
        self.credentials_path = credentials_path
        self.token_path = token_path
        self.service = None
        self._authenticate()
    
    def _authenticate(self) -> None:
        """进行 OAuth2 认证"""
        creds = None
        
        # 尝试加载已保存的令牌
        if os.path.exists(self.token_path):
            with open(self.token_path, 'rb') as token:
                creds = pickle.load(token)
        
        # 如果没有有效凭证,进行认证
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                # 刷新过期凭证
                try:
                    creds.refresh(Request())
                except Exception as e:
                    print(f"刷新令牌失败: {e}")
                    creds = None
            else:
                # 进行完整 OAuth 流程
                if not os.path.exists(self.credentials_path):
                    raise FileNotFoundError(
                        f"未找到 {self.credentials_path}\n"
                        "请先在 Google Cloud Console 创建 OAuth 客户端并下载配置文件\n"
                        "详见: https://developers.google.com/gmail/api/quickstart/python"
                    )
                
                flow = InstalledAppFlow.from_client_secrets_file(
                    self.credentials_path, SCOPES)
                creds = flow.run_local_server(
                    port=8080,
                    prompt='consent',
                    authorization_prompt_message=
                        "请访问以下网址授权: {url}"
                )
            
            # 保存凭证
            with open(self.token_path, 'wb') as token:
                pickle.dump(creds, token)
        
        # 构建 Gmail 服务
        self.service = build('gmail', 'v1', credentials=creds)
        print("✓ Gmail API 认证成功")
    
    def get_emails(self, max_results: int = 10, 
                   query: str = '',
                   label_ids: Optional[List[str]] = None) -> List[Dict[str, Any]]:
        """
        获取邮件列表
        
        Args:
            max_results: 最大返回数量
            query: Gmail 搜索语法
            label_ids: 按标签过滤 (INBOX, STARRED, etc.)
        
        Returns:
            邮件列表
        """
        try:
            results = self.service.users().messages().list(
                userId='me',
                maxResults=max_results,
                q=query,
                labelIds=label_ids
            ).execute()
            
            messages = results.get('messages', [])
            return messages
            
        except HttpError as e:
            print(f"获取邮件列表失败: {e}")
            return []
    
    def get_email_detail(self, msg_id: str) -> Optional[Dict[str, Any]]:
        """
        获取邮件详细内容
        
        Args:
            msg_id: 邮件ID
        
        Returns:
            邮件详情字典
        """
        try:
            message = self.service.users().messages().get(
                userId='me',
                id=msg_id,
                format='full'
            ).execute()
            
            return self._parse_message(message)
            
        except HttpError as e:
            print(f"获取邮件详情失败: {e}")
            return None
    
    def _parse_message(self, message: dict) -> Dict[str, Any]:
        """解析邮件消息"""
        headers = message.get('payload', {}).get('headers', [])
        
        # 提取标准头部
        def get_header(name: str) -> str:
            for header in headers:
                if header['name'].lower() == name.lower():
                    return header['value']
            return ''
        
        # 解析内容
        payload = message.get('payload', {})
        body = self._get_body(payload)
        
        return {
            'id': message['id'],
            'threadId': message.get('threadId'),
            'subject': get_header('subject'),
            'from': get_header('from'),
            'to': get_header('to'),
            'date': get_header('date'),
            'labels': message.get('labelIds', []),
            'snippet': message.get('snippet', ''),
            'body': body,
            'raw': message.get('raw')
        }
    
    def _get_body(self, payload: dict) -> str:
        """从 payload 中提取邮件正文"""
        # 直接 body
        if 'body' in payload and payload['body'].get('data'):
            import base64
            data = payload['body']['data']
            return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
        
        # 多部分消息
        if 'parts' in payload:
            for part in payload['parts']:
                if part.get('mimeType') == 'text/plain':
                    if 'body' in part and part['body'].get('data'):
                        import base64
                        data = part['body']['data']
                        return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
                elif part.get('mimeType') == 'text/html':
                    if 'body' in part and part['body'].get('data'):
                        import base64
                        data = part['body']['data']
                        return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
        
        return ''
    
    def mark_important(self, msg_id: str) -> bool:
        """
        标记邮件为重要 (添加 STARRED 标签)
        
        Args:
            msg_id: 邮件ID
        
        Returns:
            是否成功
        """
        try:
            self.service.users().messages().modify(
                userId='me',
                id=msg_id,
                body={
                    'addLabelIds': ['STARRED'],
                    'removeLabelIds': ['UNREAD']
                }
            ).execute()
            return True
        except HttpError as e:
            print(f"标记邮件失败: {e}")
            return False
    
    def mark_as_read(self, msg_id: str) -> bool:
        """
        标记邮件为已读
        
        Args:
            msg_id: 邮件ID
        
        Returns:
            是否成功
        """
        try:
            self.service.users().messages().modify(
                userId='me',
                id=msg_id,
                body={
                    'removeLabelIds': ['UNREAD']
                }
            ).execute()
            return True
        except HttpError as e:
            print(f"标记已读失败: {e}")
            return False
    
    def get_recent_emails(self, days: int = 7, max_results: int = 20) -> List[Dict[str, Any]]:
        """获取最近几天的邮件"""
        # 计算日期
        date = (datetime.now() - timedelta(days=days)).strftime('%Y/%m/%d')
        query = f'after:{date}'
        
        messages = self.get_emails(max_results=max_results, query=query)
        
        # 获取详细信息
        emails = []
        for msg in messages:
            detail = self.get_email_detail(msg['id'])
            if detail:
                emails.append(detail)
        
        return emails


# 示例用法
if __name__ == '__main__':
    try:
        # 初始化客户端 (需要 credentials.json)
        client = GmailClient()
        
        # 获取最近邮件
        print("\n=== 最近 5 封邮件 ===")
        emails = client.get_emails(max_results=5)
        
        for msg in emails:
            detail = client.get_email_detail(msg['id'])
            if detail:
                print(f"\n标题: {detail['subject']}")
                print(f"发件人: {detail['from']}")
                print(f"日期: {detail['date']}")
                print(f"预览: {detail['snippet'][:100]}...")
        
        # 标记第一封邮件为重要
        if emails:
            print(f"\n标记邮件 {emails[0]['id']} 为重要...")
            client.mark_important(emails[0]['id'])
            
    except FileNotFoundError as e:
        print(f"\n{e}")
        print("\n请按以下步骤操作:")
        print("1. 访问 https://console.cloud.google.com/")
        print("2. 创建项目并启用 Gmail API")
        print("3. 创建 OAuth 2.0 客户端凭证")
        print("4. 下载 JSON 文件并重命名为 credentials.json")
    except Exception as e:
        print(f"错误: {e}")

```

### scripts/imap_client.py

```python
#!/usr/bin/env python3
"""
IMAP Client - Universal IMAP email client
Supports: 163, QQ, Outlook, Hotmail
"""

import imaplib
import email
import json
import argparse
import sys
from datetime import datetime
from typing import Optional, List, Dict


# IMAP server configurations
IMAP_CONFIG = {
    "163": {
        "server": "imap.163.com",
        "port": 993,
        "ssl": True
    },
    "qq": {
        "server": "imap.qq.com",
        "port": 993,
        "ssl": True
    },
    "outlook": {
        "server": "outlook.office365.com",
        "port": 993,
        "ssl": True
    },
    "hotmail": {
        "server": "outlook.office365.com",
        "port": 993,
        "ssl": True
    }
}


def parse_email_provider(email_addr: str) -> str:
    """Detect email provider from email address."""
    domain = email_addr.split("@")[-1].lower()
    
    if "163" in domain or "126" in domain or "yeah" in domain:
        return "163"
    elif "qq" in domain:
        return "qq"
    elif "outlook" in domain or "live" in domain or "msn" in domain:
        return "outlook"
    elif "hotmail" in domain or "live.com" in domain or "msn.com" in domain:
        return "hotmail"
    else:
        raise ValueError(f"Unsupported email provider: {domain}")


def connect_imap(server: str, email_addr: str, password: str) -> imaplib.IMAP4_SSL:
    """Connect to IMAP server."""
    try:
        config = IMAP_CONFIG.get(server)
        if not config:
            # Try as custom server
            config = {"server": server, "port": 993, "ssl": True}
        
        if config.get("ssl", True):
            imap = imaplib.IMAP4_SSL(config["server"], config["port"])
        else:
            imap = imaplib.IMAP4(config["server"], config["port"])
        
        imap.login(email_addr, password)
        print(f"✓ Connected to {config['server']}")
        return imap
    except Exception as e:
        print(f"✗ Connection failed: {e}")
        sys.exit(1)


def get_email_list(imap: imaplib.IMAP4_SSL, limit: int = 50) -> List[Dict]:
    """Fetch email list from INBOX."""
    try:
        status, messages = imap.select("INBOX")
        if status != "OK":
            print(f"✗ Failed to select INBOX: {messages}")
            return []
        
        # Get total number of messages
        total_msgs = int(messages[0])
        print(f"Total messages in INBOX: {total_msgs}")
        
        # Calculate range to fetch
        start = max(1, total_msgs - limit + 1)
        end = total_msgs
        
        emails = []
        for num in range(end, start - 1, -1):
            try:
                status, msg_data = imap.fetch(str(num), "(RFC822)")
                if status != "OK":
                    continue
                
                msg = email.message_from_bytes(msg_data[0][1])
                
                # Parse email headers
                subject = msg.get("Subject", "(No Subject)")
                from_addr = email.utils.parseaddr(msg.get("From", ""))[1]
                date = msg.get("Date", "")
                
                # Get preview (first 100 chars of body)
                preview = ""
                if msg.is_multipart:
                    for part in msg.walk():
                        if part.get_content_type() == "text/plain":
                            try:
                                preview = part.get_payload(decode=True).decode("utf-8", errors="ignore")[:200]
                                break
                            except:
                                pass
                else:
                    try:
                        preview = msg.get_payload(decode=True).decode("utf-8", errors="ignore")[:200]
                    except:
                        pass
                
                emails.append({
                    "id": num,
                    "subject": subject,
                    "from": from_addr,
                    "date": date,
                    "preview": preview.replace("\n", " ").strip()
                })
                
            except Exception as e:
                print(f"  Warning: Failed to fetch email {num}: {e}")
                continue
        
        return emails
        
    except Exception as e:
        print(f"✗ Failed to fetch emails: {e}")
        return []


def get_email_detail(imap: imaplib.IMAP4_SSL, email_id: int) -> Optional[Dict]:
    """Get full email details."""
    try:
        status, msg_data = imap.fetch(str(email_id), "(RFC822)")
        if status != "OK":
            return None
        
        msg = email.message_from_bytes(msg_data[0][1])
        
        # Parse headers
        subject = msg.get("Subject", "(No Subject)")
        from_addr = email.utils.parseaddr(msg.get("From", ""))
        to_addr = email.utils.parseaddr(msg.get("To", ""))
        date = msg.get("Date", "")
        
        # Parse body
        body_text = ""
        body_html = ""
        
        if msg.is_multipart():
            for part in msg.walk():
                content_type = part.get_content_type()
                if content_type == "text/plain" and not body_text:
                    try:
                        body_text = part.get_payload(decode=True).decode("utf-8", errors="ignore")
                    except:
                        pass
                elif content_type == "text/html" and not body_html:
                    try:
                        body_html = part.get_payload(decode=True).decode("utf-8", errors="ignore")
                    except:
                        pass
        else:
            try:
                body_text = msg.get_payload(decode=True).decode("utf-8", errors="ignore")
            except:
                pass
        
        return {
            "id": email_id,
            "subject": subject,
            "from": from_addr,
            "to": to_addr,
            "date": date,
            "body_text": body_text,
            "body_html": body_html
        }
        
    except Exception as e:
        print(f"✗ Failed to fetch email {email_id}: {e}")
        return None


def mark_as_read(imap: imaplib.IMAP4_SSL, email_id: int) -> bool:
    """Mark email as read."""
    try:
        status, _ = imap.store(str(email_id), "+FLAGS", "\\Seen")
        return status == "OK"
    except Exception as e:
        print(f"✗ Failed to mark email as read: {e}")
        return False


def mark_as_unread(imap: imaplib.IMAP4_SSL, email_id: int) -> bool:
    """Mark email as unread."""
    try:
        status, _ = imap.store(str(email_id), "-FLAGS", "\\Seen")
        return status == "OK"
    except Exception as e:
        print(f"✗ Failed to mark email as unread: {e}")
        return False


def main():
    parser = argparse.ArgumentParser(description="Universal IMAP Email Client")
    parser.add_argument("--server", help="IMAP server (auto-detected from email if not provided)")
    parser.add_argument("--email", required=True, help="Email address")
    parser.add_argument("--password", required=True, help="Password or app token")
    parser.add_argument("--limit", type=int, default=50, help="Number of emails to fetch (default: 50)")
    parser.add_argument("--list", action="store_true", help="List emails")
    parser.add_argument("--read", type=int, help="Read specific email by ID")
    parser.add_argument("--mark-read", type=int, help="Mark email as read")
    parser.add_argument("--mark-unread", type=int, help="Mark email as unread")
    parser.add_argument("--output", help="Output file (JSON)")
    
    args = parser.parse_args()
    
    # Auto-detect provider if not specified
    server = args.server or parse_email_provider(args.email)
    print(f"Using provider: {server}")
    
    # Connect
    imap = connect_imap(server, args.email, args.password)
    
    try:
        if args.list:
            print(f"\nFetching last {args.limit} emails...")
            emails = get_email_list(imap, args.limit)
            
            if args.output:
                with open(args.output, "w", encoding="utf-8") as f:
                    json.dump(emails, f, ensure_ascii=False, indent=2)
                print(f"✓ Saved to {args.output}")
            else:
                print(f"\n{'='*60}")
                for i, email in enumerate(emails, 1):
                    print(f"\n[{i}] Subject: {email['subject']}")
                    print(f"    From: {email['from']}")
                    print(f"    Date: {email['date']}")
                    print(f"    Preview: {email['preview'][:80]}...")
                    
        elif args.read:
            email_detail = get_email_detail(imap, args.read)
            if email_detail:
                print(f"\nSubject: {email_detail['subject']}")
                print(f"From: {email_detail['from']}")
                print(f"To: {email_detail['to']}")
                print(f"Date: {email_detail['date']}")
                print(f"\n{'='*60}")
                print(email_detail["body_text"][:2000])
            else:
                print("Email not found")
                
        elif args.mark_read:
            if mark_as_read(imap, args.mark_read):
                print(f"✓ Email {args.mark_read} marked as read")
            else:
                print(f"✗ Failed to mark email as read")
                
        elif args.mark_unread:
            if mark_as_unread(imap, args.mark_unread):
                print(f"✓ Email {args.mark_unread} marked as unread")
            else:
                print(f"✗ Failed to mark email as unread")
        else:
            parser.print_help()
            
    finally:
        imap.logout()
        print("\n✓ Logged out")


if __name__ == "__main__":
    main()

```

### scripts/mail_parser.py

```python
#!/usr/bin/env python3
"""
Mail Parser - Email parsing and important email analysis
Parses plain text/HTML emails and identifies important emails
"""

import json
import re
import argparse
import sys
from datetime import datetime
from typing import List, Dict, Optional
from html import unescape


# Important email keywords (case-insensitive)
IMPORTANT_KEYWORDS = {
    "urgent": ["urgent", "紧急", "紧迫", "asap", "immediately"],
    "deadline": ["deadline", "截止", "截止日期", "due", "最后期限", "截止时间"],
    "meeting": ["meeting", "会议", "conference", "call", "zoom", "teams"],
    "invoice": ["invoice", "发票", "账单", "payment", "付款", "invoice"],
    "contract": ["contract", "合同", "agreement", "协议", "legal"],
    "action": ["action required", "需要处理", "请回复", "please respond", "需确认"],
    "important": ["important", "重要", "priority", "高优先级", "urgent"],
    "money": ["payment", "付款", "invoice", "账单", "转账", "金额", "payment"],
    "security": ["security", "安全", "password", "密码", "account", "账户", "登录"]
}

# Priority patterns (more specific = higher priority)
PRIORITY_PATTERNS = [
    (r"(?i)(urgent|immediate|asap|紧急|紧迫|立刻)", 10),
    (r"(?i)(deadline|due|截止|最后期限)", 9),
    (r"(?i)(meeting|会议|call|zoom|teams|conference)", 8),
    (r"(?i)(invoice|payment|付款|发票|账单)", 7),
    (r"(?i)(contract|合同|agreement|协议)", 7),
    (r"(?i)(action required|需要处理|请回复|please respond)", 6),
    (r"(?i)(important|重要|priority|高优先级)", 5),
]


def clean_text(text: str) -> str:
    """Clean and normalize text."""
    if not text:
        return ""
    # Unescape HTML entities
    text = unescape(text)
    # Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text)
    # Remove control characters
    text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text)
    return text.strip()


def strip_html(html: str) -> str:
    """Strip HTML tags and return plain text."""
    if not html:
        return ""
    
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', ' ', html)
    # Decode HTML entities
    text = unescape(text)
    # Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text)
    return text.strip()


def detect_important_keywords(text: str) -> List[Dict[str, any]]:
    """Detect important keywords in email text."""
    text_lower = text.lower()
    found = []
    
    for category, keywords in IMPORTANT_KEYWORDS.items():
        for keyword in keywords:
            if keyword.lower() in text_lower:
                # Find position
                pos = text_lower.find(keyword.lower())
                found.append({
                    "category": category,
                    "keyword": keyword,
                    "position": pos,
                    "context": text[max(0, pos-20):min(len(text), pos+len(keyword)+20)]
                })
    
    return found


def calculate_priority(text: str) -> int:
    """Calculate email priority (0-10)."""
    text_lower = text.lower()
    priority = 0
    
    for pattern, score in PRIORITY_PATTERNS:
        if re.search(pattern, text_lower):
            priority = max(priority, score)
    
    return priority


def analyze_email(email_data: Dict) -> Dict:
    """Analyze a single email for importance."""
    subject = email_data.get("subject", "")
    preview = email_data.get("preview", "")
    body_text = email_data.get("body_text", "")
    body_html = email_data.get("body_html", "")
    
    # Combine text for analysis
    full_text = f"{subject} {preview} {body_text}"
    if not full_text.strip():
        full_text = strip_html(body_html)
    
    # Clean text
    full_text = clean_text(full_text)
    
    # Calculate priority
    priority = calculate_priority(full_text)
    
    # Detect keywords
    keywords = detect_important_keywords(full_text)
    
    # Determine categories
    categories = list(set(k["category"] for k in keywords))
    
    # Generate summary
    summary = {
        "id": email_data.get("id"),
        "subject": subject,
        "from": email_data.get("from"),
        "date": email_data.get("date"),
        "priority": priority,
        "is_important": priority >= 6,
        "categories": categories,
        "keywords": [k["keyword"] for k in keywords],
        "preview": preview[:200] if preview else (full_text[:200] if full_text else "")
    }
    
    return summary


def load_emails_from_file(filepath: str) -> List[Dict]:
    """Load emails from JSON file."""
    try:
        with open(filepath, "r", encoding="utf-8") as f:
            data = json.load(f)
            if isinstance(data, list):
                return data
            elif isinstance(data, dict) and "emails" in data:
                return data["emails"]
            else:
                return [data]
    except Exception as e:
        print(f"✗ Failed to load emails: {e}")
        sys.exit(1)


def analyze_emails(emails: List[Dict]) -> Dict:
    """Analyze a list of emails."""
    results = []
    important_emails = []
    
    for email_data in emails:
        analysis = analyze_email(email_data)
        results.append(analysis)
        
        if analysis["is_important"]:
            important_emails.append(analysis)
    
    return {
        "total": len(emails),
        "important_count": len(important_emails),
        "results": results,
        "important": important_emails
    }


def print_summary(results: Dict):
    """Print analysis summary."""
    print(f"\n{'='*60}")
    print(f"Email Analysis Summary")
    print(f"{'='*60}")
    print(f"Total emails analyzed: {results['total']}")
    print(f"Important emails found: {results['important_count']}")
    print(f"\n{'='*60}")
    
    if results["important"]:
        print("\n📌 IMPORTANT EMAILS:")
        print("-" * 60)
        for email in results["important"]:
            print(f"\nPriority: {'🔥' * (email['priority'] // 3)} ({email['priority']}/10)")
            print(f"Subject: {email['subject']}")
            print(f"From: {email['from']}")
            print(f"Categories: {', '.join(email['categories'])}")
            print(f"Keywords: {', '.join(email['keywords'][:5])}")
            print(f"Preview: {email['preview'][:100]}...")
    
    print(f"\n{'='*60}")
    print("\nAll Emails (sorted by priority):")
    print("-" * 60)
    
    sorted_results = sorted(results["results"], key=lambda x: x["priority"], reverse=True)
    for i, email in enumerate(sorted_results, 1):
        icon = "🔥" if email["is_important"] else "  "
        print(f"{icon} [{email['priority']:>2}] {email['subject'][:50]}")


def main():
    parser = argparse.ArgumentParser(description="Email Parser and Analyzer")
    parser.add_argument("--emails", help="Path to JSON file containing emails")
    parser.add_argument("--analyze", action="store_true", help="Analyze loaded emails")
    parser.add_argument("--input", help="Input JSON file")
    parser.add_argument("--output", help="Output JSON file for results")
    parser.add_argument("--important-only", action="store_true", help="Show only important emails")
    parser.add_argument("--min-priority", type=int, default=0, help="Minimum priority to show (0-10)")
    
    args = parser.parse_args()
    
    emails = []
    
    # Load emails from file
    if args.emails or args.input:
        filepath = args.emails or args.input
        print(f"Loading emails from {filepath}...")
        emails = load_emails_from_file(filepath)
        print(f"Loaded {len(emails)} emails")
    
    if args.analyze or args.emails:
        if not emails:
            print("✗ No emails to analyze")
            sys.exit(1)
        
        print("\nAnalyzing emails...")
        results = analyze_emails(emails)
        
        # Filter by priority
        if args.important_only:
            results["results"] = [r for r in results["results"] if r["priority"] >= args.min_priority]
        
        # Print summary
        print_summary(results)
        
        # Save to file if requested
        if args.output:
            output_data = {
                "timestamp": datetime.now().isoformat(),
                "total": results["total"],
                "important_count": results["important_count"],
                "results": results["results"],
                "important": results["important"]
            }
            with open(args.output, "w", encoding="utf-8") as f:
                json.dump(output_data, f, ensure_ascii=False, indent=2)
            print(f"\n✓ Results saved to {args.output}")
    else:
        parser.print_help()


if __name__ == "__main__":
    main()

```

### scripts/scheduler.py

```python
#!/usr/bin/env python3
"""
Scheduler - Extract calendar events from emails and generate ICS files
"""

import json
import re
import argparse
import sys
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from email.utils import parsedate_to_datetime
import uuid


# Date/time patterns to match
DATETIME_PATTERNS = [
    # 2024年3月15日 14:00
    (r"(\d{4})年(\d{1,2})月(\d{1,2})日\s*(\d{1,2}):(\d{2})", "datetime"),
    # 2024-03-15 14:00 or 2024/03/15 14:00
    (r"(\d{4})[-/](\d{1,2})[-/](\d{1,2})\s+(\d{1,2}):(\d{2})", "datetime"),
    # March 15, 2024 14:00
    (r"(Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+(\d{1,2}),?\s+(\d{4})\s+(\d{1,2}):(\d{2})", "datetime"),
    # 3月15日 14:00
    (r"(\d{1,2})月(\d{1,2})日\s*(\d{1,2}):(\d{2})", "datetime_short"),
]

DATE_PATTERNS = [
    # 2024年3月15日
    (r"(\d{4})年(\d{1,2})月(\d{1,2})日", "date"),
    # 2024-03-15 or 2024/03/15
    (r"(\d{4})[-/](\d{1,2})[-/](\d{1,2})", "date"),
    # March 15, 2024
    (r"(Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+(\d{1,2}),?\s+(\d{4})", "date_short"),
]

MONTH_MAP = {
    "jan": 1, "january": 1,
    "feb": 2, "february": 2,
    "mar": 3, "march": 3,
    "apr": 4, "april": 4,
    "may": 5,
    "jun": 6, "june": 6,
    "jul": 7, "july": 7,
    "aug": 8, "august": 8,
    "sep": 9, "september": 9,
    "oct": 10, "october": 10,
    "nov": 11, "november": 11,
    "dec": 12, "december": 12,
}


def parse_datetime(match, year_hint=None) -> Optional[datetime]:
    """Parse datetime from regex match."""
    try:
        if len(match.groups()) == 5:
            g = match.groups()
            if g[0].isdigit() and len(g[0]) == 4:  # Full year
                year, month, day, hour, minute = int(g[0]), int(g[1]), int(g[2]), int(g[3]), int(g[4])
            elif g[0].lower() in MONTH_MAP:  # Month name
                month = MONTH_MAP[g[0].lower()]
                day, year, hour, minute = int(g[1]), int(g[2]), int(g[3]), int(g[4])
                if year_hint:
                    year = year_hint
            else:  # Short format (month, day, hour, minute)
                month, day, year, hour, minute = int(g[0]), int(g[1]), year_hint or datetime.now().year, int(g[2]), int(g[3])
            return datetime(year, month, day, hour, minute)
    except Exception:
        pass
    return None


def parse_date(match, year_hint=None) -> Optional[datetime]:
    """Parse date from regex match."""
    try:
        g = match.groups()
        if len(g) >= 3:
            if g[0].isdigit() and len(g[0]) == 4:  # Full format
                year, month, day = int(g[0]), int(g[1]), int(g[2])
                return datetime(year, month, day)
            elif g[0].lower() in MONTH_MAP:  # Month name
                month = MONTH_MAP[g[0].lower()]
                day, year = int(g[1]), int(g[2])
                return datetime(year, month, day)
    except Exception:
        pass
    return None


def extract_ical_events(text: str) -> List[Dict]:
    """Extract iCal events from text."""
    events = []
    # Simple VCALENDAR parser
    if "BEGIN:VEVENT" in text:
        in_event = False
        event_data = {}
        
        for line in text.split('\n'):
            line = line.strip()
            
            if line == "BEGIN:VEVENT":
                in_event = True
                event_data = {}
            elif line == "END:VEVENT":
                in_event = False
                if event_data:
                    events.append(event_data)
            elif in_event:
                if line.startswith("SUMMARY:"):
                    event_data["summary"] = line[8:]
                elif line.startswith("DTSTART"):
                    if ":" in line:
                        event_data["dtstart"] = line.split(":")[1]
                elif line.startswith("DTEND"):
                    if ":" in line:
                        event_data["dtend"] = line.split(":")[1]
                elif line.startswith("DESCRIPTION:"):
                    event_data["description"] = line[12:]
                elif line.startswith("LOCATION:"):
                    event_data["location"] = line[9:]
    
    return events


def extract_text_events(text: str, year_hint=None) -> List[Dict]:
    """Extract events from plain text."""
    events = []
    
    # Try datetime patterns first
    for pattern, ptype in DATETIME_PATTERNS:
        for match in re.finditer(pattern, text):
            dt = None
            if ptype == "datetime":
                dt = parse_datetime(match, year_hint)
            elif ptype == "datetime_short":
                dt = parse_datetime(match, year_hint)
            
            if dt:
                # Extract context (surrounding text)
                start = max(0, match.start() - 30)
                end = min(len(text), match.end() + 100)
                context = text[start:end].strip()
                
                # Try to extract title from context
                title = context.split('\n')[0][:50]
                
                events.append({
                    "dtstart": dt,
                    "dtend": dt + timedelta(hours=1),
                    "summary": title,
                    "description": context,
                    "source": "text"
                })
                break
    
    # Try date patterns
    if not events:
        for pattern, ptype in DATE_PATTERNS:
            for match in re.finditer(pattern, text):
                dt = None
                if ptype == "date":
                    dt = parse_date(match, year_hint)
                elif ptype == "date_short":
                    dt = parse_date(match, year_hint)
                
                if dt:
                    start = max(0, match.start() - 30)
                    end = min(len(text), match.end() + 100)
                    context = text[start:end].strip()
                    title = context.split('\n')[0][:50]
                    
                    events.append({
                        "dtstart": dt,
                        "dtend": dt + timedelta(hours=1),
                        "summary": title,
                        "description": context,
                        "source": "text"
                    })
                    break
    
    return events


def generate_ics(events: List[Dict]) -> str:
    """Generate ICS calendar file content."""
    lines = [
        "BEGIN:VCALENDAR",
        "VERSION:2.0",
        "PRODID:-//Email Assistant//EN",
        "CALSCALE:GREGORIAN",
        "METHOD:PUBLISH",
    ]
    
    for event in events:
        uid = str(uuid.uuid4())
        dtstart = event.get("dtstart")
        dtend = event.get("dtend")
        
        if isinstance(dtstart, datetime):
            dtstart_str = dtstart.strftime("%Y%m%dT%H%M%S")
        else:
            dtstart_str = str(dtstart)
        
        if isinstance(dtend, datetime):
            dtend_str = dtend.strftime("%Y%m%dT%H%M%S")
        else:
            dtend_str = str(dtend)
        
        lines.extend([
            "BEGIN:VEVENT",
            f"UID:{uid}",
            f"DTSTART:{dtstart_str}",
            f"DTEND:{dtend_str}",
            f"SUMMARY:{event.get('summary', 'Event')}",
        ])
        
        if event.get("description"):
            lines.append(f"DESCRIPTION:{event.get('description')}")
        
        if event.get("location"):
            lines.append(f"LOCATION:{event.get('location')}")
        
        lines.extend([
            "END:VEVENT",
        ])
    
    lines.append("END:VCALENDAR")
    return "\n".join(lines)


def load_emails(filepath: str) -> List[Dict]:
    """Load emails from JSON file."""
    try:
        with open(filepath, "r", encoding="utf-8") as f:
            data = json.load(f)
            if isinstance(data, list):
                return data
            elif isinstance(data, dict) and "emails" in data:
                return data["emails"]
            else:
                return [data]
    except Exception as e:
        print(f"✗ Failed to load emails: {e}")
        sys.exit(1)


def extract_events_from_emails(emails: List[Dict]) -> List[Dict]:
    """Extract calendar events from emails."""
    all_events = []
    year_hint = datetime.now().year
    
    for email in emails:
        subject = email.get("subject", "")
        body_text = email.get("body_text", "")
        body_html = email.get("body_html", "")
        preview = email.get("preview", "")
        
        # Combine all text
        full_text = f"{subject}\n{preview}\n{body_text}"
        
        # Extract iCal events
        ical_events = extract_ical_events(full_text)
        for e in ical_events:
            e["email_subject"] = subject
            e["email_from"] = email.get("from", "")
        all_events.extend(ical_events)
        
        # Extract text events
        text_events = extract_text_events(full_text, year_hint)
        for e in text_events:
            e["email_subject"] = subject
            e["email_from"] = email.get("from", "")
        all_events.extend(text_events)
    
    return all_events


def main():
    parser = argparse.ArgumentParser(description="Extract calendar events from emails")
    parser.add_argument("--emails", help="Path to JSON file containing emails")
    parser.add_argument("--output", default="events.ics", help="Output ICS file")
    parser.add_argument("--list", action="store_true", help="List events without generating ICS")
    
    args = parser.parse_args()
    
    if not args.emails:
        parser.print_help()
        sys.exit(1)
    
    print(f"Loading emails from {args.emails}...")
    emails = load_emails(args.emails)
    print(f"Loaded {len(emails)} emails")
    
    print("\nExtracting events...")
    events = extract_events_from_emails(emails)
    
    print(f"Found {len(events)} events")
    
    if args.list:
        for i, event in enumerate(events, 1):
            print(f"\n{i}. {event.get('summary', 'Untitled')}")
            dtstart = event.get("dtstart")
            if isinstance(dtstart, datetime):
                print(f"   Date: {dtstart.strftime('%Y-%m-%d %H:%M')}")
            print(f"   Source: {event.get('source', 'unknown')}")
            if event.get("email_subject"):
                print(f"   From email: {event['email_subject'][:50]}")
    
    if not args.list:
        ics_content = generate_ics(events)
        with open(args.output, "w", encoding="utf-8") as f:
            f.write(ics_content)
        print(f"\n✓ Events exported to {args.output}")
        
        # Also save as JSON for debugging
        json_output = args.output.replace(".ics", ".json")
        json_data = []
        for event in events:
            json_event = {
                "summary": event.get("summary"),
                "dtstart": str(event.get("dtstart")) if event.get("dtstart") else None,
                "dtend": str(event.get("dtend")) if event.get("dtend") else None,
                "source": event.get("source"),
                "email_subject": event.get("email_subject"),
            }
            json_data.append(json_event)
        
        with open(json_output, "w", encoding="utf-8") as f:
            json.dump(json_data, f, ensure_ascii=False, indent=2)
        print(f"✓ Debug data saved to {json_output}")


if __name__ == "__main__":
    main()

```

email-assistant | SkillHub