Back to skills
SkillHub ClubShip Full StackFull Stack

feishu-card-sender

通过飞书 OpenAPI 发送卡片消息(interactive card),支持模板化 JSON 卡片与变量替换。用于用户要求“发送飞书卡片/模板消息/互动卡片”时,或需要把结构化通知发到指定 open_id/chat_id 时。该技能只走 OpenAPI(appid/appsecret + tenant_access_token),不使用 message 通道。

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,087
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
C61.1

Install command

npx @skill-hub/cli install openclaw-skills-feishu-card-sender

Repository

openclaw/skills

Skill path: skills/dobbey/feishu-card-sender

通过飞书 OpenAPI 发送卡片消息(interactive card),支持模板化 JSON 卡片与变量替换。用于用户要求“发送飞书卡片/模板消息/互动卡片”时,或需要把结构化通知发到指定 open_id/chat_id 时。该技能只走 OpenAPI(appid/appsecret + tenant_access_token),不使用 message 通道。

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install feishu-card-sender into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding feishu-card-sender to shared team environments
  • Use feishu-card-sender for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: feishu-card-sender
description: 通过飞书 OpenAPI 发送卡片消息(interactive card),支持模板化 JSON 卡片与变量替换。用于用户要求“发送飞书卡片/模板消息/互动卡片”时,或需要把结构化通知发到指定 open_id/chat_id 时。该技能只走 OpenAPI(appid/appsecret + tenant_access_token),不使用 message 通道。
---

# Feishu Card Sender

## 概览

用统一方式通过飞书 OpenAPI 发送卡片消息,避免每次手写 curl。
本技能固定使用 `scripts/send_feishu_card.py`(appid/appsecret 鉴权),不走 `message` 通道。

## 快速决策

1. **发送到当前会话用户** → 使用当前会话 `sender_id` 作为 `--receive-id`(open_id)
2. **发送到指定用户/群** → 显式传 `--receive-id` + `--receive-id-type`
3. **需要复用模板** → 把模板放到 `assets/templates/*.json`,再用 `--template` + 变量替换

## 发送方式:脚本发送(OpenAPI,唯一方式)

脚本:`scripts/send_feishu_card.py`

### 凭证

按以下优先级自动获取(从高到低):
1. 命令参数:`--app-id` / `--app-secret`
2. 环境变量:`FEISHU_APP_ID` / `FEISHU_APP_SECRET`
3. OpenClaw 配置:`/root/.openclaw/openclaw.json` 的 `channels.feishu.accounts`
   - 可用 `--account-id` 指定账户(不传则取第一个)
   - 可用 `--account-id current` 自动读取会话上下文 account_id(从运行时环境变量 best-effort 获取)

### 常用命令

```bash
# 1) 列出内置模板
python3 scripts/send_feishu_card.py --list-templates

# 2) 用 movie 模板发送到 open_id(变量文件)
export FEISHU_APP_ID="cli_xxx"
export FEISHU_APP_SECRET="xxx"
python3 scripts/send_feishu_card.py \
  --template movie \
  --receive-id ou_xxx \
  --receive-id-type open_id \
  --vars-file references/vars.example.env

# 3) 直接指定模板文件 + 行内变量
python3 scripts/send_feishu_card.py \
  --template-file assets/templates/movie.json \
  --receive-id ou_xxx \
  --var title='星际穿越' \
  --var rating='8.7'

# 4) 只有海报 URL 时,自动上传飞书并注入 poster_img_key
python3 scripts/send_feishu_card.py \
  --template movie-custom \
  --receive-id ou_xxx \
  --receive-id-type open_id \
  --poster-url 'https://image.tmdb.org/t/p/original/xxx.jpg' \
  --var title='星际穿越' \
  --var overview='...'
```

## 变量替换规则

- 模板中占位符写法:`${key}`
- 变量来源(后者覆盖前者):
  1) `--vars-file`(`KEY=VALUE`)
  2) 多次 `--var key=value`
- 未提供的变量保持原样,不报错(便于渐进调试)

## 参数规范(必读)

发送前先读:
- `references/template-params.md`(movie-custom)
- `references/template-params-tv.md`(tv-custom)

- 文档定义了每个模板的必填参数、字段含义、示例命令。
- `cast`(演员)字段必须按 Markdown 多行列表字符串传递。

## 文件结构

- `scripts/send_feishu_card.py`:获取 token、渲染模板、发送消息
- `scripts/format_cast.py`:把演员 JSON 自动转为 `cast` 字段字符串
- `scripts/card_callback_router.py`:卡片回调统一路由入口
- `scripts/subscribe_callback_handler.py`:处理“立即订阅”回调并写入 MoviePilot(含幂等)
- `assets/templates/movie-custom.json`:电影详情模板(movie)
- `assets/templates/tv-custom.json`:剧集详情模板(tv)
- `assets/rules/*.rules.json`:通用规则(支持 require_non_empty / default_value / 条件删减区块)
- `references/vars.example.env`:变量文件示例
- `references/template-params.md`:模板参数与传参格式规范

## Card-Action 自动处理规则(方案 B)

当收到 Feishu 卡片回调消息时(`message_id` 形如 `card-action-c-...`,内容含 `{"action":"subscribe"...}`):

1. 自动提取 `callback_token`(从 `message_id` 去掉 `card-action-` 前缀)
2. 自动执行:
   - 先延时更新卡片为“处理中...”并禁用
   - 执行 MoviePilot 订阅(幂等)
   - 再延时更新卡片为“✅ 已订阅”或“❌ 订阅失败,请重试”
3. 无需用户手动提供 token/account 参数

## 发送后回复约束

- 卡片发送成功后,默认不要再发“已发送 + message_id”的额外文本。
- 仅在用户明确要求回执时,才返回 `message_id`。

## 安全与约束

- 不在脚本里硬编码 App Secret
- 日志默认不打印密钥
- 失败时返回飞书原始错误码与错误信息,便于排障


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### scripts/send_feishu_card.py

