Back to skills
SkillHub ClubShip Full StackFull Stack

xiaohongshu-skill

当用户想要与小红书(xiaohongshu/rednote)交互时使用此 Skill。包括搜索笔记、获取帖子详情、查看用户主页、二维码扫码登录、提取平台内容等。当用户提到 xiaohongshu、小红书、rednote,或需要浏览/抓取中国社交媒体内容时激活此 Skill。

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,074
Hot score
99
Updated
March 20, 2026
Overall rating
C5.5
Composite score
5.5
Best-practice grade
C61.1

Install command

npx @skill-hub/cli install openclaw-skills-xiaohongshu-skill

Repository

openclaw/skills

Skill path: skills/deliciousbuding/xiaohongshu-skill

当用户想要与小红书(xiaohongshu/rednote)交互时使用此 Skill。包括搜索笔记、获取帖子详情、查看用户主页、二维码扫码登录、提取平台内容等。当用户提到 xiaohongshu、小红书、rednote,或需要浏览/抓取中国社交媒体内容时激活此 Skill。

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install xiaohongshu-skill into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding xiaohongshu-skill to shared team environments
  • Use xiaohongshu-skill for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: xiaohongshu-skill
description: 当用户想要与小红书(xiaohongshu/rednote)交互时使用此 Skill。包括搜索笔记、获取帖子详情、查看用户主页、二维码扫码登录、提取平台内容等。当用户提到 xiaohongshu、小红书、rednote,或需要浏览/抓取中国社交媒体内容时激活此 Skill。
user-invokable: true
metadata: {"openclaw": {"emoji": "📕", "requires": {"bins": ["python3", "playwright"], "anyBins": ["python3", "python"]}, "os": ["win32", "linux", "darwin"], "install": [{"id": "pip", "kind": "node", "label": "Install dependencies (pip)", "bins": ["playwright"]}]}}
---

# 小红书 Skill

基于 Python Playwright 的小红书(rednote)交互工具,通过浏览器自动化从 `window.__INITIAL_STATE__`(Vue SSR 状态)中提取结构化数据。

## 前置条件

在 `{baseDir}` 目录下安装依赖:

```bash
cd {baseDir}
pip install -r requirements.txt
playwright install chromium
```

Linux/WSL 环境还需运行:
```bash
playwright install-deps chromium
```

## 快速开始

所有命令从 `{baseDir}` 目录运行。

### 1. 登录(首次必须)

```bash
cd {baseDir}

# 打开浏览器窗口,显示二维码供微信/小红书扫描
python -m scripts qrcode --headless=false

# 检查登录是否仍然有效
python -m scripts check-login
```

在无头环境下,二维码图片保存到 `{baseDir}/data/qrcode.png`,可通过其他渠道发送扫码。

### 2. 搜索

```bash
cd {baseDir}

# 基础搜索
python -m scripts search "关键词"

# 带筛选条件
python -m scripts search "美食" --sort-by=最新 --note-type=图文 --limit=10
```

**筛选选项:**
- `--sort-by`:综合、最新、最多点赞、最多评论、最多收藏
- `--note-type`:不限、视频、图文
- `--publish-time`:不限、一天内、一周内、半年内
- `--search-scope`:不限、已看过、未看过、已关注
- `--location`:不限、同城、附近

### 3. 帖子详情

```bash
cd {baseDir}

# 使用搜索结果中的 id 和 xsec_token
python -m scripts feed <feed_id> <xsec_token>

# 加载评论
python -m scripts feed <feed_id> <xsec_token> --load-comments --max-comments=20
```

### 4. 用户主页

```bash
cd {baseDir}
python -m scripts user <user_id> [xsec_token]
```

## 数据提取路径

| 数据类型 | JavaScript 路径 |
|----------|----------------|
| 搜索结果 | `window.__INITIAL_STATE__.search.feeds` |
| 帖子详情 | `window.__INITIAL_STATE__.note.noteDetailMap` |
| 用户信息 | `window.__INITIAL_STATE__.user.userPageData` |
| 用户笔记 | `window.__INITIAL_STATE__.user.notes` |

**Vue Ref 处理:** 始终通过 `.value` 或 `._value` 解包:
```javascript
const data = obj.value !== undefined ? obj.value : obj._value;
```

## 反爬保护

本 Skill 内置了针对小红书反机器人策略的保护措施:

- **频率控制**:两次导航间自动延迟 3-6 秒,每 5 次连续请求后冷却 10 秒
- **验证码检测**:自动检测安全验证页面重定向,触发时抛出 `CaptchaError` 并给出处理建议
- **仿人类行为**:随机延迟、滚动模式、User-Agent 伪装

**触发验证码时的处理:**
1. 等待几分钟后重试
2. 运行 `cd {baseDir} && python -m scripts qrcode --headless=false` 手动通过验证
3. 如 Cookie 失效,重新扫码登录

## 输出格式

所有命令输出 JSON 到标准输出。搜索结果示例:
```json
{
  "id": "abc123",
  "xsec_token": "ABxyz...",
  "title": "帖子标题",
  "type": "normal",
  "user": "用户名",
  "user_id": "user123",
  "liked_count": "1234",
  "collected_count": "567",
  "comment_count": "89"
}
```

## 文件结构

```
{baseDir}/
├── SKILL.md              # 本文件(Skill 规范)
├── README.md             # 项目文档
├── requirements.txt      # Python 依赖
├── LICENSE               # MIT 许可证
├── data/                 # 运行时数据(二维码、调试输出)
└── scripts/              # 核心模块
    ├── __init__.py
    ├── __main__.py       # CLI 入口
    ├── client.py         # 浏览器客户端封装(频率控制 + 验证码检测)
    ├── login.py          # 二维码扫码登录流程
    ├── search.py         # 搜索(支持多种筛选)
    ├── feed.py           # 帖子详情提取
    └── user.py           # 用户主页提取
```

## 跨平台兼容性

| 环境 | 无头模式 | 有头模式(扫码登录) | 备注 |
|------|----------|----------------------|------|
| Windows | 支持 | 支持 | 主要开发环境 |
| WSL2 (Win11) | 支持 | 通过 WSLg 支持 | 需要 `playwright install-deps` |
| Linux 服务器 | 支持 | 不适用 | 二维码保存为图片文件 |

## 注意事项

1. **Cookie 过期**:Cookie 会定期过期,`check-login` 返回 false 时需重新登录
2. **频率限制**:过度抓取会触发验证码,请依赖内置的频率控制
3. **xsec_token**:Token 与会话绑定,始终使用搜索/用户结果中的最新 Token
4. **仅供学习**:请遵守小红书的使用条款,本工具仅用于学习研究


---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### _meta.json

```json
{
  "owner": "deliciousbuding",
  "slug": "xiaohongshu-skill",
  "displayName": "小红书skill",
  "latest": {
    "version": "1.0.2",
    "publishedAt": 1772171400721,
    "commit": "https://github.com/openclaw/skills/commit/3614fc2aa03f6bb379b6727e5adc7c8c3567016c"
  },
  "history": [
    {
      "version": "1.0.0",
      "publishedAt": 1771854373203,
      "commit": "https://github.com/openclaw/skills/commit/99b9cf011c7424d801948cbd909609914e89f2c4"
    }
  ]
}

```

### scripts/__init__.py

```python
"""
xiaohongshu-skill

基于 xiaohongshu-mcp Go 源码翻译的 Python Playwright 实现
"""

from .client import XiaohongshuClient, create_client, DEFAULT_COOKIE_PATH
from . import login
from . import search
from . import feed
from . import user
from . import comment
from . import interact
from . import explore
from . import publish

__version__ = "1.1.0"
__all__ = [
    "XiaohongshuClient",
    "create_client",
    "DEFAULT_COOKIE_PATH",
    "login",
    "search",
    "feed",
    "user",
    "comment",
    "interact",
    "explore",
    "publish",
]

```

### scripts/__main__.py

