feishu-card-sender
通过飞书 OpenAPI 发送卡片消息(interactive card),支持模板化 JSON 卡片与变量替换。用于用户要求“发送飞书卡片/模板消息/互动卡片”时,或需要把结构化通知发到指定 open_id/chat_id 时。该技能只走 OpenAPI(appid/appsecret + tenant_access_token),不使用 message 通道。
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-feishu-card-sender
Repository
Skill path: skills/dobbey/feishu-card-sender
通过飞书 OpenAPI 发送卡片消息(interactive card),支持模板化 JSON 卡片与变量替换。用于用户要求“发送飞书卡片/模板消息/互动卡片”时,或需要把结构化通知发到指定 open_id/chat_id 时。该技能只走 OpenAPI(appid/appsecret + tenant_access_token),不使用 message 通道。
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install feishu-card-sender into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding feishu-card-sender to shared team environments
- Use feishu-card-sender for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: feishu-card-sender
description: 通过飞书 OpenAPI 发送卡片消息(interactive card),支持模板化 JSON 卡片与变量替换。用于用户要求“发送飞书卡片/模板消息/互动卡片”时,或需要把结构化通知发到指定 open_id/chat_id 时。该技能只走 OpenAPI(appid/appsecret + tenant_access_token),不使用 message 通道。
---
# Feishu Card Sender
## 概览
用统一方式通过飞书 OpenAPI 发送卡片消息,避免每次手写 curl。
本技能固定使用 `scripts/send_feishu_card.py`(appid/appsecret 鉴权),不走 `message` 通道。
## 快速决策
1. **发送到当前会话用户** → 使用当前会话 `sender_id` 作为 `--receive-id`(open_id)
2. **发送到指定用户/群** → 显式传 `--receive-id` + `--receive-id-type`
3. **需要复用模板** → 把模板放到 `assets/templates/*.json`,再用 `--template` + 变量替换
## 发送方式:脚本发送(OpenAPI,唯一方式)
脚本:`scripts/send_feishu_card.py`
### 凭证
按以下优先级自动获取(从高到低):
1. 命令参数:`--app-id` / `--app-secret`
2. 环境变量:`FEISHU_APP_ID` / `FEISHU_APP_SECRET`
3. OpenClaw 配置:`/root/.openclaw/openclaw.json` 的 `channels.feishu.accounts`
- 可用 `--account-id` 指定账户(不传则取第一个)
- 可用 `--account-id current` 自动读取会话上下文 account_id(从运行时环境变量 best-effort 获取)
### 常用命令
```bash
# 1) 列出内置模板
python3 scripts/send_feishu_card.py --list-templates
# 2) 用 movie 模板发送到 open_id(变量文件)
export FEISHU_APP_ID="cli_xxx"
export FEISHU_APP_SECRET="xxx"
python3 scripts/send_feishu_card.py \
--template movie \
--receive-id ou_xxx \
--receive-id-type open_id \
--vars-file references/vars.example.env
# 3) 直接指定模板文件 + 行内变量
python3 scripts/send_feishu_card.py \
--template-file assets/templates/movie.json \
--receive-id ou_xxx \
--var title='星际穿越' \
--var rating='8.7'
# 4) 只有海报 URL 时,自动上传飞书并注入 poster_img_key
python3 scripts/send_feishu_card.py \
--template movie-custom \
--receive-id ou_xxx \
--receive-id-type open_id \
--poster-url 'https://image.tmdb.org/t/p/original/xxx.jpg' \
--var title='星际穿越' \
--var overview='...'
```
## 变量替换规则
- 模板中占位符写法:`${key}`
- 变量来源(后者覆盖前者):
1) `--vars-file`(`KEY=VALUE`)
2) 多次 `--var key=value`
- 未提供的变量保持原样,不报错(便于渐进调试)
## 参数规范(必读)
发送前先读:
- `references/template-params.md`(movie-custom)
- `references/template-params-tv.md`(tv-custom)
- 文档定义了每个模板的必填参数、字段含义、示例命令。
- `cast`(演员)字段必须按 Markdown 多行列表字符串传递。
## 文件结构
- `scripts/send_feishu_card.py`:获取 token、渲染模板、发送消息
- `scripts/format_cast.py`:把演员 JSON 自动转为 `cast` 字段字符串
- `scripts/card_callback_router.py`:卡片回调统一路由入口
- `scripts/subscribe_callback_handler.py`:处理“立即订阅”回调并写入 MoviePilot(含幂等)
- `assets/templates/movie-custom.json`:电影详情模板(movie)
- `assets/templates/tv-custom.json`:剧集详情模板(tv)
- `assets/rules/*.rules.json`:通用规则(支持 require_non_empty / default_value / 条件删减区块)
- `references/vars.example.env`:变量文件示例
- `references/template-params.md`:模板参数与传参格式规范
## Card-Action 自动处理规则(方案 B)
当收到 Feishu 卡片回调消息时(`message_id` 形如 `card-action-c-...`,内容含 `{"action":"subscribe"...}`):
1. 自动提取 `callback_token`(从 `message_id` 去掉 `card-action-` 前缀)
2. 自动执行:
- 先延时更新卡片为“处理中...”并禁用
- 执行 MoviePilot 订阅(幂等)
- 再延时更新卡片为“✅ 已订阅”或“❌ 订阅失败,请重试”
3. 无需用户手动提供 token/account 参数
## 发送后回复约束
- 卡片发送成功后,默认不要再发“已发送 + message_id”的额外文本。
- 仅在用户明确要求回执时,才返回 `message_id`。
## 安全与约束
- 不在脚本里硬编码 App Secret
- 日志默认不打印密钥
- 失败时返回飞书原始错误码与错误信息,便于排障
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### scripts/send_feishu_card.py
```python
#!/usr/bin/env python3
import argparse
import json
import os
import re
import sys
import uuid
import mimetypes
from pathlib import Path
from urllib import request, error
from card_snapshot_store import save_snapshot
ROOT = Path(__file__).resolve().parent.parent
TEMPLATE_DIR = ROOT / "assets" / "templates"
RULES_DIR = ROOT / "assets" / "rules"
OPENCLAW_CONFIG = Path("/root/.openclaw/openclaw.json")
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def parse_kv_pairs(items):
data = {}
for item in items or []:
if "=" not in item:
raise ValueError(f"Invalid --var '{item}', expected key=value")
k, v = item.split("=", 1)
data[k.strip()] = v
return data
def parse_vars_file(path):
data = {}
if not path:
return data
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"vars file not found: {path}")
for raw in p.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
data[k.strip()] = v.lstrip()
return data
def render_template(text, variables):
pattern = re.compile(r"\$\{([a-zA-Z0-9_\-\.]+)\}")
def repl(match):
key = match.group(1)
return variables.get(key, match.group(0))
return pattern.sub(repl, text)
def load_rules(template_name=None):
if not template_name:
return []
p = RULES_DIR / f"{template_name}.rules.json"
if not p.exists():
return []
try:
data = json.loads(p.read_text(encoding="utf-8"))
return data if isinstance(data, list) else []
except Exception:
return []
def _selector_match(element, selector):
if not isinstance(element, dict) or not isinstance(selector, dict):
return False
tag = selector.get("tag")
if tag and element.get("tag") != tag:
return False
content_contains = selector.get("contentContains")
if content_contains is not None:
if content_contains not in str(element.get("content", "")):
return False
text_contains = selector.get("textContains")
if text_contains is not None:
txt = ""
text_obj = element.get("text")
if isinstance(text_obj, dict):
txt = str(text_obj.get("content", ""))
if text_contains not in txt:
return False
return True
def _is_blankish(value):
if value is None:
return True
s = str(value)
if not s:
return True
# Remove common quotes/symbol wrappers and all unicode whitespaces
compact = re.sub(r"[\s\"'“”‘’「」『』]+", "", s, flags=re.UNICODE)
return compact == ""
def apply_rules(template_text, variables, rules):
try:
obj = json.loads(template_text)
except Exception:
return template_text
if not rules:
return template_text
# pass-1: variable level rules
for rule in rules:
if not isinstance(rule, dict):
continue
action = rule.get("action")
field = rule.get("field")
if action == "default_value":
if _is_blankish(variables.get(field)):
variables[field] = str(rule.get("value", ""))
elif action == "require_non_empty":
if _is_blankish(variables.get(field)):
msg = rule.get("message") or f"required field empty: {field}"
raise ValueError(msg)
# pass-2: element pruning rules
body = obj.get("body") if isinstance(obj, dict) else None
elements = body.get("elements") if isinstance(body, dict) else None
if not isinstance(elements, list):
return json.dumps(obj, ensure_ascii=False)
for rule in rules:
if not isinstance(rule, dict):
continue
action = rule.get("action")
selector = rule.get("selector") or {}
field = rule.get("field")
if action == "remove_when_empty":
if not _is_blankish(variables.get(field)):
continue
elements = [el for el in elements if not _selector_match(el, selector)]
elif action == "remove_when_missing":
if variables.get(field):
continue
elements = [el for el in elements if not _selector_match(el, selector)]
body["elements"] = elements
return json.dumps(obj, ensure_ascii=False)
def _coerce_common_types(node):
if isinstance(node, dict):
out = {}
for k, v in node.items():
vv = _coerce_common_types(v)
if k in ("disabled",) and isinstance(vv, str):
lv = vv.strip().lower()
if lv == "true":
vv = True
elif lv == "false":
vv = False
out[k] = vv
return out
if isinstance(node, list):
return [_coerce_common_types(x) for x in node]
return node
def post_json(url, payload, headers=None):
data = json.dumps(payload).encode("utf-8")
req = request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json; charset=utf-8")
for k, v in (headers or {}).items():
req.add_header(k, v)
try:
with request.urlopen(req, timeout=20) as resp:
body = resp.read().decode("utf-8", errors="replace")
return resp.status, body
except error.HTTPError as ex:
body = ex.read().decode("utf-8", errors="replace")
return ex.code, body
def upload_image_bytes(token, image_bytes, filename="poster.jpg"):
boundary = f"----OpenClawBoundary{uuid.uuid4().hex}"
crlf = "\r\n"
file_content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
parts = []
parts.append((f"--{boundary}{crlf}").encode("utf-8"))
parts.append((f'Content-Disposition: form-data; name="image_type"{crlf}{crlf}message{crlf}').encode("utf-8"))
parts.append((f"--{boundary}{crlf}").encode("utf-8"))
parts.append(
(
f'Content-Disposition: form-data; name="image"; filename="{filename}"{crlf}'
f"Content-Type: {file_content_type}{crlf}{crlf}"
).encode("utf-8")
)
parts.append(image_bytes)
parts.append(crlf.encode("utf-8"))
parts.append((f"--{boundary}--{crlf}").encode("utf-8"))
body = b"".join(parts)
req = request.Request("https://open.feishu.cn/open-apis/im/v1/images", data=body, method="POST")
req.add_header("Authorization", f"Bearer {token}")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
try:
with request.urlopen(req, timeout=30) as resp:
raw = resp.read().decode("utf-8", errors="replace")
data = json.loads(raw)
if resp.status >= 400 or data.get("code") != 0:
raise RuntimeError(f"upload image failed: status={resp.status}, body={raw}")
image_key = ((data.get("data") or {}).get("image_key"))
if not image_key:
raise RuntimeError(f"upload image succeeded but image_key missing: body={raw}")
return image_key
except error.HTTPError as ex:
raw = ex.read().decode("utf-8", errors="replace")
raise RuntimeError(f"upload image failed: status={ex.code}, body={raw}")
def get_tenant_token(app_id, app_secret):
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
status, body = post_json(url, {"app_id": app_id, "app_secret": app_secret})
try:
data = json.loads(body)
except json.JSONDecodeError:
raise RuntimeError(f"token response is not json, status={status}, body={body[:500]}")
if status >= 400 or data.get("code") != 0:
raise RuntimeError(f"get token failed: status={status}, body={body}")
token = data.get("tenant_access_token")
if not token:
raise RuntimeError(f"tenant_access_token missing: body={body}")
return token
def get_session_account_id():
"""Best-effort read account_id from OpenClaw runtime env."""
for k in (
"OPENCLAW_ACCOUNT_ID",
"OPENCLAW_INBOUND_ACCOUNT_ID",
"ACCOUNT_ID",
):
v = os.getenv(k)
if v:
return v
return None
def load_openclaw_feishu_credentials(account_id=None, config_path=None):
cfg_path = Path(config_path) if config_path else OPENCLAW_CONFIG
if not cfg_path.exists():
return None, None
try:
cfg = json.loads(cfg_path.read_text(encoding="utf-8"))
accounts = (((cfg.get("channels") or {}).get("feishu") or {}).get("accounts") or [])
except Exception:
return None, None
if not accounts:
return None, None
target = None
if account_id is not None:
# 1) match by configured account id (e.g. default/dev-bot)
for a in accounts:
if str(a.get("id")) == str(account_id):
target = a
break
# 2) if numeric, match by index (inbound account_id is often "0"/"1")
if target is None and str(account_id).isdigit():
idx = int(str(account_id))
if 0 <= idx < len(accounts):
target = accounts[idx]
if target is None:
target = accounts[0]
return target.get("appId"), target.get("appSecret")
def load_template(template_name=None, template_file=None):
if template_file:
p = Path(template_file)
else:
p = TEMPLATE_DIR / f"{template_name}.json"
if not p.exists():
raise FileNotFoundError(f"template not found: {p}")
return p.read_text(encoding="utf-8"), p
def list_templates():
if not TEMPLATE_DIR.exists():
print("(no templates)")
return
files = sorted(TEMPLATE_DIR.glob("*.json"))
if not files:
print("(no templates)")
return
for f in files:
print(f.stem)
def main():
parser = argparse.ArgumentParser(description="Send Feishu interactive card from template")
parser.add_argument("--template", help="template name in assets/templates, without .json")
parser.add_argument("--template-file", help="absolute/relative path to template json")
parser.add_argument("--list-templates", action="store_true", help="list built-in templates")
parser.add_argument("--receive-id", help="open_id/user_id/chat_id")
parser.add_argument("--receive-id-type", default="open_id", choices=["open_id", "user_id", "chat_id", "union_id"], help="default: open_id")
parser.add_argument("--vars-file", help="env-like KEY=VALUE file")
parser.add_argument("--var", action="append", help="inline variable key=value, repeatable")
parser.add_argument("--poster-url", help="download image from URL, upload to Feishu, and fill poster_img_key")
parser.add_argument("--poster-file", help="read local image file, upload to Feishu, and fill poster_img_key")
parser.add_argument("--app-id", default=os.getenv("FEISHU_APP_ID"))
parser.add_argument("--app-secret", default=os.getenv("FEISHU_APP_SECRET"))
parser.add_argument("--account-id", help="OpenClaw feishu account id; use 'current' to auto-read from session context")
parser.add_argument("--config", help="OpenClaw config path (default: /root/.openclaw/openclaw.json)")
parser.add_argument("--dry-run", action="store_true", help="render and validate only, do not send")
args = parser.parse_args()
if args.list_templates:
list_templates()
return 0
if not args.template and not args.template_file:
eprint("ERROR: --template or --template-file is required")
return 2
if not args.receive_id:
eprint("ERROR: --receive-id is required")
return 2
app_id, app_secret = args.app_id, args.app_secret
account_id = args.account_id
if account_id == "current":
account_id = get_session_account_id()
if not app_id or not app_secret:
cfg_app_id, cfg_app_secret = load_openclaw_feishu_credentials(account_id, args.config)
app_id = app_id or cfg_app_id
app_secret = app_secret or cfg_app_secret
if not app_id or not app_secret:
eprint("ERROR: app credentials missing; set FEISHU_APP_ID/FEISHU_APP_SECRET, pass --app-id/--app-secret, or use --account-id with OpenClaw config")
return 2
try:
raw_template, template_path = load_template(args.template, args.template_file)
file_vars = parse_vars_file(args.vars_file)
cli_vars = parse_kv_pairs(args.var)
variables = {**file_vars, **cli_vars}
if not (variables.get("card_key") or "").strip():
variables["card_key"] = uuid.uuid4().hex
token = get_tenant_token(app_id, app_secret)
# Optional: upload poster first and inject poster_img_key
if args.poster_url or args.poster_file:
if args.poster_url:
req = request.Request(args.poster_url, headers={"User-Agent": "Mozilla/5.0 OpenClaw/feishu-card-sender"})
with request.urlopen(req, timeout=30) as resp:
image_bytes = resp.read()
filename = Path(args.poster_url.split("?")[0]).name or "poster.jpg"
else:
p = Path(args.poster_file)
if not p.exists():
raise FileNotFoundError(f"poster file not found: {args.poster_file}")
image_bytes = p.read_bytes()
filename = p.name
image_key = upload_image_bytes(token, image_bytes, filename=filename)
variables["poster_img_key"] = image_key
rules = load_rules(args.template)
ruled_template = apply_rules(raw_template, variables, rules)
rendered = render_template(ruled_template, variables)
# validate + normalize common types (e.g. "disabled": "false" -> false)
rendered_obj = json.loads(rendered)
rendered_obj = _coerce_common_types(rendered_obj)
rendered = json.dumps(rendered_obj, ensure_ascii=False)
if args.dry_run:
print(f"template: {template_path}")
print("dry_run: true")
print(f"rendered_size: {len(rendered)}")
print(rendered)
return 0
send_url = f"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type={args.receive_id_type}"
payload = {
"receive_id": args.receive_id,
"msg_type": "interactive",
"content": rendered,
}
status, body = post_json(send_url, payload, headers={"Authorization": f"Bearer {token}"})
print(f"template: {template_path}")
print(f"status: {status}")
print(f"response: {body}")
data = json.loads(body)
if variables.get("poster_img_key"):
print(f"poster_img_key: {variables.get('poster_img_key')}")
if status >= 400 or data.get("code") != 0:
return 1
msg_id = (data.get("data") or {}).get("message_id")
if msg_id:
print(f"message_id: {msg_id}")
# snapshot for delayed card update workflow
try:
save_snapshot(
card_key=variables.get("card_key"),
message_id=msg_id,
receive_id=args.receive_id,
account_id=str(account_id) if account_id is not None else None,
media_type=variables.get("media_type"),
tmdb_id=str(variables.get("tmdb_id")) if variables.get("tmdb_id") is not None else None,
title=variables.get("title"),
raw_card_json=rendered,
)
except Exception:
pass
return 0
except Exception as ex:
eprint(f"ERROR: {ex}")
return 1
if __name__ == "__main__":
sys.exit(main())
```
### references/template-params.md
```markdown
# 模板参数传递规范(给 agent)
本文件定义每个模板的参数清单、必填项和格式约束。调用 `scripts/send_feishu_card.py` 时,统一使用 `--var key=value` 传参。
---
## template: `movie-custom`
模板文件:`assets/templates/movie-custom.json`
### 必填参数
- `title`:电影标题(纯文本)
- `overview`:剧情简介(可多句)
- `rating`:评分文本(如 `8.3 / 10`)
- `runtime`:时长文本(如 `173 分钟`)
- `year`:上映日期/年份(如 `2023-01-22`)
- `genres`:类型列表文本(如 `科幻 / 灾难 / 冒险`)
- `country`:国家/地区(如 `中国` 或 `美国 / 英国`)
- `cast`:演员区块(Markdown 列表字符串)
- `director`:导演名
- `tagline`:一句话文案(可选;为空时自动不渲染该区块)
- `detail_url`:详情链接(TMDB 页面 URL,用于“查看详情”按钮)
- `tmdb_id`:TMDB 条目 ID(用于“立即订阅”回调)
- `media_type`:`movie`(可不传,规则默认补齐)
- `subscribe_disabled`:`true|false`(可选,控制“立即订阅”按钮禁用)
- `subscribe_button_text`:按钮文案(可选,默认“立即订阅”)
### 图片字段
- `poster_img_key`:飞书可用图片 key(`img_v3_...`),**必填**。
- 该字段用于模板中的海报渲染,不可直接传 URL。
- 若只有图片 URL/本地文件,可让脚本自动上传并注入:
- `--poster-url <https://...jpg>`
- `--poster-file </path/to/poster.jpg>`
### 重点:`cast` 参数样式(强约束)
`cast` 必须传 **Markdown 多行字符串**,每行一个演员,推荐格式:
```text
• **演员名** 饰 角色名
• **演员名** 饰 角色名
• **演员名** 饰 角色名
```
示例值:
```text
• **吴京** 饰 刘培强\n• **刘德华** 饰 图恒宇\n• **李雪健** 饰 周喆直
```
说明:
- 在 shell 里建议用单引号包裹整个值;换行用 `\n`。
- 不要传 JSON 数组,模板期望的是单个字符串。
### 一次完整示例
```bash
python3 scripts/send_feishu_card.py \
--template movie-custom \
--receive-id ou_xxx \
--receive-id-type open_id \
--account-id current \
--var title='流浪地球2' \
--var overview='太阳即将毁灭,人类启动数字生命与行星发动机双线计划。' \
--var rating='8.3 / 10' \
--var runtime='173 分钟' \
--var year='2023-01-22' \
--var genres='科幻 / 灾难 / 冒险' \
--var country='中国' \
--var cast='• **吴京** 饰 刘培强\n• **刘德华** 饰 图恒宇\n• **李雪健** 饰 周喆直' \
--var director='郭帆' \
--var tagline='爱是穿越一切的力量'
```
---
## cast 自动格式化工具
脚本:`scripts/format_cast.py`
用途:把演员 JSON 数组转换成 `cast` 字段要求的 Markdown 多行字符串。
### 输入示例
```json
[
{"name": "吴京", "role": "刘培强"},
{"name": "刘德华", "role": "图恒宇"},
{"name": "李雪健", "role": "周喆直"}
]
```
### 用法
```bash
# 输出真实多行
python3 scripts/format_cast.py --input-file cast.json
# 输出 shell 友好的 \n 转义串(推荐给 --var cast=...)
python3 scripts/format_cast.py --input-file cast.json --escape-newline
```
## agent 执行规则(必须遵守)
1. 发送前先核对模板参数清单,缺失必填参数就先补齐。
2. `cast` 一律按上面的 Markdown 列表格式传。
3. 不要在参数值里注入未转义双引号;优先用单引号包裹 shell 值。
4. 失败时原样返回飞书错误码和 log_id,便于排障。
```
### references/template-params-tv.md
```markdown
# template: `tv-custom`
模板文件:`assets/templates/tv-custom.json`
## 必填参数
- title
- overview
- rating
- first_air_date
- status
- seasons_episodes(示例:`3 季 / 24 集`)
- episode_runtime(示例:`45 分钟`)
- genres
- country
- creator
- cast(Markdown 多行字符串)
- detail_url(TMDB 详情页)
- tmdb_id(TMDB 条目 ID,用于“立即订阅”回调)
- media_type(`tv`,可不传,规则默认补齐)
- subscribe_disabled(`true|false`,可选,控制“立即订阅”按钮禁用)
- subscribe_button_text(可选,默认“立即订阅”)
- poster_img_key(或通过 `--poster-url/--poster-file` 自动注入)
## 可选参数
- tagline(为空时自动不渲染该区块)
## 发送示例
```bash
python3 scripts/send_feishu_card.py \
--template tv-custom \
--receive-id ou_xxx \
--receive-id-type open_id \
--account-id current \
--poster-url 'https://image.tmdb.org/t/p/original/xxx.jpg' \
--var title='黑镜' \
--var overview='...' \
--var rating='8.4 / 10' \
--var first_air_date='2011-12-04' \
--var status='Returning Series' \
--var seasons_episodes='6 季 / 27 集' \
--var episode_runtime='60 分钟' \
--var genres='剧情 / 科幻 / 惊悚' \
--var country='英国' \
--var creator='Charlie Brooker' \
--var cast='• **演员A** 饰 角色A' \
--var detail_url='https://www.themoviedb.org/tv/42009'
```
```
### scripts/format_cast.py
```python
#!/usr/bin/env python3
import argparse
import json
import sys
from pathlib import Path
def load_items(args):
if args.input_json:
return json.loads(args.input_json)
if args.input_file:
return json.loads(Path(args.input_file).read_text(encoding="utf-8"))
data = sys.stdin.read().strip()
if data:
return json.loads(data)
raise ValueError("No input provided. Use --input-json / --input-file / stdin")
def main():
p = argparse.ArgumentParser(description="Format cast JSON array to Feishu card markdown lines")
p.add_argument("--input-json", help='JSON string, e.g. [{"name":"吴京","role":"刘培强"}]')
p.add_argument("--input-file", help="Path to JSON file")
p.add_argument("--name-key", default="name")
p.add_argument("--role-key", default="role")
p.add_argument("--escape-newline", action="store_true", help="Output with \\n instead of real newlines")
args = p.parse_args()
items = load_items(args)
if not isinstance(items, list):
raise ValueError("Input must be a JSON array")
lines = []
for i in items:
if not isinstance(i, dict):
continue
name = str(i.get(args.name_key, "")).strip()
role = str(i.get(args.role_key, "")).strip()
if not name:
continue
if role:
lines.append(f"• **{name}** 饰 {role}")
else:
lines.append(f"• **{name}**")
out = "\n".join(lines)
if args.escape_newline:
out = out.replace("\n", "\\n")
print(out)
if __name__ == "__main__":
main()
```
### scripts/card_callback_router.py
```python
#!/usr/bin/env python3
import argparse
import json
import subprocess
import sys
from pathlib import Path
from urllib import request
from card_snapshot_store import find_snapshot_by_card_key, patch_subscribe_button
ENQUEUE = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/scripts/enqueue_callback.py')
SCRIPT_DIR = Path(__file__).resolve().parent
HANDLER = SCRIPT_DIR / "subscribe_callback_handler.py"
OPENCLAW_CONFIG = Path('/root/.openclaw/openclaw.json')
TOKEN_CACHE = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/tenant_token_cache.json')
def load_app_credentials(account_id: str | None):
cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding='utf-8'))
accounts = (((cfg.get('channels') or {}).get('feishu') or {}).get('accounts') or [])
if not accounts:
return None, None
if account_id is not None:
for a in accounts:
if str(a.get('id')) == str(account_id):
return a.get('appId'), a.get('appSecret')
if str(account_id).isdigit():
idx = int(str(account_id))
if 0 <= idx < len(accounts):
a = accounts[idx]
return a.get('appId'), a.get('appSecret')
a = accounts[0]
return a.get('appId'), a.get('appSecret')
def get_tenant_token(app_id: str, app_secret: str):
body = json.dumps({'app_id': app_id, 'app_secret': app_secret}).encode('utf-8')
req = request.Request('https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', data=body, method='POST', headers={'Content-Type': 'application/json'})
with request.urlopen(req, timeout=20) as resp:
data = json.loads(resp.read().decode('utf-8'))
return data.get('tenant_access_token'), int(data.get('expire', 7200))
def get_tenant_token_cached(app_id: str, app_secret: str):
now = __import__('time').time()
try:
if TOKEN_CACHE.exists():
c = json.loads(TOKEN_CACHE.read_text(encoding='utf-8'))
if c.get('app_id') == app_id and c.get('token') and float(c.get('exp_at', 0)) - now > 120:
return c['token']
except Exception:
pass
token, ttl = get_tenant_token(app_id, app_secret)
if token:
TOKEN_CACHE.parent.mkdir(parents=True, exist_ok=True)
TOKEN_CACHE.write_text(json.dumps({'app_id': app_id, 'token': token, 'exp_at': now + max(300, ttl - 60)}), encoding='utf-8')
return token
def update_message_card_by_id(message_id: str, tenant_token: str, raw_card_json: str):
card = json.loads(raw_card_json)
payload = {
'msg_type': 'interactive',
'content': json.dumps(card, ensure_ascii=False),
}
req = request.Request(
f'https://open.feishu.cn/open-apis/im/v1/messages/{message_id}',
data=json.dumps(payload, ensure_ascii=False).encode('utf-8'),
method='PATCH',
headers={'Authorization': f'Bearer {tenant_token}', 'Content-Type': 'application/json'},
)
with request.urlopen(req, timeout=20) as resp:
body = resp.read().decode('utf-8', errors='replace')
try:
return json.loads(body)
except Exception:
return {'raw': body}
def main():
p = argparse.ArgumentParser(description='Route Feishu card callback payloads')
p.add_argument('--payload', required=True, help='raw callback payload json')
p.add_argument('--channel', default='feishu')
p.add_argument('--user-id', required=True)
p.add_argument('--message-id', help='Inbound message_id, e.g. card-action-c-xxx')
p.add_argument('--account-id', help='Feishu account id/index for app credentials')
args = p.parse_args()
try:
payload = json.loads(args.payload)
except Exception:
print(json.dumps({'success': False, 'error': 'invalid_json'}, ensure_ascii=False))
return 2
action = payload.get('action')
if action != 'subscribe':
print(json.dumps({'success': False, 'error': 'unsupported_action', 'action': action}, ensure_ascii=False))
return 2
callback_token = None
if args.message_id:
mid = args.message_id.strip()
if mid.startswith('card-action-'):
callback_token = mid[len('card-action-'):]
tenant_token = None
app_id, app_secret = load_app_credentials(args.account_id)
if app_id and app_secret:
try:
tenant_token = get_tenant_token_cached(app_id, app_secret)
except Exception:
tenant_token = None
card_key = str(payload.get('card_key') or '')
snapshot = None
if card_key:
snapshot = find_snapshot_by_card_key(card_key)
message_id_for_update = (snapshot or {}).get('message_id') if isinstance(snapshot, dict) else None
processing_update = None
processing_update_error = None
if message_id_for_update and tenant_token and snapshot:
try:
processing_card = patch_subscribe_button(snapshot['raw_card_json'], text='处理中...', disabled=True)
processing_update = update_message_card_by_id(message_id_for_update, tenant_token, processing_card)
except Exception as ex:
processing_update_error = str(ex)
# Queue finalize job (subscribe + final patch)
enqueue_res = subprocess.run([
'python3', str(ENQUEUE),
'--channel', args.channel,
'--user-id', args.user_id,
'--account-id', str(args.account_id or '1'),
'--message-id', args.message_id or '',
'--payload', json.dumps(payload, ensure_ascii=False),
], capture_output=True, text=True)
out = {
'success': True,
'status': 'processing',
'message': '正在处理中,请稍候',
'debug_update': {
'processing': processing_update,
'processing_error': processing_update_error,
'has_snapshot': bool(snapshot),
'callback_token': bool(callback_token),
'message_id': message_id_for_update,
'enqueued': enqueue_res.returncode == 0,
'enqueue_stdout': (enqueue_res.stdout or '').strip(),
'enqueue_stderr': (enqueue_res.stderr or '').strip(),
}
}
print(json.dumps(out, ensure_ascii=False))
return 0
if __name__ == '__main__':
raise SystemExit(main())
```
### scripts/subscribe_callback_handler.py
```python
#!/usr/bin/env python3
import argparse
import json
import os
import time
import subprocess
from pathlib import Path
from urllib import parse, request, error
ROOT = Path(__file__).resolve().parent.parent
IDEMPOTENT_PATH = ROOT / "tmp" / "subscribe-callback-dedupe.json"
DEFAULT_BASE = "http://home.dobby.lol:1001"
def mk_toast(t: str, content: str):
return {"toast": {"type": t, "content": content}}
def pack_result(*, success: bool, status: str, message: str, subscribe_id=None, deduped=False, ui=None, error=None):
data = {
"success": success,
"status": status,
"message": message,
"deduped": deduped,
"subscribe_id": subscribe_id,
"ui": ui or {},
"callbackResponse": mk_toast("info", "处理中..."),
"delayedUpdate": {
"toast": mk_toast("success" if success else "error", message)["toast"],
"ui": ui or {},
},
}
if error:
data["error"] = error
return data
def load_cred(channel: str, user_id: str):
p = Path('/root/.openclaw/workspace-dev/skills/movipilot-api-foundation/scripts/mp_credential_store.py')
raw = subprocess.check_output(['python3', str(p), 'get', '--channel', channel, '--user-id', user_id], text=True).strip()
if not raw:
return {}
try:
return json.loads(raw)
except Exception:
return {}
def _ensure_store():
IDEMPOTENT_PATH.parent.mkdir(parents=True, exist_ok=True)
if not IDEMPOTENT_PATH.exists():
IDEMPOTENT_PATH.write_text('{}', encoding='utf-8')
def _load_store():
_ensure_store()
try:
return json.loads(IDEMPOTENT_PATH.read_text(encoding='utf-8') or '{}')
except Exception:
return {}
def _save_store(data):
_ensure_store()
IDEMPOTENT_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
def dedupe_key(payload: dict):
return f"subscribe:{payload.get('media_type')}:{payload.get('tmdb_id')}"
def get_recent(key: str, window_sec: int = 60):
db = _load_store()
item = db.get(key)
if not item:
return None
if time.time() - item.get('ts', 0) <= window_sec:
return item
return None
def mark_state(key: str, result: dict):
db = _load_store()
db[key] = {'ts': int(time.time()), **result}
_save_store(db)
def mp_get(url: str):
req = request.Request(url, headers={'User-Agent': 'OpenClaw/subscribe-callback'})
with request.urlopen(req, timeout=20) as resp:
return json.loads(resp.read().decode('utf-8', errors='replace'))
def mp_post(url: str, data: dict):
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
req = request.Request(url, data=body, method='POST', headers={'Content-Type': 'application/json', 'User-Agent': 'OpenClaw/subscribe-callback'})
with request.urlopen(req, timeout=20) as resp:
return json.loads(resp.read().decode('utf-8', errors='replace'))
def main():
ap = argparse.ArgumentParser(description='Handle Feishu card subscribe callback -> MoviePilot subscription')
ap.add_argument('--payload', required=True, help='JSON payload from callback')
ap.add_argument('--channel', default='feishu')
ap.add_argument('--user-id', required=True)
args = ap.parse_args()
payload = json.loads(args.payload)
if payload.get('action') != 'subscribe':
raise SystemExit('unsupported action')
media_type = payload.get('media_type')
tmdb_id = str(payload.get('tmdb_id') or '').strip()
if media_type not in ('movie', 'tv') or not tmdb_id.isdigit():
raise SystemExit('invalid payload: need media_type(movie|tv) and numeric tmdb_id')
key = dedupe_key(payload)
item = get_recent(key, window_sec=60)
if item:
status = item.get('status')
if status == 'processing':
print(json.dumps(pack_result(success=True, deduped=True, status='processing', message='正在处理中,请稍候', ui={'subscribe_disabled': True, 'subscribe_button_text': '处理中...'}), ensure_ascii=False))
return
print(json.dumps(pack_result(success=True, deduped=True, status=status, message='重复点击已忽略', subscribe_id=item.get('subscribe_id'), ui={'subscribe_disabled': True, 'subscribe_button_text': '✅ 已订阅' if status in ('created','exists') else '处理中...'}), ensure_ascii=False))
return
mark_state(key, {'success': True, 'status': 'processing', 'message': '正在处理中'})
cred = load_cred(args.channel, args.user_id)
token = cred.get('token')
base = cred.get('base_url') or os.getenv('MP_DEFAULT_BASE_URL') or DEFAULT_BASE
if not token:
raise SystemExit('missing movipilot token')
mediaid = f'tmdb:{tmdb_id}'
sub = mp_get(f"{base}/api/v1/subscribe/media/{parse.quote(mediaid)}?token={parse.quote(token)}")
if sub and sub.get('id'):
result = pack_result(success=True, deduped=False, status='exists', subscribe_id=sub.get('id'), message='已存在订阅', ui={'subscribe_disabled': True, 'subscribe_button_text': '✅ 已订阅'})
mark_state(key, result)
print(json.dumps(result, ensure_ascii=False))
return
type_name = '电影' if media_type == 'movie' else '电视剧'
# Fast path: create subscription directly from callback payload (avoid extra media lookup latency)
create_payload = {
'name': payload.get('title') or '',
'year': '',
'type': type_name,
'tmdbid': int(tmdb_id),
'mediaid': mediaid,
'season': None,
}
resp = mp_post(f"{base}/api/v1/subscribe/?token={parse.quote(token)}", create_payload)
# Fallback path: if direct create rejected, fetch media details then retry once
if not resp.get('success'):
media = mp_get(f"{base}/api/v1/media/{parse.quote(mediaid)}?type_name={parse.quote(type_name)}&token={parse.quote(token)}")
create_payload = {
'name': media.get('title') or payload.get('title') or '',
'year': str(media.get('year') or ''),
'type': type_name,
'tmdbid': int(tmdb_id),
'mediaid': mediaid,
'season': None,
}
resp = mp_post(f"{base}/api/v1/subscribe/?token={parse.quote(token)}", create_payload)
if not resp.get('success'):
raise RuntimeError(f'create subscribe failed: {resp}')
sid = (resp.get('data') or {}).get('id')
result = pack_result(success=True, deduped=False, status='created', subscribe_id=sid, message='新增订阅成功', ui={'subscribe_disabled': True, 'subscribe_button_text': '✅ 已订阅'})
mark_state(key, result)
print(json.dumps(result, ensure_ascii=False))
if __name__ == '__main__':
try:
main()
except error.HTTPError as ex:
raw = ex.read().decode('utf-8', errors='replace')
print(json.dumps(pack_result(success=False, status='failed', message='订阅失败,可重试', ui={'subscribe_disabled': False, 'subscribe_button_text': '订阅失败,重试'}, error=f'HTTP {ex.code}: {raw[:200]}'), ensure_ascii=False))
raise
except Exception as ex:
print(json.dumps(pack_result(success=False, status='failed', message='订阅失败,可重试', ui={'subscribe_disabled': False, 'subscribe_button_text': '订阅失败,重试'}, error=str(ex)), ensure_ascii=False))
raise
```
### assets/templates/movie-custom.json
```json
{
"schema": "2.0",
"config": {
"update_multi": true,
"style": {
"text_size": {
"normal_v2": {
"default": "normal",
"pc": "normal",
"mobile": "heading"
}
}
}
},
"body": {
"direction": "horizontal",
"horizontal_spacing": "8px",
"vertical_spacing": "8px",
"horizontal_align": "left",
"vertical_align": "top",
"elements": [
{
"tag": "column_set",
"flex_mode": "stretch",
"horizontal_align": "left",
"columns": [
{
"tag": "column",
"width": "weighted",
"elements": [
{
"tag": "img",
"img_key": "${poster_img_key}",
"preview": true,
"scale_type": "fit_horizontal",
"corner_radius": "8px"
}
],
"vertical_spacing": "8px",
"horizontal_align": "left",
"vertical_align": "top",
"weight": 1
},
{
"tag": "column",
"width": "weighted",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>📖 剧情简介</font>**\n${overview}",
"text_align": "left",
"text_size": "normal"
}
],
"vertical_spacing": "8px",
"horizontal_align": "left",
"vertical_align": "top",
"weight": 2
}
],
"margin": "0px 0px 0px 0px"
},
{
"tag": "column_set",
"flex_mode": "stretch",
"horizontal_spacing": "12px",
"horizontal_align": "left",
"columns": [
{
"tag": "column",
"width": "weighted",
"background_style": "blue-50",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>⭐ 评分</font>**",
"text_size": "normal"
},
{
"tag": "markdown",
"content": "${rating}",
"text_align": "left",
"text_size": "normal"
}
],
"padding": "12px 12px 12px 12px",
"vertical_spacing": "4px",
"horizontal_align": "left",
"vertical_align": "top",
"weight": 1
},
{
"tag": "column",
"width": "weighted",
"background_style": "blue-50",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>⏱️ 时长</font>**",
"text_size": "normal"
},
{
"tag": "markdown",
"content": "${runtime}",
"text_align": "left",
"text_size": "normal"
}
],
"padding": "12px 12px 12px 12px",
"vertical_spacing": "4px",
"horizontal_align": "left",
"vertical_align": "top",
"weight": 1
},
{
"tag": "column",
"width": "weighted",
"background_style": "blue-50",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>📅 上映</font>**",
"text_size": "normal"
},
{
"tag": "markdown",
"content": "${year}",
"text_align": "left",
"text_size": "normal"
}
],
"padding": "12px 12px 12px 12px",
"vertical_spacing": "4px",
"horizontal_align": "left",
"vertical_align": "top",
"weight": 1
}
],
"margin": "0px 0px 0px 0px"
},
{
"tag": "column_set",
"flex_mode": "stretch",
"horizontal_spacing": "12px",
"horizontal_align": "left",
"columns": [
{
"tag": "column",
"width": "weighted",
"background_style": "blue-50",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>🎭 类型</font>**",
"text_size": "normal"
},
{
"tag": "markdown",
"content": "${genres}",
"text_align": "left",
"text_size": "normal"
}
],
"padding": "12px 12px 12px 12px",
"vertical_spacing": "4px",
"horizontal_align": "left",
"vertical_align": "top",
"weight": 1
},
{
"tag": "column",
"width": "weighted",
"background_style": "blue-50",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>🌍 国家</font>**",
"text_size": "normal"
},
{
"tag": "markdown",
"content": "${country}",
"text_align": "left",
"text_size": "normal"
}
],
"padding": "12px 12px 12px 12px",
"vertical_spacing": "4px",
"horizontal_align": "left",
"vertical_align": "top",
"weight": 1
}
],
"margin": "0px 0px 0px 0px"
},
{
"tag": "markdown",
"content": "**<font color='blue-600'>🎭 主要演员</font>**\n${cast}",
"margin": "0px 0px 0px 0px"
},
{
"tag": "markdown",
"content": "**<font color='blue-600'>🎬 导演</font>** :${director}",
"text_align": "left",
"text_size": "normal_v2",
"margin": "0px 0px 0px 0px"
},
{
"tag": "markdown",
"content": "**💬 ${tagline}**",
"text_align": "left",
"text_size": "normal_v2",
"margin": "0px 0px 0px 0px"
},
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "查看详情"
},
"type": "default",
"width": "default",
"size": "medium",
"behaviors": [
{
"type": "open_url",
"default_url": "${detail_url}",
"pc_url": "",
"ios_url": "",
"android_url": ""
}
],
"margin": "0px 0px 0px 0px"
},
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "${subscribe_button_text}"
},
"type": "primary",
"width": "default",
"size": "medium",
"disabled": "${subscribe_disabled}",
"behaviors": [
{
"type": "callback",
"value": {
"action": "subscribe",
"media_type": "${media_type}",
"tmdb_id": "${tmdb_id}",
"title": "${title}",
"card_key": "${card_key}"
}
}
],
"margin": "0px 0px 0px 0px"
}
]
},
"header": {
"title": {
"tag": "plain_text",
"content": "🎬 《${title}》电影介绍"
},
"subtitle": {
"tag": "plain_text",
"content": ""
},
"template": "blue",
"padding": "12px 16px 12px 16px"
}
}
```
### assets/templates/tv-custom.json
```json
{
"schema": "2.0",
"config": {
"update_multi": true,
"style": {
"text_size": {
"normal_v2": {
"default": "normal",
"pc": "normal",
"mobile": "heading"
}
}
}
},
"body": {
"direction": "horizontal",
"horizontal_spacing": "8px",
"vertical_spacing": "8px",
"horizontal_align": "left",
"vertical_align": "top",
"elements": [
{
"tag": "column_set",
"flex_mode": "stretch",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 1,
"elements": [
{
"tag": "img",
"img_key": "${poster_img_key}",
"preview": true,
"scale_type": "fit_horizontal",
"corner_radius": "8px"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 2,
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>📖 剧集简介</font>**\n${overview}",
"text_align": "left",
"text_size": "normal"
}
]
}
]
},
{
"tag": "column_set",
"horizontal_spacing": "12px",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "blue-50",
"padding": "12px 12px 12px 12px",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>⭐ 评分</font>**"
},
{
"tag": "markdown",
"content": "${rating}"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "blue-50",
"padding": "12px 12px 12px 12px",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>📅 首播</font>**"
},
{
"tag": "markdown",
"content": "${first_air_date}"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "blue-50",
"padding": "12px 12px 12px 12px",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>📡 状态</font>**"
},
{
"tag": "markdown",
"content": "${status}"
}
]
}
]
},
{
"tag": "column_set",
"horizontal_spacing": "12px",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "blue-50",
"padding": "12px 12px 12px 12px",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>📺 季/集</font>**"
},
{
"tag": "markdown",
"content": "${seasons_episodes}"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "blue-50",
"padding": "12px 12px 12px 12px",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>⏱️ 单集时长</font>**"
},
{
"tag": "markdown",
"content": "${episode_runtime}"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "blue-50",
"padding": "12px 12px 12px 12px",
"elements": [
{
"tag": "markdown",
"content": "**<font color='blue-600'>🌍 国家/地区</font>**"
},
{
"tag": "markdown",
"content": "${country}"
}
]
}
]
},
{
"tag": "markdown",
"content": "**<font color='blue-600'>🎭 类型</font>**\n${genres}"
},
{
"tag": "markdown",
"content": "**<font color='blue-600'>🎬 主创</font>**:${creator}\n**<font color='blue-600'>👥 主演</font>**\n${cast}",
"text_size": "normal_v2"
},
{
"tag": "markdown",
"content": "**💬 \"${tagline}\"**",
"text_size": "normal_v2"
},
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "查看详情"
},
"type": "default",
"width": "default",
"size": "medium",
"behaviors": [
{
"type": "open_url",
"default_url": "${detail_url}",
"pc_url": "",
"ios_url": "",
"android_url": ""
}
]
},
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "${subscribe_button_text}"
},
"type": "primary",
"width": "default",
"size": "medium",
"disabled": "${subscribe_disabled}",
"behaviors": [
{
"type": "callback",
"value": {
"action": "subscribe",
"media_type": "${media_type}",
"tmdb_id": "${tmdb_id}",
"title": "${title}",
"card_key": "${card_key}"
}
}
],
"margin": "0px 0px 0px 0px"
}
]
},
"header": {
"title": {
"tag": "plain_text",
"content": "📺 《${title}》剧集介绍"
},
"template": "blue",
"padding": "12px 16px 12px 16px"
}
}
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "dobbey",
"slug": "feishu-card-sender",
"displayName": "Feishu Card Sender",
"latest": {
"version": "1.1.1",
"publishedAt": 1772723249604,
"commit": "https://github.com/openclaw/skills/commit/503104e56013f601671070086ed0d1e515e44031"
},
"history": []
}
```
### assets/rules/movie-custom.rules.json
```json
[
{
"action": "require_non_empty",
"field": "title",
"message": "title 不能为空"
},
{
"action": "require_non_empty",
"field": "overview",
"message": "overview 不能为空"
},
{
"action": "require_non_empty",
"field": "poster_img_key",
"message": "poster_img_key 不能为空(请传 --poster-url/--poster-file 或 --var poster_img_key=...)"
},
{
"action": "default_value",
"field": "country",
"value": "待公布"
},
{
"action": "default_value",
"field": "rating",
"value": "待公布"
},
{
"action": "default_value",
"field": "runtime",
"value": "待公布"
},
{
"action": "default_value",
"field": "year",
"value": "待公布"
},
{
"action": "remove_when_empty",
"field": "tagline",
"selector": {
"tag": "markdown",
"contentContains": "${tagline}"
}
},
{
"action": "remove_when_empty",
"field": "detail_url",
"selector": {
"tag": "button",
"textContains": "查看详情"
}
},
{
"action": "remove_when_empty",
"field": "tmdb_id",
"selector": {
"tag": "button",
"textContains": "立即订阅"
}
},
{
"action": "default_value",
"field": "media_type",
"value": "movie"
},
{
"action": "default_value",
"field": "subscribe_disabled",
"value": "false"
},
{
"action": "default_value",
"field": "subscribe_button_text",
"value": "立即订阅"
}
]
```
### assets/rules/tv-custom.rules.json
```json
[
{
"action": "require_non_empty",
"field": "title",
"message": "title 不能为空"
},
{
"action": "require_non_empty",
"field": "overview",
"message": "overview 不能为空"
},
{
"action": "require_non_empty",
"field": "poster_img_key",
"message": "poster_img_key 不能为空(请传 --poster-url/--poster-file 或 --var poster_img_key=...)"
},
{
"action": "default_value",
"field": "country",
"value": "待公布"
},
{
"action": "default_value",
"field": "status",
"value": "待公布"
},
{
"action": "default_value",
"field": "episode_runtime",
"value": "待公布"
},
{
"action": "remove_when_empty",
"field": "tagline",
"selector": {
"tag": "markdown",
"contentContains": "${tagline}"
}
},
{
"action": "remove_when_empty",
"field": "detail_url",
"selector": {
"tag": "button",
"textContains": "查看详情"
}
},
{
"action": "remove_when_empty",
"field": "tmdb_id",
"selector": {
"tag": "button",
"textContains": "立即订阅"
}
},
{
"action": "default_value",
"field": "media_type",
"value": "tv"
},
{
"action": "default_value",
"field": "subscribe_disabled",
"value": "false"
},
{
"action": "default_value",
"field": "subscribe_button_text",
"value": "立即订阅"
}
]
```
### references/callback-response-examples.json
```json
{
"processing": {
"toast": {"type": "info", "content": "处理中..."},
"card": {
"type": "raw",
"data": {
"_note": "原卡片JSON,按钮改为处理中+disabled=true"
}
}
},
"success": {
"toast": {"type": "success", "content": "订阅成功"},
"card": {
"type": "raw",
"data": {
"_note": "原卡片JSON,按钮改为✅已订阅+disabled=true"
}
}
},
"failed": {
"toast": {"type": "error", "content": "订阅失败,可重试"},
"card": {
"type": "raw",
"data": {
"_note": "原卡片JSON,按钮改为订阅失败,重试+disabled=false"
}
}
}
}
```
### references/feishu-callback-response-contract.md
```markdown
# Feishu 卡片回调 HTTP 响应契约(3 秒内)
目标:收到 `card.action.trigger` 后,服务端必须在 3 秒内返回 HTTP 200,并回传卡片更新体(raw)以实现**原位更新**。
## 1) 立即响应(processing)
- HTTP: `200`
- Body:
```json
{
"toast": {
"type": "info",
"content": "处理中..."
},
"card": {
"type": "raw",
"data": {
"schema": "2.0",
"...": "原卡片完整结构",
"body": {
"...": "原内容",
"elements": [
{
"tag": "button",
"text": {"tag": "plain_text", "content": "处理中..."},
"type": "primary",
"disabled": true,
"behaviors": [
{
"type": "callback",
"value": {
"action": "subscribe",
"media_type": "movie",
"tmdb_id": "1159559",
"title": "惊声尖叫7"
}
}
]
}
]
}
}
}
}
```
## 2) 异步执行业务
- 后台执行 MoviePilot 订阅流程:
- 幂等检查
- 已存在返回 exists
- 不存在则创建
## 3) 终态更新(延时更新接口)
业务完成后调用 Feishu 延时更新卡片接口:
- 成功/已存在:按钮 `✅ 已订阅`,`disabled=true`
- 失败:按钮 `订阅失败,重试`,`disabled=false`
## 4) OpenClaw 通道层改造点
1. Feishu callback handler 支持“透传 skill 返回体为 HTTP body”。
2. skill 返回结构新增字段:
- `callbackResponse`(立即响应)
- `delayedUpdate`(终态更新卡片 payload)
3. callback 请求上下文保留:
- `open_message_id`(用于延时更新)
- `tenant_key/chat_id`(必要时)
4. 超时保护:
- 2.5 秒内必须先回 200(至少 `{}` 或 toast)
## 5) 处理器输出建议
`card_callback_router.py` / `subscribe_callback_handler.py` 统一输出:
```json
{
"success": true,
"status": "processing|created|exists|failed",
"callbackResponse": {...},
"delayedUpdate": {...}
}
```
由通道层消费 `callbackResponse` 立即回包,`delayedUpdate` 走异步更新。
```
### references/subscribe-callback.md
```markdown
# 订阅回调处理(MoviePilot)
脚本:
- `scripts/card_callback_router.py`(统一入口)
- `scripts/subscribe_callback_handler.py`(订阅处理器)
## 输入
- `--channel`:渠道(默认 feishu)
- `--user-id`:用户ID(用于读取用户凭证)
- `--payload`:卡片回调 JSON
回调 payload 约定:
```json
{"action":"subscribe","media_type":"movie|tv","tmdb_id":"1419406","title":"..."}
```
## 行为
1. 校验 payload 必要字段。
2. 幂等去重(30 秒窗口,按 `media_type+tmdb_id`)。
3. 查询是否已订阅:`/api/v1/subscribe/media/tmdb:<id>`。
4. 未订阅时调用 `POST /api/v1/subscribe/` 新增订阅。
5. 输出标准结果 JSON:created / exists / deduped / error。
## 示例
```bash
python3 scripts/subscribe_callback_handler.py \
--channel feishu \
--user-id ou_xxx \
--payload '{"action":"subscribe","media_type":"movie","tmdb_id":"1419406","title":"捕风追影"}'
```
```
### references/vars.example.env
```bash
# 模板变量示例(KEY=VALUE)
title=星际穿越
genre=科幻 / 剧情
rating=8.7
year=2014
summary=一支探险队穿越虫洞,在时间与引力扭曲中寻找人类的新家园。
```
### scripts/callback_queue_worker.py
```python
#!/usr/bin/env python3
import json, time, subprocess
from pathlib import Path
QUEUE_DIR = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/callback-queue')
DONE_DIR = QUEUE_DIR / 'done'
FAIL_DIR = QUEUE_DIR / 'failed'
FINALIZER = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/scripts/card_callback_finalize.py')
PID_FILE = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/callback-queue-worker.pid')
def claim_job(path: Path):
processing = path.with_suffix('.processing')
try:
path.rename(processing)
return processing
except Exception:
return None
def run_job(job_file: Path):
data = json.loads(job_file.read_text(encoding='utf-8'))
cmd = [
'python3', str(FINALIZER),
'--channel', data.get('channel', 'feishu'),
'--user-id', data['user_id'],
'--account-id', str(data.get('account_id', '1')),
'--payload', json.dumps(data['payload'], ensure_ascii=False),
]
return subprocess.run(cmd, capture_output=True, text=True, timeout=180)
def recover_orphan_processing_files():
for p in QUEUE_DIR.glob('*.processing'):
try:
target = QUEUE_DIR / (p.stem + '.json')
p.rename(target)
except Exception:
try:
fail = FAIL_DIR / (p.stem + '.recover-fail.json')
fail.write_text(json.dumps({'ok': False, 'error': 'orphan_processing_recover_failed', 'file': str(p)}, ensure_ascii=False), encoding='utf-8')
except Exception:
pass
def cleanup_history(max_done=500, max_failed=500):
for d, cap in ((DONE_DIR, max_done), (FAIL_DIR, max_failed)):
files = sorted([p for p in d.glob('*') if p.is_file()], key=lambda p: p.stat().st_mtime, reverse=True)
for p in files[cap:]:
try:
p.unlink(missing_ok=True)
except Exception:
pass
def main():
QUEUE_DIR.mkdir(parents=True, exist_ok=True)
DONE_DIR.mkdir(parents=True, exist_ok=True)
FAIL_DIR.mkdir(parents=True, exist_ok=True)
PID_FILE.write_text(str(__import__('os').getpid()), encoding='utf-8')
recover_orphan_processing_files()
cleanup_history()
last_cleanup = time.time()
while True:
jobs = sorted([p for p in QUEUE_DIR.glob('*.json') if p.is_file()])
if time.time() - last_cleanup > 300:
cleanup_history()
last_cleanup = time.time()
if not jobs:
time.sleep(0.2)
continue
for job in jobs:
claimed = claim_job(job)
if not claimed:
continue
try:
res = run_job(claimed)
if res.returncode == 0:
target = DONE_DIR / (claimed.stem + '.done.json')
target.write_text(json.dumps({'ok': True, 'stdout': res.stdout.strip()}, ensure_ascii=False), encoding='utf-8')
else:
target = FAIL_DIR / (claimed.stem + '.fail.json')
target.write_text(json.dumps({'ok': False, 'code': res.returncode, 'stdout': res.stdout, 'stderr': res.stderr}, ensure_ascii=False), encoding='utf-8')
except Exception as ex:
target = FAIL_DIR / (claimed.stem + '.fail.json')
target.write_text(json.dumps({'ok': False, 'error': str(ex)}, ensure_ascii=False), encoding='utf-8')
finally:
try:
claimed.unlink(missing_ok=True)
except Exception:
pass
if __name__ == '__main__':
main()
```
### scripts/card_callback_finalize.py
```python
#!/usr/bin/env python3
import argparse, json, subprocess, time
from pathlib import Path
from urllib import request
from card_snapshot_store import find_snapshot_by_card_key, patch_subscribe_button
OPENCLAW_CONFIG = Path('/root/.openclaw/openclaw.json')
HANDLER = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/scripts/subscribe_callback_handler.py')
METRIC_LOG = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/callback-metrics.jsonl')
def load_app_credentials(account_id):
cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding='utf-8'))
accounts = (((cfg.get('channels') or {}).get('feishu') or {}).get('accounts') or [])
if not accounts:
return None, None
if account_id is not None:
for a in accounts:
if str(a.get('id')) == str(account_id):
return a.get('appId'), a.get('appSecret')
if str(account_id).isdigit():
idx = int(account_id)
if 0 <= idx < len(accounts):
a = accounts[idx]
return a.get('appId'), a.get('appSecret')
a = accounts[0]
return a.get('appId'), a.get('appSecret')
def get_tenant_token(app_id, app_secret):
body = json.dumps({'app_id': app_id, 'app_secret': app_secret}).encode()
req = request.Request('https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', data=body, method='POST', headers={'Content-Type':'application/json'})
with request.urlopen(req, timeout=20) as r:
return json.loads(r.read().decode()).get('tenant_access_token')
def patch_message(message_id, token, raw_card_json):
payload = {'msg_type':'interactive','content':json.dumps(json.loads(raw_card_json), ensure_ascii=False)}
req = request.Request(f'https://open.feishu.cn/open-apis/im/v1/messages/{message_id}', data=json.dumps(payload, ensure_ascii=False).encode(), method='PATCH', headers={'Authorization':f'Bearer {token}','Content-Type':'application/json'})
with request.urlopen(req, timeout=20) as r:
return json.loads(r.read().decode())
def log_metric(d):
METRIC_LOG.parent.mkdir(parents=True, exist_ok=True)
with METRIC_LOG.open('a', encoding='utf-8') as f:
f.write(json.dumps(d, ensure_ascii=False) + '\n')
def main():
ap=argparse.ArgumentParser()
ap.add_argument('--payload', required=True)
ap.add_argument('--channel', default='feishu')
ap.add_argument('--user-id', required=True)
ap.add_argument('--account-id', default='1')
args=ap.parse_args()
t0=time.time()
p=json.loads(args.payload)
card_key=str(p.get('card_key') or '')
snap=find_snapshot_by_card_key(card_key)
if not snap:
log_metric({'ok':False,'stage':'snapshot_missing','card_key':card_key,'t':time.time()})
return 1
app_id, app_secret = load_app_credentials(args.account_id)
token = get_tenant_token(app_id, app_secret)
t1=time.time()
res=subprocess.run(['python3', str(HANDLER), '--channel', args.channel, '--user-id', args.user_id, '--payload', args.payload], capture_output=True, text=True)
out={}
try:
out=json.loads((res.stdout or '').strip() or '{}')
except Exception:
out={'success':False}
t2=time.time()
ok=bool(out.get('success'))
final=patch_subscribe_button(snap['raw_card_json'], text='✅ 已订阅' if ok else '订阅失败,重试', disabled=True if ok else False)
patch_resp=patch_message(snap['message_id'], token, final)
t3=time.time()
log_metric({
'ok':ok,
'card_key':card_key,
'tmdb_id':p.get('tmdb_id'),
'media_type':p.get('media_type'),
't_click':t0,
't_subscribe_done':t2,
't_final_patch_done':t3,
'dt_subscribe_ms':int((t2-t1)*1000),
'dt_total_ms':int((t3-t0)*1000),
'patch_resp':patch_resp,
})
return 0
if __name__=='__main__':
raise SystemExit(main())
```
### scripts/card_snapshot_store.py
```python
#!/usr/bin/env python3
import json
import sqlite3
import time
from pathlib import Path
DB_PATH = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/card_snapshots.db')
def _conn():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
c = sqlite3.connect(str(DB_PATH))
c.execute(
'''CREATE TABLE IF NOT EXISTS card_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at INTEGER NOT NULL,
card_key TEXT,
message_id TEXT,
receive_id TEXT,
account_id TEXT,
media_type TEXT,
tmdb_id TEXT,
title TEXT,
raw_card_json TEXT NOT NULL
)'''
)
try:
c.execute('ALTER TABLE card_snapshots ADD COLUMN card_key TEXT')
except Exception:
pass
try:
c.execute('ALTER TABLE card_snapshots ADD COLUMN message_id TEXT')
except Exception:
pass
c.execute('CREATE INDEX IF NOT EXISTS idx_snap_lookup ON card_snapshots(receive_id, media_type, tmdb_id, created_at DESC)')
c.execute('CREATE INDEX IF NOT EXISTS idx_snap_card_key ON card_snapshots(card_key, created_at DESC)')
return c
def save_snapshot(*, card_key: str | None, message_id: str | None, receive_id: str, account_id: str | None, media_type: str | None, tmdb_id: str | None, title: str | None, raw_card_json: str):
conn = _conn()
try:
conn.execute(
'INSERT INTO card_snapshots(created_at, card_key, message_id, receive_id, account_id, media_type, tmdb_id, title, raw_card_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
(int(time.time()), card_key or '', message_id or '', receive_id, account_id or '', media_type or '', tmdb_id or '', title or '', raw_card_json),
)
conn.commit()
finally:
conn.close()
def find_latest_snapshot(*, receive_id: str, media_type: str, tmdb_id: str):
conn = _conn()
try:
cur = conn.execute(
'SELECT raw_card_json, account_id, title, created_at, card_key, message_id FROM card_snapshots WHERE receive_id=? AND media_type=? AND tmdb_id=? ORDER BY created_at DESC LIMIT 1',
(receive_id, media_type, tmdb_id),
)
row = cur.fetchone()
if not row:
return None
return {
'raw_card_json': row[0],
'account_id': row[1] or None,
'title': row[2] or None,
'created_at': row[3],
'card_key': row[4] or None,
'message_id': row[5] or None,
}
finally:
conn.close()
def find_snapshot_by_card_key(card_key: str):
conn = _conn()
try:
cur = conn.execute(
'SELECT raw_card_json, account_id, title, created_at, receive_id, media_type, tmdb_id, message_id FROM card_snapshots WHERE card_key=? ORDER BY created_at DESC LIMIT 1',
(card_key,),
)
row = cur.fetchone()
if not row:
return None
return {
'raw_card_json': row[0],
'account_id': row[1] or None,
'title': row[2] or None,
'created_at': row[3],
'receive_id': row[4] or None,
'media_type': row[5] or None,
'tmdb_id': row[6] or None,
'message_id': row[7] or None,
}
finally:
conn.close()
def patch_subscribe_button(raw_card_json: str, *, text: str, disabled: bool):
card = json.loads(raw_card_json)
body = card.get('body') if isinstance(card, dict) else None
elements = body.get('elements') if isinstance(body, dict) else None
if not isinstance(elements, list):
return raw_card_json
for el in elements:
if not isinstance(el, dict):
continue
if el.get('tag') != 'button':
continue
txt = ((el.get('text') or {}).get('content') if isinstance(el.get('text'), dict) else '') or ''
if txt in ('立即订阅', '✅ 已订阅', '处理中...', '订阅失败,重试'):
el['disabled'] = bool(disabled)
if isinstance(el.get('text'), dict):
el['text']['content'] = text
else:
el['text'] = {'tag': 'plain_text', 'content': text}
# Some Feishu clients render badly when disabled button still carries callback behaviors.
if bool(disabled):
el.pop('behaviors', None)
break
return json.dumps(card, ensure_ascii=False)
```
### scripts/enqueue_callback.py
```python
#!/usr/bin/env python3
import argparse, json, time, uuid
from pathlib import Path
QUEUE_DIR = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/callback-queue')
def main():
p = argparse.ArgumentParser(description='enqueue card callback job')
p.add_argument('--payload', required=True)
p.add_argument('--message-id', required=True)
p.add_argument('--user-id', required=True)
p.add_argument('--account-id', default='1')
p.add_argument('--channel', default='feishu')
args = p.parse_args()
payload = json.loads(args.payload)
job = {
'payload': payload,
'message_id': args.message_id,
'user_id': args.user_id,
'account_id': args.account_id,
'channel': args.channel,
'enqueued_at': time.time(),
}
QUEUE_DIR.mkdir(parents=True, exist_ok=True)
name = f"{int(time.time()*1000)}-{uuid.uuid4().hex}.json"
f = QUEUE_DIR / name
f.write_text(json.dumps(job, ensure_ascii=False), encoding='utf-8')
print(json.dumps({'ok': True, 'job': str(f)}, ensure_ascii=False))
if __name__ == '__main__':
main()
```
### scripts/feishu_callback_worker.py
```python
#!/usr/bin/env python3
import json
import os
import time
import hashlib
import re
import base64
import tempfile
from datetime import datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
import subprocess
from pathlib import Path
HOST = '0.0.0.0'
PORT = 18081
ROUTER = '/root/.openclaw/workspace-dev/skills/feishu-card-sender/scripts/card_callback_router.py'
USER_ID_FALLBACK = os.getenv('FEISHU_CALLBACK_USER_FALLBACK', '').strip()
ACCOUNT_ID = os.getenv('FEISHU_CALLBACK_ACCOUNT_ID', '1').strip() or '1'
ENCRYPT_KEY = os.getenv('FEISHU_CALLBACK_ENCRYPT_KEY', '').strip()
MAX_SKEW_SECONDS = int(os.getenv('FEISHU_CALLBACK_MAX_SKEW_SECONDS', '300'))
class H(BaseHTTPRequestHandler):
def _log(self, event, **kwargs):
payload = {'event': event, 'ts': int(time.time()), **kwargs}
print(json.dumps(payload, ensure_ascii=False), flush=True)
def _parse_ts_to_epoch(self, ts: str):
# 1) epoch seconds / milliseconds
try:
n = int(float(ts))
if n > 10_000_000_000:
n = n // 1000
return n
except Exception:
pass
# 2) Go-style datetime: "2026-03-05 22:25:56.839226736 +0800 CST m=+..."
m = re.search(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})(?:\.(\d{1,9}))?\s+([+-]\d{4})', ts)
if m:
base, frac, offset = m.group(1), (m.group(2) or ''), m.group(3)
frac6 = (frac + '000000')[:6]
dt = datetime.strptime(f"{base}.{frac6} {offset}", "%Y-%m-%d %H:%M:%S.%f %z")
return int(dt.timestamp())
raise ValueError(f'invalid_timestamp:{ts}')
def _decrypt_if_needed(self, req_obj):
# Feishu may send encrypted envelope: {"encrypt": "..."}
if not isinstance(req_obj, dict) or 'encrypt' not in req_obj:
return req_obj
if not ENCRYPT_KEY:
raise ValueError('encrypt_key_not_configured')
enc = str(req_obj.get('encrypt') or '')
raw = base64.b64decode(enc)
iv, ct = raw[:16], raw[16:]
keyhex = hashlib.sha256(ENCRYPT_KEY.encode('utf-8')).hexdigest()
ivhex = iv.hex()
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(ct)
inpath = f.name
try:
res = subprocess.run(
['openssl', 'enc', '-d', '-aes-256-cbc', '-K', keyhex, '-iv', ivhex, '-in', inpath],
capture_output=True,
timeout=10,
)
if res.returncode != 0:
raise ValueError(f'decrypt_failed:{res.stderr.decode("utf-8", errors="replace")[:120]}')
txt = res.stdout.decode('utf-8', errors='replace')
return json.loads(txt)
finally:
try:
os.unlink(inpath)
except Exception:
pass
def _verify_signature(self, raw_body: str):
if not ENCRYPT_KEY:
return True, 'no_encrypt_key'
ts = self.headers.get('X-Lark-Request-Timestamp') or self.headers.get('X-Lark-Request-Ts')
nonce = self.headers.get('X-Lark-Request-Nonce')
sig = self.headers.get('X-Lark-Signature')
if not ts or not nonce or not sig:
return False, 'missing_signature_headers'
try:
ts_i = self._parse_ts_to_epoch(ts)
except Exception as ex:
return False, str(ex)
if abs(int(time.time()) - ts_i) > MAX_SKEW_SECONDS:
return False, 'timestamp_skew'
base = f"{ts}{nonce}{ENCRYPT_KEY}{raw_body}".encode('utf-8')
calc = hashlib.sha256(base).hexdigest()
if calc != sig:
return False, 'signature_mismatch'
return True, 'ok'
def _send(self, code=200, obj=None):
self.send_response(code)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(obj or {}, ensure_ascii=False).encode('utf-8'))
def _send_html(self, code=200, html=''):
self.send_response(code)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
self.end_headers()
if html:
self.wfile.write(html.encode('utf-8'))
def do_HEAD(self):
p = urlparse(self.path)
if p.path == '/feishu/callback':
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
self.end_headers()
return
self.send_response(404)
self.end_headers()
def do_GET(self):
p = urlparse(self.path)
if p.path != '/feishu/callback':
return self._send_html(404, '<h1>404 Not Found</h1>')
qs = parse_qs(p.query or '')
lang_force = ((qs.get('lang') or [''])[0]).strip().lower()
if lang_force in ('zh', 'zh-cn', 'cn'):
use_zh = True
elif lang_force in ('en', 'en-us', 'en-gb'):
use_zh = False
else:
accept_lang = (self.headers.get('Accept-Language') or '').lower()
use_zh = ('zh' in accept_lang) or not accept_lang
t = {
'lang': 'zh-CN' if use_zh else 'en',
'title': '飞书回调 · 太空网关' if use_zh else 'Feishu Callback · Space Gateway',
'badge': '太空网关在线' if use_zh else 'SPACE GATEWAY ONLINE',
'h1_left': '飞书回调' if use_zh else 'Feishu Callback',
'h1_right': '接口已就绪' if use_zh else 'Endpoint is Ready',
'desc': '这里是飞书事件回调入口,专用于服务端 POST 调用。你在浏览器看到这个页面,说明域名解析、HTTPS 与反向代理都已连通。' if use_zh else 'This is the Feishu event callback endpoint for server-side POST calls. Seeing this page in your browser means DNS, HTTPS, and reverse proxy are all reachable.',
'endpoint_k': '回调路径' if use_zh else 'Endpoint',
'proto_k': '协议状态' if use_zh else 'Protocol',
'proto_v': 'HTTPS · 反向代理已启用' if use_zh else 'HTTPS · Reverse Proxy Active',
'hint': '提示:此页面仅用于可达性确认;真实回调由飞书服务器触发,无需人工访问。' if use_zh else 'Note: This page is only for reachability checks. Real callbacks are triggered by Feishu servers, no manual access required.'
}
html = """
<!doctype html>
<html lang=\"{t['lang']}\">
<head>
<meta charset=\"utf-8\" />
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no, viewport-fit=cover\" />
<title>{t['title']}</title>
<style>
:root{
--bg:#030712;
--bg2:#0b1020;
--txt:#eaf2ff;
--muted:#b8c7e6;
--primary:#78c7ff;
--line:rgba(150,190,255,.28);
--glass:rgba(8,14,33,.55);
}
*{box-sizing:border-box}
html,body{height:100%}
body{
margin:0;
overflow:hidden;
font-family:Inter,-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,Helvetica,Arial,sans-serif;
color:var(--txt);
background:
radial-gradient(1200px 700px at 10% 90%, rgba(79,118,255,.22), transparent 60%),
radial-gradient(900px 600px at 90% 10%, rgba(0,210,255,.18), transparent 62%),
radial-gradient(1000px 800px at 50% 50%, #0b1331 0%, var(--bg) 68%);
}
.stars,.stars2,.stars3{position:fixed; inset:-20%; pointer-events:none;}
.stars:before,.stars2:before,.stars3:before{
content:""; position:absolute; inset:0;
background-repeat:repeat;
animation:drift linear infinite;
opacity:.65;
}
.stars:before{background-image:radial-gradient(2px 2px at 20px 30px,#fff,transparent),radial-gradient(1px 1px at 120px 80px,#d7e8ff,transparent),radial-gradient(1.5px 1.5px at 220px 150px,#fff,transparent);background-size:260px 220px;animation-duration:120s}
.stars2:before{background-image:radial-gradient(1px 1px at 40px 60px,#9fd8ff,transparent),radial-gradient(2px 2px at 180px 120px,#fff,transparent),radial-gradient(1px 1px at 260px 40px,#c8dcff,transparent);background-size:320px 260px;animation-duration:180s;opacity:.4}
.stars3:before{background-image:radial-gradient(2px 2px at 80px 20px,#fff,transparent),radial-gradient(1px 1px at 200px 200px,#c2f2ff,transparent);background-size:400px 320px;animation-duration:240s;opacity:.28}
@keyframes drift{from{transform:translate3d(0,0,0)}to{transform:translate3d(-220px,-160px,0)}}
.nebula{position:fixed; inset:0; pointer-events:none; filter:blur(30px); opacity:.55}
.nebula:before,.nebula:after{content:""; position:absolute; border-radius:50%}
.nebula:before{width:42vw;height:42vw;left:-10vw;top:50vh;background:radial-gradient(circle,#4f70ff88,transparent 70%)}
.nebula:after{width:34vw;height:34vw;right:-8vw;top:8vh;background:radial-gradient(circle,#2dd4ff77,transparent 72%)}
.wrap{position:relative; z-index:2; height:100vh; min-height:100dvh; display:flex; align-items:center; justify-content:center; padding:max(16px,env(safe-area-inset-top)) max(16px,env(safe-area-inset-right)) max(16px,env(safe-area-inset-bottom)) max(16px,env(safe-area-inset-left))}
.panel{
width:min(980px,96vw);
padding:18px 18px 16px;
position:relative;
text-align:center;
margin:0 auto;
display:flex;
flex-direction:column;
align-items:center;
justify-content:center;
}
.badge{display:inline-flex;align-items:center;gap:8px;padding:6px 12px;border:1px solid rgba(143,214,255,.35);border-radius:999px;background:rgba(31,56,95,.35);color:#cde9ff;font-size:12px;letter-spacing:.3px}
.dot{width:8px;height:8px;border-radius:50%;background:#5cf2a5;box-shadow:0 0 12px #5cf2a5}
h1{margin:14px 0 12px;font-size:38px;line-height:1.12;letter-spacing:.3px}
.grad{background:linear-gradient(90deg,#f4f9ff,#8cd2ff 35%,#7db2ff 70%,#d8e9ff);-webkit-background-clip:text;background-clip:text;color:transparent}
p{margin:10px 0;color:var(--muted);font-size:16px;line-height:1.75}
.meta{margin-top:18px;display:grid;grid-template-columns:1fr 1fr;gap:12px}
.cell{padding:8px 10px;border:1px solid rgba(141,180,255,.20);border-radius:999px;background:transparent}
.k{display:block;font-size:12px;color:#9db4df;margin-bottom:4px}
code{color:var(--primary);background:rgba(8,16,36,.8);border:1px solid rgba(120,170,255,.25);padding:2px 7px;border-radius:8px}
.hint{margin-top:16px;font-size:13px;color:#93a9d3}
@media (max-width:760px){
h1{font-size:30px}
.meta{grid-template-columns:1fr}
.panel{padding:24px 20px}
}
</style>
</head>
<body>
<div class=\"stars\"></div>
<div class=\"stars2\"></div>
<div class=\"stars3\"></div>
<div class=\"nebula\"></div>
<main class=\"wrap\">
<section class=\"panel\">
<span class=\"badge\"><span class=\"dot\"></span>{t['badge']}</span>
<h1><span class=\"grad\">{t['h1_left']}</span> {t['h1_right']}</h1>
<p>{t['desc'].replace('POST', '<code>POST</code>')}</p>
<div class=\"meta\">
<div class=\"cell\"><span class=\"k\">{t['endpoint_k']}</span><code>/feishu/callback</code></div>
<div class=\"cell\"><span class=\"k\">{t['proto_k']}</span><code>{t['proto_v']}</code></div>
</div>
<p class=\"hint\">{t['hint']}</p>
</section>
</main>
<script>
// Hard-disable pinch zoom for in-app webviews (iOS/Android)
(function(){
document.addEventListener('gesturestart', function(e){ e.preventDefault(); }, {passive:false});
document.addEventListener('gesturechange', function(e){ e.preventDefault(); }, {passive:false});
document.addEventListener('gestureend', function(e){ e.preventDefault(); }, {passive:false});
document.addEventListener('touchmove', function(e){ if (e.touches && e.touches.length > 1) e.preventDefault(); }, {passive:false});
let lastTouchEnd = 0;
document.addEventListener('touchend', function(e){
const now = Date.now();
if (now - lastTouchEnd <= 300) e.preventDefault(); // prevent double-tap zoom
lastTouchEnd = now;
}, {passive:false});
})();
</script>
</body>
</html>
""".strip()
html = (html
.replace("{t['lang']}", t['lang'])
.replace("{t['title']}", t['title'])
.replace("{t['badge']}", t['badge'])
.replace("{t['h1_left']}", t['h1_left'])
.replace("{t['h1_right']}", t['h1_right'])
.replace("{t['desc'].replace('POST', '<code>POST</code>')}", t['desc'].replace('POST', '<code>POST</code>'))
.replace("{t['endpoint_k']}", t['endpoint_k'])
.replace("{t['proto_k']}", t['proto_k'])
.replace("{t['proto_v']}", t['proto_v'])
.replace("{t['hint']}", t['hint']))
return self._send_html(200, html)
def do_POST(self):
p = urlparse(self.path)
if p.path != '/feishu/callback':
return self._send(404, {'error': 'not_found'})
n = int(self.headers.get('Content-Length', '0'))
raw = self.rfile.read(n).decode('utf-8', errors='replace')
ok_sig, reason = self._verify_signature(raw)
if not ok_sig:
self._log('callback_rejected', reason=reason, path=p.path)
return self._send(401, {'error': 'invalid_signature', 'reason': reason})
try:
req = json.loads(raw)
req = self._decrypt_if_needed(req)
except Exception as ex:
self._log('callback_rejected', reason='invalid_json_or_decrypt_failed', detail=str(ex), path=p.path)
return self._send(400, {'error': 'invalid_json_or_decrypt_failed'})
# url_verification
if req.get('type') == 'url_verification':
self._log('url_verification', path=p.path)
return self._send(200, {'challenge': req.get('challenge', '')})
# Strict mode: follow Feishu latest encrypted callback payload
# Expected shape after decrypt: {header:{event_type}, event:{token, action.value, operator.open_id}}
header = req.get('header') or {}
event = req.get('event') or {}
event_type = header.get('event_type')
action_value = ((event.get('action') or {}).get('value') or {})
token = str(event.get('token') or '')
user_id = ((event.get('operator') or {}).get('open_id')) or USER_ID_FALLBACK
if event_type != 'card.action.trigger' or not token or not isinstance(action_value, dict):
self._log('callback_rejected', reason='invalid_payload_shape', event_type=event_type, keys=list(req.keys())[:20])
return self._send(200, {'toast': {'type': 'error', 'content': '回调结构不符合预期'}})
if not user_id:
self._log('callback_rejected', reason='missing_user_id', token=token)
return self._send(200, {'toast': {'type': 'error', 'content': '缺少用户信息,无法处理'}})
# 1) immediate ack in <3s (toast only)
self._send(200, {'toast': {'type': 'info', 'content': '处理中...'}})
# 2) async heavy work
payload = json.dumps(action_value, ensure_ascii=False)
msg_id = f'card-action-{token}' if token else ''
self._log('callback_accepted', token=token, user_id=user_id, account_id=ACCOUNT_ID)
subprocess.Popen([
'python3', ROUTER,
'--channel', 'feishu',
'--user-id', user_id,
'--account-id', ACCOUNT_ID,
'--message-id', msg_id,
'--payload', payload,
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def main():
server = ThreadingHTTPServer((HOST, PORT), H)
print(f'feishu_callback_worker listening on {HOST}:{PORT}')
server.serve_forever()
if __name__ == '__main__':
main()
```