```python
#!/usr/bin/env python3
import argparse
import json
import os
import re
import sys
import uuid
import mimetypes
from pathlib import Path
from urllib import request, error
from card_snapshot_store import save_snapshot


ROOT = Path(__file__).resolve().parent.parent
TEMPLATE_DIR = ROOT / "assets" / "templates"
RULES_DIR = ROOT / "assets" / "rules"
OPENCLAW_CONFIG = Path("/root/.openclaw/openclaw.json")


def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)


def parse_kv_pairs(items):
    data = {}
    for item in items or []:
        if "=" not in item:
            raise ValueError(f"Invalid --var '{item}', expected key=value")
        k, v = item.split("=", 1)
        data[k.strip()] = v
    return data


def parse_vars_file(path):
    data = {}
    if not path:
        return data
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"vars file not found: {path}")

    for raw in p.read_text(encoding="utf-8").splitlines():
        line = raw.strip()
        if not line or line.startswith("#"):
            continue
        if "=" not in line:
            continue
        k, v = line.split("=", 1)
        data[k.strip()] = v.lstrip()
    return data


def render_template(text, variables):
    pattern = re.compile(r"\$\{([a-zA-Z0-9_\-\.]+)\}")

    def repl(match):
        key = match.group(1)
        return variables.get(key, match.group(0))

    return pattern.sub(repl, text)


def load_rules(template_name=None):
    if not template_name:
        return []
    p = RULES_DIR / f"{template_name}.rules.json"
    if not p.exists():
        return []
    try:
        data = json.loads(p.read_text(encoding="utf-8"))
        return data if isinstance(data, list) else []
    except Exception:
        return []


def _selector_match(element, selector):
    if not isinstance(element, dict) or not isinstance(selector, dict):
        return False

    tag = selector.get("tag")
    if tag and element.get("tag") != tag:
        return False

    content_contains = selector.get("contentContains")
    if content_contains is not None:
        if content_contains not in str(element.get("content", "")):
            return False

    text_contains = selector.get("textContains")
    if text_contains is not None:
        txt = ""
        text_obj = element.get("text")
        if isinstance(text_obj, dict):
            txt = str(text_obj.get("content", ""))
        if text_contains not in txt:
            return False

    return True


def _is_blankish(value):
    if value is None:
        return True
    s = str(value)
    if not s:
        return True
    # Remove common quotes/symbol wrappers and all unicode whitespaces
    compact = re.sub(r"[\s\"'“”‘’「」『』]+", "", s, flags=re.UNICODE)
    return compact == ""


def apply_rules(template_text, variables, rules):
    try:
        obj = json.loads(template_text)
    except Exception:
        return template_text

    if not rules:
        return template_text

    # pass-1: variable level rules
    for rule in rules:
        if not isinstance(rule, dict):
            continue
        action = rule.get("action")
        field = rule.get("field")

        if action == "default_value":
            if _is_blankish(variables.get(field)):
                variables[field] = str(rule.get("value", ""))

        elif action == "require_non_empty":
            if _is_blankish(variables.get(field)):
                msg = rule.get("message") or f"required field empty: {field}"
                raise ValueError(msg)

    # pass-2: element pruning rules
    body = obj.get("body") if isinstance(obj, dict) else None
    elements = body.get("elements") if isinstance(body, dict) else None
    if not isinstance(elements, list):
        return json.dumps(obj, ensure_ascii=False)

    for rule in rules:
        if not isinstance(rule, dict):
            continue
        action = rule.get("action")
        selector = rule.get("selector") or {}
        field = rule.get("field")

        if action == "remove_when_empty":
            if not _is_blankish(variables.get(field)):
                continue
            elements = [el for el in elements if not _selector_match(el, selector)]

        elif action == "remove_when_missing":
            if variables.get(field):
                continue
            elements = [el for el in elements if not _selector_match(el, selector)]

    body["elements"] = elements
    return json.dumps(obj, ensure_ascii=False)


def _coerce_common_types(node):
    if isinstance(node, dict):
        out = {}
        for k, v in node.items():
            vv = _coerce_common_types(v)
            if k in ("disabled",) and isinstance(vv, str):
                lv = vv.strip().lower()
                if lv == "true":
                    vv = True
                elif lv == "false":
                    vv = False
            out[k] = vv
        return out
    if isinstance(node, list):
        return [_coerce_common_types(x) for x in node]
    return node


def post_json(url, payload, headers=None):
    data = json.dumps(payload).encode("utf-8")
    req = request.Request(url, data=data, method="POST")
    req.add_header("Content-Type", "application/json; charset=utf-8")
    for k, v in (headers or {}).items():
        req.add_header(k, v)
    try:
        with request.urlopen(req, timeout=20) as resp:
            body = resp.read().decode("utf-8", errors="replace")
            return resp.status, body
    except error.HTTPError as ex:
        body = ex.read().decode("utf-8", errors="replace")
        return ex.code, body


def upload_image_bytes(token, image_bytes, filename="poster.jpg"):
    boundary = f"----OpenClawBoundary{uuid.uuid4().hex}"
    crlf = "\r\n"

    file_content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"

    parts = []
    parts.append((f"--{boundary}{crlf}").encode("utf-8"))
    parts.append((f'Content-Disposition: form-data; name="image_type"{crlf}{crlf}message{crlf}').encode("utf-8"))

    parts.append((f"--{boundary}{crlf}").encode("utf-8"))
    parts.append(
        (
            f'Content-Disposition: form-data; name="image"; filename="{filename}"{crlf}'
            f"Content-Type: {file_content_type}{crlf}{crlf}"
        ).encode("utf-8")
    )
    parts.append(image_bytes)
    parts.append(crlf.encode("utf-8"))
    parts.append((f"--{boundary}--{crlf}").encode("utf-8"))

    body = b"".join(parts)
    req = request.Request("https://open.feishu.cn/open-apis/im/v1/images", data=body, method="POST")
    req.add_header("Authorization", f"Bearer {token}")
    req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")

    try:
        with request.urlopen(req, timeout=30) as resp:
            raw = resp.read().decode("utf-8", errors="replace")
            data = json.loads(raw)
            if resp.status >= 400 or data.get("code") != 0:
                raise RuntimeError(f"upload image failed: status={resp.status}, body={raw}")
            image_key = ((data.get("data") or {}).get("image_key"))
            if not image_key:
                raise RuntimeError(f"upload image succeeded but image_key missing: body={raw}")
            return image_key
    except error.HTTPError as ex:
        raw = ex.read().decode("utf-8", errors="replace")
        raise RuntimeError(f"upload image failed: status={ex.code}, body={raw}")


def get_tenant_token(app_id, app_secret):
    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
    status, body = post_json(url, {"app_id": app_id, "app_secret": app_secret})
    try:
        data = json.loads(body)
    except json.JSONDecodeError:
        raise RuntimeError(f"token response is not json, status={status}, body={body[:500]}")

    if status >= 400 or data.get("code") != 0:
        raise RuntimeError(f"get token failed: status={status}, body={body}")

    token = data.get("tenant_access_token")
    if not token:
        raise RuntimeError(f"tenant_access_token missing: body={body}")
    return token


def get_session_account_id():
    """Best-effort read account_id from OpenClaw runtime env."""
    for k in (
        "OPENCLAW_ACCOUNT_ID",
        "OPENCLAW_INBOUND_ACCOUNT_ID",
        "ACCOUNT_ID",
    ):
        v = os.getenv(k)
        if v:
            return v
    return None


def load_openclaw_feishu_credentials(account_id=None, config_path=None):
    cfg_path = Path(config_path) if config_path else OPENCLAW_CONFIG
    if not cfg_path.exists():
        return None, None

    try:
        cfg = json.loads(cfg_path.read_text(encoding="utf-8"))
        accounts = (((cfg.get("channels") or {}).get("feishu") or {}).get("accounts") or [])
    except Exception:
        return None, None

    if not accounts:
        return None, None

    target = None
    if account_id is not None:
        # 1) match by configured account id (e.g. default/dev-bot)
        for a in accounts:
            if str(a.get("id")) == str(account_id):
                target = a
                break
        # 2) if numeric, match by index (inbound account_id is often "0"/"1")
        if target is None and str(account_id).isdigit():
            idx = int(str(account_id))
            if 0 <= idx < len(accounts):
                target = accounts[idx]
    if target is None:
        target = accounts[0]

    return target.get("appId"), target.get("appSecret")


def load_template(template_name=None, template_file=None):
    if template_file:
        p = Path(template_file)
    else:
        p = TEMPLATE_DIR / f"{template_name}.json"

    if not p.exists():
        raise FileNotFoundError(f"template not found: {p}")
    return p.read_text(encoding="utf-8"), p


def list_templates():
    if not TEMPLATE_DIR.exists():
        print("(no templates)")
        return
    files = sorted(TEMPLATE_DIR.glob("*.json"))
    if not files:
        print("(no templates)")
        return
    for f in files:
        print(f.stem)


def main():
    parser = argparse.ArgumentParser(description="Send Feishu interactive card from template")
    parser.add_argument("--template", help="template name in assets/templates, without .json")
    parser.add_argument("--template-file", help="absolute/relative path to template json")
    parser.add_argument("--list-templates", action="store_true", help="list built-in templates")

    parser.add_argument("--receive-id", help="open_id/user_id/chat_id")
    parser.add_argument("--receive-id-type", default="open_id", choices=["open_id", "user_id", "chat_id", "union_id"], help="default: open_id")

    parser.add_argument("--vars-file", help="env-like KEY=VALUE file")
    parser.add_argument("--var", action="append", help="inline variable key=value, repeatable")
    parser.add_argument("--poster-url", help="download image from URL, upload to Feishu, and fill poster_img_key")
    parser.add_argument("--poster-file", help="read local image file, upload to Feishu, and fill poster_img_key")

    parser.add_argument("--app-id", default=os.getenv("FEISHU_APP_ID"))
    parser.add_argument("--app-secret", default=os.getenv("FEISHU_APP_SECRET"))
    parser.add_argument("--account-id", help="OpenClaw feishu account id; use 'current' to auto-read from session context")
    parser.add_argument("--config", help="OpenClaw config path (default: /root/.openclaw/openclaw.json)")
    parser.add_argument("--dry-run", action="store_true", help="render and validate only, do not send")

    args = parser.parse_args()

    if args.list_templates:
        list_templates()
        return 0

    if not args.template and not args.template_file:
        eprint("ERROR: --template or --template-file is required")
        return 2

    if not args.receive_id:
        eprint("ERROR: --receive-id is required")
        return 2

    app_id, app_secret = args.app_id, args.app_secret
    account_id = args.account_id
    if account_id == "current":
        account_id = get_session_account_id()

    if not app_id or not app_secret:
        cfg_app_id, cfg_app_secret = load_openclaw_feishu_credentials(account_id, args.config)
        app_id = app_id or cfg_app_id
        app_secret = app_secret or cfg_app_secret

    if not app_id or not app_secret:
        eprint("ERROR: app credentials missing; set FEISHU_APP_ID/FEISHU_APP_SECRET, pass --app-id/--app-secret, or use --account-id with OpenClaw config")
        return 2

    try:
        raw_template, template_path = load_template(args.template, args.template_file)
        file_vars = parse_vars_file(args.vars_file)
        cli_vars = parse_kv_pairs(args.var)
        variables = {**file_vars, **cli_vars}
        if not (variables.get("card_key") or "").strip():
            variables["card_key"] = uuid.uuid4().hex

        token = get_tenant_token(app_id, app_secret)

        # Optional: upload poster first and inject poster_img_key
        if args.poster_url or args.poster_file:
            if args.poster_url:
                req = request.Request(args.poster_url, headers={"User-Agent": "Mozilla/5.0 OpenClaw/feishu-card-sender"})
                with request.urlopen(req, timeout=30) as resp:
                    image_bytes = resp.read()
                filename = Path(args.poster_url.split("?")[0]).name or "poster.jpg"
            else:
                p = Path(args.poster_file)
                if not p.exists():
                    raise FileNotFoundError(f"poster file not found: {args.poster_file}")
                image_bytes = p.read_bytes()
                filename = p.name

            image_key = upload_image_bytes(token, image_bytes, filename=filename)
            variables["poster_img_key"] = image_key

        rules = load_rules(args.template)
        ruled_template = apply_rules(raw_template, variables, rules)
        rendered = render_template(ruled_template, variables)

        # validate + normalize common types (e.g. "disabled": "false" -> false)
        rendered_obj = json.loads(rendered)
        rendered_obj = _coerce_common_types(rendered_obj)
        rendered = json.dumps(rendered_obj, ensure_ascii=False)

        if args.dry_run:
            print(f"template: {template_path}")
            print("dry_run: true")
            print(f"rendered_size: {len(rendered)}")
            print(rendered)
            return 0

        send_url = f"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type={args.receive_id_type}"
        payload = {
            "receive_id": args.receive_id,
            "msg_type": "interactive",
            "content": rendered,
        }
        status, body = post_json(send_url, payload, headers={"Authorization": f"Bearer {token}"})

        print(f"template: {template_path}")
        print(f"status: {status}")
        print(f"response: {body}")

        data = json.loads(body)
        if variables.get("poster_img_key"):
            print(f"poster_img_key: {variables.get('poster_img_key')}")
        if status >= 400 or data.get("code") != 0:
            return 1
        msg_id = (data.get("data") or {}).get("message_id")
        if msg_id:
            print(f"message_id: {msg_id}")

        # snapshot for delayed card update workflow
        try:
            save_snapshot(
                card_key=variables.get("card_key"),
                message_id=msg_id,
                receive_id=args.receive_id,
                account_id=str(account_id) if account_id is not None else None,
                media_type=variables.get("media_type"),
                tmdb_id=str(variables.get("tmdb_id")) if variables.get("tmdb_id") is not None else None,
                title=variables.get("title"),
                raw_card_json=rendered,
            )
        except Exception:
            pass
        return 0
    except Exception as ex:
        eprint(f"ERROR: {ex}")
        return 1


if __name__ == "__main__":
    sys.exit(main())

```

### references/template-params.md

```markdown
# 模板参数传递规范(给 agent)

本文件定义每个模板的参数清单、必填项和格式约束。调用 `scripts/send_feishu_card.py` 时,统一使用 `--var key=value` 传参。

---

## template: `movie-custom`

模板文件:`assets/templates/movie-custom.json`

### 必填参数

- `title`:电影标题(纯文本)
- `overview`:剧情简介(可多句)
- `rating`:评分文本(如 `8.3 / 10`)
- `runtime`:时长文本(如 `173 分钟`)
- `year`:上映日期/年份(如 `2023-01-22`)
- `genres`:类型列表文本(如 `科幻 / 灾难 / 冒险`)
- `country`:国家/地区(如 `中国` 或 `美国 / 英国`)
- `cast`:演员区块(Markdown 列表字符串)
- `director`:导演名
- `tagline`:一句话文案(可选;为空时自动不渲染该区块)
- `detail_url`:详情链接(TMDB 页面 URL,用于“查看详情”按钮)
- `tmdb_id`:TMDB 条目 ID(用于“立即订阅”回调)
- `media_type`:`movie`(可不传,规则默认补齐)
- `subscribe_disabled`:`true|false`(可选,控制“立即订阅”按钮禁用)
- `subscribe_button_text`:按钮文案(可选,默认“立即订阅”)

### 图片字段

- `poster_img_key`:飞书可用图片 key(`img_v3_...`),**必填**。
- 该字段用于模板中的海报渲染,不可直接传 URL。
- 若只有图片 URL/本地文件,可让脚本自动上传并注入:
  - `--poster-url <https://...jpg>`
  - `--poster-file </path/to/poster.jpg>`

### 重点:`cast` 参数样式(强约束)

`cast` 必须传 **Markdown 多行字符串**,每行一个演员,推荐格式:

```text
• **演员名** 饰 角色名
• **演员名** 饰 角色名
• **演员名** 饰 角色名
```

示例值:

```text
• **吴京** 饰 刘培强\n• **刘德华** 饰 图恒宇\n• **李雪健** 饰 周喆直
```

说明:
- 在 shell 里建议用单引号包裹整个值;换行用 `\n`。
- 不要传 JSON 数组,模板期望的是单个字符串。

