Back to skills
SkillHub ClubAnalyze Data & AIFull StackData / AI

ai-drama-review

AI短剧规范识别技能包。检测AI短剧中的文本/小说版权侵权、年龄分级合规性(18+/12+)、小说魔改程度,并生成结构化合规报告。支持本地关键词快速扫描 + AI深度分析两层架构。Beta 阶段 - 仅供参考,不作为法律依据。

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,087
Hot score
99
Updated
March 20, 2026
Overall rating
C5.0
Composite score
5.0
Best-practice grade
C62.8

Install command

npx @skill-hub/cli install openclaw-skills-ai-drama-review

Repository

openclaw/skills

Skill path: skills/aaalenwow/ai-drama-review

AI短剧规范识别技能包。检测AI短剧中的文本/小说版权侵权、年龄分级合规性(18+/12+)、小说魔改程度,并生成结构化合规报告。支持本地关键词快速扫描 + AI深度分析两层架构。Beta 阶段 - 仅供参考,不作为法律依据。

Open repository

Best for

Primary workflow: Analyze Data & AI.

Technical facets: Full Stack, Data / AI.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install ai-drama-review into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding ai-drama-review to shared team environments
  • Use ai-drama-review for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: ai-drama-review
description: AI短剧规范识别技能包。检测AI短剧中的文本/小说版权侵权、年龄分级合规性(18+/12+)、小说魔改程度,并生成结构化合规报告。支持本地关键词快速扫描 + AI深度分析两层架构。Beta 阶段 - 仅供参考,不作为法律依据。
user-invocable: true
metadata: {"openclaw":{"requires":{"env":["OPENAI_API_KEY"],"anyBins":["python3","python"],"bins":[]},"primaryEnv":"OPENAI_API_KEY","stage":"beta","version":"0.1.0"}}
---

This skill identifies compliance risks in AI-generated short dramas, including copyright infringement, age rating violations, and unauthorized novel adaptations. It uses a two-layer architecture: local keyword scanning for fast baseline detection, plus AI-powered deep analysis for context-aware accuracy.

**Warning: BETA** — 本技能包正在测试中,检测结果仅供参考,不作为法律依据。请结合专业法律意见使用。

用户提供剧本文本、字幕文件或视频描述,本技能将执行合规审查并生成结构化风险报告。

---

## Phase 1: 环境检测与初始化

当用户请求对短剧内容进行合规审查时,先执行环境检测:

```bash
python3 scripts/env_detect.py
```

检测内容:
1. **Python 版本**: >= 3.8
2. **可用 API 密钥**: OPENAI_API_KEY / ANTHROPIC_API_KEY(用于深度分析)
3. **可选 Python 包**: jieba(中文分词,提升版权检测精度)
4. **网络连通性**: API 端点可达性

确定运行模式:
- **仅本地模式 (local_only)**: 无 API 密钥时的降级模式,仅执行关键词匹配和文本算法分析
- **混合模式 (hybrid)**(推荐): 本地快速扫描 + AI 深度上下文分析,精度更高

向用户展示环境状态和可用功能。

---

## Phase 2: 版权侵权检测

接收用户提供的剧本/台词文本,执行版权侵权检测:

```bash
python3 scripts/text_similarity.py --input <script_file> --reference-dir <reference_texts_dir>
```

### 2.1 文本预处理

1. 统一编码(Unicode 归一化)
2. 去除标点符号和多余空白
3. 按段落分割,过滤过短段落(< 20 字)
4. 中文分词(优先使用 jieba,降级为字符级分词)

### 2.2 三重相似度检测

对每个段落与参考文本库逐段比对,计算三种互补指标:

| 算法 | 检测能力 | 权重 |
|------|----------|------|
| n-gram Jaccard 系数 | 局部词汇重复 | 0.3 |
| 归一化编辑距离 | 整体文本差异 | 0.3 |
| TF-IDF 余弦相似度 | 语义主题相似 | 0.4 |

综合得分超过阈值(默认 0.7)的段落标记为疑似侵权。

### 2.3 AI 语义确认(混合模式)

将高疑似段落发送 AI 进行语义级分析:
- 排除通用表达和公共领域内容
- 评估独创性和实质性相似
- 识别改写和同义替换

向用户展示:可疑段落列表、相似度分数、疑似来源、AI 分析意见。

---

## Phase 3: 年龄分级合规检测

扫描剧本内容的年龄分级合规性:

```bash
python3 scripts/age_rating_scanner.py --input <script_file> --target-rating <all_ages|12+|18+>
```

### 3.1 Layer 1: 本地关键词快速扫描

加载分类关键词库(暴力/色情/恐怖/脏话/烟酒毒品),逐段扫描:
- 记录命中的关键词、类别、严重度(mild/moderate/severe)
- 保留命中位置和上下文(前后 30 字)
- 根据命中密度和严重程度计算初步分级建议

### 3.2 Layer 2: AI 上下文深度分析(混合模式)

将关键词命中的上下文段落发送 AI 模型:
- 判断是否为真正的不当内容(排除否定语境、文学修辞、历史引用等误报)
- 评估上下文中的内容倾向
- 给出分级建议及具体理由

### 3.3 辅助内容分析

- **视频关键帧描述**: 如果用户提供了视频帧描述,分析画面内容风险
- **音频转录文本**: 如果用户提供了音频转录,扫描脏话和不当音效描述

### 3.4 分级输出

| 分级 | 说明 |
|------|------|
| 全年龄 (all_ages) | 内容适合所有年龄段 |
| 12+ | 含轻度暴力/冲突,需家长指导 |
| 18+ | 含较强暴力/恐怖/成人主题 |
| 不合规 (non_compliant) | 超出可接受范围,建议修改 |

---

## Phase 4: 小说魔改检测

比对原著与改编版本,评估改编偏离程度:

```bash
python3 scripts/adaptation_detector.py --original <original_file> --adapted <adapted_file>
```

### 4.1 结构对齐

使用动态规划算法(Needleman-Wunsch 变体)将原著章节与改编版段落对齐,识别:
- 保留的原始情节
- 新增的情节段
- 被删除的原著内容
- 被修改的段落

### 4.2 角色偏离检测

提取角色列表和设定,比对变化:
- 性格特征改动
- 角色关系改动
- 角色命运改动

### 4.3 关键情节比对

通过 AI 提取核心情节点,评估改编对原著核心的改动程度。

### 4.4 偏离度评分

综合输出偏离度评分(0-100):

| 评分范围 | 分类 | 说明 |
|----------|------|------|
| 0 - 30 | 忠实改编 | 保留原著核心,合理调整 |
| 30 - 60 | 合理改编 | 有较大改动但未偏离核心 |
| 60 - 100 | 严重魔改 | 大幅偏离原著,可能引发争议 |

---

## Phase 5: 合规报告生成

汇总所有检测结果,生成结构化报告:

```bash
python3 scripts/report_generator.py --results <detection_results.json> --format <json|markdown>
```

报告内容:
- **总体风险等级**: 低 / 中 / 高 / 严重
- **版权侵权风险**: 疑似来源、相似段落、相似度分数
- **年龄分级合规**: 分级建议、各类别命中详情
- **小说魔改详情**: 偏离度评分、核心改动列表
- **违规位置标注**: 段落编号、时间戳、场景编号
- **整改建议清单**: 针对每项风险的具体修改建议

---

## Phase 6: 编排与完整审查

一键执行完整审查流程:

```bash
python3 scripts/review_orchestrator.py --input <script_file> [--reference-dir <dir>] [--original <file>] [--target-rating 12+] [--checks copyright rating adaptation]
```

流程:
1. 环境检测,确定运行模式
2. 加载输入文本(支持 .txt / .srt / .json 格式)
3. 执行选定的检测模块
4. AI 综合风险评估(混合模式)
5. 生成合规报告
6. 格式化风险提示文本,标注并告知用户违规风险

**风险提示格式**: 当检测到违规时,生成结构化的风险提示,供模型向用户展示具体的违规类型、位置和整改建议。

---

## 凭证安全

### 环境变量配置

**AI 分析(至少配置一个以启用混合模式):**
- `OPENAI_API_KEY` — OpenAI API(用于深度内容分析)
- `ANTHROPIC_API_KEY` — Anthropic Claude API(备选)

**安全原则:**
- 所有凭证仅通过环境变量读取,零持久化
- 不记录、不打印、不缓存任何密钥值
- 无 API 密钥时自动降级为本地模式

---

## 免责声明

本技能包提供的合规检测结果仅供参考,不构成法律意见。使用者应结合专业法律顾问的意见做出最终判断。检测结果可能存在误报或漏报,建议对高风险内容进行人工复核。


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### scripts/env_detect.py

```python
"""
环境检测模块 - ai-drama-review

检测运行环境,确定可用的分析能力。
"""

import json
import os
import platform
import subprocess
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent))
from credential_manager import _AI_PROVIDER_KEYS


def detect_python_version() -> dict:
    """检测 Python 版本。"""
    version = sys.version_info
    return {
        "version": f"{version.major}.{version.minor}.{version.micro}",
        "major": version.major,
        "minor": version.minor,
        "meets_minimum": version >= (3, 8),
    }


def detect_api_keys() -> dict:
    """检测 AI API 密钥可用性(仅检测存在性,不打印值)。"""
    return {
        env_var: bool(os.environ.get(env_var))
        for env_var in _AI_PROVIDER_KEYS.values()
    }


def detect_python_packages() -> dict:
    """检测可选 Python 包。"""
    packages = {}

    # jieba - 中文分词
    try:
        import jieba
        packages["jieba"] = {"installed": True, "version": jieba.__version__}
    except ImportError:
        packages["jieba"] = {"installed": False, "note": "可选,提升中文版权检测精度"}

    return packages


def detect_network() -> dict:
    """检测网络连通性。"""
    result = {"internet": False}

    try:
        import urllib.request
        urllib.request.urlopen("https://api.openai.com", timeout=5)
        result["internet"] = True
        result["openai_reachable"] = True
    except Exception:
        try:
            import urllib.request
            urllib.request.urlopen("https://www.baidu.com", timeout=5)
            result["internet"] = True
            result["openai_reachable"] = False
        except Exception:
            pass

    return result


def determine_run_mode(api_keys: dict) -> str:
    """
    确定运行模式。

    Args:
        api_keys: detect_api_keys() 的结果

    Returns:
        "hybrid" 或 "local_only"
    """
    if any(api_keys.values()):
        return "hybrid"
    return "local_only"


def run_full_detection() -> dict:
    """执行完整环境检测,返回 JSON 报告。"""
    python_info = detect_python_version()
    api_keys = detect_api_keys()
    packages = detect_python_packages()
    network = detect_network()
    run_mode = determine_run_mode(api_keys)

    report = {
        "system": {
            "os": platform.system(),
            "os_version": platform.version(),
            "architecture": platform.machine(),
        },
        "python": python_info,
        "api_keys": api_keys,
        "packages": packages,
        "network": network,
        "run_mode": run_mode,
        "capabilities": {
            "copyright_detection": True,
            "age_rating_scan": True,
            "adaptation_detection": True,
            "ai_deep_analysis": run_mode == "hybrid",
            "chinese_segmentation": packages.get("jieba", {}).get("installed", False),
        },
    }

    return report


if __name__ == "__main__":
    report = run_full_detection()
    print(json.dumps(report, indent=2, ensure_ascii=False))

```

### scripts/text_similarity.py