```python
#!/usr/bin/env python
"""
小红书 CLI 入口

基于 xiaohongshu-mcp 翻译
"""

import argparse
import json
import sys
from typing import Optional

# Windows GBK 终端兼容性修复:强制 stdout 使用 UTF-8
if hasattr(sys.stdout, 'reconfigure'):
    sys.stdout.reconfigure(encoding='utf-8', errors='replace')

from .client import XiaohongshuClient, CaptchaError, DEFAULT_COOKIE_PATH
from . import login
from . import search
from . import feed
from . import user
from . import comment
from . import interact
from . import explore
from . import publish


def format_output(data) -> str:
    """格式化输出为 JSON"""
    if data is None:
        return json.dumps({"error": "No data"}, ensure_ascii=False, indent=2)
    return json.dumps(data, ensure_ascii=False, indent=2)


def _headless(args) -> bool:
    """从 args 解析 headless 值"""
    val = getattr(args, 'headless', 'true')
    if isinstance(val, bool):
        return val
    return val.lower() != 'false'


# ============================================================
# 命令处理函数
# ============================================================

def cmd_login(args):
    """登录命令 — 生成二维码并等待扫码"""
    result = login.login(
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
        timeout=args.timeout,
    )
    print(format_output(result))
    if result.get("status") == "logged_in":
        return 0
    elif result.get("status") == "qrcode_ready":
        return 2
    return 1


def cmd_qrcode(args):
    """
    获取二维码并等待扫码。
    如果已登录,直接返回。
    """
    cookie_path = args.cookie or DEFAULT_COOKIE_PATH
    headless = _headless(args)

    # 1) 先快速检查是否已登录
    is_logged_in, username = login.check_login(cookie_path=cookie_path)
    if is_logged_in:
        result = {
            "status": "logged_in",
            "qrcode_path": None,
            "username": username,
            "message": "已登录",
        }
        print(format_output(result))
        return 0

    # 2) 未登录 → 启动可见浏览器,获取二维码并等待扫码
    client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
    try:
        client.start()
        action = login.LoginAction(client)

        qrcode_path, already_logged_in = action.get_wechat_qrcode()
        if already_logged_in:
            print(format_output({"status": "logged_in", "message": "已登录"}))
            return 0

        if qrcode_path:
            print(format_output({
                "status": "qrcode_ready",
                "qrcode_path": qrcode_path,
                "message": f"请扫码登录,二维码路径: {qrcode_path}",
            }))

            # 等待用户扫码(最多 120 秒)
            print("等待用户扫码…(最多 120 秒)", file=sys.stderr)
            success = action.wait_for_login(timeout=120)

            if success:
                print(format_output({"status": "logged_in", "message": "登录成功!"}))
                return 0
            else:
                print(format_output({"status": "timeout", "message": "扫码超时"}))
                return 2
        else:
            print(format_output({"status": "error", "message": "获取二维码失败"}))
            return 1
    finally:
        client.close()


def cmd_check_login(args):
    """检查登录状态"""
    is_logged_in, username = login.check_login(
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output({
        "is_logged_in": is_logged_in,
        "username": username,
    }))
    return 0


def cmd_search(args):
    """搜索命令"""
    results = search.search(
        keyword=args.keyword,
        sort_by=args.sort_by,
        note_type=args.note_type,
        publish_time=args.publish_time,
        search_scope=args.search_scope,
        location=args.location,
        limit=args.limit,
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output({
        "count": len(results),
        "results": results,
    }))
    return 0


def cmd_feed(args):
    """笔记详情命令"""
    detail = feed.feed_detail(
        feed_id=args.feed_id,
        xsec_token=args.xsec_token or "",
        load_comments=args.load_comments,
        max_comments=args.max_comments,
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(detail))
    return 0 if detail else 1


def cmd_user(args):
    """用户主页命令"""
    profile = user.user_profile(
        user_id=args.user_id,
        xsec_token=args.xsec_token or "",
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(profile))
    return 0 if profile else 1


def cmd_me(args):
    """获取自己的个人主页"""
    profile = user.my_profile(
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(profile))
    return 0 if profile else 1


def cmd_comment(args):
    """发表评论"""
    result = comment.post_comment(
        feed_id=args.feed_id,
        xsec_token=args.xsec_token or "",
        content=args.content,
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0 if result.get("status") == "success" else 1


def cmd_reply(args):
    """回复评论"""
    result = comment.reply_to_comment(
        feed_id=args.feed_id,
        xsec_token=args.xsec_token or "",
        comment_id=args.comment_id,
        reply_user_id=args.reply_user_id,
        content=args.content,
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0 if result.get("status") == "success" else 1


def cmd_like(args):
    """点赞"""
    result = interact.like(
        feed_id=args.feed_id,
        xsec_token=args.xsec_token or "",
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0 if result.get("status") == "success" else 1


def cmd_unlike(args):
    """取消点赞"""
    result = interact.unlike(
        feed_id=args.feed_id,
        xsec_token=args.xsec_token or "",
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0 if result.get("status") == "success" else 1


def cmd_collect(args):
    """收藏"""
    result = interact.collect(
        feed_id=args.feed_id,
        xsec_token=args.xsec_token or "",
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0 if result.get("status") == "success" else 1


def cmd_uncollect(args):
    """取消收藏"""
    result = interact.uncollect(
        feed_id=args.feed_id,
        xsec_token=args.xsec_token or "",
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0 if result.get("status") == "success" else 1


def cmd_explore(args):
    """首页推荐流"""
    result = explore.explore(
        limit=args.limit,
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0


def cmd_publish(args):
    """发布图文笔记"""
    image_paths = [p.strip() for p in args.images.split(",") if p.strip()]
    tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
    result = publish.publish_image(
        title=args.title,
        content=args.content,
        image_paths=image_paths,
        tags=tags,
        schedule_time=args.schedule_time,
        auto_publish=args.auto_publish,
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0 if result.get("status") in ("success", "ready") else 1


def cmd_publish_video(args):
    """发布视频笔记"""
    tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
    result = publish.publish_video(
        title=args.title,
        content=args.content,
        video_path=args.video,
        tags=tags,
        schedule_time=args.schedule_time,
        auto_publish=args.auto_publish,
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0 if result.get("status") in ("success", "ready") else 1


def cmd_publish_md(args):
    """将 Markdown 渲染为图片后发布图文笔记"""
    tags = [t.strip() for t in args.tags.split(",")] if args.tags else None

    # 从文件或直接文本读取 Markdown
    if args.file:
        with open(args.file, "r", encoding="utf-8") as f:
            markdown_text = f.read()
    else:
        markdown_text = args.text

    if not markdown_text:
        print(format_output({"status": "error", "message": "需要提供 --file 或 --text"}))
        return 1

    result = publish.publish_markdown(
        title=args.title,
        markdown_text=markdown_text,
        extra_content=args.content or "",
        tags=tags,
        schedule_time=args.schedule_time,
        auto_publish=args.auto_publish,
        image_width=args.width,
        output_dir=args.output_dir or "",
        headless=_headless(args),
        cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
    )
    print(format_output(result))
    return 0 if result.get("status") in ("success", "ready") else 1


# ============================================================
# 入口
# ============================================================

def main():
    parser = argparse.ArgumentParser(
        description="小红书 CLI 工具",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )

    # 全局参数
    parser.add_argument("--cookie", "-c", help="Cookie 文件路径", default=None)
    parser.add_argument("--headless", help="无头模式: true/false(默认 true)", default='true')

    subparsers = parser.add_subparsers(dest="command", help="可用命令")

    # login
    login_p = subparsers.add_parser("login", help="扫码登录(等待登录或超时)")
    login_p.add_argument("--timeout", "-t", type=int, default=120, help="登录超时秒数")
    login_p.add_argument("--headless", default='false', help="默认 false 以显示浏览器")
    login_p.set_defaults(func=cmd_login)

    # qrcode
    qr_p = subparsers.add_parser("qrcode", help="获取登录二维码并等待扫码")
    qr_p.add_argument("--headless", default='false', help="默认 false 以显示浏览器")
    qr_p.set_defaults(func=cmd_qrcode)

    # check-login
    chk_p = subparsers.add_parser("check-login", help="检查登录状态")
    chk_p.add_argument("--headless", default='true')
    chk_p.set_defaults(func=cmd_check_login)

    # search
    s_p = subparsers.add_parser("search", help="搜索内容")
    s_p.add_argument("keyword", help="搜索关键词")
    s_p.add_argument("--sort-by", help="排序方式")
    s_p.add_argument("--note-type", help="笔记类型")
    s_p.add_argument("--publish-time", help="发布时间")
    s_p.add_argument("--search-scope", help="搜索范围")
    s_p.add_argument("--location", help="位置距离")
    s_p.add_argument("--limit", "-n", type=int, default=10, help="返回数量")
    s_p.add_argument("--headless", default='true')
    s_p.set_defaults(func=cmd_search)

    # feed
    f_p = subparsers.add_parser("feed", help="获取笔记详情")
    f_p.add_argument("feed_id", help="笔记 ID")
    f_p.add_argument("xsec_token", nargs="?", help="xsec_token")
    f_p.add_argument("--load-comments", "-l", action="store_true", help="加载评论")
    f_p.add_argument("--max-comments", "-m", type=int, default=0, help="最大评论数")
    f_p.add_argument("--headless", default='true')
    f_p.set_defaults(func=cmd_feed)

    # user
    u_p = subparsers.add_parser("user", help="获取用户主页")
    u_p.add_argument("user_id", help="用户 ID")
    u_p.add_argument("xsec_token", nargs="?", help="xsec_token")
    u_p.add_argument("--headless", default='true')
    u_p.set_defaults(func=cmd_user)

    # me (获取自己的主页)
    me_p = subparsers.add_parser("me", help="获取自己的个人主页")
    me_p.add_argument("--headless", default='true')
    me_p.set_defaults(func=cmd_me)

    # comment (发表评论)
    cmt_p = subparsers.add_parser("comment", help="发表评论")
    cmt_p.add_argument("feed_id", help="笔记 ID")
    cmt_p.add_argument("xsec_token", nargs="?", help="xsec_token")
    cmt_p.add_argument("--content", required=True, help="评论内容")
    cmt_p.add_argument("--headless", default='true')
    cmt_p.set_defaults(func=cmd_comment)

    # reply (回复评论)
    rpl_p = subparsers.add_parser("reply", help="回复评论")
    rpl_p.add_argument("feed_id", help="笔记 ID")
    rpl_p.add_argument("xsec_token", nargs="?", help="xsec_token")
    rpl_p.add_argument("--comment-id", required=True, help="目标评论 ID")
    rpl_p.add_argument("--reply-user-id", required=True, help="被回复用户 ID")
    rpl_p.add_argument("--content", required=True, help="回复内容")
    rpl_p.add_argument("--headless", default='true')
    rpl_p.set_defaults(func=cmd_reply)

    # like (点赞)
    like_p = subparsers.add_parser("like", help="点赞笔记")
    like_p.add_argument("feed_id", help="笔记 ID")
    like_p.add_argument("xsec_token", nargs="?", help="xsec_token")
    like_p.add_argument("--headless", default='true')
    like_p.set_defaults(func=cmd_like)

    # unlike (取消点赞)
    unlike_p = subparsers.add_parser("unlike", help="取消点赞")
    unlike_p.add_argument("feed_id", help="笔记 ID")
    unlike_p.add_argument("xsec_token", nargs="?", help="xsec_token")
    unlike_p.add_argument("--headless", default='true')
    unlike_p.set_defaults(func=cmd_unlike)

    # collect (收藏)
    col_p = subparsers.add_parser("collect", help="收藏笔记")
    col_p.add_argument("feed_id", help="笔记 ID")
    col_p.add_argument("xsec_token", nargs="?", help="xsec_token")
    col_p.add_argument("--headless", default='true')
    col_p.set_defaults(func=cmd_collect)

    # uncollect (取消收藏)
    ucol_p = subparsers.add_parser("uncollect", help="取消收藏")
    ucol_p.add_argument("feed_id", help="笔记 ID")
    ucol_p.add_argument("xsec_token", nargs="?", help="xsec_token")
    ucol_p.add_argument("--headless", default='true')
    ucol_p.set_defaults(func=cmd_uncollect)

    # explore (首页推荐流)
    exp_p = subparsers.add_parser("explore", help="获取首页推荐流")
    exp_p.add_argument("--limit", "-n", type=int, default=20, help="返回数量")
    exp_p.add_argument("--headless", default='true')
    exp_p.set_defaults(func=cmd_explore)

    # publish (发布图文笔记)
    pub_p = subparsers.add_parser("publish", help="发布图文笔记")
    pub_p.add_argument("--title", required=True, help="标题(建议 <=20 字)")
    pub_p.add_argument("--content", required=True, help="正文内容")
    pub_p.add_argument("--images", required=True, help="图片路径,逗号分隔")
    pub_p.add_argument("--tags", help="话题标签,逗号分隔")
    pub_p.add_argument("--schedule-time", help="定时发布(格式: 2025-01-01 12:00)")
    pub_p.add_argument("--auto-publish", action="store_true", help="自动点击发布(默认停在发布按钮处)")
    pub_p.add_argument("--headless", default='true')
    pub_p.set_defaults(func=cmd_publish)

    # publish-video (发布视频笔记)
    pubv_p = subparsers.add_parser("publish-video", help="发布视频笔记")
    pubv_p.add_argument("--title", required=True, help="标题")
    pubv_p.add_argument("--content", required=True, help="正文内容")
    pubv_p.add_argument("--video", required=True, help="视频文件路径")
    pubv_p.add_argument("--tags", help="话题标签,逗号分隔")
    pubv_p.add_argument("--schedule-time", help="定时发布(格式: 2025-01-01 12:00)")
    pubv_p.add_argument("--auto-publish", action="store_true", help="自动点击发布")
    pubv_p.add_argument("--headless", default='true')
    pubv_p.set_defaults(func=cmd_publish_video)

    # publish-md (Markdown 转图片后发布)
    pubmd_p = subparsers.add_parser("publish-md", help="Markdown 渲染为图片后发布")
    pubmd_p.add_argument("--title", required=True, help="标题")
    pubmd_p.add_argument("--file", help="Markdown 文件路径")
    pubmd_p.add_argument("--text", help="Markdown 文本(与 --file 二选一)")
    pubmd_p.add_argument("--content", help="正文区额外文字说明")
    pubmd_p.add_argument("--tags", help="话题标签,逗号分隔")
    pubmd_p.add_argument("--schedule-time", help="定时发布")
    pubmd_p.add_argument("--auto-publish", action="store_true", help="自动点击发布")
    pubmd_p.add_argument("--width", type=int, default=1080, help="图片宽度(默认 1080)")
    pubmd_p.add_argument("--output-dir", help="图片输出目录(默认临时目录)")
    pubmd_p.add_argument("--headless", default='true')
    pubmd_p.set_defaults(func=cmd_publish_md)

    args = parser.parse_args()
    if not args.command:
        parser.print_help()
        return 0

    try:
        return args.func(args)
    except CaptchaError as e:
        print(format_output({
            "status": "error",
            "error_type": "CaptchaError",
            "message": str(e),
            "captcha_url": e.captcha_url,
        }))
        return 1
    except Exception as e:
        print(format_output({
            "status": "error",
            "error_type": type(e).__name__,
            "message": str(e),
        }))
        return 1


if __name__ == "__main__":
    sys.exit(main())

```

### scripts/client.py