### 一次完整示例

```bash
python3 scripts/send_feishu_card.py \
  --template movie-custom \
  --receive-id ou_xxx \
  --receive-id-type open_id \
  --account-id current \
  --var title='流浪地球2' \
  --var overview='太阳即将毁灭,人类启动数字生命与行星发动机双线计划。' \
  --var rating='8.3 / 10' \
  --var runtime='173 分钟' \
  --var year='2023-01-22' \
  --var genres='科幻 / 灾难 / 冒险' \
  --var country='中国' \
  --var cast='• **吴京** 饰 刘培强\n• **刘德华** 饰 图恒宇\n• **李雪健** 饰 周喆直' \
  --var director='郭帆' \
  --var tagline='爱是穿越一切的力量'
```

---

## cast 自动格式化工具

脚本:`scripts/format_cast.py`

用途:把演员 JSON 数组转换成 `cast` 字段要求的 Markdown 多行字符串。

### 输入示例

```json
[
  {"name": "吴京", "role": "刘培强"},
  {"name": "刘德华", "role": "图恒宇"},
  {"name": "李雪健", "role": "周喆直"}
]
```

### 用法

```bash
# 输出真实多行
python3 scripts/format_cast.py --input-file cast.json

# 输出 shell 友好的 \n 转义串(推荐给 --var cast=...)
python3 scripts/format_cast.py --input-file cast.json --escape-newline
```

## agent 执行规则(必须遵守)

1. 发送前先核对模板参数清单,缺失必填参数就先补齐。
2. `cast` 一律按上面的 Markdown 列表格式传。
3. 不要在参数值里注入未转义双引号;优先用单引号包裹 shell 值。
4. 失败时原样返回飞书错误码和 log_id,便于排障。

```

### references/template-params-tv.md

```markdown
# template: `tv-custom`

模板文件:`assets/templates/tv-custom.json`

## 必填参数

- title
- overview
- rating
- first_air_date
- status
- seasons_episodes(示例:`3 季 / 24 集`)
- episode_runtime(示例:`45 分钟`)
- genres
- country
- creator
- cast(Markdown 多行字符串)
- detail_url(TMDB 详情页)
- tmdb_id(TMDB 条目 ID,用于“立即订阅”回调)
- media_type(`tv`,可不传,规则默认补齐)
- subscribe_disabled(`true|false`,可选,控制“立即订阅”按钮禁用)
- subscribe_button_text(可选,默认“立即订阅”)
- poster_img_key(或通过 `--poster-url/--poster-file` 自动注入)

## 可选参数

- tagline(为空时自动不渲染该区块)

## 发送示例

```bash
python3 scripts/send_feishu_card.py \
  --template tv-custom \
  --receive-id ou_xxx \
  --receive-id-type open_id \
  --account-id current \
  --poster-url 'https://image.tmdb.org/t/p/original/xxx.jpg' \
  --var title='黑镜' \
  --var overview='...' \
  --var rating='8.4 / 10' \
  --var first_air_date='2011-12-04' \
  --var status='Returning Series' \
  --var seasons_episodes='6 季 / 27 集' \
  --var episode_runtime='60 分钟' \
  --var genres='剧情 / 科幻 / 惊悚' \
  --var country='英国' \
  --var creator='Charlie Brooker' \
  --var cast='• **演员A** 饰 角色A' \
  --var detail_url='https://www.themoviedb.org/tv/42009'
```

```

### scripts/format_cast.py

```python
#!/usr/bin/env python3
import argparse
import json
import sys
from pathlib import Path


def load_items(args):
    if args.input_json:
        return json.loads(args.input_json)
    if args.input_file:
        return json.loads(Path(args.input_file).read_text(encoding="utf-8"))
    data = sys.stdin.read().strip()
    if data:
        return json.loads(data)
    raise ValueError("No input provided. Use --input-json / --input-file / stdin")


def main():
    p = argparse.ArgumentParser(description="Format cast JSON array to Feishu card markdown lines")
    p.add_argument("--input-json", help='JSON string, e.g. [{"name":"吴京","role":"刘培强"}]')
    p.add_argument("--input-file", help="Path to JSON file")
    p.add_argument("--name-key", default="name")
    p.add_argument("--role-key", default="role")
    p.add_argument("--escape-newline", action="store_true", help="Output with \\n instead of real newlines")
    args = p.parse_args()

    items = load_items(args)
    if not isinstance(items, list):
        raise ValueError("Input must be a JSON array")

    lines = []
    for i in items:
        if not isinstance(i, dict):
            continue
        name = str(i.get(args.name_key, "")).strip()
        role = str(i.get(args.role_key, "")).strip()
        if not name:
            continue
        if role:
            lines.append(f"• **{name}** 饰 {role}")
        else:
            lines.append(f"• **{name}**")

    out = "\n".join(lines)
    if args.escape_newline:
        out = out.replace("\n", "\\n")
    print(out)


if __name__ == "__main__":
    main()

```

### scripts/card_callback_router.py

```python
#!/usr/bin/env python3
import argparse
import json
import subprocess
import sys
from pathlib import Path
from urllib import request
from card_snapshot_store import find_snapshot_by_card_key, patch_subscribe_button

ENQUEUE = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/scripts/enqueue_callback.py')

SCRIPT_DIR = Path(__file__).resolve().parent
HANDLER = SCRIPT_DIR / "subscribe_callback_handler.py"
OPENCLAW_CONFIG = Path('/root/.openclaw/openclaw.json')
TOKEN_CACHE = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/tenant_token_cache.json')


def load_app_credentials(account_id: str | None):
    cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding='utf-8'))
    accounts = (((cfg.get('channels') or {}).get('feishu') or {}).get('accounts') or [])
    if not accounts:
        return None, None
    if account_id is not None:
        for a in accounts:
            if str(a.get('id')) == str(account_id):
                return a.get('appId'), a.get('appSecret')
        if str(account_id).isdigit():
            idx = int(str(account_id))
            if 0 <= idx < len(accounts):
                a = accounts[idx]
                return a.get('appId'), a.get('appSecret')
    a = accounts[0]
    return a.get('appId'), a.get('appSecret')


def get_tenant_token(app_id: str, app_secret: str):
    body = json.dumps({'app_id': app_id, 'app_secret': app_secret}).encode('utf-8')
    req = request.Request('https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', data=body, method='POST', headers={'Content-Type': 'application/json'})
    with request.urlopen(req, timeout=20) as resp:
        data = json.loads(resp.read().decode('utf-8'))
    return data.get('tenant_access_token'), int(data.get('expire', 7200))


def get_tenant_token_cached(app_id: str, app_secret: str):
    now = __import__('time').time()
    try:
        if TOKEN_CACHE.exists():
            c = json.loads(TOKEN_CACHE.read_text(encoding='utf-8'))
            if c.get('app_id') == app_id and c.get('token') and float(c.get('exp_at', 0)) - now > 120:
                return c['token']
    except Exception:
        pass

    token, ttl = get_tenant_token(app_id, app_secret)
    if token:
        TOKEN_CACHE.parent.mkdir(parents=True, exist_ok=True)
        TOKEN_CACHE.write_text(json.dumps({'app_id': app_id, 'token': token, 'exp_at': now + max(300, ttl - 60)}), encoding='utf-8')
    return token


def update_message_card_by_id(message_id: str, tenant_token: str, raw_card_json: str):
    card = json.loads(raw_card_json)
    payload = {
        'msg_type': 'interactive',
        'content': json.dumps(card, ensure_ascii=False),
    }
    req = request.Request(
        f'https://open.feishu.cn/open-apis/im/v1/messages/{message_id}',
        data=json.dumps(payload, ensure_ascii=False).encode('utf-8'),
        method='PATCH',
        headers={'Authorization': f'Bearer {tenant_token}', 'Content-Type': 'application/json'},
    )
    with request.urlopen(req, timeout=20) as resp:
        body = resp.read().decode('utf-8', errors='replace')
        try:
            return json.loads(body)
        except Exception:
            return {'raw': body}


def main():
    p = argparse.ArgumentParser(description='Route Feishu card callback payloads')
    p.add_argument('--payload', required=True, help='raw callback payload json')
    p.add_argument('--channel', default='feishu')
    p.add_argument('--user-id', required=True)
    p.add_argument('--message-id', help='Inbound message_id, e.g. card-action-c-xxx')
    p.add_argument('--account-id', help='Feishu account id/index for app credentials')
    args = p.parse_args()

    try:
        payload = json.loads(args.payload)
    except Exception:
        print(json.dumps({'success': False, 'error': 'invalid_json'}, ensure_ascii=False))
        return 2

    action = payload.get('action')
    if action != 'subscribe':
        print(json.dumps({'success': False, 'error': 'unsupported_action', 'action': action}, ensure_ascii=False))
        return 2

    callback_token = None
    if args.message_id:
        mid = args.message_id.strip()
        if mid.startswith('card-action-'):
            callback_token = mid[len('card-action-'):]

    tenant_token = None
    app_id, app_secret = load_app_credentials(args.account_id)
    if app_id and app_secret:
        try:
            tenant_token = get_tenant_token_cached(app_id, app_secret)
        except Exception:
            tenant_token = None

    card_key = str(payload.get('card_key') or '')
    snapshot = None
    if card_key:
        snapshot = find_snapshot_by_card_key(card_key)

    message_id_for_update = (snapshot or {}).get('message_id') if isinstance(snapshot, dict) else None

    processing_update = None
    processing_update_error = None
    if message_id_for_update and tenant_token and snapshot:
        try:
            processing_card = patch_subscribe_button(snapshot['raw_card_json'], text='处理中...', disabled=True)
            processing_update = update_message_card_by_id(message_id_for_update, tenant_token, processing_card)
        except Exception as ex:
            processing_update_error = str(ex)

    # Queue finalize job (subscribe + final patch)
    enqueue_res = subprocess.run([
        'python3', str(ENQUEUE),
        '--channel', args.channel,
        '--user-id', args.user_id,
        '--account-id', str(args.account_id or '1'),
        '--message-id', args.message_id or '',
        '--payload', json.dumps(payload, ensure_ascii=False),
    ], capture_output=True, text=True)

    out = {
        'success': True,
        'status': 'processing',
        'message': '正在处理中,请稍候',
        'debug_update': {
            'processing': processing_update,
            'processing_error': processing_update_error,
            'has_snapshot': bool(snapshot),
            'callback_token': bool(callback_token),
            'message_id': message_id_for_update,
            'enqueued': enqueue_res.returncode == 0,
            'enqueue_stdout': (enqueue_res.stdout or '').strip(),
            'enqueue_stderr': (enqueue_res.stderr or '').strip(),
        }
    }
    print(json.dumps(out, ensure_ascii=False))
    return 0


if __name__ == '__main__':
    raise SystemExit(main())

```

### scripts/subscribe_callback_handler.py

```python
#!/usr/bin/env python3
import argparse
import json
import os
import time
import subprocess
from pathlib import Path
from urllib import parse, request, error

ROOT = Path(__file__).resolve().parent.parent
IDEMPOTENT_PATH = ROOT / "tmp" / "subscribe-callback-dedupe.json"
DEFAULT_BASE = "http://home.dobby.lol:1001"