```python
"""
文本相似度检测引擎

用于版权侵权检测,支持三种互补的相似度算法:
- n-gram Jaccard 系数(局部词汇重复)
- 归一化编辑距离(整体文本差异)
- TF-IDF 余弦相似度(语义主题相似)

纯 Python 实现,不依赖外部 NLP 库。
"""

import math
import re
import sys
import unicodedata
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional


@dataclass
class SimilarityResult:
    """单段相似度检测结果。"""
    source_paragraph_index: int
    source_text: str
    reference_id: str
    reference_paragraph_index: int
    reference_text: str
    ngram_jaccard: float
    edit_distance_normalized: float
    cosine_similarity: float
    combined_score: float
    is_suspicious: bool


@dataclass
class CopyrightReport:
    """版权检测报告。"""
    total_paragraphs: int
    suspicious_paragraphs: int
    max_similarity_score: float
    risk_level: str  # "low" / "medium" / "high" / "critical"
    results: List[SimilarityResult] = field(default_factory=list)


# === 文本预处理 ===

def preprocess_text(text: str) -> str:
    """统一编码、去标点、去多余空白。"""
    # Unicode 归一化
    text = unicodedata.normalize("NFKC", text)
    # 去除标点
    text = re.sub(r'[^\w\s]', '', text)
    # 合并多余空白
    text = re.sub(r'\s+', ' ', text).strip()
    return text.lower()


def split_paragraphs(text: str, min_length: int = 20) -> List[str]:
    """按段落分割文本,过滤过短段落。"""
    paragraphs = re.split(r'\n\s*\n|\n', text)
    return [p.strip() for p in paragraphs if len(p.strip()) >= min_length]


def tokenize_chinese(text: str) -> List[str]:
    """中文分词(优先 jieba,降级到字符级)。"""
    try:
        import jieba
        return list(jieba.cut(text))
    except ImportError:
        # 降级:按字符分词,保留连续英文/数字为整词
        tokens = []
        current_ascii = []
        for char in text:
            if char.isascii() and char.isalnum():
                current_ascii.append(char)
            else:
                if current_ascii:
                    tokens.append(''.join(current_ascii))
                    current_ascii = []
                if char.strip():
                    tokens.append(char)
        if current_ascii:
            tokens.append(''.join(current_ascii))
        return tokens


# === n-gram 相似度 ===

def char_ngrams(text: str, n: int = 3) -> set:
    """生成字符级 n-gram 集合。"""
    text = preprocess_text(text)
    if len(text) < n:
        return {text} if text else set()
    return {text[i:i + n] for i in range(len(text) - n + 1)}


def word_ngrams(tokens: list, n: int = 2) -> set:
    """生成词级 n-gram 集合。"""
    if len(tokens) < n:
        return {tuple(tokens)} if tokens else set()
    return {tuple(tokens[i:i + n]) for i in range(len(tokens) - n + 1)}


def jaccard_similarity(set_a: set, set_b: set) -> float:
    """Jaccard 系数 = |A ∩ B| / |A ∪ B|。"""
    if not set_a and not set_b:
        return 1.0
    if not set_a or not set_b:
        return 0.0
    intersection = len(set_a & set_b)
    union = len(set_a | set_b)
    return intersection / union if union > 0 else 0.0


# === 编辑距离 ===

def edit_distance(s1: str, s2: str) -> int:
    """Levenshtein 编辑距离(空间优化为 O(min(m,n)))。"""
    if len(s1) < len(s2):
        s1, s2 = s2, s1

    prev_row = list(range(len(s2) + 1))
    for i, c1 in enumerate(s1):
        curr_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = prev_row[j + 1] + 1
            deletions = curr_row[j] + 1
            substitutions = prev_row[j] + (0 if c1 == c2 else 1)
            curr_row.append(min(insertions, deletions, substitutions))
        prev_row = curr_row
    return prev_row[-1]


def normalized_edit_distance(s1: str, s2: str) -> float:
    """归一化编辑距离 = edit_distance / max(len(s1), len(s2))。"""
    max_len = max(len(s1), len(s2))
    if max_len == 0:
        return 0.0
    return edit_distance(s1, s2) / max_len


# === TF-IDF 余弦相似度 ===

def compute_idf(corpus: List[List[str]]) -> dict:
    """计算逆文档频率(平滑版,避免 log(1)=0 的问题)。"""
    doc_count = len(corpus)
    if doc_count == 0:
        return {}

    df = {}
    for tokens in corpus:
        seen = set(tokens)
        for token in seen:
            df[token] = df.get(token, 0) + 1

    return {
        token: math.log((doc_count + 1) / (count + 1)) + 1.0
        for token, count in df.items()
    }


def build_tfidf_vector(tokens: list, idf_dict: dict) -> dict:
    """构建 TF-IDF 向量。"""
    tf = {}
    for token in tokens:
        tf[token] = tf.get(token, 0) + 1

    total = len(tokens) if tokens else 1
    return {
        token: (count / total) * idf_dict.get(token, 1.0)
        for token, count in tf.items()
    }


def cosine_similarity_vec(vec_a: dict, vec_b: dict) -> float:
    """余弦相似度。"""
    common_keys = set(vec_a.keys()) & set(vec_b.keys())
    dot_product = sum(vec_a[k] * vec_b[k] for k in common_keys)
    norm_a = math.sqrt(sum(v * v for v in vec_a.values()))
    norm_b = math.sqrt(sum(v * v for v in vec_b.values()))
    if norm_a == 0 or norm_b == 0:
        return 0.0
    return dot_product / (norm_a * norm_b)


# === 综合比对 ===

def combine_scores(ngram_sim: float, edit_dist_norm: float,
                   cosine_sim: float) -> float:
    """综合评分(加权平均)。"""
    edit_sim = 1.0 - edit_dist_norm
    return 0.3 * ngram_sim + 0.3 * edit_sim + 0.4 * cosine_sim


def compare_paragraphs(para_a: str, para_b: str,
                       idf_dict: dict = None) -> dict:
    """计算两段文本的全部相似度指标。"""
    # n-gram Jaccard
    ngrams_a = char_ngrams(para_a, n=3)
    ngrams_b = char_ngrams(para_b, n=3)
    ngram_sim = jaccard_similarity(ngrams_a, ngrams_b)

    # 编辑距离
    preprocessed_a = preprocess_text(para_a)
    preprocessed_b = preprocess_text(para_b)
    edit_dist = normalized_edit_distance(preprocessed_a, preprocessed_b)

    # TF-IDF 余弦
    tokens_a = tokenize_chinese(preprocessed_a)
    tokens_b = tokenize_chinese(preprocessed_b)

    if idf_dict is None:
        idf_dict = compute_idf([tokens_a, tokens_b])

    vec_a = build_tfidf_vector(tokens_a, idf_dict)
    vec_b = build_tfidf_vector(tokens_b, idf_dict)
    cosine_sim = cosine_similarity_vec(vec_a, vec_b)

    combined = combine_scores(ngram_sim, edit_dist, cosine_sim)

    return {
        "ngram_jaccard": round(ngram_sim, 4),
        "edit_distance_normalized": round(edit_dist, 4),
        "cosine_similarity": round(cosine_sim, 4),
        "combined_score": round(combined, 4),
    }


def _determine_risk_level(max_score: float, suspicious_count: int,
                          total: int) -> str:
    """根据检测结果确定风险等级。"""
    if suspicious_count == 0:
        return "low"
    ratio = suspicious_count / total if total > 0 else 0
    if max_score >= 0.95 or ratio >= 0.5:
        return "critical"
    if max_score >= 0.85 or ratio >= 0.3:
        return "high"
    if max_score >= 0.7 or ratio >= 0.1:
        return "medium"
    return "low"


def scan_for_plagiarism(input_text: str, reference_texts: dict,
                        threshold: float = 0.7) -> CopyrightReport:
    """
    主入口:扫描输入文本与参考文本库的相似度。

    Args:
        input_text: 待检剧本全文
        reference_texts: {"source_id": "全文内容", ...}
        threshold: 判定阈值 (默认 0.7)

    Returns:
        CopyrightReport
    """
    input_paragraphs = split_paragraphs(input_text)

    if not input_paragraphs:
        return CopyrightReport(
            total_paragraphs=0,
            suspicious_paragraphs=0,
            max_similarity_score=0.0,
            risk_level="low",
        )

    # 构建参考文本段落
    ref_paragraphs = {}
    for ref_id, ref_text in reference_texts.items():
        ref_paragraphs[ref_id] = split_paragraphs(ref_text)

    # 构建全局 IDF
    all_token_lists = []
    for para in input_paragraphs:
        all_token_lists.append(tokenize_chinese(preprocess_text(para)))
    for ref_id, paras in ref_paragraphs.items():
        for para in paras:
            all_token_lists.append(tokenize_chinese(preprocess_text(para)))
    global_idf = compute_idf(all_token_lists)

    # 逐段比对
    results = []
    for i, input_para in enumerate(input_paragraphs):
        best_match = None
        best_score = 0.0

        for ref_id, ref_paras in ref_paragraphs.items():
            for j, ref_para in enumerate(ref_paras):
                scores = compare_paragraphs(input_para, ref_para, global_idf)
                if scores["combined_score"] > best_score:
                    best_score = scores["combined_score"]
                    best_match = SimilarityResult(
                        source_paragraph_index=i,
                        source_text=input_para[:100],
                        reference_id=ref_id,
                        reference_paragraph_index=j,
                        reference_text=ref_para[:100],
                        ngram_jaccard=scores["ngram_jaccard"],
                        edit_distance_normalized=scores["edit_distance_normalized"],
                        cosine_similarity=scores["cosine_similarity"],
                        combined_score=scores["combined_score"],
                        is_suspicious=scores["combined_score"] >= threshold,
                    )

        if best_match and best_match.is_suspicious:
            results.append(best_match)

    suspicious_count = len(results)
    max_score = max((r.combined_score for r in results), default=0.0)

    return CopyrightReport(
        total_paragraphs=len(input_paragraphs),
        suspicious_paragraphs=suspicious_count,
        max_similarity_score=round(max_score, 4),
        risk_level=_determine_risk_level(
            max_score, suspicious_count, len(input_paragraphs)
        ),
        results=results,
    )


if __name__ == "__main__":
    import argparse
    import json

    parser = argparse.ArgumentParser(description="文本相似度检测")
    parser.add_argument("--input", required=True, help="输入文件路径")
    parser.add_argument("--reference-dir", required=True, help="参考文本目录")
    parser.add_argument("--threshold", type=float, default=0.7, help="判定阈值")
    args = parser.parse_args()

    input_path = Path(args.input)
    if not input_path.exists():
        print(f"错误: 输入文件不存在: {input_path}")
        sys.exit(1)

    input_text = input_path.read_text(encoding="utf-8")

    ref_dir = Path(args.reference_dir)
    reference_texts = {}
    if ref_dir.exists():
        for f in ref_dir.glob("*.txt"):
            reference_texts[f.stem] = f.read_text(encoding="utf-8")

    report = scan_for_plagiarism(input_text, reference_texts, args.threshold)

    print(f"=== 版权侵权检测报告 ===")
    print(f"总段落数: {report.total_paragraphs}")
    print(f"可疑段落: {report.suspicious_paragraphs}")
    print(f"最高相似度: {report.max_similarity_score}")
    print(f"风险等级: {report.risk_level}")

    if report.results:
        print(f"\n可疑段落详情:")
        for r in report.results:
            print(f"\n  段落 {r.source_paragraph_index}: "
                  f"综合得分 {r.combined_score:.4f}")
            print(f"  来源: {r.reference_id} 段落 {r.reference_paragraph_index}")
            print(f"  原文: {r.source_text[:60]}...")
            print(f"  参考: {r.reference_text[:60]}...")

```

### scripts/age_rating_scanner.py