```python
"""
小红书浏览器客户端封装

基于 xiaohongshu-mcp Go 源码翻译为 Python Playwright
"""

import json
import os
import random
import sys
import time
from pathlib import Path
from typing import Optional, Any, Dict

try:
    from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright
except ImportError:
    print("请先安装 playwright: pip install playwright && playwright install chromium")
    raise


# Cookie 文件路径
DEFAULT_COOKIE_PATH = os.path.expanduser("~/.xiaohongshu/cookies.json")

# 验证码/安全拦截页面的 URL 特征
CAPTCHA_URL_PATTERNS = [
    'captcha',
    'security-verification',
    'website-login/captcha',
    'verifyType',
    'verifyBiz',
]

# 验证码页面的标题特征
CAPTCHA_TITLE_PATTERNS = [
    '安全验证',
    '验证码',
    'captcha',
    'Security Verification',
]


class CaptchaError(Exception):
    """触发验证码异常"""
    def __init__(self, url: str, message: str = ""):
        self.captcha_url = url
        super().__init__(message or f"触发安全验证: {url}")


class XiaohongshuClient:
    """小红书浏览器客户端"""

    # 频率控制参数
    MIN_INTERVAL = 3.0       # 两次导航最小间隔(秒)
    MAX_INTERVAL = 6.0       # 两次导航最大间隔(秒)
    BURST_THRESHOLD = 5      # 连续请求阈值,超过后增加额外冷却
    BURST_COOLDOWN = 10.0    # 连续请求冷却时间(秒)

    def __init__(
        self,
        headless: bool = True,
        cookie_path: str = DEFAULT_COOKIE_PATH,
        timeout: int = 60,
    ):
        self.headless = headless
        self.cookie_path = cookie_path
        self.timeout = timeout * 1000  # 转换为毫秒

        self.playwright: Optional[Playwright] = None
        self.browser: Optional[Browser] = None
        self.context: Optional[BrowserContext] = None
        self.page: Optional[Page] = None

        # 请求计时器(实例变量,避免跨实例干扰)
        self._last_navigate_time: float = 0.0
        self._navigate_count: int = 0
        self._session_start: float = 0.0

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def start(self):
        """启动浏览器"""
        self.playwright = sync_playwright().start()
        self.browser = self.playwright.chromium.launch(
            headless=self.headless,
            args=['--disable-blink-features=AutomationControlled'],
        )

        # 创建上下文
        self.context = self.browser.new_context(
            viewport={'width': 1920, 'height': 1080},
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        )

        # 加载 Cookie
        self._load_cookies()

        # 创建页面
        self.page = self.context.new_page()
        self.page.set_default_timeout(self.timeout)

    def close(self):
        """关闭浏览器"""
        # 保存 Cookie
        self._save_cookies()

        if self.page:
            self.page.close()
        if self.context:
            self.context.close()
        if self.browser:
            self.browser.close()
        if self.playwright:
            self.playwright.stop()

    def _load_cookies(self):
        """从文件加载 Cookie"""
        if not os.path.exists(self.cookie_path):
            return

        try:
            with open(self.cookie_path, 'r', encoding='utf-8') as f:
                cookies = json.load(f)
            if cookies:
                self.context.add_cookies(cookies)
                print(f"已加载 {len(cookies)} 个 Cookie", file=sys.stderr)
        except Exception as e:
            print(f"加载 Cookie 失败: {e}", file=sys.stderr)

    def _save_cookies(self):
        """保存 Cookie 到文件"""
        if not self.context:
            return

        try:
            cookies = self.context.cookies()
            os.makedirs(os.path.dirname(self.cookie_path), exist_ok=True)
            with open(self.cookie_path, 'w', encoding='utf-8') as f:
                json.dump(cookies, f, ensure_ascii=False, indent=2)
            print(f"已保存 {len(cookies)} 个 Cookie 到 {self.cookie_path}", file=sys.stderr)
        except Exception as e:
            print(f"保存 Cookie 失败: {e}", file=sys.stderr)

    def _throttle(self):
        """请求频率控制:模拟人类浏览节奏"""
        now = time.time()

        # 初始化会话起点
        if self._session_start == 0:
            self._session_start = now

        # 计算距上次导航的间隔
        elapsed = now - self._last_navigate_time if self._last_navigate_time > 0 else 999

        # 连续请求达到阈值 → 额外冷却
        if self._navigate_count > 0 and self._navigate_count % self.BURST_THRESHOLD == 0:
            cooldown = self.BURST_COOLDOWN + random.uniform(0, 3)
            if elapsed < cooldown:
                wait = cooldown - elapsed
                print(f"反爬保护: 连续请求 {self._navigate_count} 次,冷却 {wait:.1f}s...", file=sys.stderr)
                time.sleep(wait)
        elif elapsed < self.MIN_INTERVAL:
            # 普通间隔控制
            wait = random.uniform(self.MIN_INTERVAL, self.MAX_INTERVAL) - elapsed
            if wait > 0:
                time.sleep(wait)

        self._last_navigate_time = time.time()
        self._navigate_count += 1

    def _check_captcha(self) -> bool:
        """
        检测当前页面是否被重定向到验证码页面

        Returns:
            True 表示触发了验证码
        """
        if not self.page:
            return False

        try:
            current_url = self.page.url.lower()
            for pattern in CAPTCHA_URL_PATTERNS:
                if pattern in current_url:
                    return True

            page_title = self.page.title().lower()
            for pattern in CAPTCHA_TITLE_PATTERNS:
                if pattern.lower() in page_title:
                    return True
        except Exception:
            pass

        return False

    def _handle_captcha(self):
        """
        处理验证码拦截:抛出异常通知调用方

        Raises:
            CaptchaError
        """
        url = self.page.url if self.page else "unknown"
        raise CaptchaError(
            url=url,
            message=(
                f"触发小红书安全验证!\n"
                f"  验证页面: {url}\n"
                f"  本次会话已请求 {self._navigate_count} 次\n"
                f"  建议: 1) 等待几分钟后重试  2) 用 --headless=false 手动过验证码  "
                f"3) 重新扫码登录"
            ),
        )

    def navigate(self, url: str, wait_until: str = "domcontentloaded"):
        """导航到指定 URL(含频率控制和验证码检测)"""
        if not self.page:
            raise RuntimeError("浏览器未启动")

        # 频率控制
        self._throttle()

        self.page.goto(url, wait_until=wait_until)
        # 等待页面稳定
        time.sleep(random.uniform(1.5, 3.0))
        # 尝试等待 networkidle,但不强制
        try:
            self.page.wait_for_load_state("networkidle", timeout=8000)
        except Exception:
            pass

        # 验证码检测
        if self._check_captcha():
            self._handle_captcha()

    def wait_for_initial_state(self, timeout: int = 30000, retries: int = 2):
        """等待 __INITIAL_STATE__ 加载完成,带重试和回退"""
        if not self.page:
            raise RuntimeError("浏览器未启动")

        for attempt in range(retries + 1):
            # 先检测验证码
            if self._check_captcha():
                self._handle_captcha()

            try:
                self.page.wait_for_function(
                    "() => window.__INITIAL_STATE__ !== undefined",
                    timeout=timeout,
                )
                return
            except Exception:
                if attempt < retries:
                    print(f"__INITIAL_STATE__ 等待超时,刷新重试 ({attempt + 1}/{retries})...", file=sys.stderr)
                    self.page.reload(wait_until="domcontentloaded")
                    time.sleep(random.uniform(2, 4))
                    # 刷新后再检测验证码
                    if self._check_captcha():
                        self._handle_captcha()
                else:
                    print("警告: __INITIAL_STATE__ 加载超时,尝试继续执行", file=sys.stderr)

    def get_initial_state(self) -> Dict[str, Any]:
        """获取 __INITIAL_STATE__ 数据"""
        if not self.page:
            raise RuntimeError("浏览器未启动")

        # 使用 structuredClone 或手动提取需要的部分,避免循环引用
        result = self.page.evaluate("""() => {
            if (!window.__INITIAL_STATE__) {
                return '';
            }
            // 只提取需要的顶层结构
            const state = window.__INITIAL_STATE__;
            const result = {};
            if (state.search) result.search = state.search;
            if (state.feed) result.feed = state.feed;
            if (state.note) result.note = state.note;
            if (state.user) result.user = state.user;
            return JSON.stringify(result);
        }""")

        if not result:
            return {}

        return json.loads(result)

    def get_data_by_path(self, path: str) -> Any:
        """
        根据路径获取 __INITIAL_STATE__ 中的数据

        例如: "search.feeds", "note.noteDetailMap", "user.userPageData"
        """
        state = self.get_initial_state()

        # 处理 value/_value
        def get_value(obj):
            if isinstance(obj, dict):
                if 'value' in obj:
                    return obj['value']
                if '_value' in obj:
                    return obj['_value']
            return obj

        keys = path.split('.')
        current = state

        for key in keys:
            if current is None:
                return None
            if isinstance(current, dict):
                current = current.get(key)
            else:
                return None
            current = get_value(current)

        return current

    # DEPRECATED: use login.LoginAction.check_login_status() instead
    def check_login_status(self) -> bool:
        """检查登录状态(已废弃,请使用 login.LoginAction)"""
        if not self.page:
            raise RuntimeError("浏览器未启动")

        # 访问首页
        self.navigate("https://www.xiaohongshu.com/explore")
        time.sleep(1)

        # 检查是否存在登录后的元素
        try:
            element = self.page.locator('.main-container .user .link-wrapper .channel')
            count = element.count()
            return count > 0
        except Exception:
            return False

    # DEPRECATED: use login.LoginAction.get_wechat_qrcode() instead
    def get_qrcode(self) -> Optional[str]:
        """获取登录二维码(已废弃,请使用 login.LoginAction)"""
        if not self.page:
            raise RuntimeError("浏览器未启动")

        # 访问首页触发二维码弹窗
        self.navigate("https://www.xiaohongshu.com/explore")
        time.sleep(2)

        # 检查是否已登录
        if self.check_login_status():
            return None

        # 获取二维码图片
        try:
            qrcode = self.page.locator('.login-container .qrcode-img')
            src = qrcode.get_attribute('src')
            return src
        except Exception:
            return None

    # DEPRECATED: use login.LoginAction.wait_for_login() instead
    def wait_for_login(self, timeout: int = 120) -> bool:
        """
        等待用户扫码登录

        Args:
            timeout: 超时时间(秒)

        Returns:
            是否登录成功
        """
        if not self.page:
            raise RuntimeError("浏览器未启动")

        start_time = time.time()
        while time.time() - start_time < timeout:
            if self.check_login_status():
                # 保存登录后的 Cookie
                self._save_cookies()
                return True
            time.sleep(1)

        return False

    def scroll_to_bottom(self, distance: int = 500):
        """滚动页面"""
        if not self.page:
            raise RuntimeError("浏览器未启动")

        self.page.evaluate(f"window.scrollBy(0, {distance})")
        time.sleep(0.5)


def create_client(
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
    timeout: int = 60,
) -> XiaohongshuClient:
    """创建小红书客户端的便捷函数"""
    return XiaohongshuClient(
        headless=headless,
        cookie_path=cookie_path,
        timeout=timeout,
    )

```

### scripts/comment.py