def mk_toast(t: str, content: str):
    return {"toast": {"type": t, "content": content}}


def pack_result(*, success: bool, status: str, message: str, subscribe_id=None, deduped=False, ui=None, error=None):
    data = {
        "success": success,
        "status": status,
        "message": message,
        "deduped": deduped,
        "subscribe_id": subscribe_id,
        "ui": ui or {},
        "callbackResponse": mk_toast("info", "处理中..."),
        "delayedUpdate": {
            "toast": mk_toast("success" if success else "error", message)["toast"],
            "ui": ui or {},
        },
    }
    if error:
        data["error"] = error
    return data


def load_cred(channel: str, user_id: str):
    p = Path('/root/.openclaw/workspace-dev/skills/movipilot-api-foundation/scripts/mp_credential_store.py')
    raw = subprocess.check_output(['python3', str(p), 'get', '--channel', channel, '--user-id', user_id], text=True).strip()
    if not raw:
        return {}
    try:
        return json.loads(raw)
    except Exception:
        return {}


def _ensure_store():
    IDEMPOTENT_PATH.parent.mkdir(parents=True, exist_ok=True)
    if not IDEMPOTENT_PATH.exists():
        IDEMPOTENT_PATH.write_text('{}', encoding='utf-8')


def _load_store():
    _ensure_store()
    try:
        return json.loads(IDEMPOTENT_PATH.read_text(encoding='utf-8') or '{}')
    except Exception:
        return {}


def _save_store(data):
    _ensure_store()
    IDEMPOTENT_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')


def dedupe_key(payload: dict):
    return f"subscribe:{payload.get('media_type')}:{payload.get('tmdb_id')}"


def get_recent(key: str, window_sec: int = 60):
    db = _load_store()
    item = db.get(key)
    if not item:
        return None
    if time.time() - item.get('ts', 0) <= window_sec:
        return item
    return None


def mark_state(key: str, result: dict):
    db = _load_store()
    db[key] = {'ts': int(time.time()), **result}
    _save_store(db)


def mp_get(url: str):
    req = request.Request(url, headers={'User-Agent': 'OpenClaw/subscribe-callback'})
    with request.urlopen(req, timeout=20) as resp:
        return json.loads(resp.read().decode('utf-8', errors='replace'))


def mp_post(url: str, data: dict):
    body = json.dumps(data, ensure_ascii=False).encode('utf-8')
    req = request.Request(url, data=body, method='POST', headers={'Content-Type': 'application/json', 'User-Agent': 'OpenClaw/subscribe-callback'})
    with request.urlopen(req, timeout=20) as resp:
        return json.loads(resp.read().decode('utf-8', errors='replace'))


def main():
    ap = argparse.ArgumentParser(description='Handle Feishu card subscribe callback -> MoviePilot subscription')
    ap.add_argument('--payload', required=True, help='JSON payload from callback')
    ap.add_argument('--channel', default='feishu')
    ap.add_argument('--user-id', required=True)
    args = ap.parse_args()

    payload = json.loads(args.payload)
    if payload.get('action') != 'subscribe':
        raise SystemExit('unsupported action')

    media_type = payload.get('media_type')
    tmdb_id = str(payload.get('tmdb_id') or '').strip()
    if media_type not in ('movie', 'tv') or not tmdb_id.isdigit():
        raise SystemExit('invalid payload: need media_type(movie|tv) and numeric tmdb_id')

    key = dedupe_key(payload)
    item = get_recent(key, window_sec=60)
    if item:
        status = item.get('status')
        if status == 'processing':
            print(json.dumps(pack_result(success=True, deduped=True, status='processing', message='正在处理中,请稍候', ui={'subscribe_disabled': True, 'subscribe_button_text': '处理中...'}), ensure_ascii=False))
            return
        print(json.dumps(pack_result(success=True, deduped=True, status=status, message='重复点击已忽略', subscribe_id=item.get('subscribe_id'), ui={'subscribe_disabled': True, 'subscribe_button_text': '✅ 已订阅' if status in ('created','exists') else '处理中...'}), ensure_ascii=False))
        return

    mark_state(key, {'success': True, 'status': 'processing', 'message': '正在处理中'})

    cred = load_cred(args.channel, args.user_id)
    token = cred.get('token')
    base = cred.get('base_url') or os.getenv('MP_DEFAULT_BASE_URL') or DEFAULT_BASE
    if not token:
        raise SystemExit('missing movipilot token')

    mediaid = f'tmdb:{tmdb_id}'
    sub = mp_get(f"{base}/api/v1/subscribe/media/{parse.quote(mediaid)}?token={parse.quote(token)}")
    if sub and sub.get('id'):
        result = pack_result(success=True, deduped=False, status='exists', subscribe_id=sub.get('id'), message='已存在订阅', ui={'subscribe_disabled': True, 'subscribe_button_text': '✅ 已订阅'})
        mark_state(key, result)
        print(json.dumps(result, ensure_ascii=False))
        return

    type_name = '电影' if media_type == 'movie' else '电视剧'

    # Fast path: create subscription directly from callback payload (avoid extra media lookup latency)
    create_payload = {
        'name': payload.get('title') or '',
        'year': '',
        'type': type_name,
        'tmdbid': int(tmdb_id),
        'mediaid': mediaid,
        'season': None,
    }
    resp = mp_post(f"{base}/api/v1/subscribe/?token={parse.quote(token)}", create_payload)

    # Fallback path: if direct create rejected, fetch media details then retry once
    if not resp.get('success'):
        media = mp_get(f"{base}/api/v1/media/{parse.quote(mediaid)}?type_name={parse.quote(type_name)}&token={parse.quote(token)}")
        create_payload = {
            'name': media.get('title') or payload.get('title') or '',
            'year': str(media.get('year') or ''),
            'type': type_name,
            'tmdbid': int(tmdb_id),
            'mediaid': mediaid,
            'season': None,
        }
        resp = mp_post(f"{base}/api/v1/subscribe/?token={parse.quote(token)}", create_payload)
        if not resp.get('success'):
            raise RuntimeError(f'create subscribe failed: {resp}')

    sid = (resp.get('data') or {}).get('id')
    result = pack_result(success=True, deduped=False, status='created', subscribe_id=sid, message='新增订阅成功', ui={'subscribe_disabled': True, 'subscribe_button_text': '✅ 已订阅'})
    mark_state(key, result)
    print(json.dumps(result, ensure_ascii=False))


if __name__ == '__main__':
    try:
        main()
    except error.HTTPError as ex:
        raw = ex.read().decode('utf-8', errors='replace')
        print(json.dumps(pack_result(success=False, status='failed', message='订阅失败,可重试', ui={'subscribe_disabled': False, 'subscribe_button_text': '订阅失败,重试'}, error=f'HTTP {ex.code}: {raw[:200]}'), ensure_ascii=False))
        raise
    except Exception as ex:
        print(json.dumps(pack_result(success=False, status='failed', message='订阅失败,可重试', ui={'subscribe_disabled': False, 'subscribe_button_text': '订阅失败,重试'}, error=str(ex)), ensure_ascii=False))
        raise

```

### assets/templates/movie-custom.json

```json
{
  "schema": "2.0",
  "config": {
    "update_multi": true,
    "style": {
      "text_size": {
        "normal_v2": {
          "default": "normal",
          "pc": "normal",
          "mobile": "heading"
        }
      }
    }
  },
  "body": {
    "direction": "horizontal",
    "horizontal_spacing": "8px",
    "vertical_spacing": "8px",
    "horizontal_align": "left",
    "vertical_align": "top",
    "elements": [
      {
        "tag": "column_set",
        "flex_mode": "stretch",
        "horizontal_align": "left",
        "columns": [
          {
            "tag": "column",
            "width": "weighted",
            "elements": [
              {
                "tag": "img",
                "img_key": "${poster_img_key}",
                "preview": true,
                "scale_type": "fit_horizontal",
                "corner_radius": "8px"
              }
            ],
            "vertical_spacing": "8px",
            "horizontal_align": "left",
            "vertical_align": "top",
            "weight": 1
          },
          {
            "tag": "column",
            "width": "weighted",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>📖 剧情简介</font>**\n${overview}",
                "text_align": "left",
                "text_size": "normal"
              }
            ],
            "vertical_spacing": "8px",
            "horizontal_align": "left",
            "vertical_align": "top",
            "weight": 2
          }
        ],
        "margin": "0px 0px 0px 0px"
      },
      {
        "tag": "column_set",
        "flex_mode": "stretch",
        "horizontal_spacing": "12px",
        "horizontal_align": "left",
        "columns": [
          {
            "tag": "column",
            "width": "weighted",
            "background_style": "blue-50",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>⭐ 评分</font>**",
                "text_size": "normal"
              },
              {
                "tag": "markdown",
                "content": "${rating}",
                "text_align": "left",
                "text_size": "normal"
              }
            ],
            "padding": "12px 12px 12px 12px",
            "vertical_spacing": "4px",
            "horizontal_align": "left",
            "vertical_align": "top",
            "weight": 1
          },
          {
            "tag": "column",
            "width": "weighted",
            "background_style": "blue-50",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>⏱️ 时长</font>**",
                "text_size": "normal"
              },
              {
                "tag": "markdown",
                "content": "${runtime}",
                "text_align": "left",
                "text_size": "normal"
              }
            ],
            "padding": "12px 12px 12px 12px",
            "vertical_spacing": "4px",
            "horizontal_align": "left",
            "vertical_align": "top",
            "weight": 1
          },
          {
            "tag": "column",
            "width": "weighted",
            "background_style": "blue-50",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>📅 上映</font>**",
                "text_size": "normal"
              },
              {
                "tag": "markdown",
                "content": "${year}",
                "text_align": "left",
                "text_size": "normal"
              }
            ],
            "padding": "12px 12px 12px 12px",
            "vertical_spacing": "4px",
            "horizontal_align": "left",
            "vertical_align": "top",
            "weight": 1
          }
        ],
        "margin": "0px 0px 0px 0px"
      },
      {
        "tag": "column_set",
        "flex_mode": "stretch",
        "horizontal_spacing": "12px",
        "horizontal_align": "left",
        "columns": [
          {
            "tag": "column",
            "width": "weighted",
            "background_style": "blue-50",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>🎭 类型</font>**",
                "text_size": "normal"
              },
              {
                "tag": "markdown",
                "content": "${genres}",
                "text_align": "left",
                "text_size": "normal"
              }
            ],
            "padding": "12px 12px 12px 12px",
            "vertical_spacing": "4px",
            "horizontal_align": "left",
            "vertical_align": "top",
            "weight": 1
          },
          {
            "tag": "column",
            "width": "weighted",
            "background_style": "blue-50",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>🌍 国家</font>**",
                "text_size": "normal"
              },
              {
                "tag": "markdown",
                "content": "${country}",
                "text_align": "left",
                "text_size": "normal"
              }
            ],
            "padding": "12px 12px 12px 12px",
            "vertical_spacing": "4px",
            "horizontal_align": "left",
            "vertical_align": "top",
            "weight": 1
          }
        ],
        "margin": "0px 0px 0px 0px"
      },
      {
        "tag": "markdown",
        "content": "**<font color='blue-600'>🎭 主要演员</font>**\n${cast}",
        "margin": "0px 0px 0px 0px"
      },
      {
        "tag": "markdown",
        "content": "**<font color='blue-600'>🎬 导演</font>** :${director}",
        "text_align": "left",
        "text_size": "normal_v2",
        "margin": "0px 0px 0px 0px"
      },
      {
        "tag": "markdown",
        "content": "**💬 ${tagline}**",
        "text_align": "left",
        "text_size": "normal_v2",
        "margin": "0px 0px 0px 0px"
      },
      {
        "tag": "button",
        "text": {
          "tag": "plain_text",
          "content": "查看详情"
        },
        "type": "default",
        "width": "default",
        "size": "medium",
        "behaviors": [
          {
            "type": "open_url",
            "default_url": "${detail_url}",
            "pc_url": "",
            "ios_url": "",
            "android_url": ""
          }
        ],
        "margin": "0px 0px 0px 0px"
      },
      {
        "tag": "button",
        "text": {
          "tag": "plain_text",
          "content": "${subscribe_button_text}"
        },
        "type": "primary",
        "width": "default",
        "size": "medium",
        "disabled": "${subscribe_disabled}",
        "behaviors": [
          {
            "type": "callback",
            "value": {
              "action": "subscribe",
              "media_type": "${media_type}",
              "tmdb_id": "${tmdb_id}",
              "title": "${title}",
              "card_key": "${card_key}"
            }
          }
        ],
        "margin": "0px 0px 0px 0px"
      }
    ]
  },
  "header": {
    "title": {
      "tag": "plain_text",
      "content": "🎬 《${title}》电影介绍"
    },
    "subtitle": {
      "tag": "plain_text",
      "content": ""
    },
    "template": "blue",
    "padding": "12px 16px 12px 16px"
  }
}