```python
"""
年龄分级合规检测器

两层架构:
Layer 1: 本地关键词快速扫描
Layer 2: AI 上下文深度分析(需要 API 密钥)
"""

import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional


@dataclass
class KeywordHit:
    """关键词命中记录。"""
    keyword: str
    category: str       # "violence" / "sexual" / "horror" / "profanity" / "substance"
    severity: str       # "mild" / "moderate" / "severe"
    paragraph_index: int
    position_in_paragraph: int
    context: str        # 命中前后上下文
    timestamp: Optional[str] = None


@dataclass
class RatingResult:
    """分级检测结果。"""
    suggested_rating: str   # "all_ages" / "12+" / "18+" / "non_compliant"
    target_rating: str      # 用户期望的分级
    is_compliant: bool
    total_hits: int
    hits_by_category: dict = field(default_factory=dict)
    hits_by_severity: dict = field(default_factory=dict)
    keyword_hits: List[KeywordHit] = field(default_factory=list)
    ai_analysis: Optional[dict] = None
    risk_level: str = "low"


# === 关键词库管理 ===

def _get_keywords_dir() -> Path:
    """获取关键词库目录。"""
    return Path(__file__).parent.parent / "assets" / "keyword_databases"


def _get_rules_dir() -> Path:
    """获取分级规则目录。"""
    return Path(__file__).parent.parent / "assets" / "rating_rules"


def load_keyword_database(category: str) -> dict:
    """加载指定类别的关键词库。"""
    filepath = _get_keywords_dir() / f"{category}_keywords.json"
    if not filepath.exists():
        return {"category": category, "keywords": {}}

    with open(filepath, "r", encoding="utf-8") as f:
        return json.load(f)


def load_all_keywords() -> dict:
    """加载全部类别的关键词库。"""
    categories = ["violence", "sexual", "horror", "profanity", "substance"]
    all_kw = {}
    for cat in categories:
        db = load_keyword_database(cat)
        all_kw[cat] = db.get("keywords", {})
    return all_kw


def load_rating_rules(ruleset: str = "china") -> dict:
    """加载分级规则配置。"""
    filepath = _get_rules_dir() / f"{ruleset}_rating.json"
    if not filepath.exists():
        raise FileNotFoundError(f"分级规则文件不存在: {filepath}")

    with open(filepath, "r", encoding="utf-8") as f:
        return json.load(f)


# === Layer 1: 本地关键词扫描 ===

def _extract_context(text: str, pos: int, window: int = 30) -> str:
    """提取命中位置前后的上下文。"""
    start = max(0, pos - window)
    end = min(len(text), pos + window)
    return text[start:end]


def scan_keywords(text: str, keywords_db: dict) -> List[KeywordHit]:
    """逐段扫描关键词命中。"""
    paragraphs = re.split(r'\n\s*\n|\n', text)
    hits = []

    for para_idx, paragraph in enumerate(paragraphs):
        paragraph_lower = paragraph.lower()

        for category, keywords in keywords_db.items():
            for keyword, info in keywords.items():
                # 检查主关键词
                all_forms = [keyword] + info.get("aliases", [])
                for form in all_forms:
                    form_lower = form.lower()
                    start = 0
                    while True:
                        pos = paragraph_lower.find(form_lower, start)
                        if pos == -1:
                            break
                        hits.append(KeywordHit(
                            keyword=form,
                            category=category,
                            severity=info["severity"],
                            paragraph_index=para_idx,
                            position_in_paragraph=pos,
                            context=_extract_context(paragraph, pos),
                        ))
                        start = pos + len(form_lower)

    return hits


def _count_by_field(hits: List[KeywordHit], field_name: str) -> dict:
    """按指定字段统计命中数。"""
    counts = {}
    for hit in hits:
        value = getattr(hit, field_name)
        counts[value] = counts.get(value, 0) + 1
    return counts


def calculate_initial_rating(hits: List[KeywordHit],
                             rules: dict) -> str:
    """根据关键词命中情况计算初步分级。"""
    if not hits:
        return "all_ages"

    by_severity = _count_by_field(hits, "severity")
    by_category = _count_by_field(hits, "category")

    # 检查是否触发不合规
    for trigger in rules.get("non_compliant_triggers", []):
        if "category" in trigger and "severity" in trigger:
            cat_sev_count = sum(
                1 for h in hits
                if h.category == trigger["category"]
                and h.severity == trigger["severity"]
            )
            if cat_sev_count >= trigger.get("min_count", 1):
                return "non_compliant"
        elif "severity" in trigger:
            sev_count = by_severity.get(trigger["severity"], 0)
            if sev_count >= trigger.get("min_count", 1):
                return "non_compliant"

    # 逐级检查分级
    ratings_order = ["all_ages", "12+", "18+"]
    ratings_config = rules.get("ratings", {})

    for rating in ratings_order:
        config = ratings_config.get(rating, {})

        # 检查禁止类别
        forbidden = config.get("forbidden_categories", [])
        if any(by_category.get(cat, 0) > 0 for cat in forbidden):
            continue

        # 检查严重度上限
        max_sev = config.get("max_severity", "severe")
        severity_order = {"mild": 0, "moderate": 1, "severe": 2}
        max_sev_level = severity_order.get(max_sev, 2)

        # 检查是否有超过允许严重度的命中
        exceeded = False
        for sev, level in severity_order.items():
            if level > max_sev_level and by_severity.get(sev, 0) > 0:
                exceeded = True
                break

        if exceeded:
            continue

        # 检查各严重度的数量限制
        mild_limit = config.get("max_hits_mild", -1)
        moderate_limit = config.get("max_hits_moderate", -1)
        severe_limit = config.get("max_hits_severe", -1)

        mild_ok = mild_limit == -1 or by_severity.get("mild", 0) <= mild_limit
        moderate_ok = moderate_limit == -1 or by_severity.get("moderate", 0) <= moderate_limit
        severe_ok = severe_limit == -1 or by_severity.get("severe", 0) <= severe_limit

        if mild_ok and moderate_ok and severe_ok:
            return rating

    return "18+"


def _rating_order(rating: str) -> int:
    """分级排序值。"""
    order = {"all_ages": 0, "12+": 1, "18+": 2, "non_compliant": 3}
    return order.get(rating, 3)


def _determine_risk_level(suggested: str, target: str) -> str:
    """确定风险等级。"""
    if suggested == "non_compliant":
        return "critical"
    if _rating_order(suggested) > _rating_order(target):
        diff = _rating_order(suggested) - _rating_order(target)
        if diff >= 2:
            return "high"
        return "medium"
    return "low"


# === 辅助分析 ===

def analyze_frame_descriptions(descriptions: List[dict],
                               keywords_db: dict) -> List[KeywordHit]:
    """分析视频关键帧描述文本。"""
    hits = []
    for desc_item in descriptions:
        text = desc_item.get("description", "")
        timestamp = desc_item.get("timestamp", "")
        frame_hits = scan_keywords(text, keywords_db)
        for hit in frame_hits:
            hit.timestamp = timestamp
        hits.extend(frame_hits)
    return hits


def analyze_audio_transcript(transcript: str,
                             keywords_db: dict) -> List[KeywordHit]:
    """分析音频转录文本。"""
    return scan_keywords(transcript, keywords_db)


# === 主入口 ===

def run_age_rating_scan(text: str, target_rating: str = "all_ages",
                        ruleset: str = "china",
                        frame_descriptions: list = None,
                        audio_transcript: str = None) -> RatingResult:
    """
    完整的年龄分级扫描流程。

    Args:
        text: 剧本/台词文本
        target_rating: 用户期望的目标分级
        ruleset: 分级规则集("china" 或 "general")
        frame_descriptions: 视频关键帧描述列表
        audio_transcript: 音频转录文本

    Returns:
        RatingResult
    """
    keywords_db = load_all_keywords()
    rules = load_rating_rules(ruleset)

    # Layer 1: 本地扫描
    all_hits = scan_keywords(text, keywords_db)

    # 辅助内容分析
    if frame_descriptions:
        all_hits.extend(analyze_frame_descriptions(frame_descriptions, keywords_db))
    if audio_transcript:
        all_hits.extend(analyze_audio_transcript(audio_transcript, keywords_db))

    # 计算分级
    suggested = calculate_initial_rating(all_hits, rules)
    is_compliant = _rating_order(suggested) <= _rating_order(target_rating)
    risk_level = _determine_risk_level(suggested, target_rating)

    return RatingResult(
        suggested_rating=suggested,
        target_rating=target_rating,
        is_compliant=is_compliant,
        total_hits=len(all_hits),
        hits_by_category=_count_by_field(all_hits, "category"),
        hits_by_severity=_count_by_field(all_hits, "severity"),
        keyword_hits=all_hits,
        risk_level=risk_level,
    )


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="年龄分级合规检测")
    parser.add_argument("--input", required=True, help="输入文件路径")
    parser.add_argument("--target-rating", default="all_ages",
                        choices=["all_ages", "12+", "18+"])
    parser.add_argument("--ruleset", default="china",
                        choices=["china", "general"])
    args = parser.parse_args()

    input_path = Path(args.input)
    if not input_path.exists():
        print(f"错误: 输入文件不存在: {input_path}")
        sys.exit(1)

    text = input_path.read_text(encoding="utf-8")
    result = run_age_rating_scan(text, args.target_rating, args.ruleset)

    print(f"=== 年龄分级检测报告 ===")
    print(f"建议分级: {result.suggested_rating}")
    print(f"目标分级: {result.target_rating}")
    print(f"是否合规: {'是' if result.is_compliant else '否'}")
    print(f"风险等级: {result.risk_level}")
    print(f"总命中数: {result.total_hits}")
    print(f"按类别: {json.dumps(result.hits_by_category, ensure_ascii=False)}")
    print(f"按严重度: {json.dumps(result.hits_by_severity, ensure_ascii=False)}")

```

### scripts/adaptation_detector.py