```python
"""
小红书评论模块

基于 xiaohongshu-mcp/comment_feed.go 翻译
整合 xiaohongshu-ops 的安全评论理念(长度校验、人性化延迟、频率检测)
"""

import json
import sys
import time
import random
from typing import Optional, Dict, Any

from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH

# 评论安全常量(来自 xiaohongshu-ops)
MAX_COMMENT_LENGTH = 280
TYPING_DELAY_MIN = 30   # 每字符输入延迟下限(ms)
TYPING_DELAY_MAX = 80   # 每字符输入延迟上限(ms)
PRE_SUBMIT_DELAY_MIN = 1.5  # 提交前等待下限(秒)
PRE_SUBMIT_DELAY_MAX = 3.0  # 提交前等待上限(秒)
POST_SUBMIT_COOLDOWN_MIN = 8   # 提交后冷却下限(秒)
POST_SUBMIT_COOLDOWN_MAX = 15  # 提交后冷却上限(秒)


class CommentAction:
    """评论动作"""

    def __init__(self, client: XiaohongshuClient):
        self.client = client

    def _make_feed_url(self, feed_id: str, xsec_token: str, xsec_source: str = "pc_feed") -> str:
        """构建笔记详情 URL"""
        return f"https://www.xiaohongshu.com/explore/{feed_id}?xsec_token={xsec_token}&xsec_source={xsec_source}"

    def _navigate_to_feed(self, feed_id: str, xsec_token: str):
        """导航到笔记详情页并等待加载"""
        url = self._make_feed_url(feed_id, xsec_token)
        print(f"打开笔记详情页: {url}", file=sys.stderr)
        self.client.navigate(url)
        self.client.wait_for_initial_state()
        time.sleep(2)

    @staticmethod
    def validate_comment(content: str) -> Optional[str]:
        """
        评论内容校验(来自 ops 安全理念)

        Returns:
            None 表示校验通过,否则返回错误原因
        """
        if not content or not content.strip():
            return "评论内容不能为空"
        if len(content) > MAX_COMMENT_LENGTH:
            return f"评论内容超长({len(content)}/{MAX_COMMENT_LENGTH})"
        return None

    def _check_rate_limit(self) -> bool:
        """
        检测是否触发了评论频率限制(来自 ops 安全理念)

        Returns:
            True 表示被限流
        """
        page = self.client.page
        try:
            # 检查常见的频率限制提示
            rate_limit_selectors = [
                'div.d-toast:has-text("频繁")',
                'div.d-toast:has-text("操作太快")',
                'div.d-toast:has-text("稍后再试")',
                'div.d-toast:has-text("限制")',
            ]
            for sel in rate_limit_selectors:
                toast = page.locator(sel)
                if toast.count() > 0 and toast.first.is_visible():
                    toast_text = toast.first.text_content()
                    print(f"检测到频率限制: {toast_text}", file=sys.stderr)
                    return True
        except Exception:
            pass
        return False

    def _verify_input_placeholder(self, expected_hint: Optional[str] = None) -> bool:
        """
        验证输入框 placeholder 是否正确(来自 ops 安全理念)
        确保输入框已正确激活,防止误输入

        Args:
            expected_hint: 期望的 placeholder 文本片段(如 "回复 用户名")

        Returns:
            True 表示输入框状态正确
        """
        if not expected_hint:
            return True

        page = self.client.page
        try:
            placeholder = page.locator('div.input-box div.content-edit p.content-input')
            if placeholder.count() > 0:
                attr = placeholder.first.get_attribute('data-placeholder') or ''
                text = placeholder.first.text_content() or ''
                if expected_hint in attr or expected_hint in text:
                    print(f"输入框验证通过: {attr or text}", file=sys.stderr)
                    return True
                else:
                    print(f"输入框 placeholder 不匹配: 期望含「{expected_hint}」, 实际「{attr or text}」", file=sys.stderr)
        except Exception:
            pass
        return True  # 获取失败时不阻断流程

    def _type_and_submit(self, content: str) -> bool:
        """在评论输入框中输入文字并提交(整合 ops 人性化延迟)"""
        page = self.client.page

        # 点击评论输入框激活(span 占位符)
        try:
            input_trigger = page.locator('div.input-box div.content-edit span')
            input_trigger.first.click()
            time.sleep(0.5)
        except Exception as e:
            print(f"点击评论输入框失败: {e}", file=sys.stderr)
            return False

        # 在 contenteditable 的 p 元素中输入文字(人性化随机延迟)
        try:
            input_el = page.locator('div.input-box div.content-edit p.content-input')
            input_el.first.click()
            time.sleep(0.3)
            typing_delay = random.randint(TYPING_DELAY_MIN, TYPING_DELAY_MAX)
            page.keyboard.type(content, delay=typing_delay)
            # 提交前随机等待,模拟人类检查
            pre_wait = random.uniform(PRE_SUBMIT_DELAY_MIN, PRE_SUBMIT_DELAY_MAX)
            time.sleep(pre_wait)
        except Exception as e:
            print(f"输入评论内容失败: {e}", file=sys.stderr)
            return False

        # 点击发送按钮
        try:
            submit_btn = page.locator('div.bottom button.submit')
            submit_btn.first.click()
            time.sleep(1.5)
        except Exception as e:
            print(f"点击发送按钮失败: {e}", file=sys.stderr)
            return False

        # 检查是否触发频率限制
        if self._check_rate_limit():
            return False

        return True

    def post_comment(
        self,
        feed_id: str,
        xsec_token: str,
        content: str,
    ) -> Dict[str, Any]:
        """
        发表评论

        Args:
            feed_id: 笔记 ID
            xsec_token: xsec_token
            content: 评论内容

        Returns:
            操作结果
        """
        # 评论内容校验
        error = self.validate_comment(content)
        if error:
            return {
                "status": "error",
                "feed_id": feed_id,
                "content": content,
                "message": error,
            }

        self._navigate_to_feed(feed_id, xsec_token)

        # 滚动到评论区域
        self.client.page.evaluate("""() => {
            const comments = document.querySelector('.comments-wrap') ||
                              document.querySelector('.comment-wrapper');
            if (comments) comments.scrollIntoView();
        }""")
        time.sleep(1)

        success = self._type_and_submit(content)

        if success:
            print("评论发送成功", file=sys.stderr)
            # 提交后冷却(one-send-per-turn 理念)
            cooldown = random.uniform(POST_SUBMIT_COOLDOWN_MIN, POST_SUBMIT_COOLDOWN_MAX)
            print(f"提交后冷却 {cooldown:.1f}s...", file=sys.stderr)
            time.sleep(cooldown)
            return {
                "status": "success",
                "feed_id": feed_id,
                "content": content,
                "message": "评论发送成功",
            }
        else:
            return {
                "status": "error",
                "feed_id": feed_id,
                "content": content,
                "message": "评论发送失败",
            }

    def reply_to_comment(
        self,
        feed_id: str,
        xsec_token: str,
        comment_id: str,
        reply_user_id: str,
        content: str,
    ) -> Dict[str, Any]:
        """
        回复评论

        Args:
            feed_id: 笔记 ID
            xsec_token: xsec_token
            comment_id: 目标评论 ID
            reply_user_id: 被回复用户 ID
            content: 回复内容

        Returns:
            操作结果
        """
        # 评论内容校验
        error = self.validate_comment(content)
        if error:
            return {
                "status": "error",
                "feed_id": feed_id,
                "comment_id": comment_id,
                "content": content,
                "message": error,
            }

        self._navigate_to_feed(feed_id, xsec_token)

        page = self.client.page

        # 滚动到评论区域
        page.evaluate("""() => {
            const comments = document.querySelector('.comments-wrap') ||
                              document.querySelector('.comment-wrapper');
            if (comments) comments.scrollIntoView();
        }""")
        time.sleep(1)

        # 找到目标评论并点击"回复"按钮
        try:
            # 尝试通过评论 ID 定位
            comment_el = page.locator(f'[data-comment-id="{comment_id}"]')
            if comment_el.count() == 0:
                # 回退:通过遍历评论列表查找
                comment_el = page.locator('.comment-item').filter(has_text=comment_id)

            if comment_el.count() > 0:
                # 悬停以显示回复按钮
                comment_el.first.hover()
                time.sleep(0.3)

                # 点击回复按钮
                reply_btn = comment_el.first.locator('.reply-btn, button:has-text("回复"), span:has-text("回复")')
                if reply_btn.count() > 0:
                    reply_btn.first.click()
                    time.sleep(0.5)
                    # 验证输入框 placeholder(ops 技巧)
                    self._verify_input_placeholder(f"回复")
                else:
                    print("未找到回复按钮,尝试直接在评论框回复", file=sys.stderr)
            else:
                print(f"未找到评论 {comment_id},尝试直接在评论框回复", file=sys.stderr)
        except Exception as e:
            print(f"定位目标评论失败: {e}", file=sys.stderr)

        # 输入回复内容并发送
        success = self._type_and_submit(content)

        if success:
            print("回复发送成功", file=sys.stderr)
            # 提交后冷却
            cooldown = random.uniform(POST_SUBMIT_COOLDOWN_MIN, POST_SUBMIT_COOLDOWN_MAX)
            print(f"提交后冷却 {cooldown:.1f}s...", file=sys.stderr)
            time.sleep(cooldown)
            return {
                "status": "success",
                "feed_id": feed_id,
                "comment_id": comment_id,
                "reply_user_id": reply_user_id,
                "content": content,
                "message": "回复发送成功",
            }
        else:
            return {
                "status": "error",
                "feed_id": feed_id,
                "comment_id": comment_id,
                "content": content,
                "message": "回复发送失败",
            }


def post_comment(
    feed_id: str,
    xsec_token: str,
    content: str,
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
    """
    发表评论

    Args:
        feed_id: 笔记 ID
        xsec_token: xsec_token
        content: 评论内容
        headless: 是否无头模式
        cookie_path: Cookie 路径

    Returns:
        操作结果
    """
    client = XiaohongshuClient(
        headless=headless,
        cookie_path=cookie_path,
    )

    try:
        client.start()
        action = CommentAction(client)
        return action.post_comment(feed_id, xsec_token, content)
    finally:
        client.close()


def reply_to_comment(
    feed_id: str,
    xsec_token: str,
    comment_id: str,
    reply_user_id: str,
    content: str,
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
    """
    回复评论

    Args:
        feed_id: 笔记 ID
        xsec_token: xsec_token
        comment_id: 目标评论 ID
        reply_user_id: 被回复用户 ID
        content: 回复内容
        headless: 是否无头模式
        cookie_path: Cookie 路径

    Returns:
        操作结果
    """
    client = XiaohongshuClient(
        headless=headless,
        cookie_path=cookie_path,
    )

    try:
        client.start()
        action = CommentAction(client)
        return action.reply_to_comment(
            feed_id, xsec_token, comment_id, reply_user_id, content
        )
    finally:
        client.close()

```

### scripts/explore.py

```python
"""
小红书首页推荐流模块

基于 xiaohongshu-mcp/feeds.go 翻译
"""

import json
import sys
import time
from typing import Optional, Dict, Any, List

from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH


class ExploreAction:
    """首页推荐流动作"""

    EXPLORE_URL = "https://www.xiaohongshu.com/explore"

    def __init__(self, client: XiaohongshuClient):
        self.client = client

    def _extract_feeds(self) -> List[Dict[str, Any]]:
        """从 __INITIAL_STATE__ 提取首页推荐笔记列表"""
        page = self.client.page

        result = page.evaluate("""() => {
            var s = window.__INITIAL_STATE__;
            if (!s || !s.feed || !s.feed.feeds) return '';

            var feeds = s.feed.feeds;
            var data = feeds;
            if (feeds.value !== undefined) data = feeds.value;
            else if (feeds._value !== undefined) data = feeds._value;

            if (!data || !Array.isArray(data)) return '';

            // 展平二维数组(如果有的话)
            var flat = [];
            for (var i = 0; i < data.length; i++) {
                if (Array.isArray(data[i])) {
                    for (var j = 0; j < data[i].length; j++) flat.push(data[i][j]);
                } else {
                    flat.push(data[i]);
                }
            }

            // 提取每条笔记的关键信息
            return JSON.stringify(flat.map(function(item) {
                var nc = item.noteCard || item.model_type === 'note' ? item : {};
                if (item.noteCard) nc = item.noteCard;
                var info = nc.interactInfo || {};
                var user = nc.user || {};
                var cover = nc.cover || {};
                return {
                    id: item.id || '',
                    xsecToken: item.xsecToken || '',
                    noteCard: {
                        displayTitle: nc.displayTitle || nc.title || '',
                        type: nc.type || '',
                        interactInfo: {
                            likedCount: info.likedCount || '0',
                            collectedCount: info.collectedCount || '0',
                            commentCount: info.commentCount || '0',
                            sharedCount: info.sharedCount || '0'
                        },
                        user: {
                            nickname: user.nickname || user.nickName || '',
                            userId: user.userId || ''
                        },
                        cover: {
                            urlDefault: cover.urlDefault || cover.urlPre || ''
                        }
                    }
                };
            }));
        }""")

        if not result:
            return []

        try:
            return json.loads(result)
        except json.JSONDecodeError:
            return []

    def get_feeds(self, limit: int = 20) -> Dict[str, Any]:
        """
        获取首页推荐流

        Args:
            limit: 最大返回数量

        Returns:
            推荐笔记列表
        """
        client = self.client

        print("打开首页推荐流...", file=sys.stderr)
        client.navigate(self.EXPLORE_URL)
        client.wait_for_initial_state()
        time.sleep(2)

        feeds = self._extract_feeds()

        # 如果首页数据较少,尝试滚动加载更多
        if len(feeds) < limit:
            for _ in range(3):
                client.scroll_to_bottom(800)
                time.sleep(1.5)
                new_feeds = self._extract_feeds()
                if len(new_feeds) > len(feeds):
                    feeds = new_feeds
                if len(feeds) >= limit:
                    break

        # 截取到 limit
        feeds = feeds[:limit]

        print(f"获取到 {len(feeds)} 条推荐笔记", file=sys.stderr)
        return {
            "count": len(feeds),
            "feeds": feeds,
        }


def explore(
    limit: int = 20,
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
    """
    获取首页推荐流

    Args:
        limit: 最大返回数量
        headless: 是否无头模式
        cookie_path: Cookie 路径

    Returns:
        推荐笔记列表
    """
    client = XiaohongshuClient(
        headless=headless,
        cookie_path=cookie_path,
    )

    try:
        client.start()
        action = ExploreAction(client)
        return action.get_feeds(limit=limit)
    finally:
        client.close()

```

### scripts/feed.py