```

### assets/templates/tv-custom.json

```json
{
  "schema": "2.0",
  "config": {
    "update_multi": true,
    "style": {
      "text_size": {
        "normal_v2": {
          "default": "normal",
          "pc": "normal",
          "mobile": "heading"
        }
      }
    }
  },
  "body": {
    "direction": "horizontal",
    "horizontal_spacing": "8px",
    "vertical_spacing": "8px",
    "horizontal_align": "left",
    "vertical_align": "top",
    "elements": [
      {
        "tag": "column_set",
        "flex_mode": "stretch",
        "columns": [
          {
            "tag": "column",
            "width": "weighted",
            "weight": 1,
            "elements": [
              {
                "tag": "img",
                "img_key": "${poster_img_key}",
                "preview": true,
                "scale_type": "fit_horizontal",
                "corner_radius": "8px"
              }
            ]
          },
          {
            "tag": "column",
            "width": "weighted",
            "weight": 2,
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>📖 剧集简介</font>**\n${overview}",
                "text_align": "left",
                "text_size": "normal"
              }
            ]
          }
        ]
      },
      {
        "tag": "column_set",
        "horizontal_spacing": "12px",
        "columns": [
          {
            "tag": "column",
            "width": "weighted",
            "weight": 1,
            "background_style": "blue-50",
            "padding": "12px 12px 12px 12px",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>⭐ 评分</font>**"
              },
              {
                "tag": "markdown",
                "content": "${rating}"
              }
            ]
          },
          {
            "tag": "column",
            "width": "weighted",
            "weight": 1,
            "background_style": "blue-50",
            "padding": "12px 12px 12px 12px",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>📅 首播</font>**"
              },
              {
                "tag": "markdown",
                "content": "${first_air_date}"
              }
            ]
          },
          {
            "tag": "column",
            "width": "weighted",
            "weight": 1,
            "background_style": "blue-50",
            "padding": "12px 12px 12px 12px",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>📡 状态</font>**"
              },
              {
                "tag": "markdown",
                "content": "${status}"
              }
            ]
          }
        ]
      },
      {
        "tag": "column_set",
        "horizontal_spacing": "12px",
        "columns": [
          {
            "tag": "column",
            "width": "weighted",
            "weight": 1,
            "background_style": "blue-50",
            "padding": "12px 12px 12px 12px",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>📺 季/集</font>**"
              },
              {
                "tag": "markdown",
                "content": "${seasons_episodes}"
              }
            ]
          },
          {
            "tag": "column",
            "width": "weighted",
            "weight": 1,
            "background_style": "blue-50",
            "padding": "12px 12px 12px 12px",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>⏱️ 单集时长</font>**"
              },
              {
                "tag": "markdown",
                "content": "${episode_runtime}"
              }
            ]
          },
          {
            "tag": "column",
            "width": "weighted",
            "weight": 1,
            "background_style": "blue-50",
            "padding": "12px 12px 12px 12px",
            "elements": [
              {
                "tag": "markdown",
                "content": "**<font color='blue-600'>🌍 国家/地区</font>**"
              },
              {
                "tag": "markdown",
                "content": "${country}"
              }
            ]
          }
        ]
      },
      {
        "tag": "markdown",
        "content": "**<font color='blue-600'>🎭 类型</font>**\n${genres}"
      },
      {
        "tag": "markdown",
        "content": "**<font color='blue-600'>🎬 主创</font>**:${creator}\n**<font color='blue-600'>👥 主演</font>**\n${cast}",
        "text_size": "normal_v2"
      },
      {
        "tag": "markdown",
        "content": "**💬 \"${tagline}\"**",
        "text_size": "normal_v2"
      },
      {
        "tag": "button",
        "text": {
          "tag": "plain_text",
          "content": "查看详情"
        },
        "type": "default",
        "width": "default",
        "size": "medium",
        "behaviors": [
          {
            "type": "open_url",
            "default_url": "${detail_url}",
            "pc_url": "",
            "ios_url": "",
            "android_url": ""
          }
        ]
      },
      {
        "tag": "button",
        "text": {
          "tag": "plain_text",
          "content": "${subscribe_button_text}"
        },
        "type": "primary",
        "width": "default",
        "size": "medium",
        "disabled": "${subscribe_disabled}",
        "behaviors": [
          {
            "type": "callback",
            "value": {
              "action": "subscribe",
              "media_type": "${media_type}",
              "tmdb_id": "${tmdb_id}",
              "title": "${title}",
              "card_key": "${card_key}"
            }
          }
        ],
        "margin": "0px 0px 0px 0px"
      }
    ]
  },
  "header": {
    "title": {
      "tag": "plain_text",
      "content": "📺 《${title}》剧集介绍"
    },
    "template": "blue",
    "padding": "12px 16px 12px 16px"
  }
}

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### _meta.json

```json
{
  "owner": "dobbey",
  "slug": "feishu-card-sender",
  "displayName": "Feishu Card Sender",
  "latest": {
    "version": "1.1.1",
    "publishedAt": 1772723249604,
    "commit": "https://github.com/openclaw/skills/commit/503104e56013f601671070086ed0d1e515e44031"
  },
  "history": []
}

```

### assets/rules/movie-custom.rules.json

```json
[
  {
    "action": "require_non_empty",
    "field": "title",
    "message": "title 不能为空"
  },
  {
    "action": "require_non_empty",
    "field": "overview",
    "message": "overview 不能为空"
  },
  {
    "action": "require_non_empty",
    "field": "poster_img_key",
    "message": "poster_img_key 不能为空(请传 --poster-url/--poster-file 或 --var poster_img_key=...)"
  },
  {
    "action": "default_value",
    "field": "country",
    "value": "待公布"
  },
  {
    "action": "default_value",
    "field": "rating",
    "value": "待公布"
  },
  {
    "action": "default_value",
    "field": "runtime",
    "value": "待公布"
  },
  {
    "action": "default_value",
    "field": "year",
    "value": "待公布"
  },
  {
    "action": "remove_when_empty",
    "field": "tagline",
    "selector": {
      "tag": "markdown",
      "contentContains": "${tagline}"
    }
  },
  {
    "action": "remove_when_empty",
    "field": "detail_url",
    "selector": {
      "tag": "button",
      "textContains": "查看详情"
    }
  },
  {
    "action": "remove_when_empty",
    "field": "tmdb_id",
    "selector": {
      "tag": "button",
      "textContains": "立即订阅"
    }
  },
  {
    "action": "default_value",
    "field": "media_type",
    "value": "movie"
  },
  {
    "action": "default_value",
    "field": "subscribe_disabled",
    "value": "false"
  },
  {
    "action": "default_value",
    "field": "subscribe_button_text",
    "value": "立即订阅"
  }
]

```

### assets/rules/tv-custom.rules.json

```json
[
  {
    "action": "require_non_empty",
    "field": "title",
    "message": "title 不能为空"
  },
  {
    "action": "require_non_empty",
    "field": "overview",
    "message": "overview 不能为空"
  },
  {
    "action": "require_non_empty",
    "field": "poster_img_key",
    "message": "poster_img_key 不能为空(请传 --poster-url/--poster-file 或 --var poster_img_key=...)"
  },
  {
    "action": "default_value",
    "field": "country",
    "value": "待公布"
  },
  {
    "action": "default_value",
    "field": "status",
    "value": "待公布"
  },
  {
    "action": "default_value",
    "field": "episode_runtime",
    "value": "待公布"
  },
  {
    "action": "remove_when_empty",
    "field": "tagline",
    "selector": {
      "tag": "markdown",
      "contentContains": "${tagline}"
    }
  },
  {
    "action": "remove_when_empty",
    "field": "detail_url",
    "selector": {
      "tag": "button",
      "textContains": "查看详情"
    }
  },
  {
    "action": "remove_when_empty",
    "field": "tmdb_id",
    "selector": {
      "tag": "button",
      "textContains": "立即订阅"
    }
  },
  {
    "action": "default_value",
    "field": "media_type",
    "value": "tv"
  },
  {
    "action": "default_value",
    "field": "subscribe_disabled",
    "value": "false"
  },
  {
    "action": "default_value",
    "field": "subscribe_button_text",
    "value": "立即订阅"
  }
]

```

### references/callback-response-examples.json

```json
{
  "processing": {
    "toast": {"type": "info", "content": "处理中..."},
    "card": {
      "type": "raw",
      "data": {
        "_note": "原卡片JSON,按钮改为处理中+disabled=true"
      }
    }
  },
  "success": {
    "toast": {"type": "success", "content": "订阅成功"},
    "card": {
      "type": "raw",
      "data": {
        "_note": "原卡片JSON,按钮改为✅已订阅+disabled=true"
      }
    }
  },
  "failed": {
    "toast": {"type": "error", "content": "订阅失败,可重试"},
    "card": {
      "type": "raw",
      "data": {
        "_note": "原卡片JSON,按钮改为订阅失败,重试+disabled=false"
      }
    }
  }
}

```

### references/feishu-callback-response-contract.md