```python
"""
小说魔改检测器

比对原著与改编版本,量化改编偏离程度。
使用 Needleman-Wunsch 变体进行章节对齐。
"""

import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional

sys.path.insert(0, str(Path(__file__).parent))
from text_similarity import (
    preprocess_text, char_ngrams, jaccard_similarity,
    tokenize_chinese, compute_idf, build_tfidf_vector,
    cosine_similarity_vec,
)


@dataclass
class PlotPoint:
    """情节点。"""
    index: int
    summary: str
    characters: List[str] = field(default_factory=list)
    location: Optional[str] = None
    importance: str = "normal"  # "core" / "normal" / "minor"


@dataclass
class CharacterProfile:
    """角色概要。"""
    name: str
    traits: List[str] = field(default_factory=list)
    relationships: dict = field(default_factory=dict)
    fate: Optional[str] = None


@dataclass
class DeviationItem:
    """偏离项。"""
    deviation_type: str  # "plot_added"/"plot_removed"/"plot_modified"
                         # "character_changed"/"setting_changed"
    original_content: str
    adapted_content: str
    severity: str        # "minor" / "moderate" / "major"
    description: str


@dataclass
class AdaptationReport:
    """改编检测报告。"""
    deviation_score: float        # 0-100
    adaptation_type: str          # "faithful"/"reasonable"/"severe_modification"
    total_deviations: int
    deviations_by_type: dict = field(default_factory=dict)
    deviations_by_severity: dict = field(default_factory=dict)
    deviation_items: List[DeviationItem] = field(default_factory=list)
    section_alignment: list = field(default_factory=list)


# === 文本结构提取 ===

def extract_sections(text: str) -> List[dict]:
    """
    提取章节/段落结构。

    尝试按章节标题分割,如果没有明确标题则按段落分割。
    """
    # 尝试按中文章节标题分割
    chapter_pattern = r'(第[一二三四五六七八九十百千\d]+[章节回集幕][\s::]*[^\n]*)'
    chapters = re.split(chapter_pattern, text)

    sections = []
    if len(chapters) > 1:
        # 有明确章节标题
        i = 0
        while i < len(chapters):
            if re.match(chapter_pattern, chapters[i]):
                title = chapters[i].strip()
                content = chapters[i + 1].strip() if i + 1 < len(chapters) else ""
                sections.append({"title": title, "content": content})
                i += 2
            else:
                if chapters[i].strip():
                    sections.append({"title": "", "content": chapters[i].strip()})
                i += 1
    else:
        # 按段落分割
        paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
        for i, para in enumerate(paragraphs):
            if len(para) >= 15:  # 过滤过短段落
                sections.append({"title": f"段落{i + 1}", "content": para})

    return sections


def _quick_similarity(text_a: str, text_b: str) -> float:
    """快速计算两段文本的相似度(用于对齐)。"""
    if not text_a or not text_b:
        return 0.0

    # 使用字符 n-gram Jaccard 作为快速相似度
    ngrams_a = char_ngrams(text_a, n=3)
    ngrams_b = char_ngrams(text_b, n=3)
    return jaccard_similarity(ngrams_a, ngrams_b)


# === 章节对齐 ===

def align_sections(original_sections: list, adapted_sections: list) -> list:
    """
    基于 Needleman-Wunsch 变体的章节对齐。

    Returns:
        [(orig_idx_or_None, adapted_idx_or_None, similarity, status), ...]
        status: "matched" / "added" / "removed" / "modified"
    """
    m = len(original_sections)
    n = len(adapted_sections)

    if m == 0 and n == 0:
        return []
    if m == 0:
        return [(None, j, 0.0, "added") for j in range(n)]
    if n == 0:
        return [(i, None, 0.0, "removed") for i in range(m)]

    # 构建相似度矩阵
    sim_matrix = [[0.0] * n for _ in range(m)]
    for i in range(m):
        for j in range(n):
            sim_matrix[i][j] = _quick_similarity(
                original_sections[i]["content"],
                adapted_sections[j]["content"],
            )

    # 动态规划
    GAP_PENALTY = -0.1
    dp = [[0.0] * (n + 1) for _ in range(m + 1)]

    for i in range(1, m + 1):
        dp[i][0] = dp[i - 1][0] + GAP_PENALTY
    for j in range(1, n + 1):
        dp[0][j] = dp[0][j - 1] + GAP_PENALTY

    for i in range(1, m + 1):
        for j in range(1, n + 1):
            match_score = dp[i - 1][j - 1] + sim_matrix[i - 1][j - 1]
            skip_orig = dp[i - 1][j] + GAP_PENALTY
            skip_adapt = dp[i][j - 1] + GAP_PENALTY
            dp[i][j] = max(match_score, skip_orig, skip_adapt)

    # 回溯
    alignment = []
    i, j = m, n
    while i > 0 or j > 0:
        if i > 0 and j > 0 and dp[i][j] == dp[i - 1][j - 1] + sim_matrix[i - 1][j - 1]:
            sim = sim_matrix[i - 1][j - 1]
            status = "matched" if sim >= 0.3 else "modified"
            alignment.append((i - 1, j - 1, sim, status))
            i -= 1
            j -= 1
        elif i > 0 and dp[i][j] == dp[i - 1][j] + GAP_PENALTY:
            alignment.append((i - 1, None, 0.0, "removed"))
            i -= 1
        else:
            alignment.append((None, j - 1, 0.0, "added"))
            j -= 1

    alignment.reverse()
    return alignment


# === 角色分析 ===

def extract_characters_local(text: str) -> List[str]:
    """本地方式提取角色名(基于高频重复的短词)。"""
    # 简单启发式:提取引号中出现的称呼和高频 2-3 字名
    names = set()

    # 提取对话前的称呼
    dialogue_pattern = r'[「『"](.*?)[」』"]'
    speaker_pattern = r'(\S{2,4})[说道叫喊问答笑哭]'
    for match in re.finditer(speaker_pattern, text):
        name = match.group(1)
        if len(name) <= 4 and not any(c.isdigit() for c in name):
            names.add(name)

    return list(names)


# === 偏离度计算 ===

def _classify_deviation_severity(sim: float, status: str) -> str:
    """判定偏离严重程度。"""
    if status == "removed":
        return "major"
    if status == "added":
        return "moderate"
    if status == "modified":
        if sim >= 0.5:
            return "minor"
        if sim >= 0.2:
            return "moderate"
        return "major"
    return "minor"


def build_deviations(alignment: list, original_sections: list,
                     adapted_sections: list) -> List[DeviationItem]:
    """从对齐结果构建偏离项列表。"""
    deviations = []

    for orig_idx, adapt_idx, sim, status in alignment:
        if status == "matched":
            continue

        orig_content = (original_sections[orig_idx]["content"][:200]
                        if orig_idx is not None else "")
        adapt_content = (adapted_sections[adapt_idx]["content"][:200]
                         if adapt_idx is not None else "")

        severity = _classify_deviation_severity(sim, status)

        if status == "removed":
            desc = f"原著段落被删除"
            dev_type = "plot_removed"
        elif status == "added":
            desc = f"新增了原著中没有的内容"
            dev_type = "plot_added"
        else:
            desc = f"内容被修改(相似度: {sim:.2f})"
            dev_type = "plot_modified"

        deviations.append(DeviationItem(
            deviation_type=dev_type,
            original_content=orig_content,
            adapted_content=adapt_content,
            severity=severity,
            description=desc,
        ))

    return deviations


def calculate_deviation_score(deviations: List[DeviationItem],
                              total_sections: int) -> float:
    """
    计算偏离度评分 (0-100)。

    权重设计:
    - plot_removed × 3.0(删除原著核心最严重)
    - plot_modified × 2.0
    - plot_added × 1.0
    - character_changed × 2.5
    - setting_changed × 1.5
    严重度加权:minor × 0.5, moderate × 1.0, major × 2.0
    """
    if not deviations or total_sections == 0:
        return 0.0

    severity_weights = {"minor": 0.5, "moderate": 1.0, "major": 2.0}
    type_weights = {
        "plot_removed": 3.0,
        "plot_modified": 2.0,
        "plot_added": 1.0,
        "character_changed": 2.5,
        "setting_changed": 1.5,
    }

    weighted_sum = 0.0
    for d in deviations:
        sw = severity_weights.get(d.severity, 1.0)
        tw = type_weights.get(d.deviation_type, 1.0)
        weighted_sum += sw * tw

    # 归一化到 0-100
    max_possible = total_sections * 3.0 * 2.0  # 全部为 major + removed
    score = min(100.0, (weighted_sum / max(max_possible, 1)) * 100)
    return round(score, 1)


def classify_adaptation(score: float) -> str:
    """分类改编类型。"""
    if score <= 30:
        return "faithful"
    if score <= 60:
        return "reasonable"
    return "severe_modification"


# === 主入口 ===

def detect_adaptation(original_text: str, adapted_text: str) -> AdaptationReport:
    """
    完整的魔改检测流程。

    Args:
        original_text: 原著全文
        adapted_text: 改编版全文

    Returns:
        AdaptationReport
    """
    # 提取结构
    orig_sections = extract_sections(original_text)
    adapt_sections = extract_sections(adapted_text)

    if not orig_sections and not adapt_sections:
        return AdaptationReport(
            deviation_score=0.0,
            adaptation_type="faithful",
            total_deviations=0,
        )

    # 章节对齐
    alignment = align_sections(orig_sections, adapt_sections)

    # 构建偏离项
    deviations = build_deviations(alignment, orig_sections, adapt_sections)

    # 计算偏离度
    total_sections = max(len(orig_sections), len(adapt_sections))
    score = calculate_deviation_score(deviations, total_sections)
    adaptation_type = classify_adaptation(score)

    # 统计
    by_type = {}
    by_severity = {}
    for d in deviations:
        by_type[d.deviation_type] = by_type.get(d.deviation_type, 0) + 1
        by_severity[d.severity] = by_severity.get(d.severity, 0) + 1

    return AdaptationReport(
        deviation_score=score,
        adaptation_type=adaptation_type,
        total_deviations=len(deviations),
        deviations_by_type=by_type,
        deviations_by_severity=by_severity,
        deviation_items=deviations,
        section_alignment=[(o, a, s, st) for o, a, s, st in alignment],
    )


if __name__ == "__main__":
    import argparse
    import json

    parser = argparse.ArgumentParser(description="小说魔改检测")
    parser.add_argument("--original", required=True, help="原著文件路径")
    parser.add_argument("--adapted", required=True, help="改编版文件路径")
    args = parser.parse_args()

    orig_path = Path(args.original)
    adapt_path = Path(args.adapted)

    if not orig_path.exists():
        print(f"错误: 原著文件不存在: {orig_path}")
        sys.exit(1)
    if not adapt_path.exists():
        print(f"错误: 改编文件不存在: {adapt_path}")
        sys.exit(1)

    original = orig_path.read_text(encoding="utf-8")
    adapted = adapt_path.read_text(encoding="utf-8")

    report = detect_adaptation(original, adapted)

    print(f"=== 小说魔改检测报告 ===")
    print(f"偏离度评分: {report.deviation_score}/100")
    print(f"改编类型: {report.adaptation_type}")
    print(f"总偏离数: {report.total_deviations}")
    print(f"按类型: {json.dumps(report.deviations_by_type, ensure_ascii=False)}")
    print(f"按严重度: {json.dumps(report.deviations_by_severity, ensure_ascii=False)}")

    if report.deviation_items:
        print(f"\n偏离详情:")
        for d in report.deviation_items[:10]:
            print(f"  [{d.severity}] {d.description}")
            if d.original_content:
                print(f"    原文: {d.original_content[:80]}...")
            if d.adapted_content:
                print(f"    改编: {d.adapted_content[:80]}...")

```

### scripts/report_generator.py

