xiaohongshu-skill
当用户想要与小红书(xiaohongshu/rednote)交互时使用此 Skill。包括搜索笔记、获取帖子详情、查看用户主页、二维码扫码登录、提取平台内容等。当用户提到 xiaohongshu、小红书、rednote,或需要浏览/抓取中国社交媒体内容时激活此 Skill。
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-xiaohongshu-skill
Repository
Skill path: skills/deliciousbuding/xiaohongshu-skill
当用户想要与小红书(xiaohongshu/rednote)交互时使用此 Skill。包括搜索笔记、获取帖子详情、查看用户主页、二维码扫码登录、提取平台内容等。当用户提到 xiaohongshu、小红书、rednote,或需要浏览/抓取中国社交媒体内容时激活此 Skill。
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install xiaohongshu-skill into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding xiaohongshu-skill to shared team environments
- Use xiaohongshu-skill for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: xiaohongshu-skill
description: 当用户想要与小红书(xiaohongshu/rednote)交互时使用此 Skill。包括搜索笔记、获取帖子详情、查看用户主页、二维码扫码登录、提取平台内容等。当用户提到 xiaohongshu、小红书、rednote,或需要浏览/抓取中国社交媒体内容时激活此 Skill。
user-invokable: true
metadata: {"openclaw": {"emoji": "📕", "requires": {"bins": ["python3", "playwright"], "anyBins": ["python3", "python"]}, "os": ["win32", "linux", "darwin"], "install": [{"id": "pip", "kind": "node", "label": "Install dependencies (pip)", "bins": ["playwright"]}]}}
---
# 小红书 Skill
基于 Python Playwright 的小红书(rednote)交互工具,通过浏览器自动化从 `window.__INITIAL_STATE__`(Vue SSR 状态)中提取结构化数据。
## 前置条件
在 `{baseDir}` 目录下安装依赖:
```bash
cd {baseDir}
pip install -r requirements.txt
playwright install chromium
```
Linux/WSL 环境还需运行:
```bash
playwright install-deps chromium
```
## 快速开始
所有命令从 `{baseDir}` 目录运行。
### 1. 登录(首次必须)
```bash
cd {baseDir}
# 打开浏览器窗口,显示二维码供微信/小红书扫描
python -m scripts qrcode --headless=false
# 检查登录是否仍然有效
python -m scripts check-login
```
在无头环境下,二维码图片保存到 `{baseDir}/data/qrcode.png`,可通过其他渠道发送扫码。
### 2. 搜索
```bash
cd {baseDir}
# 基础搜索
python -m scripts search "关键词"
# 带筛选条件
python -m scripts search "美食" --sort-by=最新 --note-type=图文 --limit=10
```
**筛选选项:**
- `--sort-by`:综合、最新、最多点赞、最多评论、最多收藏
- `--note-type`:不限、视频、图文
- `--publish-time`:不限、一天内、一周内、半年内
- `--search-scope`:不限、已看过、未看过、已关注
- `--location`:不限、同城、附近
### 3. 帖子详情
```bash
cd {baseDir}
# 使用搜索结果中的 id 和 xsec_token
python -m scripts feed <feed_id> <xsec_token>
# 加载评论
python -m scripts feed <feed_id> <xsec_token> --load-comments --max-comments=20
```
### 4. 用户主页
```bash
cd {baseDir}
python -m scripts user <user_id> [xsec_token]
```
## 数据提取路径
| 数据类型 | JavaScript 路径 |
|----------|----------------|
| 搜索结果 | `window.__INITIAL_STATE__.search.feeds` |
| 帖子详情 | `window.__INITIAL_STATE__.note.noteDetailMap` |
| 用户信息 | `window.__INITIAL_STATE__.user.userPageData` |
| 用户笔记 | `window.__INITIAL_STATE__.user.notes` |
**Vue Ref 处理:** 始终通过 `.value` 或 `._value` 解包:
```javascript
const data = obj.value !== undefined ? obj.value : obj._value;
```
## 反爬保护
本 Skill 内置了针对小红书反机器人策略的保护措施:
- **频率控制**:两次导航间自动延迟 3-6 秒,每 5 次连续请求后冷却 10 秒
- **验证码检测**:自动检测安全验证页面重定向,触发时抛出 `CaptchaError` 并给出处理建议
- **仿人类行为**:随机延迟、滚动模式、User-Agent 伪装
**触发验证码时的处理:**
1. 等待几分钟后重试
2. 运行 `cd {baseDir} && python -m scripts qrcode --headless=false` 手动通过验证
3. 如 Cookie 失效,重新扫码登录
## 输出格式
所有命令输出 JSON 到标准输出。搜索结果示例:
```json
{
"id": "abc123",
"xsec_token": "ABxyz...",
"title": "帖子标题",
"type": "normal",
"user": "用户名",
"user_id": "user123",
"liked_count": "1234",
"collected_count": "567",
"comment_count": "89"
}
```
## 文件结构
```
{baseDir}/
├── SKILL.md # 本文件(Skill 规范)
├── README.md # 项目文档
├── requirements.txt # Python 依赖
├── LICENSE # MIT 许可证
├── data/ # 运行时数据(二维码、调试输出)
└── scripts/ # 核心模块
├── __init__.py
├── __main__.py # CLI 入口
├── client.py # 浏览器客户端封装(频率控制 + 验证码检测)
├── login.py # 二维码扫码登录流程
├── search.py # 搜索(支持多种筛选)
├── feed.py # 帖子详情提取
└── user.py # 用户主页提取
```
## 跨平台兼容性
| 环境 | 无头模式 | 有头模式(扫码登录) | 备注 |
|------|----------|----------------------|------|
| Windows | 支持 | 支持 | 主要开发环境 |
| WSL2 (Win11) | 支持 | 通过 WSLg 支持 | 需要 `playwright install-deps` |
| Linux 服务器 | 支持 | 不适用 | 二维码保存为图片文件 |
## 注意事项
1. **Cookie 过期**:Cookie 会定期过期,`check-login` 返回 false 时需重新登录
2. **频率限制**:过度抓取会触发验证码,请依赖内置的频率控制
3. **xsec_token**:Token 与会话绑定,始终使用搜索/用户结果中的最新 Token
4. **仅供学习**:请遵守小红书的使用条款,本工具仅用于学习研究
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "deliciousbuding",
"slug": "xiaohongshu-skill",
"displayName": "小红书skill",
"latest": {
"version": "1.0.2",
"publishedAt": 1772171400721,
"commit": "https://github.com/openclaw/skills/commit/3614fc2aa03f6bb379b6727e5adc7c8c3567016c"
},
"history": [
{
"version": "1.0.0",
"publishedAt": 1771854373203,
"commit": "https://github.com/openclaw/skills/commit/99b9cf011c7424d801948cbd909609914e89f2c4"
}
]
}
```
### scripts/__init__.py
```python
"""
xiaohongshu-skill
基于 xiaohongshu-mcp Go 源码翻译的 Python Playwright 实现
"""
from .client import XiaohongshuClient, create_client, DEFAULT_COOKIE_PATH
from . import login
from . import search
from . import feed
from . import user
from . import comment
from . import interact
from . import explore
from . import publish
__version__ = "1.1.0"
__all__ = [
"XiaohongshuClient",
"create_client",
"DEFAULT_COOKIE_PATH",
"login",
"search",
"feed",
"user",
"comment",
"interact",
"explore",
"publish",
]
```
### scripts/__main__.py
```python
#!/usr/bin/env python
"""
小红书 CLI 入口
基于 xiaohongshu-mcp 翻译
"""
import argparse
import json
import sys
from typing import Optional
# Windows GBK 终端兼容性修复:强制 stdout 使用 UTF-8
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
from .client import XiaohongshuClient, CaptchaError, DEFAULT_COOKIE_PATH
from . import login
from . import search
from . import feed
from . import user
from . import comment
from . import interact
from . import explore
from . import publish
def format_output(data) -> str:
"""格式化输出为 JSON"""
if data is None:
return json.dumps({"error": "No data"}, ensure_ascii=False, indent=2)
return json.dumps(data, ensure_ascii=False, indent=2)
def _headless(args) -> bool:
"""从 args 解析 headless 值"""
val = getattr(args, 'headless', 'true')
if isinstance(val, bool):
return val
return val.lower() != 'false'
# ============================================================
# 命令处理函数
# ============================================================
def cmd_login(args):
"""登录命令 — 生成二维码并等待扫码"""
result = login.login(
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
timeout=args.timeout,
)
print(format_output(result))
if result.get("status") == "logged_in":
return 0
elif result.get("status") == "qrcode_ready":
return 2
return 1
def cmd_qrcode(args):
"""
获取二维码并等待扫码。
如果已登录,直接返回。
"""
cookie_path = args.cookie or DEFAULT_COOKIE_PATH
headless = _headless(args)
# 1) 先快速检查是否已登录
is_logged_in, username = login.check_login(cookie_path=cookie_path)
if is_logged_in:
result = {
"status": "logged_in",
"qrcode_path": None,
"username": username,
"message": "已登录",
}
print(format_output(result))
return 0
# 2) 未登录 → 启动可见浏览器,获取二维码并等待扫码
client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
try:
client.start()
action = login.LoginAction(client)
qrcode_path, already_logged_in = action.get_wechat_qrcode()
if already_logged_in:
print(format_output({"status": "logged_in", "message": "已登录"}))
return 0
if qrcode_path:
print(format_output({
"status": "qrcode_ready",
"qrcode_path": qrcode_path,
"message": f"请扫码登录,二维码路径: {qrcode_path}",
}))
# 等待用户扫码(最多 120 秒)
print("等待用户扫码…(最多 120 秒)", file=sys.stderr)
success = action.wait_for_login(timeout=120)
if success:
print(format_output({"status": "logged_in", "message": "登录成功!"}))
return 0
else:
print(format_output({"status": "timeout", "message": "扫码超时"}))
return 2
else:
print(format_output({"status": "error", "message": "获取二维码失败"}))
return 1
finally:
client.close()
def cmd_check_login(args):
"""检查登录状态"""
is_logged_in, username = login.check_login(
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output({
"is_logged_in": is_logged_in,
"username": username,
}))
return 0
def cmd_search(args):
"""搜索命令"""
results = search.search(
keyword=args.keyword,
sort_by=args.sort_by,
note_type=args.note_type,
publish_time=args.publish_time,
search_scope=args.search_scope,
location=args.location,
limit=args.limit,
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output({
"count": len(results),
"results": results,
}))
return 0
def cmd_feed(args):
"""笔记详情命令"""
detail = feed.feed_detail(
feed_id=args.feed_id,
xsec_token=args.xsec_token or "",
load_comments=args.load_comments,
max_comments=args.max_comments,
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(detail))
return 0 if detail else 1
def cmd_user(args):
"""用户主页命令"""
profile = user.user_profile(
user_id=args.user_id,
xsec_token=args.xsec_token or "",
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(profile))
return 0 if profile else 1
def cmd_me(args):
"""获取自己的个人主页"""
profile = user.my_profile(
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(profile))
return 0 if profile else 1
def cmd_comment(args):
"""发表评论"""
result = comment.post_comment(
feed_id=args.feed_id,
xsec_token=args.xsec_token or "",
content=args.content,
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0 if result.get("status") == "success" else 1
def cmd_reply(args):
"""回复评论"""
result = comment.reply_to_comment(
feed_id=args.feed_id,
xsec_token=args.xsec_token or "",
comment_id=args.comment_id,
reply_user_id=args.reply_user_id,
content=args.content,
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0 if result.get("status") == "success" else 1
def cmd_like(args):
"""点赞"""
result = interact.like(
feed_id=args.feed_id,
xsec_token=args.xsec_token or "",
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0 if result.get("status") == "success" else 1
def cmd_unlike(args):
"""取消点赞"""
result = interact.unlike(
feed_id=args.feed_id,
xsec_token=args.xsec_token or "",
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0 if result.get("status") == "success" else 1
def cmd_collect(args):
"""收藏"""
result = interact.collect(
feed_id=args.feed_id,
xsec_token=args.xsec_token or "",
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0 if result.get("status") == "success" else 1
def cmd_uncollect(args):
"""取消收藏"""
result = interact.uncollect(
feed_id=args.feed_id,
xsec_token=args.xsec_token or "",
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0 if result.get("status") == "success" else 1
def cmd_explore(args):
"""首页推荐流"""
result = explore.explore(
limit=args.limit,
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0
def cmd_publish(args):
"""发布图文笔记"""
image_paths = [p.strip() for p in args.images.split(",") if p.strip()]
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
result = publish.publish_image(
title=args.title,
content=args.content,
image_paths=image_paths,
tags=tags,
schedule_time=args.schedule_time,
auto_publish=args.auto_publish,
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0 if result.get("status") in ("success", "ready") else 1
def cmd_publish_video(args):
"""发布视频笔记"""
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
result = publish.publish_video(
title=args.title,
content=args.content,
video_path=args.video,
tags=tags,
schedule_time=args.schedule_time,
auto_publish=args.auto_publish,
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0 if result.get("status") in ("success", "ready") else 1
def cmd_publish_md(args):
"""将 Markdown 渲染为图片后发布图文笔记"""
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
# 从文件或直接文本读取 Markdown
if args.file:
with open(args.file, "r", encoding="utf-8") as f:
markdown_text = f.read()
else:
markdown_text = args.text
if not markdown_text:
print(format_output({"status": "error", "message": "需要提供 --file 或 --text"}))
return 1
result = publish.publish_markdown(
title=args.title,
markdown_text=markdown_text,
extra_content=args.content or "",
tags=tags,
schedule_time=args.schedule_time,
auto_publish=args.auto_publish,
image_width=args.width,
output_dir=args.output_dir or "",
headless=_headless(args),
cookie_path=args.cookie or DEFAULT_COOKIE_PATH,
)
print(format_output(result))
return 0 if result.get("status") in ("success", "ready") else 1
# ============================================================
# 入口
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="小红书 CLI 工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# 全局参数
parser.add_argument("--cookie", "-c", help="Cookie 文件路径", default=None)
parser.add_argument("--headless", help="无头模式: true/false(默认 true)", default='true')
subparsers = parser.add_subparsers(dest="command", help="可用命令")
# login
login_p = subparsers.add_parser("login", help="扫码登录(等待登录或超时)")
login_p.add_argument("--timeout", "-t", type=int, default=120, help="登录超时秒数")
login_p.add_argument("--headless", default='false', help="默认 false 以显示浏览器")
login_p.set_defaults(func=cmd_login)
# qrcode
qr_p = subparsers.add_parser("qrcode", help="获取登录二维码并等待扫码")
qr_p.add_argument("--headless", default='false', help="默认 false 以显示浏览器")
qr_p.set_defaults(func=cmd_qrcode)
# check-login
chk_p = subparsers.add_parser("check-login", help="检查登录状态")
chk_p.add_argument("--headless", default='true')
chk_p.set_defaults(func=cmd_check_login)
# search
s_p = subparsers.add_parser("search", help="搜索内容")
s_p.add_argument("keyword", help="搜索关键词")
s_p.add_argument("--sort-by", help="排序方式")
s_p.add_argument("--note-type", help="笔记类型")
s_p.add_argument("--publish-time", help="发布时间")
s_p.add_argument("--search-scope", help="搜索范围")
s_p.add_argument("--location", help="位置距离")
s_p.add_argument("--limit", "-n", type=int, default=10, help="返回数量")
s_p.add_argument("--headless", default='true')
s_p.set_defaults(func=cmd_search)
# feed
f_p = subparsers.add_parser("feed", help="获取笔记详情")
f_p.add_argument("feed_id", help="笔记 ID")
f_p.add_argument("xsec_token", nargs="?", help="xsec_token")
f_p.add_argument("--load-comments", "-l", action="store_true", help="加载评论")
f_p.add_argument("--max-comments", "-m", type=int, default=0, help="最大评论数")
f_p.add_argument("--headless", default='true')
f_p.set_defaults(func=cmd_feed)
# user
u_p = subparsers.add_parser("user", help="获取用户主页")
u_p.add_argument("user_id", help="用户 ID")
u_p.add_argument("xsec_token", nargs="?", help="xsec_token")
u_p.add_argument("--headless", default='true')
u_p.set_defaults(func=cmd_user)
# me (获取自己的主页)
me_p = subparsers.add_parser("me", help="获取自己的个人主页")
me_p.add_argument("--headless", default='true')
me_p.set_defaults(func=cmd_me)
# comment (发表评论)
cmt_p = subparsers.add_parser("comment", help="发表评论")
cmt_p.add_argument("feed_id", help="笔记 ID")
cmt_p.add_argument("xsec_token", nargs="?", help="xsec_token")
cmt_p.add_argument("--content", required=True, help="评论内容")
cmt_p.add_argument("--headless", default='true')
cmt_p.set_defaults(func=cmd_comment)
# reply (回复评论)
rpl_p = subparsers.add_parser("reply", help="回复评论")
rpl_p.add_argument("feed_id", help="笔记 ID")
rpl_p.add_argument("xsec_token", nargs="?", help="xsec_token")
rpl_p.add_argument("--comment-id", required=True, help="目标评论 ID")
rpl_p.add_argument("--reply-user-id", required=True, help="被回复用户 ID")
rpl_p.add_argument("--content", required=True, help="回复内容")
rpl_p.add_argument("--headless", default='true')
rpl_p.set_defaults(func=cmd_reply)
# like (点赞)
like_p = subparsers.add_parser("like", help="点赞笔记")
like_p.add_argument("feed_id", help="笔记 ID")
like_p.add_argument("xsec_token", nargs="?", help="xsec_token")
like_p.add_argument("--headless", default='true')
like_p.set_defaults(func=cmd_like)
# unlike (取消点赞)
unlike_p = subparsers.add_parser("unlike", help="取消点赞")
unlike_p.add_argument("feed_id", help="笔记 ID")
unlike_p.add_argument("xsec_token", nargs="?", help="xsec_token")
unlike_p.add_argument("--headless", default='true')
unlike_p.set_defaults(func=cmd_unlike)
# collect (收藏)
col_p = subparsers.add_parser("collect", help="收藏笔记")
col_p.add_argument("feed_id", help="笔记 ID")
col_p.add_argument("xsec_token", nargs="?", help="xsec_token")
col_p.add_argument("--headless", default='true')
col_p.set_defaults(func=cmd_collect)
# uncollect (取消收藏)
ucol_p = subparsers.add_parser("uncollect", help="取消收藏")
ucol_p.add_argument("feed_id", help="笔记 ID")
ucol_p.add_argument("xsec_token", nargs="?", help="xsec_token")
ucol_p.add_argument("--headless", default='true')
ucol_p.set_defaults(func=cmd_uncollect)
# explore (首页推荐流)
exp_p = subparsers.add_parser("explore", help="获取首页推荐流")
exp_p.add_argument("--limit", "-n", type=int, default=20, help="返回数量")
exp_p.add_argument("--headless", default='true')
exp_p.set_defaults(func=cmd_explore)
# publish (发布图文笔记)
pub_p = subparsers.add_parser("publish", help="发布图文笔记")
pub_p.add_argument("--title", required=True, help="标题(建议 <=20 字)")
pub_p.add_argument("--content", required=True, help="正文内容")
pub_p.add_argument("--images", required=True, help="图片路径,逗号分隔")
pub_p.add_argument("--tags", help="话题标签,逗号分隔")
pub_p.add_argument("--schedule-time", help="定时发布(格式: 2025-01-01 12:00)")
pub_p.add_argument("--auto-publish", action="store_true", help="自动点击发布(默认停在发布按钮处)")
pub_p.add_argument("--headless", default='true')
pub_p.set_defaults(func=cmd_publish)
# publish-video (发布视频笔记)
pubv_p = subparsers.add_parser("publish-video", help="发布视频笔记")
pubv_p.add_argument("--title", required=True, help="标题")
pubv_p.add_argument("--content", required=True, help="正文内容")
pubv_p.add_argument("--video", required=True, help="视频文件路径")
pubv_p.add_argument("--tags", help="话题标签,逗号分隔")
pubv_p.add_argument("--schedule-time", help="定时发布(格式: 2025-01-01 12:00)")
pubv_p.add_argument("--auto-publish", action="store_true", help="自动点击发布")
pubv_p.add_argument("--headless", default='true')
pubv_p.set_defaults(func=cmd_publish_video)
# publish-md (Markdown 转图片后发布)
pubmd_p = subparsers.add_parser("publish-md", help="Markdown 渲染为图片后发布")
pubmd_p.add_argument("--title", required=True, help="标题")
pubmd_p.add_argument("--file", help="Markdown 文件路径")
pubmd_p.add_argument("--text", help="Markdown 文本(与 --file 二选一)")
pubmd_p.add_argument("--content", help="正文区额外文字说明")
pubmd_p.add_argument("--tags", help="话题标签,逗号分隔")
pubmd_p.add_argument("--schedule-time", help="定时发布")
pubmd_p.add_argument("--auto-publish", action="store_true", help="自动点击发布")
pubmd_p.add_argument("--width", type=int, default=1080, help="图片宽度(默认 1080)")
pubmd_p.add_argument("--output-dir", help="图片输出目录(默认临时目录)")
pubmd_p.add_argument("--headless", default='true')
pubmd_p.set_defaults(func=cmd_publish_md)
args = parser.parse_args()
if not args.command:
parser.print_help()
return 0
try:
return args.func(args)
except CaptchaError as e:
print(format_output({
"status": "error",
"error_type": "CaptchaError",
"message": str(e),
"captcha_url": e.captcha_url,
}))
return 1
except Exception as e:
print(format_output({
"status": "error",
"error_type": type(e).__name__,
"message": str(e),
}))
return 1
if __name__ == "__main__":
sys.exit(main())
```
### scripts/client.py
```python
"""
小红书浏览器客户端封装
基于 xiaohongshu-mcp Go 源码翻译为 Python Playwright
"""
import json
import os
import random
import sys
import time
from pathlib import Path
from typing import Optional, Any, Dict
try:
from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright
except ImportError:
print("请先安装 playwright: pip install playwright && playwright install chromium")
raise
# Cookie 文件路径
DEFAULT_COOKIE_PATH = os.path.expanduser("~/.xiaohongshu/cookies.json")
# 验证码/安全拦截页面的 URL 特征
CAPTCHA_URL_PATTERNS = [
'captcha',
'security-verification',
'website-login/captcha',
'verifyType',
'verifyBiz',
]
# 验证码页面的标题特征
CAPTCHA_TITLE_PATTERNS = [
'安全验证',
'验证码',
'captcha',
'Security Verification',
]
class CaptchaError(Exception):
"""触发验证码异常"""
def __init__(self, url: str, message: str = ""):
self.captcha_url = url
super().__init__(message or f"触发安全验证: {url}")
class XiaohongshuClient:
"""小红书浏览器客户端"""
# 频率控制参数
MIN_INTERVAL = 3.0 # 两次导航最小间隔(秒)
MAX_INTERVAL = 6.0 # 两次导航最大间隔(秒)
BURST_THRESHOLD = 5 # 连续请求阈值,超过后增加额外冷却
BURST_COOLDOWN = 10.0 # 连续请求冷却时间(秒)
def __init__(
self,
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
timeout: int = 60,
):
self.headless = headless
self.cookie_path = cookie_path
self.timeout = timeout * 1000 # 转换为毫秒
self.playwright: Optional[Playwright] = None
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
# 请求计时器(实例变量,避免跨实例干扰)
self._last_navigate_time: float = 0.0
self._navigate_count: int = 0
self._session_start: float = 0.0
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def start(self):
"""启动浏览器"""
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=self.headless,
args=['--disable-blink-features=AutomationControlled'],
)
# 创建上下文
self.context = self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
)
# 加载 Cookie
self._load_cookies()
# 创建页面
self.page = self.context.new_page()
self.page.set_default_timeout(self.timeout)
def close(self):
"""关闭浏览器"""
# 保存 Cookie
self._save_cookies()
if self.page:
self.page.close()
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
def _load_cookies(self):
"""从文件加载 Cookie"""
if not os.path.exists(self.cookie_path):
return
try:
with open(self.cookie_path, 'r', encoding='utf-8') as f:
cookies = json.load(f)
if cookies:
self.context.add_cookies(cookies)
print(f"已加载 {len(cookies)} 个 Cookie", file=sys.stderr)
except Exception as e:
print(f"加载 Cookie 失败: {e}", file=sys.stderr)
def _save_cookies(self):
"""保存 Cookie 到文件"""
if not self.context:
return
try:
cookies = self.context.cookies()
os.makedirs(os.path.dirname(self.cookie_path), exist_ok=True)
with open(self.cookie_path, 'w', encoding='utf-8') as f:
json.dump(cookies, f, ensure_ascii=False, indent=2)
print(f"已保存 {len(cookies)} 个 Cookie 到 {self.cookie_path}", file=sys.stderr)
except Exception as e:
print(f"保存 Cookie 失败: {e}", file=sys.stderr)
def _throttle(self):
"""请求频率控制:模拟人类浏览节奏"""
now = time.time()
# 初始化会话起点
if self._session_start == 0:
self._session_start = now
# 计算距上次导航的间隔
elapsed = now - self._last_navigate_time if self._last_navigate_time > 0 else 999
# 连续请求达到阈值 → 额外冷却
if self._navigate_count > 0 and self._navigate_count % self.BURST_THRESHOLD == 0:
cooldown = self.BURST_COOLDOWN + random.uniform(0, 3)
if elapsed < cooldown:
wait = cooldown - elapsed
print(f"反爬保护: 连续请求 {self._navigate_count} 次,冷却 {wait:.1f}s...", file=sys.stderr)
time.sleep(wait)
elif elapsed < self.MIN_INTERVAL:
# 普通间隔控制
wait = random.uniform(self.MIN_INTERVAL, self.MAX_INTERVAL) - elapsed
if wait > 0:
time.sleep(wait)
self._last_navigate_time = time.time()
self._navigate_count += 1
def _check_captcha(self) -> bool:
"""
检测当前页面是否被重定向到验证码页面
Returns:
True 表示触发了验证码
"""
if not self.page:
return False
try:
current_url = self.page.url.lower()
for pattern in CAPTCHA_URL_PATTERNS:
if pattern in current_url:
return True
page_title = self.page.title().lower()
for pattern in CAPTCHA_TITLE_PATTERNS:
if pattern.lower() in page_title:
return True
except Exception:
pass
return False
def _handle_captcha(self):
"""
处理验证码拦截:抛出异常通知调用方
Raises:
CaptchaError
"""
url = self.page.url if self.page else "unknown"
raise CaptchaError(
url=url,
message=(
f"触发小红书安全验证!\n"
f" 验证页面: {url}\n"
f" 本次会话已请求 {self._navigate_count} 次\n"
f" 建议: 1) 等待几分钟后重试 2) 用 --headless=false 手动过验证码 "
f"3) 重新扫码登录"
),
)
def navigate(self, url: str, wait_until: str = "domcontentloaded"):
"""导航到指定 URL(含频率控制和验证码检测)"""
if not self.page:
raise RuntimeError("浏览器未启动")
# 频率控制
self._throttle()
self.page.goto(url, wait_until=wait_until)
# 等待页面稳定
time.sleep(random.uniform(1.5, 3.0))
# 尝试等待 networkidle,但不强制
try:
self.page.wait_for_load_state("networkidle", timeout=8000)
except Exception:
pass
# 验证码检测
if self._check_captcha():
self._handle_captcha()
def wait_for_initial_state(self, timeout: int = 30000, retries: int = 2):
"""等待 __INITIAL_STATE__ 加载完成,带重试和回退"""
if not self.page:
raise RuntimeError("浏览器未启动")
for attempt in range(retries + 1):
# 先检测验证码
if self._check_captcha():
self._handle_captcha()
try:
self.page.wait_for_function(
"() => window.__INITIAL_STATE__ !== undefined",
timeout=timeout,
)
return
except Exception:
if attempt < retries:
print(f"__INITIAL_STATE__ 等待超时,刷新重试 ({attempt + 1}/{retries})...", file=sys.stderr)
self.page.reload(wait_until="domcontentloaded")
time.sleep(random.uniform(2, 4))
# 刷新后再检测验证码
if self._check_captcha():
self._handle_captcha()
else:
print("警告: __INITIAL_STATE__ 加载超时,尝试继续执行", file=sys.stderr)
def get_initial_state(self) -> Dict[str, Any]:
"""获取 __INITIAL_STATE__ 数据"""
if not self.page:
raise RuntimeError("浏览器未启动")
# 使用 structuredClone 或手动提取需要的部分,避免循环引用
result = self.page.evaluate("""() => {
if (!window.__INITIAL_STATE__) {
return '';
}
// 只提取需要的顶层结构
const state = window.__INITIAL_STATE__;
const result = {};
if (state.search) result.search = state.search;
if (state.feed) result.feed = state.feed;
if (state.note) result.note = state.note;
if (state.user) result.user = state.user;
return JSON.stringify(result);
}""")
if not result:
return {}
return json.loads(result)
def get_data_by_path(self, path: str) -> Any:
"""
根据路径获取 __INITIAL_STATE__ 中的数据
例如: "search.feeds", "note.noteDetailMap", "user.userPageData"
"""
state = self.get_initial_state()
# 处理 value/_value
def get_value(obj):
if isinstance(obj, dict):
if 'value' in obj:
return obj['value']
if '_value' in obj:
return obj['_value']
return obj
keys = path.split('.')
current = state
for key in keys:
if current is None:
return None
if isinstance(current, dict):
current = current.get(key)
else:
return None
current = get_value(current)
return current
# DEPRECATED: use login.LoginAction.check_login_status() instead
def check_login_status(self) -> bool:
"""检查登录状态(已废弃,请使用 login.LoginAction)"""
if not self.page:
raise RuntimeError("浏览器未启动")
# 访问首页
self.navigate("https://www.xiaohongshu.com/explore")
time.sleep(1)
# 检查是否存在登录后的元素
try:
element = self.page.locator('.main-container .user .link-wrapper .channel')
count = element.count()
return count > 0
except Exception:
return False
# DEPRECATED: use login.LoginAction.get_wechat_qrcode() instead
def get_qrcode(self) -> Optional[str]:
"""获取登录二维码(已废弃,请使用 login.LoginAction)"""
if not self.page:
raise RuntimeError("浏览器未启动")
# 访问首页触发二维码弹窗
self.navigate("https://www.xiaohongshu.com/explore")
time.sleep(2)
# 检查是否已登录
if self.check_login_status():
return None
# 获取二维码图片
try:
qrcode = self.page.locator('.login-container .qrcode-img')
src = qrcode.get_attribute('src')
return src
except Exception:
return None
# DEPRECATED: use login.LoginAction.wait_for_login() instead
def wait_for_login(self, timeout: int = 120) -> bool:
"""
等待用户扫码登录
Args:
timeout: 超时时间(秒)
Returns:
是否登录成功
"""
if not self.page:
raise RuntimeError("浏览器未启动")
start_time = time.time()
while time.time() - start_time < timeout:
if self.check_login_status():
# 保存登录后的 Cookie
self._save_cookies()
return True
time.sleep(1)
return False
def scroll_to_bottom(self, distance: int = 500):
"""滚动页面"""
if not self.page:
raise RuntimeError("浏览器未启动")
self.page.evaluate(f"window.scrollBy(0, {distance})")
time.sleep(0.5)
def create_client(
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
timeout: int = 60,
) -> XiaohongshuClient:
"""创建小红书客户端的便捷函数"""
return XiaohongshuClient(
headless=headless,
cookie_path=cookie_path,
timeout=timeout,
)
```
### scripts/comment.py
```python
"""
小红书评论模块
基于 xiaohongshu-mcp/comment_feed.go 翻译
整合 xiaohongshu-ops 的安全评论理念(长度校验、人性化延迟、频率检测)
"""
import json
import sys
import time
import random
from typing import Optional, Dict, Any
from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH
# 评论安全常量(来自 xiaohongshu-ops)
MAX_COMMENT_LENGTH = 280
TYPING_DELAY_MIN = 30 # 每字符输入延迟下限(ms)
TYPING_DELAY_MAX = 80 # 每字符输入延迟上限(ms)
PRE_SUBMIT_DELAY_MIN = 1.5 # 提交前等待下限(秒)
PRE_SUBMIT_DELAY_MAX = 3.0 # 提交前等待上限(秒)
POST_SUBMIT_COOLDOWN_MIN = 8 # 提交后冷却下限(秒)
POST_SUBMIT_COOLDOWN_MAX = 15 # 提交后冷却上限(秒)
class CommentAction:
"""评论动作"""
def __init__(self, client: XiaohongshuClient):
self.client = client
def _make_feed_url(self, feed_id: str, xsec_token: str, xsec_source: str = "pc_feed") -> str:
"""构建笔记详情 URL"""
return f"https://www.xiaohongshu.com/explore/{feed_id}?xsec_token={xsec_token}&xsec_source={xsec_source}"
def _navigate_to_feed(self, feed_id: str, xsec_token: str):
"""导航到笔记详情页并等待加载"""
url = self._make_feed_url(feed_id, xsec_token)
print(f"打开笔记详情页: {url}", file=sys.stderr)
self.client.navigate(url)
self.client.wait_for_initial_state()
time.sleep(2)
@staticmethod
def validate_comment(content: str) -> Optional[str]:
"""
评论内容校验(来自 ops 安全理念)
Returns:
None 表示校验通过,否则返回错误原因
"""
if not content or not content.strip():
return "评论内容不能为空"
if len(content) > MAX_COMMENT_LENGTH:
return f"评论内容超长({len(content)}/{MAX_COMMENT_LENGTH})"
return None
def _check_rate_limit(self) -> bool:
"""
检测是否触发了评论频率限制(来自 ops 安全理念)
Returns:
True 表示被限流
"""
page = self.client.page
try:
# 检查常见的频率限制提示
rate_limit_selectors = [
'div.d-toast:has-text("频繁")',
'div.d-toast:has-text("操作太快")',
'div.d-toast:has-text("稍后再试")',
'div.d-toast:has-text("限制")',
]
for sel in rate_limit_selectors:
toast = page.locator(sel)
if toast.count() > 0 and toast.first.is_visible():
toast_text = toast.first.text_content()
print(f"检测到频率限制: {toast_text}", file=sys.stderr)
return True
except Exception:
pass
return False
def _verify_input_placeholder(self, expected_hint: Optional[str] = None) -> bool:
"""
验证输入框 placeholder 是否正确(来自 ops 安全理念)
确保输入框已正确激活,防止误输入
Args:
expected_hint: 期望的 placeholder 文本片段(如 "回复 用户名")
Returns:
True 表示输入框状态正确
"""
if not expected_hint:
return True
page = self.client.page
try:
placeholder = page.locator('div.input-box div.content-edit p.content-input')
if placeholder.count() > 0:
attr = placeholder.first.get_attribute('data-placeholder') or ''
text = placeholder.first.text_content() or ''
if expected_hint in attr or expected_hint in text:
print(f"输入框验证通过: {attr or text}", file=sys.stderr)
return True
else:
print(f"输入框 placeholder 不匹配: 期望含「{expected_hint}」, 实际「{attr or text}」", file=sys.stderr)
except Exception:
pass
return True # 获取失败时不阻断流程
def _type_and_submit(self, content: str) -> bool:
"""在评论输入框中输入文字并提交(整合 ops 人性化延迟)"""
page = self.client.page
# 点击评论输入框激活(span 占位符)
try:
input_trigger = page.locator('div.input-box div.content-edit span')
input_trigger.first.click()
time.sleep(0.5)
except Exception as e:
print(f"点击评论输入框失败: {e}", file=sys.stderr)
return False
# 在 contenteditable 的 p 元素中输入文字(人性化随机延迟)
try:
input_el = page.locator('div.input-box div.content-edit p.content-input')
input_el.first.click()
time.sleep(0.3)
typing_delay = random.randint(TYPING_DELAY_MIN, TYPING_DELAY_MAX)
page.keyboard.type(content, delay=typing_delay)
# 提交前随机等待,模拟人类检查
pre_wait = random.uniform(PRE_SUBMIT_DELAY_MIN, PRE_SUBMIT_DELAY_MAX)
time.sleep(pre_wait)
except Exception as e:
print(f"输入评论内容失败: {e}", file=sys.stderr)
return False
# 点击发送按钮
try:
submit_btn = page.locator('div.bottom button.submit')
submit_btn.first.click()
time.sleep(1.5)
except Exception as e:
print(f"点击发送按钮失败: {e}", file=sys.stderr)
return False
# 检查是否触发频率限制
if self._check_rate_limit():
return False
return True
def post_comment(
self,
feed_id: str,
xsec_token: str,
content: str,
) -> Dict[str, Any]:
"""
发表评论
Args:
feed_id: 笔记 ID
xsec_token: xsec_token
content: 评论内容
Returns:
操作结果
"""
# 评论内容校验
error = self.validate_comment(content)
if error:
return {
"status": "error",
"feed_id": feed_id,
"content": content,
"message": error,
}
self._navigate_to_feed(feed_id, xsec_token)
# 滚动到评论区域
self.client.page.evaluate("""() => {
const comments = document.querySelector('.comments-wrap') ||
document.querySelector('.comment-wrapper');
if (comments) comments.scrollIntoView();
}""")
time.sleep(1)
success = self._type_and_submit(content)
if success:
print("评论发送成功", file=sys.stderr)
# 提交后冷却(one-send-per-turn 理念)
cooldown = random.uniform(POST_SUBMIT_COOLDOWN_MIN, POST_SUBMIT_COOLDOWN_MAX)
print(f"提交后冷却 {cooldown:.1f}s...", file=sys.stderr)
time.sleep(cooldown)
return {
"status": "success",
"feed_id": feed_id,
"content": content,
"message": "评论发送成功",
}
else:
return {
"status": "error",
"feed_id": feed_id,
"content": content,
"message": "评论发送失败",
}
def reply_to_comment(
self,
feed_id: str,
xsec_token: str,
comment_id: str,
reply_user_id: str,
content: str,
) -> Dict[str, Any]:
"""
回复评论
Args:
feed_id: 笔记 ID
xsec_token: xsec_token
comment_id: 目标评论 ID
reply_user_id: 被回复用户 ID
content: 回复内容
Returns:
操作结果
"""
# 评论内容校验
error = self.validate_comment(content)
if error:
return {
"status": "error",
"feed_id": feed_id,
"comment_id": comment_id,
"content": content,
"message": error,
}
self._navigate_to_feed(feed_id, xsec_token)
page = self.client.page
# 滚动到评论区域
page.evaluate("""() => {
const comments = document.querySelector('.comments-wrap') ||
document.querySelector('.comment-wrapper');
if (comments) comments.scrollIntoView();
}""")
time.sleep(1)
# 找到目标评论并点击"回复"按钮
try:
# 尝试通过评论 ID 定位
comment_el = page.locator(f'[data-comment-id="{comment_id}"]')
if comment_el.count() == 0:
# 回退:通过遍历评论列表查找
comment_el = page.locator('.comment-item').filter(has_text=comment_id)
if comment_el.count() > 0:
# 悬停以显示回复按钮
comment_el.first.hover()
time.sleep(0.3)
# 点击回复按钮
reply_btn = comment_el.first.locator('.reply-btn, button:has-text("回复"), span:has-text("回复")')
if reply_btn.count() > 0:
reply_btn.first.click()
time.sleep(0.5)
# 验证输入框 placeholder(ops 技巧)
self._verify_input_placeholder(f"回复")
else:
print("未找到回复按钮,尝试直接在评论框回复", file=sys.stderr)
else:
print(f"未找到评论 {comment_id},尝试直接在评论框回复", file=sys.stderr)
except Exception as e:
print(f"定位目标评论失败: {e}", file=sys.stderr)
# 输入回复内容并发送
success = self._type_and_submit(content)
if success:
print("回复发送成功", file=sys.stderr)
# 提交后冷却
cooldown = random.uniform(POST_SUBMIT_COOLDOWN_MIN, POST_SUBMIT_COOLDOWN_MAX)
print(f"提交后冷却 {cooldown:.1f}s...", file=sys.stderr)
time.sleep(cooldown)
return {
"status": "success",
"feed_id": feed_id,
"comment_id": comment_id,
"reply_user_id": reply_user_id,
"content": content,
"message": "回复发送成功",
}
else:
return {
"status": "error",
"feed_id": feed_id,
"comment_id": comment_id,
"content": content,
"message": "回复发送失败",
}
def post_comment(
feed_id: str,
xsec_token: str,
content: str,
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
"""
发表评论
Args:
feed_id: 笔记 ID
xsec_token: xsec_token
content: 评论内容
headless: 是否无头模式
cookie_path: Cookie 路径
Returns:
操作结果
"""
client = XiaohongshuClient(
headless=headless,
cookie_path=cookie_path,
)
try:
client.start()
action = CommentAction(client)
return action.post_comment(feed_id, xsec_token, content)
finally:
client.close()
def reply_to_comment(
feed_id: str,
xsec_token: str,
comment_id: str,
reply_user_id: str,
content: str,
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
"""
回复评论
Args:
feed_id: 笔记 ID
xsec_token: xsec_token
comment_id: 目标评论 ID
reply_user_id: 被回复用户 ID
content: 回复内容
headless: 是否无头模式
cookie_path: Cookie 路径
Returns:
操作结果
"""
client = XiaohongshuClient(
headless=headless,
cookie_path=cookie_path,
)
try:
client.start()
action = CommentAction(client)
return action.reply_to_comment(
feed_id, xsec_token, comment_id, reply_user_id, content
)
finally:
client.close()
```
### scripts/explore.py
```python
"""
小红书首页推荐流模块
基于 xiaohongshu-mcp/feeds.go 翻译
"""
import json
import sys
import time
from typing import Optional, Dict, Any, List
from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH
class ExploreAction:
"""首页推荐流动作"""
EXPLORE_URL = "https://www.xiaohongshu.com/explore"
def __init__(self, client: XiaohongshuClient):
self.client = client
def _extract_feeds(self) -> List[Dict[str, Any]]:
"""从 __INITIAL_STATE__ 提取首页推荐笔记列表"""
page = self.client.page
result = page.evaluate("""() => {
var s = window.__INITIAL_STATE__;
if (!s || !s.feed || !s.feed.feeds) return '';
var feeds = s.feed.feeds;
var data = feeds;
if (feeds.value !== undefined) data = feeds.value;
else if (feeds._value !== undefined) data = feeds._value;
if (!data || !Array.isArray(data)) return '';
// 展平二维数组(如果有的话)
var flat = [];
for (var i = 0; i < data.length; i++) {
if (Array.isArray(data[i])) {
for (var j = 0; j < data[i].length; j++) flat.push(data[i][j]);
} else {
flat.push(data[i]);
}
}
// 提取每条笔记的关键信息
return JSON.stringify(flat.map(function(item) {
var nc = item.noteCard || item.model_type === 'note' ? item : {};
if (item.noteCard) nc = item.noteCard;
var info = nc.interactInfo || {};
var user = nc.user || {};
var cover = nc.cover || {};
return {
id: item.id || '',
xsecToken: item.xsecToken || '',
noteCard: {
displayTitle: nc.displayTitle || nc.title || '',
type: nc.type || '',
interactInfo: {
likedCount: info.likedCount || '0',
collectedCount: info.collectedCount || '0',
commentCount: info.commentCount || '0',
sharedCount: info.sharedCount || '0'
},
user: {
nickname: user.nickname || user.nickName || '',
userId: user.userId || ''
},
cover: {
urlDefault: cover.urlDefault || cover.urlPre || ''
}
}
};
}));
}""")
if not result:
return []
try:
return json.loads(result)
except json.JSONDecodeError:
return []
def get_feeds(self, limit: int = 20) -> Dict[str, Any]:
"""
获取首页推荐流
Args:
limit: 最大返回数量
Returns:
推荐笔记列表
"""
client = self.client
print("打开首页推荐流...", file=sys.stderr)
client.navigate(self.EXPLORE_URL)
client.wait_for_initial_state()
time.sleep(2)
feeds = self._extract_feeds()
# 如果首页数据较少,尝试滚动加载更多
if len(feeds) < limit:
for _ in range(3):
client.scroll_to_bottom(800)
time.sleep(1.5)
new_feeds = self._extract_feeds()
if len(new_feeds) > len(feeds):
feeds = new_feeds
if len(feeds) >= limit:
break
# 截取到 limit
feeds = feeds[:limit]
print(f"获取到 {len(feeds)} 条推荐笔记", file=sys.stderr)
return {
"count": len(feeds),
"feeds": feeds,
}
def explore(
limit: int = 20,
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
"""
获取首页推荐流
Args:
limit: 最大返回数量
headless: 是否无头模式
cookie_path: Cookie 路径
Returns:
推荐笔记列表
"""
client = XiaohongshuClient(
headless=headless,
cookie_path=cookie_path,
)
try:
client.start()
action = ExploreAction(client)
return action.get_feeds(limit=limit)
finally:
client.close()
```
### scripts/feed.py
```python
"""
小红书笔记详情模块
基于 xiaohongshu-mcp/feed_detail.go 翻译
"""
import json
import sys
import time
import random
from typing import Optional, Dict, Any, Tuple
from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH
class FeedDetailAction:
"""笔记详情动作"""
def __init__(self, client: XiaohongshuClient):
self.client = client
def _make_feed_detail_url(self, feed_id: str, xsec_token: str, xsec_source: str = "pc_feed") -> str:
"""构建笔记详情 URL"""
return f"https://www.xiaohongshu.com/explore/{feed_id}?xsec_token={xsec_token}&xsec_source={xsec_source}"
def _scroll_to_comments(self):
"""滚动到评论区域"""
self.client.page.evaluate("""() => {
const comments = document.querySelector('.comments-wrap');
if (comments) {
comments.scrollIntoView();
}
}""")
time.sleep(0.5)
def _load_comments(self, max_items: int = 0):
"""
加载评论(滚动 + 点击加载更多)
Args:
max_items: 最大评论数量,0 表示全部
"""
page = self.client.page
# 滚动到评论区域
self._scroll_to_comments()
# 随机延迟,模拟人类行为
def human_delay():
time.sleep(random.uniform(0.3, 0.7))
max_attempts = 50 if max_items == 0 else max_items * 3
last_count = 0
stagnant = 0
for attempt in range(max_attempts):
# 检查是否有"加载更多"按钮
try:
more_btn = page.locator('.more-comments')
if more_btn.is_visible():
more_btn.click()
human_delay()
except Exception:
pass
# 滚动
page.evaluate("window.scrollBy(0, 300)")
human_delay()
# 获取当前评论数量
try:
comments = page.locator('.comment-item')
current_count = comments.count()
except Exception:
current_count = 0
if current_count == last_count:
stagnant += 1
if stagnant >= 5:
break
else:
stagnant = 0
last_count = current_count
# 检查是否达到目标数量
if max_items > 0 and current_count >= max_items:
break
def _extract_feed_detail(self, feed_id: str) -> Optional[Dict[str, Any]]:
"""提取笔记详情数据"""
page = self.client.page
# 传入 feed_id,只提取对应条目,避免序列化整个 Vue Reactive 代理
result = page.evaluate("""(fid) => {
var s = window.__INITIAL_STATE__;
if (!s || !s.note || !s.note.noteDetailMap) return '';
var ndm = s.note.noteDetailMap;
var map = ndm;
if (ndm.value !== undefined) map = ndm.value;
else if (ndm._value !== undefined) map = ndm._value;
var detail = map[fid];
if (!detail) return '';
return JSON.stringify(detail);
}""", feed_id)
if not result:
return None
try:
return json.loads(result)
except json.JSONDecodeError:
return None
def get_feed_detail(
self,
feed_id: str,
xsec_token: str,
load_comments: bool = False,
max_comments: int = 0,
xsec_source: str = "pc_feed",
) -> Optional[Dict[str, Any]]:
"""
获取笔记详情
Args:
feed_id: 笔记 ID
xsec_token: xsec_token 参数
load_comments: 是否加载评论
max_comments: 最大评论数量,0 表示全部
xsec_source: 来源标识(pc_feed/pc_note/pc_search)
Returns:
笔记详情数据
"""
client = self.client
# 构建 URL 并导航
url = self._make_feed_detail_url(feed_id, xsec_token, xsec_source)
print(f"打开笔记详情页: {url}", file=sys.stderr)
client.navigate(url)
# 等待页面加载
client.wait_for_initial_state()
time.sleep(2)
# 重试提取:noteDetailMap 可能需要额外时间填充
detail = None
for attempt in range(3):
detail = self._extract_feed_detail(feed_id)
if detail:
break
if attempt < 2:
print(f"noteDetailMap 未就绪,等待重试 ({attempt + 1}/3)...", file=sys.stderr)
time.sleep(2)
# 加载评论
if detail and load_comments:
print("加载评论中...", file=sys.stderr)
self._load_comments(max_comments)
# 重新提取以包含评论数据
detail = self._extract_feed_detail(feed_id) or detail
if not detail:
print("未获取到笔记详情", file=sys.stderr)
return None
return detail
def feed_detail(
feed_id: str,
xsec_token: str,
load_comments: bool = False,
max_comments: int = 0,
xsec_source: str = "pc_feed",
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Optional[Dict[str, Any]]:
"""
获取笔记详情
Args:
feed_id: 笔记 ID
xsec_token: xsec_token 参数
load_comments: 是否加载评论
max_comments: 最大评论数量
xsec_source: 来源标识
headless: 是否无头模式
cookie_path: Cookie 路径
Returns:
笔记详情数据
"""
client = XiaohongshuClient(
headless=headless,
cookie_path=cookie_path,
)
try:
client.start()
action = FeedDetailAction(client)
return action.get_feed_detail(
feed_id=feed_id,
xsec_token=xsec_token,
load_comments=load_comments,
max_comments=max_comments,
xsec_source=xsec_source,
)
finally:
client.close()
```
### scripts/interact.py
```python
"""
小红书互动模块(点赞 / 取消点赞 / 收藏 / 取消收藏)
基于 xiaohongshu-mcp/like_favorite.go 翻译
"""
import json
import sys
import time
from typing import Optional, Dict, Any, Tuple
from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH
class InteractAction:
"""互动动作(点赞、收藏)"""
# CSS 选择器
LIKE_SELECTOR = '.interact-container .left .like-wrapper'
LIKE_ACTIVE_SELECTOR = '.interact-container .left .like-wrapper.active, .interact-container .left .like-wrapper.liked'
COLLECT_SELECTOR = '.interact-container .left .collect-wrapper'
COLLECT_ACTIVE_SELECTOR = '.interact-container .left .collect-wrapper.active, .interact-container .left .collect-wrapper.collected'
def __init__(self, client: XiaohongshuClient):
self.client = client
def _make_feed_url(self, feed_id: str, xsec_token: str, xsec_source: str = "pc_feed") -> str:
"""构建笔记详情 URL"""
return f"https://www.xiaohongshu.com/explore/{feed_id}?xsec_token={xsec_token}&xsec_source={xsec_source}"
def _navigate_to_feed(self, feed_id: str, xsec_token: str):
"""导航到笔记详情页并等待加载"""
url = self._make_feed_url(feed_id, xsec_token)
print(f"打开笔记详情页: {url}", file=sys.stderr)
self.client.navigate(url)
self.client.wait_for_initial_state()
time.sleep(2)
def _get_interact_state(self, feed_id: str) -> Dict[str, bool]:
"""
从 __INITIAL_STATE__ 获取当前互动状态
Returns:
{"liked": bool, "collected": bool}
"""
page = self.client.page
result = page.evaluate("""(fid) => {
var s = window.__INITIAL_STATE__;
if (!s || !s.note || !s.note.noteDetailMap) return '';
var ndm = s.note.noteDetailMap;
var map = ndm;
if (ndm.value !== undefined) map = ndm.value;
else if (ndm._value !== undefined) map = ndm._value;
var detail = map[fid];
if (!detail || !detail.note || !detail.note.interactInfo) return '';
var info = detail.note.interactInfo;
return JSON.stringify({
liked: !!info.liked,
collected: !!info.collected
});
}""", feed_id)
if not result:
return {"liked": False, "collected": False}
try:
return json.loads(result)
except json.JSONDecodeError:
return {"liked": False, "collected": False}
def _click_button(self, selector: str, label: str) -> bool:
"""点击互动按钮"""
page = self.client.page
try:
btn = page.locator(selector)
if btn.count() > 0:
btn.first.click()
time.sleep(1.5)
return True
else:
print(f"未找到{label}按钮: {selector}", file=sys.stderr)
return False
except Exception as e:
print(f"点击{label}按钮失败: {e}", file=sys.stderr)
return False
def like(self, feed_id: str, xsec_token: str) -> Dict[str, Any]:
"""
点赞笔记
Returns:
操作结果
"""
self._navigate_to_feed(feed_id, xsec_token)
state = self._get_interact_state(feed_id)
if state.get("liked"):
return {
"status": "success",
"action": "like",
"feed_id": feed_id,
"already_liked": True,
"message": "已经点赞过了",
}
success = self._click_button(self.LIKE_SELECTOR, "点赞")
return {
"status": "success" if success else "error",
"action": "like",
"feed_id": feed_id,
"message": "点赞成功" if success else "点赞失败",
}
def unlike(self, feed_id: str, xsec_token: str) -> Dict[str, Any]:
"""
取消点赞
Returns:
操作结果
"""
self._navigate_to_feed(feed_id, xsec_token)
state = self._get_interact_state(feed_id)
if not state.get("liked"):
return {
"status": "success",
"action": "unlike",
"feed_id": feed_id,
"already_unliked": True,
"message": "尚未点赞,无需取消",
}
success = self._click_button(self.LIKE_SELECTOR, "取消点赞")
return {
"status": "success" if success else "error",
"action": "unlike",
"feed_id": feed_id,
"message": "取消点赞成功" if success else "取消点赞失败",
}
def collect(self, feed_id: str, xsec_token: str) -> Dict[str, Any]:
"""
收藏笔记
Returns:
操作结果
"""
self._navigate_to_feed(feed_id, xsec_token)
state = self._get_interact_state(feed_id)
if state.get("collected"):
return {
"status": "success",
"action": "collect",
"feed_id": feed_id,
"already_collected": True,
"message": "已经收藏过了",
}
success = self._click_button(self.COLLECT_SELECTOR, "收藏")
return {
"status": "success" if success else "error",
"action": "collect",
"feed_id": feed_id,
"message": "收藏成功" if success else "收藏失败",
}
def uncollect(self, feed_id: str, xsec_token: str) -> Dict[str, Any]:
"""
取消收藏
Returns:
操作结果
"""
self._navigate_to_feed(feed_id, xsec_token)
state = self._get_interact_state(feed_id)
if not state.get("collected"):
return {
"status": "success",
"action": "uncollect",
"feed_id": feed_id,
"already_uncollected": True,
"message": "尚未收藏,无需取消",
}
success = self._click_button(self.COLLECT_SELECTOR, "取消收藏")
return {
"status": "success" if success else "error",
"action": "uncollect",
"feed_id": feed_id,
"message": "取消收藏成功" if success else "取消收藏失败",
}
# ============================================================
# 便捷函数
# ============================================================
def _run_interact(action_name: str, feed_id: str, xsec_token: str,
headless: bool = True, cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
"""通用互动操作执行器"""
client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
try:
client.start()
action = InteractAction(client)
method = getattr(action, action_name)
return method(feed_id, xsec_token)
finally:
client.close()
def like(feed_id: str, xsec_token: str, headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
"""点赞笔记"""
return _run_interact("like", feed_id, xsec_token, headless, cookie_path)
def unlike(feed_id: str, xsec_token: str, headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
"""取消点赞"""
return _run_interact("unlike", feed_id, xsec_token, headless, cookie_path)
def collect(feed_id: str, xsec_token: str, headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
"""收藏笔记"""
return _run_interact("collect", feed_id, xsec_token, headless, cookie_path)
def uncollect(feed_id: str, xsec_token: str, headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH) -> Dict[str, Any]:
"""取消收藏"""
return _run_interact("uncollect", feed_id, xsec_token, headless, cookie_path)
```
### scripts/login.py
```python
"""
小红书登录模块
基于 xiaohongshu-mcp/login.go 翻译
支持生成微信登录二维码,保存供主模型发送
"""
import json
import sys
import time
import base64
import os
from typing import Optional, Tuple, Dict, Any
from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH
# QRCode 图片保存目录 - 放在 skill 文件夹内
SKILL_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
QRCODE_DIR = os.path.join(SKILL_DIR, "data")
QRCODE_PATH = os.path.join(QRCODE_DIR, "qrcode.png")
class LoginAction:
"""登录动作"""
def __init__(self, client: XiaohongshuClient):
self.client = client
def check_login_status(self, navigate: bool = True) -> Tuple[bool, Optional[str]]:
"""
检查登录状态
Args:
navigate: 是否先导航到首页。
如果已经在首页上,设 False 避免刷新页面。
Returns:
(是否已登录, 用户名)
"""
page = self.client.page
if navigate:
self.client.navigate("https://www.xiaohongshu.com/explore")
time.sleep(3)
# ---- 方式 1:检测页面上是否弹出了登录弹窗 ----
# 如果弹窗的二维码区域可见 → 未登录
try:
qr = page.locator('img.qrcode-img[src^="data:image"]')
if qr.count() > 0 and qr.first.is_visible():
return False, None
except Exception:
pass
# ---- 方式 2:检查 cookie 里是否包含 web_session ----
try:
cookies = self.client.context.cookies()
has_session = any(c['name'] == 'web_session' for c in cookies)
if has_session:
# 尝试获取用户名
username = self._try_get_username()
return True, username or "已登录用户"
except Exception:
pass
# ---- 方式 3:检查 HTML 里有没有用户头像链接(登录后才有) ----
try:
# 侧边栏会有 /user/profile/xxx 的链接
profile_link = page.locator('a[href*="/user/profile/"]')
if profile_link.count() > 0:
username = self._try_get_username()
return True, username or "已登录用户"
except Exception:
pass
return False, None
def _try_get_username(self) -> Optional[str]:
"""尝试从页面提取用户昵称"""
try:
name = self.client.page.evaluate("""() => {
const el = document.querySelector('.user .name, .sidebar .user-name, [class*="nickname"]');
return el ? el.textContent.trim() : '';
}""")
return name if name else None
except Exception:
return None
def get_wechat_qrcode(self) -> Tuple[Optional[str], bool]:
"""
获取微信登录二维码
流程:
1. 访问小红书首页触发登录弹窗
2. 获取弹窗中的微信二维码图片
3. 保存到文件
Returns:
(二维码文件路径, 是否已登录)
"""
client = self.client
page = client.page
# 访问首页触发登录弹窗
client.navigate("https://www.xiaohongshu.com/explore")
time.sleep(4) # 给弹窗足够时间渲染
# 先检查是否已登录(不要重新 navigate)
is_logged_in, _ = self.check_login_status(navigate=False)
if is_logged_in:
return None, True
# 尝试获取二维码 base64 图片
qrcode_src = None
for attempt in range(5):
try:
qr = page.locator('img.qrcode-img[src^="data:image"]')
if qr.count() > 0:
src = qr.first.get_attribute('src')
if src and len(src) > 200: # 有效 base64 至少上百字符
qrcode_src = src
break
except Exception:
pass
time.sleep(1)
if qrcode_src:
# 去掉 data:image/png;base64, 前缀
if ',' in qrcode_src:
qrcode_src = qrcode_src.split(',', 1)[1]
# 保存二维码图片
img_data = base64.b64decode(qrcode_src)
os.makedirs(QRCODE_DIR, exist_ok=True)
with open(QRCODE_PATH, 'wb') as f:
f.write(img_data)
print(f"二维码已保存到: {QRCODE_PATH}", file=sys.stderr)
return QRCODE_PATH, False
# 后备:整页截屏
print("未找到有效的二维码图片,截屏保存...", file=sys.stderr)
os.makedirs(QRCODE_DIR, exist_ok=True)
page.screenshot(path=QRCODE_PATH)
return QRCODE_PATH, False
def wait_for_login(self, timeout: int = 120, min_wait: int = 30) -> bool:
"""
在 **当前页面** 上等待用户扫码登录。
不会重新 navigate,以免刷新掉二维码弹窗。
会强制等待至少 min_wait 秒再开始检测,
给用户足够时间在手机上确认登录。
Args:
timeout: 总超时时间(秒)
min_wait: 最少等待秒数(默认 30)
Returns:
是否登录成功
"""
start = time.time()
# ---- 阶段 1: 强制等待 min_wait 秒 ----
print(f"请在手机上扫码并确认登录(至少等待 {min_wait} 秒)...", file=sys.stderr)
while time.time() - start < min_wait:
elapsed = int(time.time() - start)
remaining = min_wait - elapsed
if remaining > 0 and remaining % 10 == 0:
print(f" 等待中... 还剩 {remaining} 秒", file=sys.stderr)
time.sleep(2)
# ---- 阶段 2: 开始轮询检测 web_session cookie ----
print("开始检测登录状态...", file=sys.stderr)
while time.time() - start < timeout:
try:
cookies = self.client.context.cookies()
has_session = any(c['name'] == 'web_session' for c in cookies)
if has_session:
print("检测到 web_session cookie,登录成功!", file=sys.stderr)
self.client._save_cookies()
return True
except Exception:
pass
elapsed = int(time.time() - start)
remaining = timeout - elapsed
if remaining > 0 and remaining % 15 == 0:
print(f" 仍在等待登录... 剩余 {remaining} 秒", file=sys.stderr)
time.sleep(3)
print("登录超时", file=sys.stderr)
return False
# ====== 顶层便捷函数 ======
def check_login(
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Tuple[bool, Optional[str]]:
"""检查登录状态"""
client = XiaohongshuClient(headless=True, cookie_path=cookie_path)
try:
client.start()
action = LoginAction(client)
return action.check_login_status(navigate=True)
finally:
client.close()
def login(
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
timeout: int = 120,
) -> Dict[str, Any]:
"""
登录小红书(生成二维码 + 等待扫码)
Returns:
登录结果字典
"""
client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
try:
client.start()
action = LoginAction(client)
# 获取二维码
qrcode_path, is_logged_in = action.get_wechat_qrcode()
if is_logged_in:
return {
"status": "logged_in",
"qrcode_path": None,
"username": "已登录用户",
"message": "已登录",
}
if qrcode_path:
# 等待扫码
success = action.wait_for_login(timeout=timeout)
if success:
return {
"status": "logged_in",
"qrcode_path": None,
"username": "已登录用户",
"message": "扫码登录成功",
}
return {
"status": "timeout",
"qrcode_path": qrcode_path,
"username": None,
"message": "扫码超时",
}
return {
"status": "error",
"qrcode_path": None,
"username": None,
"message": "获取二维码失败",
}
finally:
client.close()
```
### scripts/publish.py
```python
"""
小红书发布模块(图文 + 视频)
基于 xiaohongshu-mcp/publish.go + publish_video.go 翻译
整合 xiaohongshu-ops 的安全发布理念(人工确认 checkpoint)
"""
import json
import os
import sys
import time
import random
from pathlib import Path
from typing import Optional, Dict, Any, List
from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH
PUBLISH_URL = "https://creator.xiaohongshu.com/publish/publish?source=official"
class PublishAction:
"""发布动作"""
def __init__(self, client: XiaohongshuClient):
self.client = client
def _navigate_to_publish(self):
"""导航到创作者中心发布页"""
print("打开创作者中心发布页...", file=sys.stderr)
self.client.navigate(PUBLISH_URL)
time.sleep(3)
def _click_publish_tab(self, tab_name: str):
"""点击发布类型 TAB(上传图文 / 上传视频)"""
page = self.client.page
# 等待上传区域出现
try:
page.wait_for_selector('div.upload-content, div.creator-tab', timeout=15000)
except Exception:
print("等待发布页加载超时,继续尝试", file=sys.stderr)
time.sleep(1)
# 移除可能的弹窗遮挡
page.evaluate("""() => {
var popover = document.querySelector('div.d-popover');
if (popover) popover.remove();
}""")
# 查找并点击对应 TAB
try:
tabs = page.locator('div.creator-tab')
for i in range(tabs.count()):
tab = tabs.nth(i)
text = tab.text_content().strip()
if text == tab_name:
tab.click()
time.sleep(1)
print(f"已切换到「{tab_name}」", file=sys.stderr)
return
# 回退:使用文本定位
page.get_by_text(tab_name, exact=True).click()
time.sleep(1)
except Exception as e:
print(f"切换 TAB「{tab_name}」失败: {e}", file=sys.stderr)
def _upload_images(self, image_paths: List[str]):
"""逐张上传图片"""
page = self.client.page
valid_paths = [p for p in image_paths if os.path.exists(p)]
if not valid_paths:
raise ValueError("没有有效的图片文件")
for i, path in enumerate(valid_paths):
abs_path = os.path.abspath(path)
print(f"上传图片 ({i+1}/{len(valid_paths)}): {abs_path}", file=sys.stderr)
# 第一张用 .upload-input,后续用 input[type=file]
selector = '.upload-input' if i == 0 else 'input[type="file"]'
try:
upload_input = page.locator(selector)
upload_input.set_input_files(abs_path)
except Exception:
# 回退
upload_input = page.locator('input[type="file"]')
upload_input.set_input_files(abs_path)
# 等待图片上传完成(预览元素出现)
expected = i + 1
for _ in range(120): # 最多等 60 秒
try:
previews = page.locator('.img-preview-area .pr, .upload-preview-item')
if previews.count() >= expected:
break
except Exception:
pass
time.sleep(0.5)
time.sleep(1)
print(f"全部 {len(valid_paths)} 张图片上传完成", file=sys.stderr)
def _upload_video(self, video_path: str):
"""上传视频文件"""
page = self.client.page
if not os.path.exists(video_path):
raise ValueError(f"视频文件不存在: {video_path}")
abs_path = os.path.abspath(video_path)
print(f"上传视频: {abs_path}", file=sys.stderr)
try:
upload_input = page.locator('.upload-input')
upload_input.set_input_files(abs_path)
except Exception:
upload_input = page.locator('input[type="file"]')
upload_input.set_input_files(abs_path)
# 等待发布按钮可点击(视频处理完成标志),最多等 10 分钟
print("等待视频处理完成...", file=sys.stderr)
btn_selector = '.publish-page-publish-btn button.bg-red'
for attempt in range(600):
try:
btn = page.locator(btn_selector)
if btn.count() > 0 and btn.is_visible():
disabled = btn.get_attribute('disabled')
if disabled is None:
print("视频处理完成", file=sys.stderr)
return
except Exception:
pass
time.sleep(1)
print("警告: 等待视频处理超时", file=sys.stderr)
def _fill_title(self, title: str):
"""填写标题"""
page = self.client.page
try:
title_input = page.locator('div.d-input input')
title_input.first.fill(title)
time.sleep(0.5)
# 检查标题是否超长
max_suffix = page.locator('div.title-container div.max_suffix')
if max_suffix.count() > 0 and max_suffix.is_visible():
length_text = max_suffix.text_content()
print(f"警告: 标题超长 ({length_text})", file=sys.stderr)
print(f"标题已填写: {title}", file=sys.stderr)
except Exception as e:
print(f"填写标题失败: {e}", file=sys.stderr)
def _fill_content(self, content: str):
"""填写正文"""
page = self.client.page
# 尝试两种编辑器:Quill 或 contenteditable
content_el = None
try:
ql = page.locator('div.ql-editor')
if ql.count() > 0:
content_el = ql.first
except Exception:
pass
if content_el is None:
try:
# 通过 placeholder 查找
content_el = page.locator('p[data-placeholder*="输入正文描述"]').first
# 向上找 textbox 父元素
parent = page.locator('[role="textbox"]')
if parent.count() > 0:
content_el = parent.first
except Exception:
pass
if content_el is None:
print("未找到正文输入框", file=sys.stderr)
return
try:
content_el.click()
time.sleep(0.3)
page.keyboard.type(content, delay=30)
time.sleep(0.5)
# 检查正文是否超长
length_error = page.locator('div.edit-container div.length-error')
if length_error.count() > 0 and length_error.is_visible():
err_text = length_error.text_content()
print(f"警告: 正文超长 ({err_text})", file=sys.stderr)
print("正文已填写", file=sys.stderr)
except Exception as e:
print(f"填写正文失败: {e}", file=sys.stderr)
def _input_tags(self, tags: List[str]):
"""输入话题标签(通过 # 触发联想)"""
if not tags:
return
page = self.client.page
# 先移动光标到正文末尾
content_el = None
try:
ql = page.locator('div.ql-editor')
if ql.count() > 0:
content_el = ql.first
else:
content_el = page.locator('[role="textbox"]').first
except Exception:
return
if content_el is None:
return
try:
content_el.click()
time.sleep(0.3)
# 按 End 键移动到末尾
page.keyboard.press('End')
page.keyboard.press('Enter')
page.keyboard.press('Enter')
time.sleep(0.5)
except Exception:
pass
# 限制最多 10 个标签
tags = tags[:10]
for tag in tags:
tag = tag.lstrip('#')
try:
# 输入 #
page.keyboard.type('#', delay=100)
time.sleep(0.3)
# 逐字输入标签文字
page.keyboard.type(tag, delay=50)
time.sleep(1)
# 尝试点击联想下拉框的第一个选项
topic_item = page.locator('#creator-editor-topic-container .item')
if topic_item.count() > 0:
topic_item.first.click()
print(f"标签「{tag}」已通过联想选择", file=sys.stderr)
else:
# 没有联想,输入空格结束
page.keyboard.type(' ', delay=50)
print(f"标签「{tag}」已直接输入", file=sys.stderr)
time.sleep(0.5)
except Exception as e:
print(f"输入标签「{tag}」失败: {e}", file=sys.stderr)
def _set_schedule(self, schedule_time: str):
"""设置定时发布(格式: 2025-01-01 12:00)"""
page = self.client.page
try:
# 点击定时发布开关
switch = page.locator('.post-time-wrapper .d-switch')
switch.click()
time.sleep(0.8)
# 设置日期时间
date_input = page.locator('.date-picker-container input')
date_input.fill('')
date_input.fill(schedule_time)
time.sleep(0.5)
print(f"定时发布设置: {schedule_time}", file=sys.stderr)
except Exception as e:
print(f"设置定时发布失败: {e}", file=sys.stderr)
def _click_publish_button(self) -> bool:
"""点击发布按钮"""
page = self.client.page
try:
btn = page.locator('.publish-page-publish-btn button.bg-red')
if btn.count() > 0:
btn.first.click()
time.sleep(3)
print("已点击发布按钮", file=sys.stderr)
return True
else:
print("未找到发布按钮", file=sys.stderr)
return False
except Exception as e:
print(f"点击发布按钮失败: {e}", file=sys.stderr)
return False
def _check_publish_ready(self) -> Dict[str, Any]:
"""检查发布前的状态(三要素校验)"""
page = self.client.page
status = {}
# 检查标题
try:
title_input = page.locator('div.d-input input')
status["title"] = title_input.input_value() if title_input.count() > 0 else ""
except Exception:
status["title"] = ""
# 检查发布按钮可见性
try:
btn = page.locator('.publish-page-publish-btn button.bg-red')
status["publish_button_visible"] = btn.count() > 0 and btn.is_visible()
except Exception:
status["publish_button_visible"] = False
status["title_ok"] = bool(status["title"])
return status
def publish_image(
self,
title: str,
content: str,
image_paths: List[str],
tags: Optional[List[str]] = None,
schedule_time: Optional[str] = None,
auto_publish: bool = False,
) -> Dict[str, Any]:
"""
发布图文笔记
Args:
title: 标题(建议 <=20 字)
content: 正文
image_paths: 图片文件路径列表
tags: 话题标签列表
schedule_time: 定时发布时间(格式 2025-01-01 12:00),None 为立即
auto_publish: 是否自动点击发布(默认 False,停在发布按钮处)
Returns:
操作结果
"""
self._navigate_to_publish()
self._click_publish_tab("上传图文")
# 1. 上传图片
self._upload_images(image_paths)
# 2. 填写标题
self._fill_title(title)
time.sleep(1)
# 3. 填写正文
self._fill_content(content)
time.sleep(1)
# 4. 添加标签
if tags:
self._input_tags(tags)
time.sleep(1)
# 5. 定时发布
if schedule_time:
self._set_schedule(schedule_time)
# 6. 校验三要素
ready = self._check_publish_ready()
print(f"发布前校验: {ready}", file=sys.stderr)
# 7. 是否自动发布
if auto_publish:
success = self._click_publish_button()
return {
"status": "success" if success else "error",
"action": "publish_image",
"title": title,
"image_count": len(image_paths),
"tags": tags or [],
"schedule_time": schedule_time,
"published": success,
"message": "发布成功" if success else "发布失败",
}
else:
return {
"status": "ready",
"action": "publish_image",
"title": title,
"image_count": len(image_paths),
"tags": tags or [],
"schedule_time": schedule_time,
"published": False,
"ready_check": ready,
"message": "已填写完毕,停在发布按钮处。请确认后使用 --auto-publish 发布。",
}
def publish_video(
self,
title: str,
content: str,
video_path: str,
tags: Optional[List[str]] = None,
schedule_time: Optional[str] = None,
auto_publish: bool = False,
) -> Dict[str, Any]:
"""
发布视频笔记
Args:
title: 标题
content: 正文
video_path: 视频文件路径
tags: 话题标签
schedule_time: 定时发布时间
auto_publish: 是否自动发布(默认 False)
Returns:
操作结果
"""
self._navigate_to_publish()
self._click_publish_tab("上传视频")
# 1. 上传视频
self._upload_video(video_path)
# 2. 填写标题
self._fill_title(title)
time.sleep(1)
# 3. 填写正文
self._fill_content(content)
time.sleep(1)
# 4. 添加标签
if tags:
self._input_tags(tags)
time.sleep(1)
# 5. 定时发布
if schedule_time:
self._set_schedule(schedule_time)
# 6. 校验
ready = self._check_publish_ready()
print(f"发布前校验: {ready}", file=sys.stderr)
if auto_publish:
success = self._click_publish_button()
return {
"status": "success" if success else "error",
"action": "publish_video",
"title": title,
"video_path": video_path,
"published": success,
"message": "发布成功" if success else "发布失败",
}
else:
return {
"status": "ready",
"action": "publish_video",
"title": title,
"video_path": video_path,
"published": False,
"ready_check": ready,
"message": "已填写完毕,停在发布按钮处。请确认后使用 --auto-publish 发布。",
}
def md_to_images(
markdown_text: str,
output_dir: str = ".",
width: int = 1080,
css: str = "",
) -> List[str]:
"""
将 Markdown 文本渲染为长文图片(利用 Playwright 截图)
Args:
markdown_text: Markdown 文本
output_dir: 输出目录
width: 图片宽度(像素)
css: 自定义 CSS 样式
Returns:
生成的图片路径列表
"""
try:
import markdown
except ImportError:
print("需要安装 markdown 库: pip install markdown", file=sys.stderr)
raise
# 将 Markdown 转为 HTML
html_content = markdown.markdown(
markdown_text,
extensions=['tables', 'fenced_code', 'codehilite', 'nl2br'],
)
default_css = """
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", "PingFang SC",
"Hiragino Sans GB", "Microsoft YaHei", sans-serif;
padding: 40px 50px;
line-height: 1.8;
color: #333;
background: #fff;
max-width: 100%;
margin: 0 auto;
font-size: 16px;
}
h1 { font-size: 24px; font-weight: bold; margin: 20px 0 10px; color: #222; }
h2 { font-size: 20px; font-weight: bold; margin: 18px 0 8px; color: #333; }
h3 { font-size: 18px; font-weight: bold; margin: 15px 0 6px; color: #444; }
p { margin: 8px 0; }
ul, ol { padding-left: 20px; }
li { margin: 4px 0; }
blockquote {
border-left: 4px solid #ff2442;
padding: 8px 16px;
margin: 12px 0;
background: #fff5f5;
color: #555;
}
code {
background: #f5f5f5;
padding: 2px 6px;
border-radius: 3px;
font-size: 14px;
}
pre {
background: #f5f5f5;
padding: 16px;
border-radius: 6px;
overflow-x: auto;
}
img { max-width: 100%; border-radius: 8px; }
hr { border: none; border-top: 1px solid #eee; margin: 20px 0; }
table { border-collapse: collapse; width: 100%; margin: 12px 0; }
th, td { border: 1px solid #ddd; padding: 8px 12px; text-align: left; }
th { background: #f5f5f5; }
"""
full_css = default_css + "\n" + css
full_html = f"""<!DOCTYPE html>
<html><head>
<meta charset="utf-8">
<meta name="viewport" content="width={width}">
<style>{full_css}</style>
</head><body>{html_content}</body></html>"""
os.makedirs(output_dir, exist_ok=True)
from playwright.sync_api import sync_playwright
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=True)
page = browser.new_page(viewport={"width": width, "height": 800})
page.set_content(full_html)
page.wait_for_load_state("networkidle")
time.sleep(0.5)
# 获取实际内容高度
total_height = page.evaluate("document.body.scrollHeight")
# 如果内容不太长(<=4000px),直接截一张
# 否则分页截图(每张最多 3000px)
image_paths = []
max_page_height = 3000
if total_height <= max_page_height + 500:
# 单张截图
page.set_viewport_size({"width": width, "height": total_height + 40})
time.sleep(0.3)
img_path = os.path.join(output_dir, "md_page_1.png")
page.screenshot(path=img_path, full_page=True)
image_paths.append(img_path)
else:
# 分页截图
page_num = 1
y_offset = 0
while y_offset < total_height:
chunk_height = min(max_page_height, total_height - y_offset)
img_path = os.path.join(output_dir, f"md_page_{page_num}.png")
page.screenshot(
path=img_path,
clip={"x": 0, "y": y_offset, "width": width, "height": chunk_height},
)
image_paths.append(img_path)
y_offset += chunk_height
page_num += 1
browser.close()
print(f"Markdown 已渲染为 {len(image_paths)} 张图片", file=sys.stderr)
return image_paths
# ============================================================
# 便捷函数
# ============================================================
def publish_image(
title: str,
content: str,
image_paths: List[str],
tags: Optional[List[str]] = None,
schedule_time: Optional[str] = None,
auto_publish: bool = False,
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
"""发布图文笔记"""
client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
try:
client.start()
action = PublishAction(client)
return action.publish_image(
title=title, content=content, image_paths=image_paths,
tags=tags, schedule_time=schedule_time, auto_publish=auto_publish,
)
finally:
client.close()
def publish_video(
title: str,
content: str,
video_path: str,
tags: Optional[List[str]] = None,
schedule_time: Optional[str] = None,
auto_publish: bool = False,
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
"""发布视频笔记"""
client = XiaohongshuClient(headless=headless, cookie_path=cookie_path)
try:
client.start()
action = PublishAction(client)
return action.publish_video(
title=title, content=content, video_path=video_path,
tags=tags, schedule_time=schedule_time, auto_publish=auto_publish,
)
finally:
client.close()
def publish_markdown(
title: str,
markdown_text: str,
extra_content: str = "",
tags: Optional[List[str]] = None,
schedule_time: Optional[str] = None,
auto_publish: bool = False,
image_width: int = 1080,
output_dir: str = "",
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Dict[str, Any]:
"""
将 Markdown 渲染为图片后发布图文笔记
Args:
title: 标题
markdown_text: Markdown 内容
extra_content: 正文区的额外文字说明
tags: 话题标签
schedule_time: 定时发布
auto_publish: 是否自动发布
image_width: 图片宽度
output_dir: 图片输出目录(默认临时目录)
headless: 无头模式
cookie_path: Cookie 路径
Returns:
操作结果
"""
import tempfile
if not output_dir:
output_dir = tempfile.mkdtemp(prefix="xhs_md_")
# 1. Markdown → 图片
image_paths = md_to_images(markdown_text, output_dir=output_dir, width=image_width)
if not image_paths:
return {"status": "error", "message": "Markdown 渲染失败,未生成图片"}
# 2. 发布图文
body = extra_content or f"本文由 Markdown 渲染生成,共 {len(image_paths)} 页"
return publish_image(
title=title, content=body, image_paths=image_paths,
tags=tags, schedule_time=schedule_time, auto_publish=auto_publish,
headless=headless, cookie_path=cookie_path,
)
```
### scripts/search.py
```python
"""
小红书搜索模块
基于 xiaohongshu-mcp/search.go 翻译
"""
import json
import sys
import time
import urllib.parse
from typing import Optional, List, Dict, Any
from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH
# 筛选选项映射表(来自 Go 源码)
FILTER_OPTIONS_MAP = {
1: [ # 排序依据
{"index": 1, "text": "综合"},
{"index": 2, "text": "最新"},
{"index": 3, "text": "最多点赞"},
{"index": 4, "text": "最多评论"},
{"index": 5, "text": "最多收藏"},
],
2: [ # 笔记类型
{"index": 1, "text": "不限"},
{"index": 2, "text": "视频"},
{"index": 3, "text": "图文"},
],
3: [ # 发布时间
{"index": 1, "text": "不限"},
{"index": 2, "text": "一天内"},
{"index": 3, "text": "一周内"},
{"index": 4, "text": "半年内"},
],
4: [ # 搜索范围
{"index": 1, "text": "不限"},
{"index": 2, "text": "已看过"},
{"index": 3, "text": "未看过"},
{"index": 4, "text": "已关注"},
],
5: [ # 位置距离
{"index": 1, "text": "不限"},
{"index": 2, "text": "同城"},
{"index": 3, "text": "附近"},
],
}
class SearchAction:
"""搜索动作"""
def __init__(self, client: XiaohongshuClient):
self.client = client
def _make_search_url(self, keyword: str) -> str:
"""构建搜索 URL"""
params = urllib.parse.urlencode({
"keyword": keyword,
"source": "web_explore_feed",
})
return f"https://www.xiaohongshu.com/search_result?{params}"
def _apply_filters(
self,
sort_by: Optional[str] = None,
note_type: Optional[str] = None,
publish_time: Optional[str] = None,
search_scope: Optional[str] = None,
location: Optional[str] = None,
):
"""应用筛选条件"""
page = self.client.page
# 检查是否有筛选条件
has_filters = any([sort_by, note_type, publish_time, search_scope, location])
if not has_filters:
return
# 悬停在筛选按钮上
try:
filter_btn = page.locator('div.filter')
filter_btn.hover()
time.sleep(0.5)
# 等待筛选面板出现
page.wait_for_selector('div.filter-panel', timeout=5000)
except Exception as e:
print(f"打开筛选面板失败: {e}", file=sys.stderr)
return
# 映射筛选选项到文本
filter_texts = []
if sort_by:
text = self._find_filter_text(1, sort_by)
if text:
filter_texts.append(text)
if note_type:
text = self._find_filter_text(2, note_type)
if text:
filter_texts.append(text)
if publish_time:
text = self._find_filter_text(3, publish_time)
if text:
filter_texts.append(text)
if search_scope:
text = self._find_filter_text(4, search_scope)
if text:
filter_texts.append(text)
if location:
text = self._find_filter_text(5, location)
if text:
filter_texts.append(text)
# 应用筛选:使用文本定位器,避免依赖 DOM 顺序
filter_panel = page.locator('div.filter-panel')
for tag_text in filter_texts:
try:
filter_panel.get_by_text(tag_text, exact=True).click()
time.sleep(0.3)
except Exception as e:
print(f"点击筛选选项失败: {e}", file=sys.stderr)
# 等待页面更新
time.sleep(1)
def _find_filter_text(self, filters_group: int, text: str) -> Optional[str]:
"""查找筛选选项的显示文本(用于文本定位器)"""
options = FILTER_OPTIONS_MAP.get(filters_group, [])
for opt in options:
if opt["text"] == text:
return opt["text"]
return None
def search(
self,
keyword: str,
sort_by: Optional[str] = None,
note_type: Optional[str] = None,
publish_time: Optional[str] = None,
search_scope: Optional[str] = None,
location: Optional[str] = None,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
搜索小红书内容
Args:
keyword: 搜索关键词
sort_by: 排序方式:综合 最新 最多点赞 最多评论 最多收藏
note_type: 笔记类型:不限 视频 图文
publish_time: 发布时间:不限 一天内 一周内 半年内
search_scope: 搜索范围:不限 已看过 未看过 已关注
location: 位置距离:不限 同城 附近
limit: 返回数量限制
Returns:
搜索结果列表
"""
client = self.client
page = client.page
# 导航到搜索页面
search_url = self._make_search_url(keyword)
client.navigate(search_url)
# 等待页面加载 - 增加等待时间确保搜索结果加载完成
client.wait_for_initial_state()
time.sleep(3)
# 滚动页面触发加载更多内容
for _ in range(3):
page.evaluate("window.scrollBy(0, 500)")
time.sleep(0.5)
# 应用筛选条件
self._apply_filters(
sort_by=sort_by,
note_type=note_type,
publish_time=publish_time,
search_scope=search_scope,
location=location,
)
# 提取数据 - 使用实际验证过的字段路径
result = page.evaluate("""() => {
const feeds = window.__INITIAL_STATE__?.search?.feeds;
const data = feeds?.value || feeds?._value;
if (!data || !Array.isArray(data) || data.length === 0) return '';
return JSON.stringify(data.slice(0, 50).map(item => {
const nc = item.noteCard || {};
const user = nc.user || {};
const info = nc.interactInfo || {};
const cover = nc.cover || {};
return {
id: item.id || '',
xsec_token: item.xsecToken || '',
title: nc.displayTitle || '',
type: nc.type || '',
user: user.nickname || user.nickName || '',
user_id: user.userId || '',
user_avatar: user.avatar || '',
liked_count: info.likedCount || '0',
collected_count: info.collectedCount || '0',
comment_count: info.commentCount || '0',
shared_count: info.sharedCount || '0',
cover_url: cover.urlDefault || cover.urlPre || '',
};
}));
}""")
if not result:
print("未获取到搜索结果", file=sys.stderr)
return []
# 解析 JSON
try:
feeds = json.loads(result)
except json.JSONDecodeError as e:
print(f"解析搜索结果失败: {e}", file=sys.stderr)
return []
# 限制数量
if limit > 0:
feeds = feeds[:limit]
return feeds
def search(
keyword: str,
sort_by: Optional[str] = None,
note_type: Optional[str] = None,
publish_time: Optional[str] = None,
search_scope: Optional[str] = None,
location: Optional[str] = None,
limit: int = 10,
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> List[Dict[str, Any]]:
"""
搜索小红书内容
Args:
keyword: 搜索关键词
sort_by: 排序方式
note_type: 笔记类型
publish_time: 发布时间
search_scope: 搜索范围
location: 位置距离
limit: 返回数量限制
headless: 是否无头模式
cookie_path: Cookie 路径
Returns:
搜索结果列表
"""
client = XiaohongshuClient(
headless=headless,
cookie_path=cookie_path,
)
try:
client.start()
action = SearchAction(client)
return action.search(
keyword=keyword,
sort_by=sort_by,
note_type=note_type,
publish_time=publish_time,
search_scope=search_scope,
location=location,
limit=limit,
)
finally:
client.close()
```
### scripts/user.py
```python
"""
小红书用户主页模块
基于 xiaohongshu-mcp/user_profile.go 翻译
"""
import json
import sys
import time
from typing import Optional, Dict, Any
from .client import XiaohongshuClient, DEFAULT_COOKIE_PATH
class UserProfileAction:
"""用户主页动作"""
def __init__(self, client: XiaohongshuClient):
self.client = client
def _make_user_profile_url(self, user_id: str, xsec_token: str = "") -> str:
"""构建用户主页 URL"""
if xsec_token:
return f"https://www.xiaohongshu.com/user/profile/{user_id}?xsec_token={xsec_token}&xsec_source=pc_note"
return f"https://www.xiaohongshu.com/user/profile/{user_id}"
def _extract_user_profile_data(self) -> Optional[Dict[str, Any]]:
"""提取用户主页数据"""
page = self.client.page
# 获取用户信息
user_data_result = page.evaluate("""() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.user &&
window.__INITIAL_STATE__.user.userPageData) {
const userPageData = window.__INITIAL_STATE__.user.userPageData;
const data = userPageData.value !== undefined ? userPageData.value : userPageData._value;
if (data) {
return JSON.stringify(data);
}
}
return '';
}""")
if not user_data_result:
return None
# 获取用户笔记列表(含置顶标记和时间信息)
notes_result = page.evaluate("""() => {
if (!window.__INITIAL_STATE__ ||
!window.__INITIAL_STATE__.user ||
!window.__INITIAL_STATE__.user.notes) return '';
var notes = window.__INITIAL_STATE__.user.notes;
var data = notes.value !== undefined ? notes.value : (notes._value !== undefined ? notes._value : notes);
if (!data) return '';
// 展平二维数组
var flat = [];
for (var i = 0; i < data.length; i++) {
if (Array.isArray(data[i])) {
for (var j = 0; j < data[i].length; j++) flat.push(data[i][j]);
} else {
flat.push(data[i]);
}
}
// 提取每条笔记的关键信息,包含置顶标记和排序所需字段
return JSON.stringify(flat.map(function(item) {
var nc = item.noteCard || {};
var info = nc.interactInfo || {};
var user = nc.user || {};
var cover = nc.cover || {};
var result = {
id: item.id || '',
xsecToken: item.xsecToken || '',
noteCard: {
displayTitle: nc.displayTitle || '',
type: nc.type || '',
interactInfo: {
likedCount: info.likedCount || '0',
collectedCount: info.collectedCount || '0',
commentCount: info.commentCount || '0',
sharedCount: info.sharedCount || '0'
},
user: {
nickname: user.nickname || user.nickName || '',
userId: user.userId || ''
},
cover: {
urlDefault: cover.urlDefault || cover.urlPre || ''
}
}
};
// 置顶标记(小红书用多种字段名)
if (item.isTop) result.isTop = true;
if (item.stickyTop) result.isTop = true;
if (item.topFlag) result.isTop = true;
if (nc.isTop) result.isTop = true;
// 检查 showTags 中是否有置顶标签
var tags = item.showTags || nc.showTags || [];
for (var k = 0; k < tags.length; k++) {
if (tags[k] === 'top' || tags[k] === 'is_top' || tags[k] === 'sticky') {
result.isTop = true;
}
}
// 时间信息
if (nc.time) result.time = nc.time;
if (nc.createTime) result.time = nc.createTime;
if (nc.lastUpdateTime) result.lastUpdateTime = nc.lastUpdateTime;
if (item.timestamp) result.time = item.timestamp;
return result;
}));
}""")
try:
user_page_data = json.loads(user_data_result)
except json.JSONDecodeError:
return None
# 解析笔记数据
feeds = []
if notes_result:
try:
feeds = json.loads(notes_result)
except json.JSONDecodeError:
pass
# 组装响应
response = {
"userBasicInfo": user_page_data.get("basicInfo", {}),
"interactions": user_page_data.get("interactions", []),
"feeds": feeds,
}
return response
def get_user_profile(
self,
user_id: str,
xsec_token: str = "",
) -> Optional[Dict[str, Any]]:
"""
获取用户主页信息
Args:
user_id: 用户 ID
xsec_token: xsec_token 参数(可选)
Returns:
用户主页数据
"""
client = self.client
# 构建 URL 并导航
url = self._make_user_profile_url(user_id, xsec_token)
print(f"打开用户主页: {url}", file=sys.stderr)
client.navigate(url)
# 等待页面加载
client.wait_for_initial_state()
time.sleep(1)
# 提取数据
profile = self._extract_user_profile_data()
if not profile:
print("未获取到用户主页数据", file=sys.stderr)
return None
return profile
def get_my_profile(self) -> Optional[Dict[str, Any]]:
"""
获取自己的个人主页(通过侧边栏导航)
Returns:
自己的用户主页数据
"""
client = self.client
# 先导航到首页
print("导航到首页获取个人信息...", file=sys.stderr)
client.navigate("https://www.xiaohongshu.com/explore")
time.sleep(2)
page = client.page
# 尝试从侧边栏获取自己的用户 ID
my_user_id = page.evaluate("""() => {
// 方法 1: 从侧边栏用户链接提取
var links = document.querySelectorAll('a[href*="/user/profile/"]');
for (var i = 0; i < links.length; i++) {
var href = links[i].getAttribute('href');
var match = href.match(/\\/user\\/profile\\/([a-f0-9]+)/);
if (match) return match[1];
}
// 方法 2: 从 __INITIAL_STATE__ 提取
if (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.user) {
var user = window.__INITIAL_STATE__.user;
if (user.userPageData) {
var data = user.userPageData.value || user.userPageData._value || user.userPageData;
if (data && data.basicInfo && data.basicInfo.userId) {
return data.basicInfo.userId;
}
}
}
return '';
}""")
if not my_user_id:
# 回退:点击侧边栏的个人头像/链接
try:
avatar = page.locator('.sidebar-nav .user-avatar, .side-bar .user-wrap a, a.user-link')
if avatar.count() > 0:
avatar.first.click()
time.sleep(2)
# 从跳转后的 URL 提取用户 ID
import re
match = re.search(r'/user/profile/([a-f0-9]+)', page.url)
if match:
my_user_id = match.group(1)
except Exception as e:
print(f"通过侧边栏获取用户ID失败: {e}", file=sys.stderr)
if not my_user_id:
print("无法获取当前登录用户的 ID", file=sys.stderr)
return None
print(f"获取到用户 ID: {my_user_id}", file=sys.stderr)
return self.get_user_profile(my_user_id)
def user_profile(
user_id: str,
xsec_token: str = "",
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Optional[Dict[str, Any]]:
"""
获取用户主页信息
Args:
user_id: 用户 ID
xsec_token: xsec_token 参数
headless: 是否无头模式
cookie_path: Cookie 路径
Returns:
用户主页数据
"""
client = XiaohongshuClient(
headless=headless,
cookie_path=cookie_path,
)
try:
client.start()
action = UserProfileAction(client)
return action.get_user_profile(
user_id=user_id,
xsec_token=xsec_token,
)
finally:
client.close()
def my_profile(
headless: bool = True,
cookie_path: str = DEFAULT_COOKIE_PATH,
) -> Optional[Dict[str, Any]]:
"""
获取自己的个人主页
Args:
headless: 是否无头模式
cookie_path: Cookie 路径
Returns:
自己的用户主页数据
"""
client = XiaohongshuClient(
headless=headless,
cookie_path=cookie_path,
)
try:
client.start()
action = UserProfileAction(client)
return action.get_my_profile()
finally:
client.close()
```