```markdown
# Feishu 卡片回调 HTTP 响应契约(3 秒内)

目标:收到 `card.action.trigger` 后,服务端必须在 3 秒内返回 HTTP 200,并回传卡片更新体(raw)以实现**原位更新**。

## 1) 立即响应(processing)

- HTTP: `200`
- Body:

```json
{
  "toast": {
    "type": "info",
    "content": "处理中..."
  },
  "card": {
    "type": "raw",
    "data": {
      "schema": "2.0",
      "...": "原卡片完整结构",
      "body": {
        "...": "原内容",
        "elements": [
          {
            "tag": "button",
            "text": {"tag": "plain_text", "content": "处理中..."},
            "type": "primary",
            "disabled": true,
            "behaviors": [
              {
                "type": "callback",
                "value": {
                  "action": "subscribe",
                  "media_type": "movie",
                  "tmdb_id": "1159559",
                  "title": "惊声尖叫7"
                }
              }
            ]
          }
        ]
      }
    }
  }
}
```

## 2) 异步执行业务

- 后台执行 MoviePilot 订阅流程:
  - 幂等检查
  - 已存在返回 exists
  - 不存在则创建

## 3) 终态更新(延时更新接口)

业务完成后调用 Feishu 延时更新卡片接口:
- 成功/已存在:按钮 `✅ 已订阅`,`disabled=true`
- 失败:按钮 `订阅失败,重试`,`disabled=false`

## 4) OpenClaw 通道层改造点

1. Feishu callback handler 支持“透传 skill 返回体为 HTTP body”。
2. skill 返回结构新增字段:
   - `callbackResponse`(立即响应)
   - `delayedUpdate`(终态更新卡片 payload)
3. callback 请求上下文保留:
   - `open_message_id`(用于延时更新)
   - `tenant_key/chat_id`(必要时)
4. 超时保护:
   - 2.5 秒内必须先回 200(至少 `{}` 或 toast)

## 5) 处理器输出建议

`card_callback_router.py` / `subscribe_callback_handler.py` 统一输出:

```json
{
  "success": true,
  "status": "processing|created|exists|failed",
  "callbackResponse": {...},
  "delayedUpdate": {...}
}
```

由通道层消费 `callbackResponse` 立即回包,`delayedUpdate` 走异步更新。

```

### references/subscribe-callback.md

```markdown
# 订阅回调处理(MoviePilot)

脚本:
- `scripts/card_callback_router.py`(统一入口)
- `scripts/subscribe_callback_handler.py`(订阅处理器)

## 输入

- `--channel`:渠道(默认 feishu)
- `--user-id`:用户ID(用于读取用户凭证)
- `--payload`:卡片回调 JSON

回调 payload 约定:

```json
{"action":"subscribe","media_type":"movie|tv","tmdb_id":"1419406","title":"..."}
```

## 行为

1. 校验 payload 必要字段。
2. 幂等去重(30 秒窗口,按 `media_type+tmdb_id`)。
3. 查询是否已订阅:`/api/v1/subscribe/media/tmdb:<id>`。
4. 未订阅时调用 `POST /api/v1/subscribe/` 新增订阅。
5. 输出标准结果 JSON:created / exists / deduped / error。

## 示例

```bash
python3 scripts/subscribe_callback_handler.py \
  --channel feishu \
  --user-id ou_xxx \
  --payload '{"action":"subscribe","media_type":"movie","tmdb_id":"1419406","title":"捕风追影"}'
```

```

### references/vars.example.env

```bash
# 模板变量示例(KEY=VALUE)
title=星际穿越
genre=科幻 / 剧情
rating=8.7
year=2014
summary=一支探险队穿越虫洞,在时间与引力扭曲中寻找人类的新家园。

```

### scripts/callback_queue_worker.py

```python
#!/usr/bin/env python3
import json, time, subprocess
from pathlib import Path

QUEUE_DIR = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/callback-queue')
DONE_DIR = QUEUE_DIR / 'done'
FAIL_DIR = QUEUE_DIR / 'failed'
FINALIZER = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/scripts/card_callback_finalize.py')
PID_FILE = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/callback-queue-worker.pid')


def claim_job(path: Path):
    processing = path.with_suffix('.processing')
    try:
        path.rename(processing)
        return processing
    except Exception:
        return None


def run_job(job_file: Path):
    data = json.loads(job_file.read_text(encoding='utf-8'))
    cmd = [
        'python3', str(FINALIZER),
        '--channel', data.get('channel', 'feishu'),
        '--user-id', data['user_id'],
        '--account-id', str(data.get('account_id', '1')),
        '--payload', json.dumps(data['payload'], ensure_ascii=False),
    ]
    return subprocess.run(cmd, capture_output=True, text=True, timeout=180)


def recover_orphan_processing_files():
    for p in QUEUE_DIR.glob('*.processing'):
        try:
            target = QUEUE_DIR / (p.stem + '.json')
            p.rename(target)
        except Exception:
            try:
                fail = FAIL_DIR / (p.stem + '.recover-fail.json')
                fail.write_text(json.dumps({'ok': False, 'error': 'orphan_processing_recover_failed', 'file': str(p)}, ensure_ascii=False), encoding='utf-8')
            except Exception:
                pass


def cleanup_history(max_done=500, max_failed=500):
    for d, cap in ((DONE_DIR, max_done), (FAIL_DIR, max_failed)):
        files = sorted([p for p in d.glob('*') if p.is_file()], key=lambda p: p.stat().st_mtime, reverse=True)
        for p in files[cap:]:
            try:
                p.unlink(missing_ok=True)
            except Exception:
                pass


def main():
    QUEUE_DIR.mkdir(parents=True, exist_ok=True)
    DONE_DIR.mkdir(parents=True, exist_ok=True)
    FAIL_DIR.mkdir(parents=True, exist_ok=True)
    PID_FILE.write_text(str(__import__('os').getpid()), encoding='utf-8')
    recover_orphan_processing_files()
    cleanup_history()

    last_cleanup = time.time()
    while True:
        jobs = sorted([p for p in QUEUE_DIR.glob('*.json') if p.is_file()])
        if time.time() - last_cleanup > 300:
            cleanup_history()
            last_cleanup = time.time()
        if not jobs:
            time.sleep(0.2)
            continue

        for job in jobs:
            claimed = claim_job(job)
            if not claimed:
                continue
            try:
                res = run_job(claimed)
                if res.returncode == 0:
                    target = DONE_DIR / (claimed.stem + '.done.json')
                    target.write_text(json.dumps({'ok': True, 'stdout': res.stdout.strip()}, ensure_ascii=False), encoding='utf-8')
                else:
                    target = FAIL_DIR / (claimed.stem + '.fail.json')
                    target.write_text(json.dumps({'ok': False, 'code': res.returncode, 'stdout': res.stdout, 'stderr': res.stderr}, ensure_ascii=False), encoding='utf-8')
            except Exception as ex:
                target = FAIL_DIR / (claimed.stem + '.fail.json')
                target.write_text(json.dumps({'ok': False, 'error': str(ex)}, ensure_ascii=False), encoding='utf-8')
            finally:
                try:
                    claimed.unlink(missing_ok=True)
                except Exception:
                    pass


if __name__ == '__main__':
    main()

```

### scripts/card_callback_finalize.py

```python
#!/usr/bin/env python3
import argparse, json, subprocess, time
from pathlib import Path
from urllib import request
from card_snapshot_store import find_snapshot_by_card_key, patch_subscribe_button

OPENCLAW_CONFIG = Path('/root/.openclaw/openclaw.json')
HANDLER = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/scripts/subscribe_callback_handler.py')
METRIC_LOG = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/callback-metrics.jsonl')


def load_app_credentials(account_id):
    cfg = json.loads(OPENCLAW_CONFIG.read_text(encoding='utf-8'))
    accounts = (((cfg.get('channels') or {}).get('feishu') or {}).get('accounts') or [])
    if not accounts:
        return None, None
    if account_id is not None:
        for a in accounts:
            if str(a.get('id')) == str(account_id):
                return a.get('appId'), a.get('appSecret')
        if str(account_id).isdigit():
            idx = int(account_id)
            if 0 <= idx < len(accounts):
                a = accounts[idx]
                return a.get('appId'), a.get('appSecret')
    a = accounts[0]
    return a.get('appId'), a.get('appSecret')


def get_tenant_token(app_id, app_secret):
    body = json.dumps({'app_id': app_id, 'app_secret': app_secret}).encode()
    req = request.Request('https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', data=body, method='POST', headers={'Content-Type':'application/json'})
    with request.urlopen(req, timeout=20) as r:
        return json.loads(r.read().decode()).get('tenant_access_token')


def patch_message(message_id, token, raw_card_json):
    payload = {'msg_type':'interactive','content':json.dumps(json.loads(raw_card_json), ensure_ascii=False)}
    req = request.Request(f'https://open.feishu.cn/open-apis/im/v1/messages/{message_id}', data=json.dumps(payload, ensure_ascii=False).encode(), method='PATCH', headers={'Authorization':f'Bearer {token}','Content-Type':'application/json'})
    with request.urlopen(req, timeout=20) as r:
        return json.loads(r.read().decode())


def log_metric(d):
    METRIC_LOG.parent.mkdir(parents=True, exist_ok=True)
    with METRIC_LOG.open('a', encoding='utf-8') as f:
        f.write(json.dumps(d, ensure_ascii=False) + '\n')


def main():
    ap=argparse.ArgumentParser()
    ap.add_argument('--payload', required=True)
    ap.add_argument('--channel', default='feishu')
    ap.add_argument('--user-id', required=True)
    ap.add_argument('--account-id', default='1')
    args=ap.parse_args()

    t0=time.time()
    p=json.loads(args.payload)
    card_key=str(p.get('card_key') or '')
    snap=find_snapshot_by_card_key(card_key)
    if not snap:
        log_metric({'ok':False,'stage':'snapshot_missing','card_key':card_key,'t':time.time()})
        return 1

    app_id, app_secret = load_app_credentials(args.account_id)
    token = get_tenant_token(app_id, app_secret)

    t1=time.time()
    res=subprocess.run(['python3', str(HANDLER), '--channel', args.channel, '--user-id', args.user_id, '--payload', args.payload], capture_output=True, text=True)
    out={}
    try:
        out=json.loads((res.stdout or '').strip() or '{}')
    except Exception:
        out={'success':False}
    t2=time.time()

    ok=bool(out.get('success'))
    final=patch_subscribe_button(snap['raw_card_json'], text='✅ 已订阅' if ok else '订阅失败,重试', disabled=True if ok else False)
    patch_resp=patch_message(snap['message_id'], token, final)
    t3=time.time()

    log_metric({
        'ok':ok,
        'card_key':card_key,
        'tmdb_id':p.get('tmdb_id'),
        'media_type':p.get('media_type'),
        't_click':t0,
        't_subscribe_done':t2,
        't_final_patch_done':t3,
        'dt_subscribe_ms':int((t2-t1)*1000),
        'dt_total_ms':int((t3-t0)*1000),
        'patch_resp':patch_resp,
    })
    return 0

if __name__=='__main__':
    raise SystemExit(main())

```

### scripts/card_snapshot_store.py