```python
"""
合规报告生成器

生成结构化 JSON 报告和可读 Markdown 报告。
"""

import json
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import List, Optional


@dataclass
class ComplianceReport:
    """完整合规报告。"""
    report_id: str = ""
    generated_at: str = ""
    input_file: str = ""
    overall_risk_level: str = "low"  # "low"/"medium"/"high"/"critical"
    overall_score: float = 100.0     # 0-100 合规得分(越高越合规)

    copyright_result: Optional[dict] = None
    age_rating_result: Optional[dict] = None
    adaptation_result: Optional[dict] = None

    violation_summary: List[dict] = field(default_factory=list)
    remediation_suggestions: List[str] = field(default_factory=list)


def calculate_overall_risk(copyright_result: dict = None,
                           age_rating_result: dict = None,
                           adaptation_result: dict = None) -> tuple:
    """
    计算总体风险等级和合规得分。

    Returns:
        (risk_level, score)
    """
    risk_scores = []  # 各模块的风险分(越高越危险)

    if copyright_result:
        level = copyright_result.get("risk_level", "low")
        level_map = {"low": 0, "medium": 30, "high": 60, "critical": 90}
        risk_scores.append(level_map.get(level, 0))

    if age_rating_result:
        if not age_rating_result.get("is_compliant", True):
            level = age_rating_result.get("risk_level", "low")
            level_map = {"low": 0, "medium": 30, "high": 60, "critical": 90}
            risk_scores.append(level_map.get(level, 0))
        else:
            risk_scores.append(0)

    if adaptation_result:
        score = adaptation_result.get("deviation_score", 0)
        if score >= 60:
            risk_scores.append(70)
        elif score >= 30:
            risk_scores.append(30)
        else:
            risk_scores.append(0)

    if not risk_scores:
        return "low", 100.0

    max_risk = max(risk_scores)
    avg_risk = sum(risk_scores) / len(risk_scores)

    # 综合风险:最大风险权重 0.7 + 平均风险 0.3
    combined_risk = max_risk * 0.7 + avg_risk * 0.3

    if combined_risk >= 70:
        risk_level = "critical"
    elif combined_risk >= 45:
        risk_level = "high"
    elif combined_risk >= 20:
        risk_level = "medium"
    else:
        risk_level = "low"

    compliance_score = max(0, 100 - combined_risk)
    return risk_level, round(compliance_score, 1)


def _build_violation_summary(copyright_result: dict = None,
                             age_rating_result: dict = None,
                             adaptation_result: dict = None) -> List[dict]:
    """构建违规摘要列表。"""
    violations = []

    if copyright_result and copyright_result.get("suspicious_paragraphs", 0) > 0:
        violations.append({
            "type": "copyright",
            "severity": copyright_result.get("risk_level", "medium"),
            "description": (
                f"发现 {copyright_result['suspicious_paragraphs']} 个疑似侵权段落,"
                f"最高相似度 {copyright_result.get('max_similarity_score', 0):.2f}"
            ),
        })

    if age_rating_result and not age_rating_result.get("is_compliant", True):
        violations.append({
            "type": "age_rating",
            "severity": age_rating_result.get("risk_level", "medium"),
            "description": (
                f"内容建议分级 {age_rating_result.get('suggested_rating', '未知')},"
                f"超出目标分级 {age_rating_result.get('target_rating', '未知')},"
                f"共 {age_rating_result.get('total_hits', 0)} 处命中"
            ),
        })

    if adaptation_result and adaptation_result.get("deviation_score", 0) >= 60:
        violations.append({
            "type": "adaptation",
            "severity": "high",
            "description": (
                f"改编偏离度 {adaptation_result['deviation_score']}/100,"
                f"属于{_translate_adaptation_type(adaptation_result.get('adaptation_type', ''))},"
                f"共 {adaptation_result.get('total_deviations', 0)} 处偏离"
            ),
        })

    return violations


def _translate_adaptation_type(t: str) -> str:
    """翻译改编类型。"""
    types = {
        "faithful": "忠实改编",
        "reasonable": "合理改编",
        "severe_modification": "严重魔改",
    }
    return types.get(t, t)


def _build_remediation(violations: List[dict]) -> List[str]:
    """根据违规摘要生成整改建议。"""
    suggestions = []

    for v in violations:
        if v["type"] == "copyright":
            suggestions.append("对疑似侵权段落进行原创性改写,避免与已有作品高度相似")
            suggestions.append("核实参考来源的版权状态,确认是否需要获取授权")
        elif v["type"] == "age_rating":
            suggestions.append("修改或删除不符合目标年龄分级的内容")
            suggestions.append("对暴力/恐怖/不当场景进行弱化处理")
        elif v["type"] == "adaptation":
            suggestions.append("重新审视对原著核心情节的改动,确保改编的合理性")
            suggestions.append("考虑获取原著权利人的改编授权")

    if not suggestions:
        suggestions.append("当前内容未发现明显违规,建议定期复查")

    return list(dict.fromkeys(suggestions))  # 去重保序


def generate_json_report(report: ComplianceReport) -> str:
    """生成 JSON 格式报告。"""
    data = {
        "report_id": report.report_id,
        "generated_at": report.generated_at,
        "input_file": report.input_file,
        "overall_risk_level": report.overall_risk_level,
        "overall_score": report.overall_score,
        "violation_summary": report.violation_summary,
        "remediation_suggestions": report.remediation_suggestions,
    }
    if report.copyright_result:
        data["copyright_detection"] = report.copyright_result
    if report.age_rating_result:
        data["age_rating_scan"] = report.age_rating_result
    if report.adaptation_result:
        data["adaptation_detection"] = report.adaptation_result

    return json.dumps(data, ensure_ascii=False, indent=2)


def generate_markdown_report(report: ComplianceReport) -> str:
    """生成 Markdown 可读报告。"""
    lines = [
        f"# AI短剧合规审查报告",
        f"",
        f"**报告 ID**: {report.report_id}",
        f"**生成时间**: {report.generated_at}",
        f"**输入文件**: {report.input_file}",
        f"",
        f"## 总体评估",
        f"",
        f"| 项目 | 结果 |",
        f"|------|------|",
        f"| 风险等级 | **{report.overall_risk_level.upper()}** |",
        f"| 合规得分 | {report.overall_score}/100 |",
        f"",
    ]

    if report.violation_summary:
        lines.append("## 违规摘要")
        lines.append("")
        for v in report.violation_summary:
            emoji = {"low": "!", "medium": "!!", "high": "!!!", "critical": "!!!!"}
            lines.append(f"- [{v['severity'].upper()}] **{v['type']}**: {v['description']}")
        lines.append("")

    if report.copyright_result:
        cr = report.copyright_result
        lines.append("## 版权侵权检测")
        lines.append("")
        lines.append(f"- 总段落数: {cr.get('total_paragraphs', 0)}")
        lines.append(f"- 可疑段落: {cr.get('suspicious_paragraphs', 0)}")
        lines.append(f"- 最高相似度: {cr.get('max_similarity_score', 0):.4f}")
        lines.append(f"- 风险等级: {cr.get('risk_level', 'low')}")
        lines.append("")

    if report.age_rating_result:
        ar = report.age_rating_result
        lines.append("## 年龄分级合规")
        lines.append("")
        lines.append(f"- 建议分级: {ar.get('suggested_rating', 'N/A')}")
        lines.append(f"- 目标分级: {ar.get('target_rating', 'N/A')}")
        lines.append(f"- 是否合规: {'是' if ar.get('is_compliant') else '否'}")
        lines.append(f"- 总命中数: {ar.get('total_hits', 0)}")
        lines.append("")

    if report.adaptation_result:
        ad = report.adaptation_result
        lines.append("## 小说改编检测")
        lines.append("")
        lines.append(f"- 偏离度: {ad.get('deviation_score', 0)}/100")
        lines.append(f"- 改编类型: {_translate_adaptation_type(ad.get('adaptation_type', ''))}")
        lines.append(f"- 总偏离数: {ad.get('total_deviations', 0)}")
        lines.append("")

    lines.append("## 整改建议")
    lines.append("")
    for i, suggestion in enumerate(report.remediation_suggestions, 1):
        lines.append(f"{i}. {suggestion}")
    lines.append("")

    lines.append("---")
    lines.append("*本报告由 ai-drama-review 自动生成,仅供参考,不作为法律依据。*")

    return "\n".join(lines)


def generate_violation_annotations(report: ComplianceReport) -> list:
    """生成违规位置标注列表。"""
    annotations = []

    if report.copyright_result:
        for r in report.copyright_result.get("results", []):
            annotations.append({
                "type": "copyright",
                "location": {"paragraph": r.get("source_paragraph_index", 0)},
                "severity": "high" if r.get("combined_score", 0) >= 0.85 else "medium",
                "description": (
                    f"与 {r.get('reference_id', '未知')} 相似度 "
                    f"{r.get('combined_score', 0):.2f}"
                ),
            })

    if report.age_rating_result:
        for hit in report.age_rating_result.get("keyword_hits", []):
            annotations.append({
                "type": "age_rating",
                "location": {
                    "paragraph": hit.get("paragraph_index", 0),
                    "timestamp": hit.get("timestamp"),
                },
                "severity": hit.get("severity", "mild"),
                "description": (
                    f"[{hit.get('category', '')}] "
                    f"关键词 '{hit.get('keyword', '')}'"
                ),
            })

    return annotations


def build_full_report(input_file: str, copyright_result=None,
                      age_rating_result=None,
                      adaptation_result=None) -> ComplianceReport:
    """汇总所有检测结果,构建完整报告。"""
    risk_level, score = calculate_overall_risk(
        copyright_result, age_rating_result, adaptation_result
    )

    violations = _build_violation_summary(
        copyright_result, age_rating_result, adaptation_result
    )
    remediation = _build_remediation(violations)

    report = ComplianceReport(
        report_id=f"DR-{datetime.now().strftime('%Y%m%d%H%M%S')}",
        generated_at=datetime.now().isoformat(),
        input_file=input_file,
        overall_risk_level=risk_level,
        overall_score=score,
        copyright_result=copyright_result,
        age_rating_result=age_rating_result,
        adaptation_result=adaptation_result,
        violation_summary=violations,
        remediation_suggestions=remediation,
    )

    return report

```

### scripts/review_orchestrator.py