```python
"""
小红书笔记详情模块

基于 xiaohongshu-mcp/feed_detail.go 翻译
"""

import json
import sys
import time
import random
from typing import Optional, Dict, Any, Tuple

from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH


class FeedDetailAction:
    """笔记详情动作"""

    def __init__(self, client: XiaohongshuClient):
        self.client = client

    def _make_feed_detail_url(self, feed_id: str, xsec_token: str, xsec_source: str = "pc_feed") -> str:
        """构建笔记详情 URL"""
        return f"https://www.xiaohongshu.com/explore/{feed_id}?xsec_token={xsec_token}&xsec_source={xsec_source}"

    def _scroll_to_comments(self):
        """滚动到评论区域"""
        self.client.page.evaluate("""() => {
            const comments = document.querySelector('.comments-wrap');
            if (comments) {
                comments.scrollIntoView();
            }
        }""")
        time.sleep(0.5)

    def _load_comments(self, max_items: int = 0):
        """
        加载评论(滚动 + 点击加载更多)

        Args:
            max_items: 最大评论数量,0 表示全部
        """
        page = self.client.page

        # 滚动到评论区域
        self._scroll_to_comments()

        # 随机延迟,模拟人类行为
        def human_delay():
            time.sleep(random.uniform(0.3, 0.7))

        max_attempts = 50 if max_items == 0 else max_items * 3
        last_count = 0
        stagnant = 0

        for attempt in range(max_attempts):
            # 检查是否有"加载更多"按钮
            try:
                more_btn = page.locator('.more-comments')
                if more_btn.is_visible():
                    more_btn.click()
                    human_delay()
            except Exception:
                pass

            # 滚动
            page.evaluate("window.scrollBy(0, 300)")
            human_delay()

            # 获取当前评论数量
            try:
                comments = page.locator('.comment-item')
                current_count = comments.count()
            except Exception:
                current_count = 0

            if current_count == last_count:
                stagnant += 1
                if stagnant >= 5:
                    break
            else:
                stagnant = 0

            last_count = current_count

            # 检查是否达到目标数量
            if max_items > 0 and current_count >= max_items:
                break

    def _extract_feed_detail(self, feed_id: str) -> Optional[Dict[str, Any]]:
        """提取笔记详情数据"""
        page = self.client.page

        # 传入 feed_id,只提取对应条目,避免序列化整个 Vue Reactive 代理
        result = page.evaluate("""(fid) => {
            var s = window.__INITIAL_STATE__;
            if (!s || !s.note || !s.note.noteDetailMap) return '';

            var ndm = s.note.noteDetailMap;
            var map = ndm;
            if (ndm.value !== undefined) map = ndm.value;
            else if (ndm._value !== undefined) map = ndm._value;

            var detail = map[fid];
            if (!detail) return '';

            return JSON.stringify(detail);
        }""", feed_id)

        if not result:
            return None

        try:
            return json.loads(result)
        except json.JSONDecodeError:
            return None

    def get_feed_detail(
        self,
        feed_id: str,
        xsec_token: str,
        load_comments: bool = False,
        max_comments: int = 0,
        xsec_source: str = "pc_feed",
    ) -> Optional[Dict[str, Any]]:
        """
        获取笔记详情

        Args:
            feed_id: 笔记 ID
            xsec_token: xsec_token 参数
            load_comments: 是否加载评论
            max_comments: 最大评论数量,0 表示全部
            xsec_source: 来源标识(pc_feed/pc_note/pc_search)

        Returns:
            笔记详情数据
        """
        client = self.client

        # 构建 URL 并导航
        url = self._make_feed_detail_url(feed_id, xsec_token, xsec_source)
        print(f"打开笔记详情页: {url}", file=sys.stderr)
        client.navigate(url)

        # 等待页面加载
        client.wait_for_initial_state()
        time.sleep(2)

        # 重试提取:noteDetailMap 可能需要额外时间填充
        detail = None
        for attempt in range(3):
            detail = self._extract_feed_detail(feed_id)
            if detail:
                break
            if attempt < 2:
                print(f"noteDetailMap 未就绪,等待重试 ({attempt + 1}/3)...", file=sys.stderr)
                time.sleep(2)

        # 加载评论
        if detail and load_comments:
            print("加载评论中...", file=sys.stderr)
            self._load_comments(max_comments)
            # 重新提取以包含评论数据
            detail = self._extract_feed_detail(feed_id) or detail

        if not detail:
            print("未获取到笔记详情", file=sys.stderr)
            return None

        return detail


def feed_detail(
    feed_id: str,
    xsec_token: str,
    load_comments: bool = False,
    max_comments: int = 0,
    xsec_source: str = "pc_feed",
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Optional[Dict[str, Any]]:
    """
    获取笔记详情

    Args:
        feed_id: 笔记 ID
        xsec_token: xsec_token 参数
        load_comments: 是否加载评论
        max_comments: 最大评论数量
        xsec_source: 来源标识
        headless: 是否无头模式
        cookie_path: Cookie 路径

    Returns:
        笔记详情数据
    """
    client = XiaohongshuClient(
        headless=headless,
        cookie_path=cookie_path,
    )

    try:
        client.start()
        action = FeedDetailAction(client)
        return action.get_feed_detail(
            feed_id=feed_id,
            xsec_token=xsec_token,
            load_comments=load_comments,
            max_comments=max_comments,
            xsec_source=xsec_source,
        )
    finally:
        client.close()

```

### scripts/interact.py

```python
"""
小红书互动模块(点赞 / 取消点赞 / 收藏 / 取消收藏)

基于 xiaohongshu-mcp/like_favorite.go 翻译
"""

import json
import sys
import time
from typing import Optional, Dict, Any, Tuple

from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH


class InteractAction:
    """互动动作(点赞、收藏)"""

    # CSS 选择器
    LIKE_SELECTOR = '.interact-container .left .like-wrapper'
    LIKE_ACTIVE_SELECTOR = '.interact-container .left .like-wrapper.active, .interact-container .left .like-wrapper.liked'
    COLLECT_SELECTOR = '.interact-container .left .collect-wrapper'
    COLLECT_ACTIVE_SELECTOR = '.interact-container .left .collect-wrapper.active, .interact-container .left .collect-wrapper.collected'

    def __init__(self, client: XiaohongshuClient):
        self.client = client

    def _make_feed_url(self, feed_id: str, xsec_token: str, xsec_source: str = "pc_feed") -> str:
        """构建笔记详情 URL"""
        return f"https://www.xiaohongshu.com/explore/{feed_id}?xsec_token={xsec_token}&xsec_source={xsec_source}"

    def _navigate_to_feed(self, feed_id: str, xsec_token: str):
        """导航到笔记详情页并等待加载"""
        url = self._make_feed_url(feed_id, xsec_token)
        print(f"打开笔记详情页: {url}", file=sys.stderr)
        self.client.navigate(url)
        self.client.wait_for_initial_state()
        time.sleep(2)

    def _get_interact_state(self, feed_id: str) -> Dict[str, bool]:
        """
        从 __INITIAL_STATE__ 获取当前互动状态

        Returns:
            {"liked": bool, "collected": bool}
        """
        page = self.client.page
        result = page.evaluate("""(fid) => {
            var s = window.__INITIAL_STATE__;
            if (!s || !s.note || !s.note.noteDetailMap) return '';

            var ndm = s.note.noteDetailMap;
            var map = ndm;
            if (ndm.value !== undefined) map = ndm.value;
            else if (ndm._value !== undefined) map = ndm._value;

            var detail = map[fid];
            if (!detail || !detail.note || !detail.note.interactInfo) return '';

            var info = detail.note.interactInfo;
            return JSON.stringify({
                liked: !!info.liked,
                collected: !!info.collected
            });
        }""", feed_id)

        if not result:
            return {"liked": False, "collected": False}

        try:
            return json.loads(result)
        except json.JSONDecodeError:
            return {"liked": False, "collected": False}

    def _click_button(self, selector: str, label: str) -> bool:
        """点击互动按钮"""
        page = self.client.page
        try:
            btn = page.locator(selector)
            if btn.count() > 0:
                btn.first.click()
                time.sleep(1.5)
                return True
            else:
                print(f"未找到{label}按钮: {selector}", file=sys.stderr)
                return False
        except Exception as e:
            print(f"点击{label}按钮失败: {e}", file=sys.stderr)
            return False

    def like(self, feed_id: str, xsec_token: str) -> Dict[str, Any]:
        """
        点赞笔记

        Returns:
            操作结果
        """
        self._navigate_to_feed(feed_id, xsec_token)

        state = self._get_interact_state(feed_id)
        if state.get("liked"):
            return {
                "status": "success",
                "action": "like",
                "feed_id": feed_id,
                "already_liked": True,
                "message": "已经点赞过了",
            }

        success = self._click_button(self.LIKE_SELECTOR, "点赞")
        return {
            "status": "success" if success else "error",
            "action": "like",
            "feed_id": feed_id,
            "message": "点赞成功" if success else "点赞失败",
        }

    def unlike(self, feed_id: str, xsec_token: str) -> Dict[str, Any]:
        """
        取消点赞

        Returns:
            操作结果
        """
        self._navigate_to_feed(feed_id, xsec_token)

        state = self._get_interact_state(feed_id)
        if not state.get("liked"):
            return {
                "status": "success",
                "action": "unlike",
                "feed_id": feed_id,
                "already_unliked": True,
                "message": "尚未点赞,无需取消",
            }

        success = self._click_button(self.LIKE_SELECTOR, "取消点赞")
        return {
            "status": "success" if success else "error",
            "action": "unlike",
            "feed_id": feed_id,
            "message": "取消点赞成功" if success else "取消点赞失败",
        }

    def collect(self, feed_id: str, xsec_token: str) -> Dict[str, Any]:
        """
        收藏笔记

        Returns:
            操作结果
        """
        self._navigate_to_feed(feed_id, xsec_token)

        state = self._get_interact_state(feed_id)
        if state.get("collected"):
            return {
                "status": "success",
                "action": "collect",
                "feed_id": feed_id,
                "already_collected": True,
                "message": "已经收藏过了",
            }

        success = self._click_button(self.COLLECT_SELECTOR, "收藏")
        return {
            "status": "success" if success else "error",
            "action": "collect",
            "feed_id": feed_id,
            "message": "收藏成功" if success else "收藏失败",
        }

    def uncollect(self, feed_id: str, xsec_token: str) -> Dict[str, Any]:
        """
        取消收藏

        Returns:
            操作结果
        """
        self._navigate_to_feed(feed_id, xsec_token)

        state = self._get_interact_state(feed_id)
        if not state.get("collected"):
            return {
                "status": "success",
                "action": "uncollect",
                "feed_id": feed_id,
                "already_uncollected": True,
                "message": "尚未收藏,无需取消",
            }

        success = self._click_button(self.COLLECT_SELECTOR, "取消收藏")
        return {
            "status": "success" if success else "error",
            "action": "uncollect",
            "feed_id": feed_id,
            "message": "取消收藏成功" if success else "取消收藏失败",
        }


# ============================================================
# 便捷函数
# ============================================================

def _run_interact(action_name: str, feed_id: str, xsec_token: str,
                  headless: bool = True, cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
    """通用互动操作执行器"""
    client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
    try:
        client.start()
        action = InteractAction(client)
        method = getattr(action, action_name)
        return method(feed_id, xsec_token)
    finally:
        client.close()


def like(feed_id: str, xsec_token: str, headless: bool = True,
         cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
    """点赞笔记"""
    return _run_interact("like", feed_id, xsec_token, headless, cookie_path)


def unlike(feed_id: str, xsec_token: str, headless: bool = True,
           cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
    """取消点赞"""
    return _run_interact("unlike", feed_id, xsec_token, headless, cookie_path)


def collect(feed_id: str, xsec_token: str, headless: bool = True,
            cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
    """收藏笔记"""
    return _run_interact("collect", feed_id, xsec_token, headless, cookie_path)


def uncollect(feed_id: str, xsec_token: str, headless: bool = True,
              cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
    """取消收藏"""
    return _run_interact("uncollect", feed_id, xsec_token, headless, cookie_path)

```

### scripts/login.py