```python
#!/usr/bin/env python3
import json
import sqlite3
import time
from pathlib import Path

DB_PATH = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/card_snapshots.db')


def _conn():
    DB_PATH.parent.mkdir(parents=True, exist_ok=True)
    c = sqlite3.connect(str(DB_PATH))
    c.execute(
        '''CREATE TABLE IF NOT EXISTS card_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            created_at INTEGER NOT NULL,
            card_key TEXT,
            message_id TEXT,
            receive_id TEXT,
            account_id TEXT,
            media_type TEXT,
            tmdb_id TEXT,
            title TEXT,
            raw_card_json TEXT NOT NULL
        )'''
    )
    try:
        c.execute('ALTER TABLE card_snapshots ADD COLUMN card_key TEXT')
    except Exception:
        pass
    try:
        c.execute('ALTER TABLE card_snapshots ADD COLUMN message_id TEXT')
    except Exception:
        pass
    c.execute('CREATE INDEX IF NOT EXISTS idx_snap_lookup ON card_snapshots(receive_id, media_type, tmdb_id, created_at DESC)')
    c.execute('CREATE INDEX IF NOT EXISTS idx_snap_card_key ON card_snapshots(card_key, created_at DESC)')
    return c


def save_snapshot(*, card_key: str | None, message_id: str | None, receive_id: str, account_id: str | None, media_type: str | None, tmdb_id: str | None, title: str | None, raw_card_json: str):
    conn = _conn()
    try:
        conn.execute(
            'INSERT INTO card_snapshots(created_at, card_key, message_id, receive_id, account_id, media_type, tmdb_id, title, raw_card_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
            (int(time.time()), card_key or '', message_id or '', receive_id, account_id or '', media_type or '', tmdb_id or '', title or '', raw_card_json),
        )
        conn.commit()
    finally:
        conn.close()


def find_latest_snapshot(*, receive_id: str, media_type: str, tmdb_id: str):
    conn = _conn()
    try:
        cur = conn.execute(
            'SELECT raw_card_json, account_id, title, created_at, card_key, message_id FROM card_snapshots WHERE receive_id=? AND media_type=? AND tmdb_id=? ORDER BY created_at DESC LIMIT 1',
            (receive_id, media_type, tmdb_id),
        )
        row = cur.fetchone()
        if not row:
            return None
        return {
            'raw_card_json': row[0],
            'account_id': row[1] or None,
            'title': row[2] or None,
            'created_at': row[3],
            'card_key': row[4] or None,
            'message_id': row[5] or None,
        }
    finally:
        conn.close()


def find_snapshot_by_card_key(card_key: str):
    conn = _conn()
    try:
        cur = conn.execute(
            'SELECT raw_card_json, account_id, title, created_at, receive_id, media_type, tmdb_id, message_id FROM card_snapshots WHERE card_key=? ORDER BY created_at DESC LIMIT 1',
            (card_key,),
        )
        row = cur.fetchone()
        if not row:
            return None
        return {
            'raw_card_json': row[0],
            'account_id': row[1] or None,
            'title': row[2] or None,
            'created_at': row[3],
            'receive_id': row[4] or None,
            'media_type': row[5] or None,
            'tmdb_id': row[6] or None,
            'message_id': row[7] or None,
        }
    finally:
        conn.close()


def patch_subscribe_button(raw_card_json: str, *, text: str, disabled: bool):
    card = json.loads(raw_card_json)
    body = card.get('body') if isinstance(card, dict) else None
    elements = body.get('elements') if isinstance(body, dict) else None
    if not isinstance(elements, list):
        return raw_card_json

    for el in elements:
        if not isinstance(el, dict):
            continue
        if el.get('tag') != 'button':
            continue
        txt = ((el.get('text') or {}).get('content') if isinstance(el.get('text'), dict) else '') or ''
        if txt in ('立即订阅', '✅ 已订阅', '处理中...', '订阅失败,重试'):
            el['disabled'] = bool(disabled)
            if isinstance(el.get('text'), dict):
                el['text']['content'] = text
            else:
                el['text'] = {'tag': 'plain_text', 'content': text}
            # Some Feishu clients render badly when disabled button still carries callback behaviors.
            if bool(disabled):
                el.pop('behaviors', None)
            break

    return json.dumps(card, ensure_ascii=False)

```

### scripts/enqueue_callback.py

```python
#!/usr/bin/env python3
import argparse, json, time, uuid
from pathlib import Path

QUEUE_DIR = Path('/root/.openclaw/workspace-dev/skills/feishu-card-sender/tmp/callback-queue')


def main():
    p = argparse.ArgumentParser(description='enqueue card callback job')
    p.add_argument('--payload', required=True)
    p.add_argument('--message-id', required=True)
    p.add_argument('--user-id', required=True)
    p.add_argument('--account-id', default='1')
    p.add_argument('--channel', default='feishu')
    args = p.parse_args()

    payload = json.loads(args.payload)
    job = {
        'payload': payload,
        'message_id': args.message_id,
        'user_id': args.user_id,
        'account_id': args.account_id,
        'channel': args.channel,
        'enqueued_at': time.time(),
    }
    QUEUE_DIR.mkdir(parents=True, exist_ok=True)
    name = f"{int(time.time()*1000)}-{uuid.uuid4().hex}.json"
    f = QUEUE_DIR / name
    f.write_text(json.dumps(job, ensure_ascii=False), encoding='utf-8')
    print(json.dumps({'ok': True, 'job': str(f)}, ensure_ascii=False))


if __name__ == '__main__':
    main()

```

### scripts/feishu_callback_worker.py