```python
"""
审查流程编排器

协调版权检测、年龄分级、魔改检测三大模块,
输出统一的合规报告。
"""

import argparse
import json
import sys
from dataclasses import asdict
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent))
from env_detect import run_full_detection, determine_run_mode, detect_api_keys
from text_similarity import scan_for_plagiarism, CopyrightReport
from age_rating_scanner import run_age_rating_scan, RatingResult
from adaptation_detector import detect_adaptation, AdaptationReport
from report_generator import (
    build_full_report, generate_json_report, generate_markdown_report,
)


def load_input_text(file_path: str) -> str:
    """加载输入文件(支持 .txt / .srt / .json)。"""
    path = Path(file_path)
    suffix = path.suffix.lower()

    text = path.read_text(encoding="utf-8")

    if suffix == ".json":
        data = json.loads(text)
        # 尝试提取常见字段
        if isinstance(data, dict):
            parts = []
            for key in ["script", "text", "content", "dialogue", "subtitles"]:
                if key in data:
                    val = data[key]
                    if isinstance(val, str):
                        parts.append(val)
                    elif isinstance(val, list):
                        parts.extend(
                            item.get("text", str(item))
                            if isinstance(item, dict) else str(item)
                            for item in val
                        )
            return "\n".join(parts) if parts else text
        return text

    if suffix == ".srt":
        # 提取 SRT 字幕中的文本行
        lines = []
        for line in text.split("\n"):
            line = line.strip()
            # 跳过序号行、时间码行、空行
            if not line or line.isdigit() or "-->" in line:
                continue
            lines.append(line)
        return "\n".join(lines)

    return text


def load_reference_texts(reference_dir: str) -> dict:
    """加载参考文本库。"""
    ref_dir = Path(reference_dir)
    if not ref_dir.exists():
        return {}

    texts = {}
    for f in ref_dir.iterdir():
        if f.suffix.lower() in (".txt", ".md"):
            texts[f.stem] = f.read_text(encoding="utf-8")
    return texts


def _copyright_result_to_dict(report: CopyrightReport) -> dict:
    """将 CopyrightReport 转为字典。"""
    return {
        "total_paragraphs": report.total_paragraphs,
        "suspicious_paragraphs": report.suspicious_paragraphs,
        "max_similarity_score": report.max_similarity_score,
        "risk_level": report.risk_level,
        "results": [
            {
                "source_paragraph_index": r.source_paragraph_index,
                "source_text": r.source_text,
                "reference_id": r.reference_id,
                "combined_score": r.combined_score,
            }
            for r in report.results
        ],
    }


def _rating_result_to_dict(result: RatingResult) -> dict:
    """将 RatingResult 转为字典。"""
    return {
        "suggested_rating": result.suggested_rating,
        "target_rating": result.target_rating,
        "is_compliant": result.is_compliant,
        "total_hits": result.total_hits,
        "hits_by_category": result.hits_by_category,
        "hits_by_severity": result.hits_by_severity,
        "risk_level": result.risk_level,
        "keyword_hits": [
            {
                "keyword": h.keyword,
                "category": h.category,
                "severity": h.severity,
                "paragraph_index": h.paragraph_index,
                "context": h.context,
                "timestamp": h.timestamp,
            }
            for h in result.keyword_hits[:50]  # 限制数量
        ],
    }


def _adaptation_result_to_dict(report: AdaptationReport) -> dict:
    """将 AdaptationReport 转为字典。"""
    return {
        "deviation_score": report.deviation_score,
        "adaptation_type": report.adaptation_type,
        "total_deviations": report.total_deviations,
        "deviations_by_type": report.deviations_by_type,
        "deviations_by_severity": report.deviations_by_severity,
        "deviation_items": [
            {
                "type": d.deviation_type,
                "severity": d.severity,
                "description": d.description,
                "original": d.original_content[:100],
                "adapted": d.adapted_content[:100],
            }
            for d in report.deviation_items[:20]
        ],
    }


def format_user_warning(report_dict: dict) -> str:
    """
    格式化用户风险提示文本。

    当检测到违规时,标注并提示模型告知用户该视频可能存在的违规风险。
    """
    risk = report_dict.get("overall_risk_level", "low")
    score = report_dict.get("overall_score", 100)
    violations = report_dict.get("violation_summary", [])

    if risk == "low" and not violations:
        return "当前内容未发现明显合规风险。"

    risk_labels = {
        "low": "低风险",
        "medium": "中等风险",
        "high": "高风险",
        "critical": "严重风险",
    }

    lines = [
        f"[合规警告] 该内容存在 {risk_labels.get(risk, risk)} (合规得分: {score}/100)",
        "",
    ]

    for v in violations:
        type_labels = {
            "copyright": "版权侵权",
            "age_rating": "年龄分级",
            "adaptation": "小说魔改",
        }
        label = type_labels.get(v["type"], v["type"])
        lines.append(f"  - [{v['severity'].upper()}] {label}: {v['description']}")

    lines.append("")

    suggestions = report_dict.get("remediation_suggestions", [])
    if suggestions:
        lines.append("整改建议:")
        for s in suggestions:
            lines.append(f"  - {s}")

    lines.append("")
    lines.append("注意: 以上检测结果仅供参考,不作为法律依据。建议进行人工复核。")

    return "\n".join(lines)


def run_full_review(input_file: str,
                    reference_dir: str = None,
                    original_file: str = None,
                    target_rating: str = "all_ages",
                    checks: list = None,
                    output_format: str = "json") -> dict:
    """
    执行完整审查流程。

    Args:
        input_file: 输入剧本/台词文件
        reference_dir: 参考文本库目录(版权检测用)
        original_file: 原著文件路径(魔改检测用)
        target_rating: 目标年龄分级
        checks: 要执行的检测模块列表
        output_format: 输出格式 ("json" 或 "markdown")

    Returns:
        完整审查结果字典
    """
    if checks is None:
        checks = ["copyright", "rating", "adaptation"]

    # 加载输入
    input_text = load_input_text(input_file)

    copyright_result = None
    age_rating_result = None
    adaptation_result = None

    # 版权检测
    if "copyright" in checks and reference_dir:
        ref_texts = load_reference_texts(reference_dir)
        if ref_texts:
            cr = scan_for_plagiarism(input_text, ref_texts)
            copyright_result = _copyright_result_to_dict(cr)

    # 年龄分级检测
    if "rating" in checks:
        rr = run_age_rating_scan(input_text, target_rating)
        age_rating_result = _rating_result_to_dict(rr)

    # 魔改检测
    if "adaptation" in checks and original_file:
        orig_path = Path(original_file)
        if orig_path.exists():
            orig_text = orig_path.read_text(encoding="utf-8")
            ar = detect_adaptation(orig_text, input_text)
            adaptation_result = _adaptation_result_to_dict(ar)

    # 构建报告
    report = build_full_report(
        input_file, copyright_result, age_rating_result, adaptation_result
    )

    if output_format == "markdown":
        formatted = generate_markdown_report(report)
    else:
        formatted = generate_json_report(report)

    # 生成用户警告
    report_dict = json.loads(generate_json_report(report))
    warning = format_user_warning(report_dict)

    return {
        "report": report_dict,
        "formatted": formatted,
        "warning": warning,
    }


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="AI短剧规范审查")
    parser.add_argument("--input", required=True, help="剧本/台词文件路径")
    parser.add_argument("--reference-dir", help="参考文本库目录(版权检测用)")
    parser.add_argument("--original", help="原著文件路径(魔改检测用)")
    parser.add_argument("--target-rating", default="all_ages",
                        choices=["all_ages", "12+", "18+"])
    parser.add_argument("--checks", nargs="+",
                        default=["copyright", "rating", "adaptation"],
                        choices=["copyright", "rating", "adaptation"])
    parser.add_argument("--output", default="json",
                        choices=["json", "markdown"])

    args = parser.parse_args()

    input_path = Path(args.input)
    if not input_path.exists():
        print(f"错误: 输入文件不存在: {input_path}")
        sys.exit(1)

    result = run_full_review(
        input_file=str(input_path),
        reference_dir=args.reference_dir,
        original_file=args.original,
        target_rating=args.target_rating,
        checks=args.checks,
        output_format=args.output,
    )

    print(result["formatted"])
    print()
    print("=== 风险提示 ===")
    print(result["warning"])

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### _meta.json

```json
{
  "owner": "aaalenwow",
  "slug": "ai-drama-review",
  "displayName": "Ai Drama Review",
  "latest": {
    "version": "0.1.1",
    "publishedAt": 1773134353756,
    "commit": "https://github.com/openclaw/skills/commit/0e0463894d075833761724433445e7520d3045e2"
  },
  "history": []
}

```

### assets/keyword_databases/horror_keywords.json

```json
{
  "category": "horror",
  "version": "1.0",
  "keywords": {
    "恐怖": {"severity": "moderate", "aliases": ["恐惧", "惊恐", "惊悚"]},
    "鬼": {"severity": "mild", "aliases": ["鬼魂", "幽灵", "亡灵", "冤魂"]},
    "丧尸": {"severity": "moderate", "aliases": ["僵尸", "行尸", "活死人"]},
    "尸体": {"severity": "moderate", "aliases": ["死尸", "尸首", "遗体"]},
    "腐烂": {"severity": "severe", "aliases": ["腐尸", "腐烂", "发臭"]},
    "肢解": {"severity": "severe", "aliases": ["分尸", "碎尸", "断肢"]},
    "诅咒": {"severity": "mild", "aliases": ["咒语", "诅咒", "邪咒"]},
    "噩梦": {"severity": "mild", "aliases": ["梦魇", "恶梦"]},
    "怪物": {"severity": "mild", "aliases": ["妖怪", "怪兽", "魔物"]},
    "黑暗": {"severity": "mild", "aliases": ["阴森", "阴暗", "黑暗"]},
    "尖叫": {"severity": "mild", "aliases": ["惨叫", "嘶叫"]},
    "horror": {"severity": "moderate", "aliases": ["terrifying", "horrifying"]},
    "zombie": {"severity": "moderate", "aliases": ["undead", "walking dead"]},
    "ghost": {"severity": "mild", "aliases": ["haunted", "phantom", "specter"]},
    "gore": {"severity": "severe", "aliases": ["gory", "gruesome", "grotesque"]}
  }
}

```

### assets/keyword_databases/profanity_keywords.json

```json
{
  "category": "profanity",
  "version": "1.0",
  "keywords": {
    "操": {"severity": "severe", "aliases": ["草", "艹"]},
    "妈的": {"severity": "moderate", "aliases": ["他妈的", "你妈的", "妈了个"]},
    "傻逼": {"severity": "severe", "aliases": ["沙比", "煞笔"]},
    "狗屎": {"severity": "moderate", "aliases": ["狗屁", "放屁"]},
    "贱": {"severity": "moderate", "aliases": ["贱人", "贱货", "下贱"]},
    "滚": {"severity": "mild", "aliases": ["滚蛋", "滚开"]},
    "废物": {"severity": "mild", "aliases": ["废柴", "没用"]},
    "混蛋": {"severity": "moderate", "aliases": ["浑蛋", "王八蛋"]},
    "畜生": {"severity": "severe", "aliases": ["禽兽", "牲口"]},
    "该死": {"severity": "mild", "aliases": ["去死", "找死"]},
    "fuck": {"severity": "severe", "aliases": ["fucking", "f*ck"]},
    "shit": {"severity": "moderate", "aliases": ["bullshit", "crap"]},
    "damn": {"severity": "mild", "aliases": ["damned", "goddamn"]}
  }
}

```

### assets/keyword_databases/sexual_keywords.json

```json
{
  "category": "sexual",
  "version": "1.0",
  "keywords": {
    "裸": {"severity": "moderate", "aliases": ["裸体", "裸露", "全裸", "赤裸"]},
    "性": {"severity": "moderate", "aliases": ["性行为", "性关系", "性暗示"]},
    "色情": {"severity": "severe", "aliases": ["淫秽", "黄色", "情色"]},
    "诱惑": {"severity": "mild", "aliases": ["挑逗", "勾引", "撩拨"]},
    "抚摸": {"severity": "mild", "aliases": ["爱抚", "触摸", "摩挲"]},
    "亲密": {"severity": "mild", "aliases": ["亲吻", "拥吻", "热吻"]},
    "暴露": {"severity": "moderate", "aliases": ["露骨", "衣衫不整"]},
    "侵犯": {"severity": "severe", "aliases": ["性侵", "猥亵", "骚扰", "非礼"]},
    "卖淫": {"severity": "severe", "aliases": ["嫖", "妓"]},
    "nude": {"severity": "moderate", "aliases": ["naked", "nudity"]},
    "sexual": {"severity": "moderate", "aliases": ["sexually", "intercourse"]},
    "explicit": {"severity": "severe", "aliases": ["pornographic", "obscene"]}
  }
}

```

### assets/keyword_databases/substance_keywords.json

```json
{
  "category": "substance",
  "version": "1.0",
  "keywords": {
    "毒品": {"severity": "severe", "aliases": ["吸毒", "贩毒", "制毒", "毒贩"]},
    "大麻": {"severity": "moderate", "aliases": ["marijuana", "cannabis"]},
    "海洛因": {"severity": "severe", "aliases": ["白粉", "冰毒", "摇头丸"]},
    "注射": {"severity": "moderate", "aliases": ["打针", "注射器", "针头"]},
    "吸食": {"severity": "moderate", "aliases": ["吸粉", "嗑药"]},
    "醉": {"severity": "mild", "aliases": ["醉酒", "喝醉", "烂醉", "酗酒"]},
    "烟": {"severity": "mild", "aliases": ["抽烟", "吸烟", "香烟", "烟瘾"]},
    "赌": {"severity": "moderate", "aliases": ["赌博", "赌场", "豪赌", "赌注"]},
    "drug": {"severity": "severe", "aliases": ["drugs", "narcotics", "cocaine", "heroin"]},
    "smoke": {"severity": "mild", "aliases": ["smoking", "cigarette"]},
    "alcohol": {"severity": "mild", "aliases": ["drunk", "drinking", "intoxicated"]},
    "gamble": {"severity": "moderate", "aliases": ["gambling", "casino", "betting"]}
  }
}