```python
"""
小红书登录模块

基于 xiaohongshu-mcp/login.go 翻译
支持生成微信登录二维码,保存供主模型发送
"""

import json
import sys
import time
import base64
import os
from typing import Optional, Tuple, Dict, Any

from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH


# QRCode 图片保存目录 - 放在 skill 文件夹内
SKILL_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
QRCODE_DIR = os.path.join(SKILL_DIR, "data")
QRCODE_PATH = os.path.join(QRCODE_DIR, "qrcode.png")


class LoginAction:
    """登录动作"""

    def __init__(self, client: XiaohongshuClient):
        self.client = client

    def check_login_status(self, navigate: bool = True) -> Tuple[bool, Optional[str]]:
        """
        检查登录状态

        Args:
            navigate: 是否先导航到首页。
                      如果已经在首页上,设 False 避免刷新页面。

        Returns:
            (是否已登录, 用户名)
        """
        page = self.client.page

        if navigate:
            self.client.navigate("https://www.xiaohongshu.com/explore")
            time.sleep(3)

        # ---- 方式 1:检测页面上是否弹出了登录弹窗 ----
        # 如果弹窗的二维码区域可见 → 未登录
        try:
            qr = page.locator('img.qrcode-img[src^="data:image"]')
            if qr.count() > 0 and qr.first.is_visible():
                return False, None
        except Exception:
            pass

        # ---- 方式 2:检查 cookie 里是否包含 web_session ----
        try:
            cookies = self.client.context.cookies()
            has_session = any(c['name'] == 'web_session' for c in cookies)
            if has_session:
                # 尝试获取用户名
                username = self._try_get_username()
                return True, username or "已登录用户"
        except Exception:
            pass

        # ---- 方式 3:检查 HTML 里有没有用户头像链接(登录后才有) ----
        try:
            # 侧边栏会有 /user/profile/xxx 的链接
            profile_link = page.locator('a[href*="/user/profile/"]')
            if profile_link.count() > 0:
                username = self._try_get_username()
                return True, username or "已登录用户"
        except Exception:
            pass

        return False, None

    def _try_get_username(self) -> Optional[str]:
        """尝试从页面提取用户昵称"""
        try:
            name = self.client.page.evaluate("""() => {
                const el = document.querySelector('.user .name, .sidebar .user-name, [class*="nickname"]');
                return el ? el.textContent.trim() : '';
            }""")
            return name if name else None
        except Exception:
            return None

    def get_wechat_qrcode(self) -> Tuple[Optional[str], bool]:
        """
        获取微信登录二维码

        流程:
        1. 访问小红书首页触发登录弹窗
        2. 获取弹窗中的微信二维码图片
        3. 保存到文件

        Returns:
            (二维码文件路径, 是否已登录)
        """
        client = self.client
        page = client.page

        # 访问首页触发登录弹窗
        client.navigate("https://www.xiaohongshu.com/explore")
        time.sleep(4)  # 给弹窗足够时间渲染

        # 先检查是否已登录(不要重新 navigate)
        is_logged_in, _ = self.check_login_status(navigate=False)
        if is_logged_in:
            return None, True

        # 尝试获取二维码 base64 图片
        qrcode_src = None
        for attempt in range(5):
            try:
                qr = page.locator('img.qrcode-img[src^="data:image"]')
                if qr.count() > 0:
                    src = qr.first.get_attribute('src')
                    if src and len(src) > 200:  # 有效 base64 至少上百字符
                        qrcode_src = src
                        break
            except Exception:
                pass
            time.sleep(1)

        if qrcode_src:
            # 去掉 data:image/png;base64, 前缀
            if ',' in qrcode_src:
                qrcode_src = qrcode_src.split(',', 1)[1]

            # 保存二维码图片
            img_data = base64.b64decode(qrcode_src)
            os.makedirs(QRCODE_DIR, exist_ok=True)
            with open(QRCODE_PATH, 'wb') as f:
                f.write(img_data)

            print(f"二维码已保存到: {QRCODE_PATH}", file=sys.stderr)
            return QRCODE_PATH, False

        # 后备:整页截屏
        print("未找到有效的二维码图片,截屏保存...", file=sys.stderr)
        os.makedirs(QRCODE_DIR, exist_ok=True)
        page.screenshot(path=QRCODE_PATH)
        return QRCODE_PATH, False

    def wait_for_login(self, timeout: int = 120, min_wait: int = 30) -> bool:
        """
        在 **当前页面** 上等待用户扫码登录。
        不会重新 navigate,以免刷新掉二维码弹窗。

        会强制等待至少 min_wait 秒再开始检测,
        给用户足够时间在手机上确认登录。

        Args:
            timeout: 总超时时间(秒)
            min_wait: 最少等待秒数(默认 30)

        Returns:
            是否登录成功
        """
        start = time.time()

        # ---- 阶段 1: 强制等待 min_wait 秒 ----
        print(f"请在手机上扫码并确认登录(至少等待 {min_wait} 秒)...", file=sys.stderr)
        while time.time() - start < min_wait:
            elapsed = int(time.time() - start)
            remaining = min_wait - elapsed
            if remaining > 0 and remaining % 10 == 0:
                print(f"  等待中... 还剩 {remaining} 秒", file=sys.stderr)
            time.sleep(2)

        # ---- 阶段 2: 开始轮询检测 web_session cookie ----
        print("开始检测登录状态...", file=sys.stderr)
        while time.time() - start < timeout:
            try:
                cookies = self.client.context.cookies()
                has_session = any(c['name'] == 'web_session' for c in cookies)
                if has_session:
                    print("检测到 web_session cookie,登录成功!", file=sys.stderr)
                    self.client._save_cookies()
                    return True
            except Exception:
                pass

            elapsed = int(time.time() - start)
            remaining = timeout - elapsed
            if remaining > 0 and remaining % 15 == 0:
                print(f"  仍在等待登录... 剩余 {remaining} 秒", file=sys.stderr)
            time.sleep(3)

        print("登录超时", file=sys.stderr)
        return False


# ====== 顶层便捷函数 ======

def check_login(
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Tuple[bool, Optional[str]]:
    """检查登录状态"""
    client = XiaohongshuClient(headless=True, cookie_path=cookie_path)
    try:
        client.start()
        action = LoginAction(client)
        return action.check_login_status(navigate=True)
    finally:
        client.close()


def login(
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
    timeout: int = 120,
) -> Dict[str, Any]:
    """
    登录小红书(生成二维码 + 等待扫码)

    Returns:
        登录结果字典
    """
    client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
    try:
        client.start()
        action = LoginAction(client)

        # 获取二维码
        qrcode_path, is_logged_in = action.get_wechat_qrcode()
        if is_logged_in:
            return {
                "status": "logged_in",
                "qrcode_path": None,
                "username": "已登录用户",
                "message": "已登录",
            }

        if qrcode_path:
            # 等待扫码
            success = action.wait_for_login(timeout=timeout)
            if success:
                return {
                    "status": "logged_in",
                    "qrcode_path": None,
                    "username": "已登录用户",
                    "message": "扫码登录成功",
                }
            return {
                "status": "timeout",
                "qrcode_path": qrcode_path,
                "username": None,
                "message": "扫码超时",
            }

        return {
            "status": "error",
            "qrcode_path": None,
            "username": None,
            "message": "获取二维码失败",
        }
    finally:
        client.close()

```

### scripts/publish.py