```python
#!/usr/bin/env python3
import json
import os
import time
import hashlib
import re
import base64
import tempfile
from datetime import datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
import subprocess
from pathlib import Path

HOST = '0.0.0.0'
PORT = 18081

ROUTER = '/root/.openclaw/workspace-dev/skills/feishu-card-sender/scripts/card_callback_router.py'
USER_ID_FALLBACK = os.getenv('FEISHU_CALLBACK_USER_FALLBACK', '').strip()
ACCOUNT_ID = os.getenv('FEISHU_CALLBACK_ACCOUNT_ID', '1').strip() or '1'
ENCRYPT_KEY = os.getenv('FEISHU_CALLBACK_ENCRYPT_KEY', '').strip()
MAX_SKEW_SECONDS = int(os.getenv('FEISHU_CALLBACK_MAX_SKEW_SECONDS', '300'))


class H(BaseHTTPRequestHandler):
    def _log(self, event, **kwargs):
        payload = {'event': event, 'ts': int(time.time()), **kwargs}
        print(json.dumps(payload, ensure_ascii=False), flush=True)


    def _parse_ts_to_epoch(self, ts: str):
        # 1) epoch seconds / milliseconds
        try:
            n = int(float(ts))
            if n > 10_000_000_000:
                n = n // 1000
            return n
        except Exception:
            pass

        # 2) Go-style datetime: "2026-03-05 22:25:56.839226736 +0800 CST m=+..."
        m = re.search(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})(?:\.(\d{1,9}))?\s+([+-]\d{4})', ts)
        if m:
            base, frac, offset = m.group(1), (m.group(2) or ''), m.group(3)
            frac6 = (frac + '000000')[:6]
            dt = datetime.strptime(f"{base}.{frac6} {offset}", "%Y-%m-%d %H:%M:%S.%f %z")
            return int(dt.timestamp())

        raise ValueError(f'invalid_timestamp:{ts}')

    def _decrypt_if_needed(self, req_obj):
        # Feishu may send encrypted envelope: {"encrypt": "..."}
        if not isinstance(req_obj, dict) or 'encrypt' not in req_obj:
            return req_obj
        if not ENCRYPT_KEY:
            raise ValueError('encrypt_key_not_configured')

        enc = str(req_obj.get('encrypt') or '')
        raw = base64.b64decode(enc)
        iv, ct = raw[:16], raw[16:]
        keyhex = hashlib.sha256(ENCRYPT_KEY.encode('utf-8')).hexdigest()
        ivhex = iv.hex()

        with tempfile.NamedTemporaryFile(delete=False) as f:
            f.write(ct)
            inpath = f.name
        try:
            res = subprocess.run(
                ['openssl', 'enc', '-d', '-aes-256-cbc', '-K', keyhex, '-iv', ivhex, '-in', inpath],
                capture_output=True,
                timeout=10,
            )
            if res.returncode != 0:
                raise ValueError(f'decrypt_failed:{res.stderr.decode("utf-8", errors="replace")[:120]}')
            txt = res.stdout.decode('utf-8', errors='replace')
            return json.loads(txt)
        finally:
            try:
                os.unlink(inpath)
            except Exception:
                pass

    def _verify_signature(self, raw_body: str):
        if not ENCRYPT_KEY:
            return True, 'no_encrypt_key'
        ts = self.headers.get('X-Lark-Request-Timestamp') or self.headers.get('X-Lark-Request-Ts')
        nonce = self.headers.get('X-Lark-Request-Nonce')
        sig = self.headers.get('X-Lark-Signature')
        if not ts or not nonce or not sig:
            return False, 'missing_signature_headers'
        try:
            ts_i = self._parse_ts_to_epoch(ts)
        except Exception as ex:
            return False, str(ex)
        if abs(int(time.time()) - ts_i) > MAX_SKEW_SECONDS:
            return False, 'timestamp_skew'
        base = f"{ts}{nonce}{ENCRYPT_KEY}{raw_body}".encode('utf-8')
        calc = hashlib.sha256(base).hexdigest()
        if calc != sig:
            return False, 'signature_mismatch'
        return True, 'ok'

    def _send(self, code=200, obj=None):
        self.send_response(code)
        self.send_header('Content-Type', 'application/json; charset=utf-8')
        self.end_headers()
        self.wfile.write(json.dumps(obj or {}, ensure_ascii=False).encode('utf-8'))

    def _send_html(self, code=200, html=''):
        self.send_response(code)
        self.send_header('Content-Type', 'text/html; charset=utf-8')
        self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
        self.send_header('Pragma', 'no-cache')
        self.send_header('Expires', '0')
        self.end_headers()
        if html:
            self.wfile.write(html.encode('utf-8'))

    def do_HEAD(self):
        p = urlparse(self.path)
        if p.path == '/feishu/callback':
            self.send_response(200)
            self.send_header('Content-Type', 'text/html; charset=utf-8')
            self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
            self.send_header('Pragma', 'no-cache')
            self.send_header('Expires', '0')
            self.end_headers()
            return
        self.send_response(404)
        self.end_headers()

    def do_GET(self):
        p = urlparse(self.path)
        if p.path != '/feishu/callback':
            return self._send_html(404, '<h1>404 Not Found</h1>')

        qs = parse_qs(p.query or '')
        lang_force = ((qs.get('lang') or [''])[0]).strip().lower()
        if lang_force in ('zh', 'zh-cn', 'cn'):
            use_zh = True
        elif lang_force in ('en', 'en-us', 'en-gb'):
            use_zh = False
        else:
            accept_lang = (self.headers.get('Accept-Language') or '').lower()
            use_zh = ('zh' in accept_lang) or not accept_lang

        t = {
            'lang': 'zh-CN' if use_zh else 'en',
            'title': '飞书回调 · 太空网关' if use_zh else 'Feishu Callback · Space Gateway',
            'badge': '太空网关在线' if use_zh else 'SPACE GATEWAY ONLINE',
            'h1_left': '飞书回调' if use_zh else 'Feishu Callback',
            'h1_right': '接口已就绪' if use_zh else 'Endpoint is Ready',
            'desc': '这里是飞书事件回调入口,专用于服务端 POST 调用。你在浏览器看到这个页面,说明域名解析、HTTPS 与反向代理都已连通。' if use_zh else 'This is the Feishu event callback endpoint for server-side POST calls. Seeing this page in your browser means DNS, HTTPS, and reverse proxy are all reachable.',
            'endpoint_k': '回调路径' if use_zh else 'Endpoint',
            'proto_k': '协议状态' if use_zh else 'Protocol',
            'proto_v': 'HTTPS · 反向代理已启用' if use_zh else 'HTTPS · Reverse Proxy Active',
            'hint': '提示:此页面仅用于可达性确认;真实回调由飞书服务器触发,无需人工访问。' if use_zh else 'Note: This page is only for reachability checks. Real callbacks are triggered by Feishu servers, no manual access required.'
        }

        html = """
<!doctype html>
<html lang=\"{t['lang']}\">
<head>
  <meta charset=\"utf-8\" />
  <meta name=\"viewport\" content=\"width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no, viewport-fit=cover\" />
  <title>{t['title']}</title>
  <style>
    :root{
      --bg:#030712;
      --bg2:#0b1020;
      --txt:#eaf2ff;
      --muted:#b8c7e6;
      --primary:#78c7ff;
      --line:rgba(150,190,255,.28);
      --glass:rgba(8,14,33,.55);
    }
    *{box-sizing:border-box}
    html,body{height:100%}
    body{
      margin:0;
      overflow:hidden;
      font-family:Inter,-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,Helvetica,Arial,sans-serif;
      color:var(--txt);
      background:
        radial-gradient(1200px 700px at 10% 90%, rgba(79,118,255,.22), transparent 60%),
        radial-gradient(900px 600px at 90% 10%, rgba(0,210,255,.18), transparent 62%),
        radial-gradient(1000px 800px at 50% 50%, #0b1331 0%, var(--bg) 68%);
    }
    .stars,.stars2,.stars3{position:fixed; inset:-20%; pointer-events:none;}
    .stars:before,.stars2:before,.stars3:before{
      content:""; position:absolute; inset:0;
      background-repeat:repeat;
      animation:drift linear infinite;
      opacity:.65;
    }
    .stars:before{background-image:radial-gradient(2px 2px at 20px 30px,#fff,transparent),radial-gradient(1px 1px at 120px 80px,#d7e8ff,transparent),radial-gradient(1.5px 1.5px at 220px 150px,#fff,transparent);background-size:260px 220px;animation-duration:120s}
    .stars2:before{background-image:radial-gradient(1px 1px at 40px 60px,#9fd8ff,transparent),radial-gradient(2px 2px at 180px 120px,#fff,transparent),radial-gradient(1px 1px at 260px 40px,#c8dcff,transparent);background-size:320px 260px;animation-duration:180s;opacity:.4}
    .stars3:before{background-image:radial-gradient(2px 2px at 80px 20px,#fff,transparent),radial-gradient(1px 1px at 200px 200px,#c2f2ff,transparent);background-size:400px 320px;animation-duration:240s;opacity:.28}
    @keyframes drift{from{transform:translate3d(0,0,0)}to{transform:translate3d(-220px,-160px,0)}}

    .nebula{position:fixed; inset:0; pointer-events:none; filter:blur(30px); opacity:.55}
    .nebula:before,.nebula:after{content:""; position:absolute; border-radius:50%}
    .nebula:before{width:42vw;height:42vw;left:-10vw;top:50vh;background:radial-gradient(circle,#4f70ff88,transparent 70%)}
    .nebula:after{width:34vw;height:34vw;right:-8vw;top:8vh;background:radial-gradient(circle,#2dd4ff77,transparent 72%)}

    .wrap{position:relative; z-index:2; height:100vh; min-height:100dvh; display:flex; align-items:center; justify-content:center; padding:max(16px,env(safe-area-inset-top)) max(16px,env(safe-area-inset-right)) max(16px,env(safe-area-inset-bottom)) max(16px,env(safe-area-inset-left))}
    .panel{
      width:min(980px,96vw);
      padding:18px 18px 16px;
      position:relative;
      text-align:center;
      margin:0 auto;
      display:flex;
      flex-direction:column;
      align-items:center;
      justify-content:center;
    }
    .badge{display:inline-flex;align-items:center;gap:8px;padding:6px 12px;border:1px solid rgba(143,214,255,.35);border-radius:999px;background:rgba(31,56,95,.35);color:#cde9ff;font-size:12px;letter-spacing:.3px}
    .dot{width:8px;height:8px;border-radius:50%;background:#5cf2a5;box-shadow:0 0 12px #5cf2a5}
    h1{margin:14px 0 12px;font-size:38px;line-height:1.12;letter-spacing:.3px}
    .grad{background:linear-gradient(90deg,#f4f9ff,#8cd2ff 35%,#7db2ff 70%,#d8e9ff);-webkit-background-clip:text;background-clip:text;color:transparent}
    p{margin:10px 0;color:var(--muted);font-size:16px;line-height:1.75}
    .meta{margin-top:18px;display:grid;grid-template-columns:1fr 1fr;gap:12px}
    .cell{padding:8px 10px;border:1px solid rgba(141,180,255,.20);border-radius:999px;background:transparent}
    .k{display:block;font-size:12px;color:#9db4df;margin-bottom:4px}
    code{color:var(--primary);background:rgba(8,16,36,.8);border:1px solid rgba(120,170,255,.25);padding:2px 7px;border-radius:8px}
    .hint{margin-top:16px;font-size:13px;color:#93a9d3}
    @media (max-width:760px){
      h1{font-size:30px}
      .meta{grid-template-columns:1fr}
      .panel{padding:24px 20px}
    }
  </style>
</head>
<body>
  <div class=\"stars\"></div>
  <div class=\"stars2\"></div>
  <div class=\"stars3\"></div>
  <div class=\"nebula\"></div>

  <main class=\"wrap\">
    <section class=\"panel\">
      <span class=\"badge\"><span class=\"dot\"></span>{t['badge']}</span>
      <h1><span class=\"grad\">{t['h1_left']}</span> {t['h1_right']}</h1>
      <p>{t['desc'].replace('POST', '<code>POST</code>')}</p>
      <div class=\"meta\">
        <div class=\"cell\"><span class=\"k\">{t['endpoint_k']}</span><code>/feishu/callback</code></div>
        <div class=\"cell\"><span class=\"k\">{t['proto_k']}</span><code>{t['proto_v']}</code></div>
      </div>
      <p class=\"hint\">{t['hint']}</p>
    </section>
  </main>
  <script>
    // Hard-disable pinch zoom for in-app webviews (iOS/Android)
    (function(){
      document.addEventListener('gesturestart', function(e){ e.preventDefault(); }, {passive:false});
      document.addEventListener('gesturechange', function(e){ e.preventDefault(); }, {passive:false});
      document.addEventListener('gestureend', function(e){ e.preventDefault(); }, {passive:false});
      document.addEventListener('touchmove', function(e){ if (e.touches && e.touches.length > 1) e.preventDefault(); }, {passive:false});
      let lastTouchEnd = 0;
      document.addEventListener('touchend', function(e){
        const now = Date.now();
        if (now - lastTouchEnd <= 300) e.preventDefault(); // prevent double-tap zoom
        lastTouchEnd = now;
      }, {passive:false});
    })();
  </script>
</body>
</html>
""".strip()
        html = (html
                .replace("{t['lang']}", t['lang'])
                .replace("{t['title']}", t['title'])
                .replace("{t['badge']}", t['badge'])
                .replace("{t['h1_left']}", t['h1_left'])
                .replace("{t['h1_right']}", t['h1_right'])
                .replace("{t['desc'].replace('POST', '<code>POST</code>')}", t['desc'].replace('POST', '<code>POST</code>'))
                .replace("{t['endpoint_k']}", t['endpoint_k'])
                .replace("{t['proto_k']}", t['proto_k'])
                .replace("{t['proto_v']}", t['proto_v'])
                .replace("{t['hint']}", t['hint']))
        return self._send_html(200, html)

    def do_POST(self):
        p = urlparse(self.path)
        if p.path != '/feishu/callback':
            return self._send(404, {'error': 'not_found'})

        n = int(self.headers.get('Content-Length', '0'))
        raw = self.rfile.read(n).decode('utf-8', errors='replace')

        ok_sig, reason = self._verify_signature(raw)
        if not ok_sig:
            self._log('callback_rejected', reason=reason, path=p.path)
            return self._send(401, {'error': 'invalid_signature', 'reason': reason})

        try:
            req = json.loads(raw)
            req = self._decrypt_if_needed(req)
        except Exception as ex:
            self._log('callback_rejected', reason='invalid_json_or_decrypt_failed', detail=str(ex), path=p.path)
            return self._send(400, {'error': 'invalid_json_or_decrypt_failed'})

        # url_verification
        if req.get('type') == 'url_verification':
            self._log('url_verification', path=p.path)
            return self._send(200, {'challenge': req.get('challenge', '')})

        # Strict mode: follow Feishu latest encrypted callback payload
        # Expected shape after decrypt: {header:{event_type}, event:{token, action.value, operator.open_id}}
        header = req.get('header') or {}
        event = req.get('event') or {}
        event_type = header.get('event_type')
        action_value = ((event.get('action') or {}).get('value') or {})
        token = str(event.get('token') or '')
        user_id = ((event.get('operator') or {}).get('open_id')) or USER_ID_FALLBACK

        if event_type != 'card.action.trigger' or not token or not isinstance(action_value, dict):
            self._log('callback_rejected', reason='invalid_payload_shape', event_type=event_type, keys=list(req.keys())[:20])
            return self._send(200, {'toast': {'type': 'error', 'content': '回调结构不符合预期'}})

        if not user_id:
            self._log('callback_rejected', reason='missing_user_id', token=token)
            return self._send(200, {'toast': {'type': 'error', 'content': '缺少用户信息,无法处理'}})

        # 1) immediate ack in <3s (toast only)
        self._send(200, {'toast': {'type': 'info', 'content': '处理中...'}})

        # 2) async heavy work
        payload = json.dumps(action_value, ensure_ascii=False)
        msg_id = f'card-action-{token}' if token else ''
        self._log('callback_accepted', token=token, user_id=user_id, account_id=ACCOUNT_ID)
        subprocess.Popen([
            'python3', ROUTER,
            '--channel', 'feishu',
            '--user-id', user_id,
            '--account-id', ACCOUNT_ID,
            '--message-id', msg_id,
            '--payload', payload,
        ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def main():
    server = ThreadingHTTPServer((HOST, PORT), H)
    print(f'feishu_callback_worker listening on {HOST}:{PORT}')
    server.serve_forever()


if __name__ == '__main__':
    main()

```

feishu-card-sender | SkillHub