```

### assets/keyword_databases/violence_keywords.json

```json
{
  "category": "violence",
  "version": "1.0",
  "keywords": {
    "杀": {"severity": "severe", "aliases": ["杀死", "杀害", "杀掉", "杀人", "屠杀", "击杀"]},
    "砍": {"severity": "moderate", "aliases": ["砍伤", "砍杀", "砍头"]},
    "刺": {"severity": "moderate", "aliases": ["刺伤", "刺杀", "刺穿", "捅"]},
    "打": {"severity": "mild", "aliases": ["打架", "打斗", "殴打", "打人"]},
    "血": {"severity": "moderate", "aliases": ["流血", "血迹", "鲜血", "血腥", "血泊", "血溅"]},
    "虐待": {"severity": "severe", "aliases": ["虐杀", "施虐", "折磨", "酷刑"]},
    "爆炸": {"severity": "moderate", "aliases": ["炸弹", "爆破", "引爆", "炸毁"]},
    "枪": {"severity": "moderate", "aliases": ["开枪", "射击", "枪杀", "枪击"]},
    "斩": {"severity": "severe", "aliases": ["斩首", "斩杀", "斩断"]},
    "绞": {"severity": "severe", "aliases": ["绞杀", "勒死", "绞刑", "窒息"]},
    "暴力": {"severity": "moderate", "aliases": ["暴打", "暴行", "暴虐"]},
    "残忍": {"severity": "severe", "aliases": ["残杀", "残暴", "残害"]},
    "伤口": {"severity": "mild", "aliases": ["创伤", "伤疤", "伤痕"]},
    "搏斗": {"severity": "mild", "aliases": ["格斗", "肉搏", "缠斗"]},
    "毒": {"severity": "moderate", "aliases": ["下毒", "中毒", "毒杀", "毒药"]},
    "kill": {"severity": "severe", "aliases": ["killing", "murder", "slaughter"]},
    "stab": {"severity": "severe", "aliases": ["stabbing", "stabbed"]},
    "torture": {"severity": "severe", "aliases": ["torment", "torturing"]},
    "blood": {"severity": "moderate", "aliases": ["bloody", "bleeding", "bloodshed"]},
    "fight": {"severity": "mild", "aliases": ["fighting", "brawl"]}
  }
}

```

### assets/rating_rules/china_rating.json

```json
{
  "version": "1.0",
  "description": "中国内容分级规则(参考广电总局相关规定)",
  "ratings": {
    "all_ages": {
      "description": "适合所有年龄段",
      "max_severity": "mild",
      "max_hits_mild": 5,
      "max_hits_moderate": 0,
      "max_hits_severe": 0,
      "forbidden_categories": ["sexual", "substance"]
    },
    "12+": {
      "description": "12 岁以上,需家长指导",
      "max_severity": "moderate",
      "max_hits_mild": -1,
      "max_hits_moderate": 5,
      "max_hits_severe": 0,
      "forbidden_categories": []
    },
    "18+": {
      "description": "18 岁以上",
      "max_severity": "severe",
      "max_hits_mild": -1,
      "max_hits_moderate": -1,
      "max_hits_severe": 3,
      "forbidden_categories": []
    }
  },
  "non_compliant_triggers": [
    {"description": "色情内容严重", "category": "sexual", "severity": "severe", "min_count": 1},
    {"description": "严重暴力过多", "severity": "severe", "min_count": 5},
    {"description": "毒品相关严重内容", "category": "substance", "severity": "severe", "min_count": 1}
  ]
}

```

### assets/rating_rules/general_rating.json

```json
{
  "version": "1.0",
  "description": "通用内容分级规则(参考 ESRB/PEGI 思路)",
  "ratings": {
    "all_ages": {
      "description": "Everyone / PEGI 3",
      "max_severity": "mild",
      "max_hits_mild": 3,
      "max_hits_moderate": 0,
      "max_hits_severe": 0,
      "forbidden_categories": ["sexual", "substance"]
    },
    "12+": {
      "description": "Teen / PEGI 12",
      "max_severity": "moderate",
      "max_hits_mild": -1,
      "max_hits_moderate": 8,
      "max_hits_severe": 0,
      "forbidden_categories": []
    },
    "18+": {
      "description": "Mature / PEGI 18",
      "max_severity": "severe",
      "max_hits_mild": -1,
      "max_hits_moderate": -1,
      "max_hits_severe": 5,
      "forbidden_categories": []
    }
  },
  "non_compliant_triggers": [
    {"description": "Extreme sexual content", "category": "sexual", "severity": "severe", "min_count": 2},
    {"description": "Excessive severe violence", "severity": "severe", "min_count": 8},
    {"description": "Drug glorification", "category": "substance", "severity": "severe", "min_count": 2}
  ]
}

```

### assets/report_templates/full_report.md

```markdown
# AI短剧合规审查报告

**报告 ID**: {{report_id}}
**生成时间**: {{generated_at}}
**输入文件**: {{input_file}}

## 总体评估

| 项目 | 结果 |
|------|------|
| 风险等级 | {{overall_risk_level}} |
| 合规得分 | {{overall_score}}/100 |

## 违规摘要