```python
"""
小红书发布模块(图文 + 视频)

基于 xiaohongshu-mcp/publish.go + publish_video.go 翻译
整合 xiaohongshu-ops 的安全发布理念(人工确认 checkpoint)
"""

import json
import os
import sys
import time
import random
from pathlib import Path
from typing import Optional, Dict, Any, List

from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH

PUBLISH_URL = "https://creator.xiaohongshu.com/publish/publish?source=official"


class PublishAction:
    """发布动作"""

    def __init__(self, client: XiaohongshuClient):
        self.client = client

    def _navigate_to_publish(self):
        """导航到创作者中心发布页"""
        print("打开创作者中心发布页...", file=sys.stderr)
        self.client.navigate(PUBLISH_URL)
        time.sleep(3)

    def _click_publish_tab(self, tab_name: str):
        """点击发布类型 TAB(上传图文 / 上传视频)"""
        page = self.client.page

        # 等待上传区域出现
        try:
            page.wait_for_selector('div.upload-content, div.creator-tab', timeout=15000)
        except Exception:
            print("等待发布页加载超时,继续尝试", file=sys.stderr)

        time.sleep(1)

        # 移除可能的弹窗遮挡
        page.evaluate("""() => {
            var popover = document.querySelector('div.d-popover');
            if (popover) popover.remove();
        }""")

        # 查找并点击对应 TAB
        try:
            tabs = page.locator('div.creator-tab')
            for i in range(tabs.count()):
                tab = tabs.nth(i)
                text = tab.text_content().strip()
                if text == tab_name:
                    tab.click()
                    time.sleep(1)
                    print(f"已切换到「{tab_name}」", file=sys.stderr)
                    return
            # 回退:使用文本定位
            page.get_by_text(tab_name, exact=True).click()
            time.sleep(1)
        except Exception as e:
            print(f"切换 TAB「{tab_name}」失败: {e}", file=sys.stderr)

    def _upload_images(self, image_paths: List[str]):
        """逐张上传图片"""
        page = self.client.page
        valid_paths = [p for p in image_paths if os.path.exists(p)]

        if not valid_paths:
            raise ValueError("没有有效的图片文件")

        for i, path in enumerate(valid_paths):
            abs_path = os.path.abspath(path)
            print(f"上传图片 ({i+1}/{len(valid_paths)}): {abs_path}", file=sys.stderr)

            # 第一张用 .upload-input,后续用 input[type=file]
            selector = '.upload-input' if i == 0 else 'input[type="file"]'
            try:
                upload_input = page.locator(selector)
                upload_input.set_input_files(abs_path)
            except Exception:
                # 回退
                upload_input = page.locator('input[type="file"]')
                upload_input.set_input_files(abs_path)

            # 等待图片上传完成(预览元素出现)
            expected = i + 1
            for _ in range(120):  # 最多等 60 秒
                try:
                    previews = page.locator('.img-preview-area .pr, .upload-preview-item')
                    if previews.count() >= expected:
                        break
                except Exception:
                    pass
                time.sleep(0.5)

            time.sleep(1)

        print(f"全部 {len(valid_paths)} 张图片上传完成", file=sys.stderr)

    def _upload_video(self, video_path: str):
        """上传视频文件"""
        page = self.client.page

        if not os.path.exists(video_path):
            raise ValueError(f"视频文件不存在: {video_path}")

        abs_path = os.path.abspath(video_path)
        print(f"上传视频: {abs_path}", file=sys.stderr)

        try:
            upload_input = page.locator('.upload-input')
            upload_input.set_input_files(abs_path)
        except Exception:
            upload_input = page.locator('input[type="file"]')
            upload_input.set_input_files(abs_path)

        # 等待发布按钮可点击(视频处理完成标志),最多等 10 分钟
        print("等待视频处理完成...", file=sys.stderr)
        btn_selector = '.publish-page-publish-btn button.bg-red'
        for attempt in range(600):
            try:
                btn = page.locator(btn_selector)
                if btn.count() > 0 and btn.is_visible():
                    disabled = btn.get_attribute('disabled')
                    if disabled is None:
                        print("视频处理完成", file=sys.stderr)
                        return
            except Exception:
                pass
            time.sleep(1)

        print("警告: 等待视频处理超时", file=sys.stderr)

    def _fill_title(self, title: str):
        """填写标题"""
        page = self.client.page
        try:
            title_input = page.locator('div.d-input input')
            title_input.first.fill(title)
            time.sleep(0.5)

            # 检查标题是否超长
            max_suffix = page.locator('div.title-container div.max_suffix')
            if max_suffix.count() > 0 and max_suffix.is_visible():
                length_text = max_suffix.text_content()
                print(f"警告: 标题超长 ({length_text})", file=sys.stderr)

            print(f"标题已填写: {title}", file=sys.stderr)
        except Exception as e:
            print(f"填写标题失败: {e}", file=sys.stderr)

    def _fill_content(self, content: str):
        """填写正文"""
        page = self.client.page

        # 尝试两种编辑器:Quill 或 contenteditable
        content_el = None
        try:
            ql = page.locator('div.ql-editor')
            if ql.count() > 0:
                content_el = ql.first
        except Exception:
            pass

        if content_el is None:
            try:
                # 通过 placeholder 查找
                content_el = page.locator('p[data-placeholder*="输入正文描述"]').first
                # 向上找 textbox 父元素
                parent = page.locator('[role="textbox"]')
                if parent.count() > 0:
                    content_el = parent.first
            except Exception:
                pass

        if content_el is None:
            print("未找到正文输入框", file=sys.stderr)
            return

        try:
            content_el.click()
            time.sleep(0.3)
            page.keyboard.type(content, delay=30)
            time.sleep(0.5)

            # 检查正文是否超长
            length_error = page.locator('div.edit-container div.length-error')
            if length_error.count() > 0 and length_error.is_visible():
                err_text = length_error.text_content()
                print(f"警告: 正文超长 ({err_text})", file=sys.stderr)

            print("正文已填写", file=sys.stderr)
        except Exception as e:
            print(f"填写正文失败: {e}", file=sys.stderr)

    def _input_tags(self, tags: List[str]):
        """输入话题标签(通过 # 触发联想)"""
        if not tags:
            return

        page = self.client.page

        # 先移动光标到正文末尾
        content_el = None
        try:
            ql = page.locator('div.ql-editor')
            if ql.count() > 0:
                content_el = ql.first
            else:
                content_el = page.locator('[role="textbox"]').first
        except Exception:
            return

        if content_el is None:
            return

        try:
            content_el.click()
            time.sleep(0.3)
            # 按 End 键移动到末尾
            page.keyboard.press('End')
            page.keyboard.press('Enter')
            page.keyboard.press('Enter')
            time.sleep(0.5)
        except Exception:
            pass

        # 限制最多 10 个标签
        tags = tags[:10]

        for tag in tags:
            tag = tag.lstrip('#')
            try:
                # 输入 #
                page.keyboard.type('#', delay=100)
                time.sleep(0.3)

                # 逐字输入标签文字
                page.keyboard.type(tag, delay=50)
                time.sleep(1)

                # 尝试点击联想下拉框的第一个选项
                topic_item = page.locator('#creator-editor-topic-container .item')
                if topic_item.count() > 0:
                    topic_item.first.click()
                    print(f"标签「{tag}」已通过联想选择", file=sys.stderr)
                else:
                    # 没有联想,输入空格结束
                    page.keyboard.type(' ', delay=50)
                    print(f"标签「{tag}」已直接输入", file=sys.stderr)

                time.sleep(0.5)
            except Exception as e:
                print(f"输入标签「{tag}」失败: {e}", file=sys.stderr)

    def _set_schedule(self, schedule_time: str):
        """设置定时发布(格式: 2025-01-01 12:00)"""
        page = self.client.page
        try:
            # 点击定时发布开关
            switch = page.locator('.post-time-wrapper .d-switch')
            switch.click()
            time.sleep(0.8)

            # 设置日期时间
            date_input = page.locator('.date-picker-container input')
            date_input.fill('')
            date_input.fill(schedule_time)
            time.sleep(0.5)

            print(f"定时发布设置: {schedule_time}", file=sys.stderr)
        except Exception as e:
            print(f"设置定时发布失败: {e}", file=sys.stderr)

    def _click_publish_button(self) -> bool:
        """点击发布按钮"""
        page = self.client.page
        try:
            btn = page.locator('.publish-page-publish-btn button.bg-red')
            if btn.count() > 0:
                btn.first.click()
                time.sleep(3)
                print("已点击发布按钮", file=sys.stderr)
                return True
            else:
                print("未找到发布按钮", file=sys.stderr)
                return False
        except Exception as e:
            print(f"点击发布按钮失败: {e}", file=sys.stderr)
            return False

    def _check_publish_ready(self) -> Dict[str, Any]:
        """检查发布前的状态(三要素校验)"""
        page = self.client.page
        status = {}

        # 检查标题
        try:
            title_input = page.locator('div.d-input input')
            status["title"] = title_input.input_value() if title_input.count() > 0 else ""
        except Exception:
            status["title"] = ""

        # 检查发布按钮可见性
        try:
            btn = page.locator('.publish-page-publish-btn button.bg-red')
            status["publish_button_visible"] = btn.count() > 0 and btn.is_visible()
        except Exception:
            status["publish_button_visible"] = False

        status["title_ok"] = bool(status["title"])
        return status

    def publish_image(
        self,
        title: str,
        content: str,
        image_paths: List[str],
        tags: Optional[List[str]] = None,
        schedule_time: Optional[str] = None,
        auto_publish: bool = False,
    ) -> Dict[str, Any]:
        """
        发布图文笔记

        Args:
            title: 标题(建议 <=20 字)
            content: 正文
            image_paths: 图片文件路径列表
            tags: 话题标签列表
            schedule_time: 定时发布时间(格式 2025-01-01 12:00),None 为立即
            auto_publish: 是否自动点击发布(默认 False,停在发布按钮处)

        Returns:
            操作结果
        """
        self._navigate_to_publish()
        self._click_publish_tab("上传图文")

        # 1. 上传图片
        self._upload_images(image_paths)

        # 2. 填写标题
        self._fill_title(title)
        time.sleep(1)

        # 3. 填写正文
        self._fill_content(content)
        time.sleep(1)

        # 4. 添加标签
        if tags:
            self._input_tags(tags)
            time.sleep(1)

        # 5. 定时发布
        if schedule_time:
            self._set_schedule(schedule_time)

        # 6. 校验三要素
        ready = self._check_publish_ready()
        print(f"发布前校验: {ready}", file=sys.stderr)

        # 7. 是否自动发布
        if auto_publish:
            success = self._click_publish_button()
            return {
                "status": "success" if success else "error",
                "action": "publish_image",
                "title": title,
                "image_count": len(image_paths),
                "tags": tags or [],
                "schedule_time": schedule_time,
                "published": success,
                "message": "发布成功" if success else "发布失败",
            }
        else:
            return {
                "status": "ready",
                "action": "publish_image",
                "title": title,
                "image_count": len(image_paths),
                "tags": tags or [],
                "schedule_time": schedule_time,
                "published": False,
                "ready_check": ready,
                "message": "已填写完毕,停在发布按钮处。请确认后使用 --auto-publish 发布。",
            }

    def publish_video(
        self,
        title: str,
        content: str,
        video_path: str,
        tags: Optional[List[str]] = None,
        schedule_time: Optional[str] = None,
        auto_publish: bool = False,
    ) -> Dict[str, Any]:
        """
        发布视频笔记

        Args:
            title: 标题
            content: 正文
            video_path: 视频文件路径
            tags: 话题标签
            schedule_time: 定时发布时间
            auto_publish: 是否自动发布(默认 False)

        Returns:
            操作结果
        """
        self._navigate_to_publish()
        self._click_publish_tab("上传视频")

        # 1. 上传视频
        self._upload_video(video_path)

        # 2. 填写标题
        self._fill_title(title)
        time.sleep(1)

        # 3. 填写正文
        self._fill_content(content)
        time.sleep(1)

        # 4. 添加标签
        if tags:
            self._input_tags(tags)
            time.sleep(1)

        # 5. 定时发布
        if schedule_time:
            self._set_schedule(schedule_time)

        # 6. 校验
        ready = self._check_publish_ready()
        print(f"发布前校验: {ready}", file=sys.stderr)

        if auto_publish:
            success = self._click_publish_button()
            return {
                "status": "success" if success else "error",
                "action": "publish_video",
                "title": title,
                "video_path": video_path,
                "published": success,
                "message": "发布成功" if success else "发布失败",
            }
        else:
            return {
                "status": "ready",
                "action": "publish_video",
                "title": title,
                "video_path": video_path,
                "published": False,
                "ready_check": ready,
                "message": "已填写完毕,停在发布按钮处。请确认后使用 --auto-publish 发布。",
            }


def md_to_images(
    markdown_text: str,
    output_dir: str = ".",
    width: int = 1080,
    css: str = "",
) -> List[str]:
    """
    将 Markdown 文本渲染为长文图片(利用 Playwright 截图)

    Args:
        markdown_text: Markdown 文本
        output_dir: 输出目录
        width: 图片宽度(像素)
        css: 自定义 CSS 样式

    Returns:
        生成的图片路径列表
    """
    try:
        import markdown
    except ImportError:
        print("需要安装 markdown 库: pip install markdown", file=sys.stderr)
        raise

    # 将 Markdown 转为 HTML
    html_content = markdown.markdown(
        markdown_text,
        extensions=['tables', 'fenced_code', 'codehilite', 'nl2br'],
    )

    default_css = """
    body {
        font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", "PingFang SC",
                     "Hiragino Sans GB", "Microsoft YaHei", sans-serif;
        padding: 40px 50px;
        line-height: 1.8;
        color: #333;
        background: #fff;
        max-width: 100%;
        margin: 0 auto;
        font-size: 16px;
    }
    h1 { font-size: 24px; font-weight: bold; margin: 20px 0 10px; color: #222; }
    h2 { font-size: 20px; font-weight: bold; margin: 18px 0 8px; color: #333; }
    h3 { font-size: 18px; font-weight: bold; margin: 15px 0 6px; color: #444; }
    p { margin: 8px 0; }
    ul, ol { padding-left: 20px; }
    li { margin: 4px 0; }
    blockquote {
        border-left: 4px solid #ff2442;
        padding: 8px 16px;
        margin: 12px 0;
        background: #fff5f5;
        color: #555;
    }
    code {
        background: #f5f5f5;
        padding: 2px 6px;
        border-radius: 3px;
        font-size: 14px;
    }
    pre {
        background: #f5f5f5;
        padding: 16px;
        border-radius: 6px;
        overflow-x: auto;
    }
    img { max-width: 100%; border-radius: 8px; }
    hr { border: none; border-top: 1px solid #eee; margin: 20px 0; }
    table { border-collapse: collapse; width: 100%; margin: 12px 0; }
    th, td { border: 1px solid #ddd; padding: 8px 12px; text-align: left; }
    th { background: #f5f5f5; }
    """

    full_css = default_css + "\n" + css
    full_html = f"""<!DOCTYPE html>
<html><head>
<meta charset="utf-8">
<meta name="viewport" content="width={width}">
<style>{full_css}</style>
</head><body>{html_content}</body></html>"""

    os.makedirs(output_dir, exist_ok=True)

    from playwright.sync_api import sync_playwright
    with sync_playwright() as pw:
        browser = pw.chromium.launch(headless=True)
        page = browser.new_page(viewport={"width": width, "height": 800})
        page.set_content(full_html)
        page.wait_for_load_state("networkidle")
        time.sleep(0.5)

        # 获取实际内容高度
        total_height = page.evaluate("document.body.scrollHeight")

        # 如果内容不太长(<=4000px),直接截一张
        # 否则分页截图(每张最多 3000px)
        image_paths = []
        max_page_height = 3000

        if total_height <= max_page_height + 500:
            # 单张截图
            page.set_viewport_size({"width": width, "height": total_height + 40})
            time.sleep(0.3)
            img_path = os.path.join(output_dir, "md_page_1.png")
            page.screenshot(path=img_path, full_page=True)
            image_paths.append(img_path)
        else:
            # 分页截图
            page_num = 1
            y_offset = 0
            while y_offset < total_height:
                chunk_height = min(max_page_height, total_height - y_offset)
                img_path = os.path.join(output_dir, f"md_page_{page_num}.png")
                page.screenshot(
                    path=img_path,
                    clip={"x": 0, "y": y_offset, "width": width, "height": chunk_height},
                )
                image_paths.append(img_path)
                y_offset += chunk_height
                page_num += 1

        browser.close()

    print(f"Markdown 已渲染为 {len(image_paths)} 张图片", file=sys.stderr)
    return image_paths


# ============================================================
# 便捷函数
# ============================================================

def publish_image(
    title: str,
    content: str,
    image_paths: List[str],
    tags: Optional[List[str]] = None,
    schedule_time: Optional[str] = None,
    auto_publish: bool = False,
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
    """发布图文笔记"""
    client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
    try:
        client.start()
        action = PublishAction(client)
        return action.publish_image(
            title=title, content=content, image_paths=image_paths,
            tags=tags, schedule_time=schedule_time, auto_publish=auto_publish,
        )
    finally:
        client.close()


def publish_video(
    title: str,
    content: str,
    video_path: str,
    tags: Optional[List[str]] = None,
    schedule_time: Optional[str] = None,
    auto_publish: bool = False,
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
    """发布视频笔记"""
    client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
    try:
        client.start()
        action = PublishAction(client)
        return action.publish_video(
            title=title, content=content, video_path=video_path,
            tags=tags, schedule_time=schedule_time, auto_publish=auto_publish,
        )
    finally:
        client.close()


def publish_markdown(
    title: str,
    markdown_text: str,
    extra_content: str = "",
    tags: Optional[List[str]] = None,
    schedule_time: Optional[str] = None,
    auto_publish: bool = False,
    image_width: int = 1080,
    output_dir: str = "",
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
    """
    将 Markdown 渲染为图片后发布图文笔记

    Args:
        title: 标题
        markdown_text: Markdown 内容
        extra_content: 正文区的额外文字说明
        tags: 话题标签
        schedule_time: 定时发布
        auto_publish: 是否自动发布
        image_width: 图片宽度
        output_dir: 图片输出目录(默认临时目录)
        headless: 无头模式
        cookie_path: Cookie 路径

    Returns:
        操作结果
    """
    import tempfile
    if not output_dir:
        output_dir = tempfile.mkdtemp(prefix="xhs_md_")

    # 1. Markdown → 图片
    image_paths = md_to_images(markdown_text, output_dir=output_dir, width=image_width)

    if not image_paths:
        return {"status": "error", "message": "Markdown 渲染失败,未生成图片"}

    # 2. 发布图文
    body = extra_content or f"本文由 Markdown 渲染生成,共 {len(image_paths)} 页"
    return publish_image(
        title=title, content=body, image_paths=image_paths,
        tags=tags, schedule_time=schedule_time, auto_publish=auto_publish,
        headless=headless, cookie_path=cookie_path,
    )

```

### scripts/search.py