{{#each violation_summary}}
- [{{severity}}] **{{type}}**: {{description}}
{{/each}}

## 版权侵权检测

- 总段落数: {{copyright.total_paragraphs}}
- 可疑段落: {{copyright.suspicious_paragraphs}}
- 最高相似度: {{copyright.max_similarity_score}}
- 风险等级: {{copyright.risk_level}}

## 年龄分级合规

- 建议分级: {{age_rating.suggested_rating}}
- 目标分级: {{age_rating.target_rating}}
- 是否合规: {{age_rating.is_compliant}}
- 总命中数: {{age_rating.total_hits}}

## 小说改编检测

- 偏离度: {{adaptation.deviation_score}}/100
- 改编类型: {{adaptation.adaptation_type}}
- 总偏离数: {{adaptation.total_deviations}}

## 整改建议

{{#each remediation_suggestions}}
{{@index}}. {{this}}
{{/each}}

---
*本报告由 ai-drama-review 自动生成,仅供参考,不作为法律依据。*

```

### assets/report_templates/summary_report.md

```markdown
# 合规审查摘要

**文件**: {{input_file}} | **时间**: {{generated_at}}

## 结论: {{overall_risk_level}} ({{overall_score}}/100)

{{#if violations}}
### 发现的问题:
{{#each violation_summary}}
- {{description}}
{{/each}}

### 建议:
{{#each remediation_suggestions}}
- {{this}}
{{/each}}
{{else}}
未发现明显合规风险。
{{/if}}

---
*仅供参考,不作为法律依据。*

```

### references/adaptation_analysis.md

```markdown
# 改编检测评估标准

## 偏离度评分体系 (0-100)

### 偏离类型权重

| 类型 | 权重 | 说明 |
|------|------|------|
| plot_removed | ×3.0 | 删除原著情节(最严重) |
| character_changed | ×2.5 | 角色设定改变 |
| plot_modified | ×2.0 | 修改原著情节 |
| setting_changed | ×1.5 | 世界观/设定改变 |
| plot_added | ×1.0 | 新增原创情节 |

### 严重度权重

| 严重度 | 权重 | 标准 |
|--------|------|------|
| major | ×2.0 | 核心情节/角色被大幅改变 |
| moderate | ×1.0 | 次要元素被修改 |
| minor | ×0.5 | 微调,不影响核心 |

### 改编分类

| 评分范围 | 类型 | 说明 |
|----------|------|------|
| 0 - 30 | 忠实改编 | 保留原著核心精神和主要情节 |
| 30 - 60 | 合理改编 | 有较大改动但保持原著基本框架 |
| 60 - 100 | 严重魔改 | 大幅偏离原著,可能引发版权争议 |

## 章节对齐算法

使用 Needleman-Wunsch 变体(全局序列对齐):
- 匹配得分:两段文本的字符 n-gram Jaccard 相似度
- 跳过惩罚:-0.1(允许删除/新增,但有代价)
- 回溯获取最优对齐

对齐结果状态:
- matched: 原著与改编高度对应(相似度 ≥ 0.3)
- modified: 有对应但内容已改变(相似度 < 0.3)
- removed: 原著段落在改编中被删除
- added: 改编中新增了原著没有的内容

## 使用建议

- 偏离度 0-30 通常不需要特别关注
- 偏离度 30-60 建议审查核心情节是否被不当修改
- 偏离度 60+ 强烈建议确认是否已获得原著权利人的改编授权
- 角色命运的重大改变(如将存活角色改为死亡)属于高风险魔改

```

### references/age_rating_standards.md

```markdown
# 年龄分级标准说明

## 中国标准(参考)

中国目前尚无统一的影视内容分级制度,但广电总局有相关管理规定。本工具参考以下原则:

### 全年龄 (all_ages)
- 不含暴力、恐怖、色情、烟酒毒品等内容
- 语言文明,无脏话
- 适合所有年龄段观众

### 12+
- 可包含轻度冲突和紧张情节
- 不含色情和毒品内容
- 轻度暴力描写不超过 5 处
- 无严重暴力、血腥场面

### 18+
- 可包含较强暴力和恐怖元素
- 可包含成人主题讨论
- 严重暴力描写不超过 3 处
- 不含极端残忍或虐待内容

### 不合规 (non_compliant)
- 含严重色情内容
- 含极端暴力(超过 5 处严重暴力)
- 含毒品美化内容
- 需要修改后才能发布

## 国际参考

### ESRB(美国)
- E (Everyone) → 全年龄
- T (Teen, 13+) → 12+
- M (Mature, 17+) → 18+
- AO (Adults Only) → 不合规

### PEGI(欧洲)
- PEGI 3 → 全年龄
- PEGI 12 → 12+
- PEGI 18 → 18+

## 检测类别

| 类别 | 说明 | 严重度范围 |
|------|------|-----------|
| violence | 暴力内容 | mild ~ severe |
| sexual | 色情/性相关 | mild ~ severe |
| horror | 恐怖/惊悚 | mild ~ severe |
| profanity | 脏话/不当言语 | mild ~ severe |
| substance | 烟酒/毒品 | mild ~ severe |

## 免责声明

本工具的分级建议仅供参考,不代表任何官方分级结论。内容创作者应自行判断并遵守当地法律法规。

```

### references/copyright_detection_guide.md

```markdown
# 版权检测方法论

## 检测算法

### 1. n-gram Jaccard 系数
- 将文本切分为字符级 3-gram(如 "你好世界" → {"你好世", "好世界"})
- 计算两组 n-gram 的 Jaccard 系数:|A∩B| / |A∪B|
- 优势:对局部词汇重复敏感,计算快速
- 局限:无法检测同义替换

### 2. 归一化编辑距离
- 计算 Levenshtein 编辑距离(插入/删除/替换操作数)
- 归一化:distance / max(len_a, len_b)
- 优势:衡量整体文本差异
- 局限:对长文本计算较慢(已优化为 O(min(m,n)) 空间)

### 3. TF-IDF 余弦相似度
- 对分词后的文本构建 TF-IDF 向量
- 计算向量间的余弦相似度
- 优势:捕捉语义主题层面的相似性
- 局限:对词序不敏感

### 综合评分
- 加权平均:n-gram(0.3) + 编辑距离(0.3) + 余弦(0.4)
- 阈值:默认 0.7(可配置)

## 风险等级判定

| 等级 | 条件 |
|------|------|
| critical | 最高分 ≥ 0.95 或可疑比例 ≥ 50% |
| high | 最高分 ≥ 0.85 或可疑比例 ≥ 30% |
| medium | 最高分 ≥ 0.70 或可疑比例 ≥ 10% |
| low | 无可疑段落 |

## 局限性说明

- 本地算法无法判断"合理引用"与"侵权"的法律界限
- 通用表达(如成语、常用句式)可能导致误报
- AI 深度分析层可辅助排除误报
- 检测结果仅供参考,最终判断需法律专业意见

```

### scripts/content_analyzer.py

```python
"""
AI 深度内容分析模块

调用 OpenAI / Anthropic API 进行深层内容理解。
使用 urllib.request,零外部依赖。
"""

import json
import os
import sys
import urllib.request
import urllib.error
from pathlib import Path
from typing import Optional

sys.path.insert(0, str(Path(__file__).parent))
from credential_manager import get_credential, list_available_providers


def _call_openai(prompt: str, system_prompt: str = "",
                 model: str = "gpt-4o") -> str:
    """调用 OpenAI API。"""
    api_key = get_credential("openai")

    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.append({"role": "user", "content": prompt})

    payload = json.dumps({
        "model": model,
        "messages": messages,
        "temperature": 0.3,
        "max_tokens": 2000,
    }).encode("utf-8")

    req = urllib.request.Request(
        "https://api.openai.com/v1/chat/completions",
        data=payload,
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        },
    )

    with urllib.request.urlopen(req, timeout=60) as resp:
        data = json.loads(resp.read())
        return data["choices"][0]["message"]["content"]


def _call_anthropic(prompt: str, system_prompt: str = "",
                    model: str = "claude-sonnet-4-20250514") -> str:
    """调用 Anthropic API。"""
    api_key = get_credential("anthropic")

    payload = json.dumps({
        "model": model,
        "max_tokens": 2000,
        "system": system_prompt or "You are a content compliance analyst.",
        "messages": [{"role": "user", "content": prompt}],
    }).encode("utf-8")

    req = urllib.request.Request(
        "https://api.anthropic.com/v1/messages",
        data=payload,
        headers={
            "x-api-key": api_key,
            "Content-Type": "application/json",
            "anthropic-version": "2023-06-01",
        },
    )

    with urllib.request.urlopen(req, timeout=60) as resp:
        data = json.loads(resp.read())
        return data["content"][0]["text"]


def call_ai(prompt: str, system_prompt: str = "",
            preferred_provider: str = None) -> Optional[str]:
    """
    统一 AI 调用接口,自动选择可用 provider。

    Returns:
        AI 回复文本,无可用 provider 时返回 None
    """
    available = list_available_providers()

    if not available:
        return None

    # 确定调用顺序
    providers_to_try = []
    if preferred_provider and preferred_provider in available:
        providers_to_try.append(preferred_provider)
    for p in ["openai", "anthropic"]:
        if p in available and p not in providers_to_try:
            providers_to_try.append(p)

    for provider in providers_to_try:
        try:
            if provider == "openai":
                return _call_openai(prompt, system_prompt)
            elif provider == "anthropic":
                return _call_anthropic(prompt, system_prompt)
        except Exception:
            continue

    return None


# === 版权分析 ===

def analyze_plagiarism_context(suspicious_pairs: list) -> Optional[dict]:
    """
    让 AI 判断可疑相似段落是否构成实质性侵权。

    Args:
        suspicious_pairs: [{"source": str, "reference": str, "score": float}, ...]

    Returns:
        {"confirmed": [...], "false_positives": [...], "analysis": str}
    """
    if not suspicious_pairs:
        return None

    pairs_text = ""
    for i, pair in enumerate(suspicious_pairs[:10]):  # 限制数量
        pairs_text += (
            f"\n--- 可疑对 {i + 1} (相似度: {pair['score']:.2f}) ---\n"
            f"待检文本: {pair['source'][:200]}\n"
            f"参考文本: {pair['reference'][:200]}\n"
        )

    prompt = (
        f"以下是文本版权侵权检测中发现的可疑相似段落对。"
        f"请分析每一对是否构成实质性侵权,考虑以下因素:\n"
        f"1. 是否为通用表达或公共领域内容\n"
        f"2. 是否存在独创性的实质相似\n"
        f"3. 是否仅为同义改写但核心表达一致\n\n"
        f"{pairs_text}\n\n"
        f"请以 JSON 格式回复:\n"
        f'{{"confirmed": [编号列表], "false_positives": [编号列表], '
        f'"analysis": "整体分析说明"}}'
    )

    system = "你是一位版权合规分析专家,擅长判断文本是否存在侵权。请客观、准确地分析。"
    result = call_ai(prompt, system)

    if result:
        try:
            # 尝试提取 JSON
            json_match = result[result.find("{"):result.rfind("}") + 1]
            return json.loads(json_match)
        except (json.JSONDecodeError, ValueError):
            return {"analysis": result}

    return None


# === 分级分析 ===

def analyze_age_rating_context(hits_with_context: list,
                                target_rating: str) -> Optional[dict]:
    """
    让 AI 分析关键词命中的上下文,排除误报。

    Args:
        hits_with_context: [{"keyword": str, "context": str, "category": str}, ...]
        target_rating: 目标分级

    Returns:
        {"confirmed": [...], "false_positives": [...], "final_rating": str}
    """
    if not hits_with_context:
        return None

    hits_text = ""
    for i, hit in enumerate(hits_with_context[:15]):
        hits_text += (
            f"\n{i + 1}. 关键词: {hit['keyword']} (类别: {hit['category']})\n"
            f"   上下文: {hit['context']}\n"
        )

    prompt = (
        f"以下是内容分级检测中的关键词命中项。目标分级为: {target_rating}\n"
        f"请分析每个命中是否为真正的不当内容,排除以下误报情况:\n"
        f"1. 否定语境(如 '不要杀人' 中的 '杀')\n"
        f"2. 文学修辞或比喻用法\n"
        f"3. 历史/教育引用\n"
        f"4. 角色对话中的合理表达\n\n"
        f"{hits_text}\n\n"
        f"请以 JSON 格式回复:\n"
        f'{{"confirmed": [编号列表], "false_positives": [编号列表], '
        f'"final_rating": "建议分级", "reasoning": "分析说明"}}'
    )

    system = "你是一位内容分级审核专家,擅长判断内容的年龄适宜性。请准确区分真正的不当内容和误报。"
    result = call_ai(prompt, system)

    if result:
        try:
            json_match = result[result.find("{"):result.rfind("}") + 1]
            return json.loads(json_match)
        except (json.JSONDecodeError, ValueError):
            return {"analysis": result}

    return None


# === 改编分析 ===

def extract_plot_and_characters(text: str) -> Optional[dict]:
    """让 AI 提取结构化的情节点和角色概要。"""
    # 截断过长文本
    truncated = text[:5000]

    prompt = (
        f"请分析以下文本,提取结构化信息:\n\n"
        f"{truncated}\n\n"
        f"请以 JSON 格式回复:\n"
        f'{{"plot_points": [{{"index": 1, "summary": "情节摘要", '
        f'"characters": ["角色名"], "importance": "core|normal|minor"}}], '
        f'"characters": [{{"name": "角色名", "traits": ["性格"], '
        f'"relationships": {{"角色名": "关系"}}}}]}}'
    )

    system = "你是一位文学分析专家,擅长提取叙事结构和角色信息。"
    result = call_ai(prompt, system)

    if result:
        try:
            json_match = result[result.find("{"):result.rfind("}") + 1]
            return json.loads(json_match)
        except (json.JSONDecodeError, ValueError):
            return {"raw_analysis": result}

    return None


def analyze_adaptation_significance(deviations: list) -> Optional[dict]:
    """让 AI 评估改编偏差的严重程度和合理性。"""
    if not deviations:
        return None

    dev_text = ""
    for i, dev in enumerate(deviations[:10]):
        dev_text += (
            f"\n{i + 1}. 类型: {dev.get('type', 'unknown')}\n"
            f"   原文: {dev.get('original', '')[:150]}\n"
            f"   改编: {dev.get('adapted', '')[:150]}\n"
        )

    prompt = (
        f"以下是原著与改编版之间的偏差列表。请评估:\n"
        f"1. 每个偏差是否合理\n"
        f"2. 是否偏离了原著的核心精神\n"
        f"3. 整体改编质量\n\n"
        f"{dev_text}\n\n"
        f"请以 JSON 格式回复:\n"
        f'{{"overall_assessment": "忠实改编|合理改编|严重魔改", '
        f'"justified_changes": [编号], "unjustified_changes": [编号], '
        f'"reasoning": "分析说明"}}'
    )

    system = "你是一位文学评论专家,擅长评估小说改编的质量和忠实度。"
    result = call_ai(prompt, system)

    if result:
        try:
            json_match = result[result.find("{"):result.rfind("}") + 1]
            return json.loads(json_match)
        except (json.JSONDecodeError, ValueError):
            return {"analysis": result}

    return None


# === 综合风险评估 ===

def generate_risk_assessment(all_findings: dict) -> Optional[dict]:
    """让 AI 综合所有发现,生成整体风险评估。"""
    findings_text = json.dumps(all_findings, ensure_ascii=False, indent=2)

    # 截断过长内容
    if len(findings_text) > 4000:
        findings_text = findings_text[:4000] + "\n... (已截断)"

    prompt = (
        f"以下是AI短剧合规审查的全部检测结果:\n\n"
        f"{findings_text}\n\n"
        f"请综合分析,给出:\n"
        f"1. 整体风险评级(low/medium/high/critical)\n"
        f"2. 最紧迫的合规问题\n"
        f"3. 具体的整改建议\n\n"
        f"请以 JSON 格式回复:\n"
        f'{{"risk_level": "等级", "top_issues": ["问题列表"], '
        f'"remediation": ["整改建议列表"], "summary": "总结"}}'
    )

    system = "你是一位内容合规顾问,擅长评估AI生成内容的法律和道德风险。"
    result = call_ai(prompt, system)

    if result:
        try:
            json_match = result[result.find("{"):result.rfind("}") + 1]
            return json.loads(json_match)
        except (json.JSONDecodeError, ValueError):
            return {"analysis": result}

    return None

```

### scripts/credential_manager.py

```python
"""
凭证安全管理模块 - ai-drama-review

安全原则:
- 所有凭证仅通过环境变量读取
- 零持久化:不写文件、不缓存、不打印
- 不通过命令行参数传递(避免进程列表泄露)
- 统一通过 get_credential() 函数访问
"""

import os
import sys

# AI 分析 Provider 密钥映射
_AI_PROVIDER_KEYS = {
    "openai": "OPENAI_API_KEY",
    "anthropic": "ANTHROPIC_API_KEY",
}

_ALL_KEYS = {**_AI_PROVIDER_KEYS}


def get_credential(provider: str) -> str:
    """
    从环境变量获取指定 provider 的 API 密钥。

    Args:
        provider: Provider 名称(如 "openai", "anthropic")

    Returns:
        API 密钥字符串

    Raises:
        ValueError: 未知的 provider 名称
        EnvironmentError: 环境变量未设置
    """
    env_var = _ALL_KEYS.get(provider.lower())
    if not env_var:
        raise ValueError(
            f"未知的 provider: '{provider}'\n"
            f"支持的 provider: {', '.join(sorted(_ALL_KEYS.keys()))}"
        )

    value = os.environ.get(env_var)
    if not value:
        raise EnvironmentError(
            f"缺少凭证: 请设置环境变量 {env_var}\n"
            f"Windows:  set {env_var}=your_key_here\n"
            f"Linux/macOS: export {env_var}=your_key_here"
        )

    return value


def list_available_providers() -> list:
    """
    列出所有已配置凭证的 AI provider。

    Returns:
        已配置凭证的 provider 名称列表
    """
    return [
        provider for provider, env_var in _AI_PROVIDER_KEYS.items()
        if os.environ.get(env_var)
    ]


def check_credential_status() -> str:
    """
    检查所有凭证的配置状态。

    Returns:
        格式化的状态报告字符串(不包含任何密钥值)
    """
    lines = ["=== 凭证配置状态 ===", ""]

    lines.append("AI 分析 Provider:")
    for provider, env_var in _AI_PROVIDER_KEYS.items():
        status = "已配置" if os.environ.get(env_var) else "未配置"
        lines.append(f"  {provider:12s} ({env_var}): {status}")

    available = list_available_providers()
    lines.append("")
    if available:
        lines.append(f"运行模式: 混合模式 (本地 + AI 深度分析)")
    else:
        lines.append(f"运行模式: 仅本地模式 (关键词匹配)")
        lines.append(f"提示: 配置 OPENAI_API_KEY 或 ANTHROPIC_API_KEY 可启用 AI 深度分析")

    return "\n".join(lines)


if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] == "--status":
        print(check_credential_status())
    elif len(sys.argv) > 1 and sys.argv[1] == "--available":
        import json
        print(json.dumps(list_available_providers(), indent=2, ensure_ascii=False))
    else:
        print("用法:")
        print("  python credential_manager.py --status    查看凭证配置状态")
        print("  python credential_manager.py --available 列出可用 provider")

```