```python
"""
小红书搜索模块

基于 xiaohongshu-mcp/search.go 翻译
"""

import json
import sys
import time
import urllib.parse
from typing import Optional, List, Dict, Any

from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH


# 筛选选项映射表(来自 Go 源码)
FILTER_OPTIONS_MAP = {
    1: [  # 排序依据
        {"index": 1, "text": "综合"},
        {"index": 2, "text": "最新"},
        {"index": 3, "text": "最多点赞"},
        {"index": 4, "text": "最多评论"},
        {"index": 5, "text": "最多收藏"},
    ],
    2: [  # 笔记类型
        {"index": 1, "text": "不限"},
        {"index": 2, "text": "视频"},
        {"index": 3, "text": "图文"},
    ],
    3: [  # 发布时间
        {"index": 1, "text": "不限"},
        {"index": 2, "text": "一天内"},
        {"index": 3, "text": "一周内"},
        {"index": 4, "text": "半年内"},
    ],
    4: [  # 搜索范围
        {"index": 1, "text": "不限"},
        {"index": 2, "text": "已看过"},
        {"index": 3, "text": "未看过"},
        {"index": 4, "text": "已关注"},
    ],
    5: [  # 位置距离
        {"index": 1, "text": "不限"},
        {"index": 2, "text": "同城"},
        {"index": 3, "text": "附近"},
    ],
}


class SearchAction:
    """搜索动作"""

    def __init__(self, client: XiaohongshuClient):
        self.client = client

    def _make_search_url(self, keyword: str) -> str:
        """构建搜索 URL"""
        params = urllib.parse.urlencode({
            "keyword": keyword,
            "source": "web_explore_feed",
        })
        return f"https://www.xiaohongshu.com/search_result?{params}"

    def _apply_filters(
        self,
        sort_by: Optional[str] = None,
        note_type: Optional[str] = None,
        publish_time: Optional[str] = None,
        search_scope: Optional[str] = None,
        location: Optional[str] = None,
    ):
        """应用筛选条件"""
        page = self.client.page

        # 检查是否有筛选条件
        has_filters = any([sort_by, note_type, publish_time, search_scope, location])
        if not has_filters:
            return

        # 悬停在筛选按钮上
        try:
            filter_btn = page.locator('div.filter')
            filter_btn.hover()
            time.sleep(0.5)

            # 等待筛选面板出现
            page.wait_for_selector('div.filter-panel', timeout=5000)
        except Exception as e:
            print(f"打开筛选面板失败: {e}", file=sys.stderr)
            return

        # 映射筛选选项到文本
        filter_texts = []

        if sort_by:
            text = self._find_filter_text(1, sort_by)
            if text:
                filter_texts.append(text)

        if note_type:
            text = self._find_filter_text(2, note_type)
            if text:
                filter_texts.append(text)

        if publish_time:
            text = self._find_filter_text(3, publish_time)
            if text:
                filter_texts.append(text)

        if search_scope:
            text = self._find_filter_text(4, search_scope)
            if text:
                filter_texts.append(text)

        if location:
            text = self._find_filter_text(5, location)
            if text:
                filter_texts.append(text)

        # 应用筛选:使用文本定位器,避免依赖 DOM 顺序
        filter_panel = page.locator('div.filter-panel')
        for tag_text in filter_texts:
            try:
                filter_panel.get_by_text(tag_text, exact=True).click()
                time.sleep(0.3)
            except Exception as e:
                print(f"点击筛选选项失败: {e}", file=sys.stderr)

        # 等待页面更新
        time.sleep(1)

    def _find_filter_text(self, filters_group: int, text: str) -> Optional[str]:
        """查找筛选选项的显示文本(用于文本定位器)"""
        options = FILTER_OPTIONS_MAP.get(filters_group, [])
        for opt in options:
            if opt["text"] == text:
                return opt["text"]
        return None

    def search(
        self,
        keyword: str,
        sort_by: Optional[str] = None,
        note_type: Optional[str] = None,
        publish_time: Optional[str] = None,
        search_scope: Optional[str] = None,
        location: Optional[str] = None,
        limit: int = 10,
    ) -> List[Dict[str, Any]]:
        """
        搜索小红书内容

        Args:
            keyword: 搜索关键词
            sort_by: 排序方式:综合 最新 最多点赞 最多评论 最多收藏
            note_type: 笔记类型:不限 视频 图文
            publish_time: 发布时间:不限 一天内 一周内 半年内
            search_scope: 搜索范围:不限 已看过 未看过 已关注
            location: 位置距离:不限 同城 附近
            limit: 返回数量限制

        Returns:
            搜索结果列表
        """
        client = self.client
        page = client.page

        # 导航到搜索页面
        search_url = self._make_search_url(keyword)
        client.navigate(search_url)

        # 等待页面加载 - 增加等待时间确保搜索结果加载完成
        client.wait_for_initial_state()
        time.sleep(3)

        # 滚动页面触发加载更多内容
        for _ in range(3):
            page.evaluate("window.scrollBy(0, 500)")
            time.sleep(0.5)

        # 应用筛选条件
        self._apply_filters(
            sort_by=sort_by,
            note_type=note_type,
            publish_time=publish_time,
            search_scope=search_scope,
            location=location,
        )

        # 提取数据 - 使用实际验证过的字段路径
        result = page.evaluate("""() => {
            const feeds = window.__INITIAL_STATE__?.search?.feeds;
            const data = feeds?.value || feeds?._value;
            if (!data || !Array.isArray(data) || data.length === 0) return '';

            return JSON.stringify(data.slice(0, 50).map(item => {
                const nc = item.noteCard || {};
                const user = nc.user || {};
                const info = nc.interactInfo || {};
                const cover = nc.cover || {};
                return {
                    id: item.id || '',
                    xsec_token: item.xsecToken || '',
                    title: nc.displayTitle || '',
                    type: nc.type || '',
                    user: user.nickname || user.nickName || '',
                    user_id: user.userId || '',
                    user_avatar: user.avatar || '',
                    liked_count: info.likedCount || '0',
                    collected_count: info.collectedCount || '0',
                    comment_count: info.commentCount || '0',
                    shared_count: info.sharedCount || '0',
                    cover_url: cover.urlDefault || cover.urlPre || '',
                };
            }));
        }""")

        if not result:
            print("未获取到搜索结果", file=sys.stderr)
            return []

        # 解析 JSON
        try:
            feeds = json.loads(result)
        except json.JSONDecodeError as e:
            print(f"解析搜索结果失败: {e}", file=sys.stderr)
            return []

        # 限制数量
        if limit > 0:
            feeds = feeds[:limit]

        return feeds


def search(
    keyword: str,
    sort_by: Optional[str] = None,
    note_type: Optional[str] = None,
    publish_time: Optional[str] = None,
    search_scope: Optional[str] = None,
    location: Optional[str] = None,
    limit: int = 10,
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> List[Dict[str, Any]]:
    """
    搜索小红书内容

    Args:
        keyword: 搜索关键词
        sort_by: 排序方式
        note_type: 笔记类型
        publish_time: 发布时间
        search_scope: 搜索范围
        location: 位置距离
        limit: 返回数量限制
        headless: 是否无头模式
        cookie_path: Cookie 路径

    Returns:
        搜索结果列表
    """
    client = XiaohongshuClient(
        headless=headless,
        cookie_path=cookie_path,
    )

    try:
        client.start()
        action = SearchAction(client)
        return action.search(
            keyword=keyword,
            sort_by=sort_by,
            note_type=note_type,
            publish_time=publish_time,
            search_scope=search_scope,
            location=location,
            limit=limit,
        )
    finally:
        client.close()

```

### scripts/user.py

```python
"""
小红书用户主页模块

基于 xiaohongshu-mcp/user_profile.go 翻译
"""

import json
import sys
import time
from typing import Optional, Dict, Any

from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH


class UserProfileAction:
    """用户主页动作"""

    def __init__(self, client: XiaohongshuClient):
        self.client = client

    def _make_user_profile_url(self, user_id: str, xsec_token: str = "") -> str:
        """构建用户主页 URL"""
        if xsec_token:
            return f"https://www.xiaohongshu.com/user/profile/{user_id}?xsec_token={xsec_token}&xsec_source=pc_note"
        return f"https://www.xiaohongshu.com/user/profile/{user_id}"

    def _extract_user_profile_data(self) -> Optional[Dict[str, Any]]:
        """提取用户主页数据"""
        page = self.client.page

        # 获取用户信息
        user_data_result = page.evaluate("""() => {
            if (window.__INITIAL_STATE__ &&
                window.__INITIAL_STATE__.user &&
                window.__INITIAL_STATE__.user.userPageData) {
                const userPageData = window.__INITIAL_STATE__.user.userPageData;
                const data = userPageData.value !== undefined ? userPageData.value : userPageData._value;
                if (data) {
                    return JSON.stringify(data);
                }
            }
            return '';
        }""")

        if not user_data_result:
            return None

        # 获取用户笔记列表(含置顶标记和时间信息)
        notes_result = page.evaluate("""() => {
            if (!window.__INITIAL_STATE__ ||
                !window.__INITIAL_STATE__.user ||
                !window.__INITIAL_STATE__.user.notes) return '';

            var notes = window.__INITIAL_STATE__.user.notes;
            var data = notes.value !== undefined ? notes.value : (notes._value !== undefined ? notes._value : notes);
            if (!data) return '';

            // 展平二维数组
            var flat = [];
            for (var i = 0; i < data.length; i++) {
                if (Array.isArray(data[i])) {
                    for (var j = 0; j < data[i].length; j++) flat.push(data[i][j]);
                } else {
                    flat.push(data[i]);
                }
            }

            // 提取每条笔记的关键信息,包含置顶标记和排序所需字段
            return JSON.stringify(flat.map(function(item) {
                var nc = item.noteCard || {};
                var info = nc.interactInfo || {};
                var user = nc.user || {};
                var cover = nc.cover || {};
                var result = {
                    id: item.id || '',
                    xsecToken: item.xsecToken || '',
                    noteCard: {
                        displayTitle: nc.displayTitle || '',
                        type: nc.type || '',
                        interactInfo: {
                            likedCount: info.likedCount || '0',
                            collectedCount: info.collectedCount || '0',
                            commentCount: info.commentCount || '0',
                            sharedCount: info.sharedCount || '0'
                        },
                        user: {
                            nickname: user.nickname || user.nickName || '',
                            userId: user.userId || ''
                        },
                        cover: {
                            urlDefault: cover.urlDefault || cover.urlPre || ''
                        }
                    }
                };
                // 置顶标记(小红书用多种字段名)
                if (item.isTop) result.isTop = true;
                if (item.stickyTop) result.isTop = true;
                if (item.topFlag) result.isTop = true;
                if (nc.isTop) result.isTop = true;
                // 检查 showTags 中是否有置顶标签
                var tags = item.showTags || nc.showTags || [];
                for (var k = 0; k < tags.length; k++) {
                    if (tags[k] === 'top' || tags[k] === 'is_top' || tags[k] === 'sticky') {
                        result.isTop = true;
                    }
                }
                // 时间信息
                if (nc.time) result.time = nc.time;
                if (nc.createTime) result.time = nc.createTime;
                if (nc.lastUpdateTime) result.lastUpdateTime = nc.lastUpdateTime;
                if (item.timestamp) result.time = item.timestamp;
                return result;
            }));
        }""")

        try:
            user_page_data = json.loads(user_data_result)
        except json.JSONDecodeError:
            return None

        # 解析笔记数据
        feeds = []
        if notes_result:
            try:
                feeds = json.loads(notes_result)
            except json.JSONDecodeError:
                pass

        # 组装响应
        response = {
            "userBasicInfo": user_page_data.get("basicInfo", {}),
            "interactions": user_page_data.get("interactions", []),
            "feeds": feeds,
        }

        return response

    def get_user_profile(
        self,
        user_id: str,
        xsec_token: str = "",
    ) -> Optional[Dict[str, Any]]:
        """
        获取用户主页信息

        Args:
            user_id: 用户 ID
            xsec_token: xsec_token 参数(可选)

        Returns:
            用户主页数据
        """
        client = self.client

        # 构建 URL 并导航
        url = self._make_user_profile_url(user_id, xsec_token)
        print(f"打开用户主页: {url}", file=sys.stderr)
        client.navigate(url)

        # 等待页面加载
        client.wait_for_initial_state()
        time.sleep(1)

        # 提取数据
        profile = self._extract_user_profile_data()

        if not profile:
            print("未获取到用户主页数据", file=sys.stderr)
            return None

        return profile


    def get_my_profile(self) -> Optional[Dict[str, Any]]:
        """
        获取自己的个人主页(通过侧边栏导航)

        Returns:
            自己的用户主页数据
        """
        client = self.client

        # 先导航到首页
        print("导航到首页获取个人信息...", file=sys.stderr)
        client.navigate("https://www.xiaohongshu.com/explore")
        time.sleep(2)

        page = client.page

        # 尝试从侧边栏获取自己的用户 ID
        my_user_id = page.evaluate("""() => {
            // 方法 1: 从侧边栏用户链接提取
            var links = document.querySelectorAll('a[href*="/user/profile/"]');
            for (var i = 0; i < links.length; i++) {
                var href = links[i].getAttribute('href');
                var match = href.match(/\\/user\\/profile\\/([a-f0-9]+)/);
                if (match) return match[1];
            }
            // 方法 2: 从 __INITIAL_STATE__ 提取
            if (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.user) {
                var user = window.__INITIAL_STATE__.user;
                if (user.userPageData) {
                    var data = user.userPageData.value || user.userPageData._value || user.userPageData;
                    if (data && data.basicInfo && data.basicInfo.userId) {
                        return data.basicInfo.userId;
                    }
                }
            }
            return '';
        }""")

        if not my_user_id:
            # 回退:点击侧边栏的个人头像/链接
            try:
                avatar = page.locator('.sidebar-nav .user-avatar, .side-bar .user-wrap a, a.user-link')
                if avatar.count() > 0:
                    avatar.first.click()
                    time.sleep(2)
                    # 从跳转后的 URL 提取用户 ID
                    import re
                    match = re.search(r'/user/profile/([a-f0-9]+)', page.url)
                    if match:
                        my_user_id = match.group(1)
            except Exception as e:
                print(f"通过侧边栏获取用户ID失败: {e}", file=sys.stderr)

        if not my_user_id:
            print("无法获取当前登录用户的 ID", file=sys.stderr)
            return None

        print(f"获取到用户 ID: {my_user_id}", file=sys.stderr)
        return self.get_user_profile(my_user_id)


def user_profile(
    user_id: str,
    xsec_token: str = "",
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Optional[Dict[str, Any]]:
    """
    获取用户主页信息

    Args:
        user_id: 用户 ID
        xsec_token: xsec_token 参数
        headless: 是否无头模式
        cookie_path: Cookie 路径

    Returns:
        用户主页数据
    """
    client = XiaohongshuClient(
        headless=headless,
        cookie_path=cookie_path,
    )

    try:
        client.start()
        action = UserProfileAction(client)
        return action.get_user_profile(
            user_id=user_id,
            xsec_token=xsec_token,
        )
    finally:
        client.close()


def my_profile(
    headless: bool = True,
    cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Optional[Dict[str, Any]]:
    """
    获取自己的个人主页

    Args:
        headless: 是否无头模式
        cookie_path: Cookie 路径

    Returns:
        自己的用户主页数据
    """
    client = XiaohongshuClient(
        headless=headless,
        cookie_path=cookie_path,
    )

    try:
        client.start()
        action = UserProfileAction(client)
        return action.get_my_profile()
    finally:
        client.close()

```

xiaohongshu-skill | SkillHub