ai-drama-review
AI短剧规范识别技能包。检测AI短剧中的文本/小说版权侵权、年龄分级合规性(18+/12+)、小说魔改程度,并生成结构化合规报告。支持本地关键词快速扫描 + AI深度分析两层架构。Beta 阶段 - 仅供参考,不作为法律依据。
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-ai-drama-review
Repository
Skill path: skills/aaalenwow/ai-drama-review
AI短剧规范识别技能包。检测AI短剧中的文本/小说版权侵权、年龄分级合规性(18+/12+)、小说魔改程度,并生成结构化合规报告。支持本地关键词快速扫描 + AI深度分析两层架构。Beta 阶段 - 仅供参考,不作为法律依据。
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Data / AI.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install ai-drama-review into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding ai-drama-review to shared team environments
- Use ai-drama-review for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: ai-drama-review
description: AI短剧规范识别技能包。检测AI短剧中的文本/小说版权侵权、年龄分级合规性(18+/12+)、小说魔改程度,并生成结构化合规报告。支持本地关键词快速扫描 + AI深度分析两层架构。Beta 阶段 - 仅供参考,不作为法律依据。
user-invocable: true
metadata: {"openclaw":{"requires":{"env":["OPENAI_API_KEY"],"anyBins":["python3","python"],"bins":[]},"primaryEnv":"OPENAI_API_KEY","stage":"beta","version":"0.1.0"}}
---
This skill identifies compliance risks in AI-generated short dramas, including copyright infringement, age rating violations, and unauthorized novel adaptations. It uses a two-layer architecture: local keyword scanning for fast baseline detection, plus AI-powered deep analysis for context-aware accuracy.
**Warning: BETA** — 本技能包正在测试中,检测结果仅供参考,不作为法律依据。请结合专业法律意见使用。
用户提供剧本文本、字幕文件或视频描述,本技能将执行合规审查并生成结构化风险报告。
---
## Phase 1: 环境检测与初始化
当用户请求对短剧内容进行合规审查时,先执行环境检测:
```bash
python3 scripts/env_detect.py
```
检测内容:
1. **Python 版本**: >= 3.8
2. **可用 API 密钥**: OPENAI_API_KEY / ANTHROPIC_API_KEY(用于深度分析)
3. **可选 Python 包**: jieba(中文分词,提升版权检测精度)
4. **网络连通性**: API 端点可达性
确定运行模式:
- **仅本地模式 (local_only)**: 无 API 密钥时的降级模式,仅执行关键词匹配和文本算法分析
- **混合模式 (hybrid)**(推荐): 本地快速扫描 + AI 深度上下文分析,精度更高
向用户展示环境状态和可用功能。
---
## Phase 2: 版权侵权检测
接收用户提供的剧本/台词文本,执行版权侵权检测:
```bash
python3 scripts/text_similarity.py --input <script_file> --reference-dir <reference_texts_dir>
```
### 2.1 文本预处理
1. 统一编码(Unicode 归一化)
2. 去除标点符号和多余空白
3. 按段落分割,过滤过短段落(< 20 字)
4. 中文分词(优先使用 jieba,降级为字符级分词)
### 2.2 三重相似度检测
对每个段落与参考文本库逐段比对,计算三种互补指标:
| 算法 | 检测能力 | 权重 |
|------|----------|------|
| n-gram Jaccard 系数 | 局部词汇重复 | 0.3 |
| 归一化编辑距离 | 整体文本差异 | 0.3 |
| TF-IDF 余弦相似度 | 语义主题相似 | 0.4 |
综合得分超过阈值(默认 0.7)的段落标记为疑似侵权。
### 2.3 AI 语义确认(混合模式)
将高疑似段落发送 AI 进行语义级分析:
- 排除通用表达和公共领域内容
- 评估独创性和实质性相似
- 识别改写和同义替换
向用户展示:可疑段落列表、相似度分数、疑似来源、AI 分析意见。
---
## Phase 3: 年龄分级合规检测
扫描剧本内容的年龄分级合规性:
```bash
python3 scripts/age_rating_scanner.py --input <script_file> --target-rating <all_ages|12+|18+>
```
### 3.1 Layer 1: 本地关键词快速扫描
加载分类关键词库(暴力/色情/恐怖/脏话/烟酒毒品),逐段扫描:
- 记录命中的关键词、类别、严重度(mild/moderate/severe)
- 保留命中位置和上下文(前后 30 字)
- 根据命中密度和严重程度计算初步分级建议
### 3.2 Layer 2: AI 上下文深度分析(混合模式)
将关键词命中的上下文段落发送 AI 模型:
- 判断是否为真正的不当内容(排除否定语境、文学修辞、历史引用等误报)
- 评估上下文中的内容倾向
- 给出分级建议及具体理由
### 3.3 辅助内容分析
- **视频关键帧描述**: 如果用户提供了视频帧描述,分析画面内容风险
- **音频转录文本**: 如果用户提供了音频转录,扫描脏话和不当音效描述
### 3.4 分级输出
| 分级 | 说明 |
|------|------|
| 全年龄 (all_ages) | 内容适合所有年龄段 |
| 12+ | 含轻度暴力/冲突,需家长指导 |
| 18+ | 含较强暴力/恐怖/成人主题 |
| 不合规 (non_compliant) | 超出可接受范围,建议修改 |
---
## Phase 4: 小说魔改检测
比对原著与改编版本,评估改编偏离程度:
```bash
python3 scripts/adaptation_detector.py --original <original_file> --adapted <adapted_file>
```
### 4.1 结构对齐
使用动态规划算法(Needleman-Wunsch 变体)将原著章节与改编版段落对齐,识别:
- 保留的原始情节
- 新增的情节段
- 被删除的原著内容
- 被修改的段落
### 4.2 角色偏离检测
提取角色列表和设定,比对变化:
- 性格特征改动
- 角色关系改动
- 角色命运改动
### 4.3 关键情节比对
通过 AI 提取核心情节点,评估改编对原著核心的改动程度。
### 4.4 偏离度评分
综合输出偏离度评分(0-100):
| 评分范围 | 分类 | 说明 |
|----------|------|------|
| 0 - 30 | 忠实改编 | 保留原著核心,合理调整 |
| 30 - 60 | 合理改编 | 有较大改动但未偏离核心 |
| 60 - 100 | 严重魔改 | 大幅偏离原著,可能引发争议 |
---
## Phase 5: 合规报告生成
汇总所有检测结果,生成结构化报告:
```bash
python3 scripts/report_generator.py --results <detection_results.json> --format <json|markdown>
```
报告内容:
- **总体风险等级**: 低 / 中 / 高 / 严重
- **版权侵权风险**: 疑似来源、相似段落、相似度分数
- **年龄分级合规**: 分级建议、各类别命中详情
- **小说魔改详情**: 偏离度评分、核心改动列表
- **违规位置标注**: 段落编号、时间戳、场景编号
- **整改建议清单**: 针对每项风险的具体修改建议
---
## Phase 6: 编排与完整审查
一键执行完整审查流程:
```bash
python3 scripts/review_orchestrator.py --input <script_file> [--reference-dir <dir>] [--original <file>] [--target-rating 12+] [--checks copyright rating adaptation]
```
流程:
1. 环境检测,确定运行模式
2. 加载输入文本(支持 .txt / .srt / .json 格式)
3. 执行选定的检测模块
4. AI 综合风险评估(混合模式)
5. 生成合规报告
6. 格式化风险提示文本,标注并告知用户违规风险
**风险提示格式**: 当检测到违规时,生成结构化的风险提示,供模型向用户展示具体的违规类型、位置和整改建议。
---
## 凭证安全
### 环境变量配置
**AI 分析(至少配置一个以启用混合模式):**
- `OPENAI_API_KEY` — OpenAI API(用于深度内容分析)
- `ANTHROPIC_API_KEY` — Anthropic Claude API(备选)
**安全原则:**
- 所有凭证仅通过环境变量读取,零持久化
- 不记录、不打印、不缓存任何密钥值
- 无 API 密钥时自动降级为本地模式
---
## 免责声明
本技能包提供的合规检测结果仅供参考,不构成法律意见。使用者应结合专业法律顾问的意见做出最终判断。检测结果可能存在误报或漏报,建议对高风险内容进行人工复核。
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### scripts/env_detect.py
```python
"""
环境检测模块 - ai-drama-review
检测运行环境,确定可用的分析能力。
"""
import json
import os
import platform
import subprocess
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from credential_manager import _AI_PROVIDER_KEYS
def detect_python_version() -> dict:
"""检测 Python 版本。"""
version = sys.version_info
return {
"version": f"{version.major}.{version.minor}.{version.micro}",
"major": version.major,
"minor": version.minor,
"meets_minimum": version >= (3, 8),
}
def detect_api_keys() -> dict:
"""检测 AI API 密钥可用性(仅检测存在性,不打印值)。"""
return {
env_var: bool(os.environ.get(env_var))
for env_var in _AI_PROVIDER_KEYS.values()
}
def detect_python_packages() -> dict:
"""检测可选 Python 包。"""
packages = {}
# jieba - 中文分词
try:
import jieba
packages["jieba"] = {"installed": True, "version": jieba.__version__}
except ImportError:
packages["jieba"] = {"installed": False, "note": "可选,提升中文版权检测精度"}
return packages
def detect_network() -> dict:
"""检测网络连通性。"""
result = {"internet": False}
try:
import urllib.request
urllib.request.urlopen("https://api.openai.com", timeout=5)
result["internet"] = True
result["openai_reachable"] = True
except Exception:
try:
import urllib.request
urllib.request.urlopen("https://www.baidu.com", timeout=5)
result["internet"] = True
result["openai_reachable"] = False
except Exception:
pass
return result
def determine_run_mode(api_keys: dict) -> str:
"""
确定运行模式。
Args:
api_keys: detect_api_keys() 的结果
Returns:
"hybrid" 或 "local_only"
"""
if any(api_keys.values()):
return "hybrid"
return "local_only"
def run_full_detection() -> dict:
"""执行完整环境检测,返回 JSON 报告。"""
python_info = detect_python_version()
api_keys = detect_api_keys()
packages = detect_python_packages()
network = detect_network()
run_mode = determine_run_mode(api_keys)
report = {
"system": {
"os": platform.system(),
"os_version": platform.version(),
"architecture": platform.machine(),
},
"python": python_info,
"api_keys": api_keys,
"packages": packages,
"network": network,
"run_mode": run_mode,
"capabilities": {
"copyright_detection": True,
"age_rating_scan": True,
"adaptation_detection": True,
"ai_deep_analysis": run_mode == "hybrid",
"chinese_segmentation": packages.get("jieba", {}).get("installed", False),
},
}
return report
if __name__ == "__main__":
report = run_full_detection()
print(json.dumps(report, indent=2, ensure_ascii=False))
```
### scripts/text_similarity.py
```python
"""
文本相似度检测引擎
用于版权侵权检测,支持三种互补的相似度算法:
- n-gram Jaccard 系数(局部词汇重复)
- 归一化编辑距离(整体文本差异)
- TF-IDF 余弦相似度(语义主题相似)
纯 Python 实现,不依赖外部 NLP 库。
"""
import math
import re
import sys
import unicodedata
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional
@dataclass
class SimilarityResult:
"""单段相似度检测结果。"""
source_paragraph_index: int
source_text: str
reference_id: str
reference_paragraph_index: int
reference_text: str
ngram_jaccard: float
edit_distance_normalized: float
cosine_similarity: float
combined_score: float
is_suspicious: bool
@dataclass
class CopyrightReport:
"""版权检测报告。"""
total_paragraphs: int
suspicious_paragraphs: int
max_similarity_score: float
risk_level: str # "low" / "medium" / "high" / "critical"
results: List[SimilarityResult] = field(default_factory=list)
# === 文本预处理 ===
def preprocess_text(text: str) -> str:
"""统一编码、去标点、去多余空白。"""
# Unicode 归一化
text = unicodedata.normalize("NFKC", text)
# 去除标点
text = re.sub(r'[^\w\s]', '', text)
# 合并多余空白
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def split_paragraphs(text: str, min_length: int = 20) -> List[str]:
"""按段落分割文本,过滤过短段落。"""
paragraphs = re.split(r'\n\s*\n|\n', text)
return [p.strip() for p in paragraphs if len(p.strip()) >= min_length]
def tokenize_chinese(text: str) -> List[str]:
"""中文分词(优先 jieba,降级到字符级)。"""
try:
import jieba
return list(jieba.cut(text))
except ImportError:
# 降级:按字符分词,保留连续英文/数字为整词
tokens = []
current_ascii = []
for char in text:
if char.isascii() and char.isalnum():
current_ascii.append(char)
else:
if current_ascii:
tokens.append(''.join(current_ascii))
current_ascii = []
if char.strip():
tokens.append(char)
if current_ascii:
tokens.append(''.join(current_ascii))
return tokens
# === n-gram 相似度 ===
def char_ngrams(text: str, n: int = 3) -> set:
"""生成字符级 n-gram 集合。"""
text = preprocess_text(text)
if len(text) < n:
return {text} if text else set()
return {text[i:i + n] for i in range(len(text) - n + 1)}
def word_ngrams(tokens: list, n: int = 2) -> set:
"""生成词级 n-gram 集合。"""
if len(tokens) < n:
return {tuple(tokens)} if tokens else set()
return {tuple(tokens[i:i + n]) for i in range(len(tokens) - n + 1)}
def jaccard_similarity(set_a: set, set_b: set) -> float:
"""Jaccard 系数 = |A ∩ B| / |A ∪ B|。"""
if not set_a and not set_b:
return 1.0
if not set_a or not set_b:
return 0.0
intersection = len(set_a & set_b)
union = len(set_a | set_b)
return intersection / union if union > 0 else 0.0
# === 编辑距离 ===
def edit_distance(s1: str, s2: str) -> int:
"""Levenshtein 编辑距离(空间优化为 O(min(m,n)))。"""
if len(s1) < len(s2):
s1, s2 = s2, s1
prev_row = list(range(len(s2) + 1))
for i, c1 in enumerate(s1):
curr_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = prev_row[j + 1] + 1
deletions = curr_row[j] + 1
substitutions = prev_row[j] + (0 if c1 == c2 else 1)
curr_row.append(min(insertions, deletions, substitutions))
prev_row = curr_row
return prev_row[-1]
def normalized_edit_distance(s1: str, s2: str) -> float:
"""归一化编辑距离 = edit_distance / max(len(s1), len(s2))。"""
max_len = max(len(s1), len(s2))
if max_len == 0:
return 0.0
return edit_distance(s1, s2) / max_len
# === TF-IDF 余弦相似度 ===
def compute_idf(corpus: List[List[str]]) -> dict:
"""计算逆文档频率(平滑版,避免 log(1)=0 的问题)。"""
doc_count = len(corpus)
if doc_count == 0:
return {}
df = {}
for tokens in corpus:
seen = set(tokens)
for token in seen:
df[token] = df.get(token, 0) + 1
return {
token: math.log((doc_count + 1) / (count + 1)) + 1.0
for token, count in df.items()
}
def build_tfidf_vector(tokens: list, idf_dict: dict) -> dict:
"""构建 TF-IDF 向量。"""
tf = {}
for token in tokens:
tf[token] = tf.get(token, 0) + 1
total = len(tokens) if tokens else 1
return {
token: (count / total) * idf_dict.get(token, 1.0)
for token, count in tf.items()
}
def cosine_similarity_vec(vec_a: dict, vec_b: dict) -> float:
"""余弦相似度。"""
common_keys = set(vec_a.keys()) & set(vec_b.keys())
dot_product = sum(vec_a[k] * vec_b[k] for k in common_keys)
norm_a = math.sqrt(sum(v * v for v in vec_a.values()))
norm_b = math.sqrt(sum(v * v for v in vec_b.values()))
if norm_a == 0 or norm_b == 0:
return 0.0
return dot_product / (norm_a * norm_b)
# === 综合比对 ===
def combine_scores(ngram_sim: float, edit_dist_norm: float,
cosine_sim: float) -> float:
"""综合评分(加权平均)。"""
edit_sim = 1.0 - edit_dist_norm
return 0.3 * ngram_sim + 0.3 * edit_sim + 0.4 * cosine_sim
def compare_paragraphs(para_a: str, para_b: str,
idf_dict: dict = None) -> dict:
"""计算两段文本的全部相似度指标。"""
# n-gram Jaccard
ngrams_a = char_ngrams(para_a, n=3)
ngrams_b = char_ngrams(para_b, n=3)
ngram_sim = jaccard_similarity(ngrams_a, ngrams_b)
# 编辑距离
preprocessed_a = preprocess_text(para_a)
preprocessed_b = preprocess_text(para_b)
edit_dist = normalized_edit_distance(preprocessed_a, preprocessed_b)
# TF-IDF 余弦
tokens_a = tokenize_chinese(preprocessed_a)
tokens_b = tokenize_chinese(preprocessed_b)
if idf_dict is None:
idf_dict = compute_idf([tokens_a, tokens_b])
vec_a = build_tfidf_vector(tokens_a, idf_dict)
vec_b = build_tfidf_vector(tokens_b, idf_dict)
cosine_sim = cosine_similarity_vec(vec_a, vec_b)
combined = combine_scores(ngram_sim, edit_dist, cosine_sim)
return {
"ngram_jaccard": round(ngram_sim, 4),
"edit_distance_normalized": round(edit_dist, 4),
"cosine_similarity": round(cosine_sim, 4),
"combined_score": round(combined, 4),
}
def _determine_risk_level(max_score: float, suspicious_count: int,
total: int) -> str:
"""根据检测结果确定风险等级。"""
if suspicious_count == 0:
return "low"
ratio = suspicious_count / total if total > 0 else 0
if max_score >= 0.95 or ratio >= 0.5:
return "critical"
if max_score >= 0.85 or ratio >= 0.3:
return "high"
if max_score >= 0.7 or ratio >= 0.1:
return "medium"
return "low"
def scan_for_plagiarism(input_text: str, reference_texts: dict,
threshold: float = 0.7) -> CopyrightReport:
"""
主入口:扫描输入文本与参考文本库的相似度。
Args:
input_text: 待检剧本全文
reference_texts: {"source_id": "全文内容", ...}
threshold: 判定阈值 (默认 0.7)
Returns:
CopyrightReport
"""
input_paragraphs = split_paragraphs(input_text)
if not input_paragraphs:
return CopyrightReport(
total_paragraphs=0,
suspicious_paragraphs=0,
max_similarity_score=0.0,
risk_level="low",
)
# 构建参考文本段落
ref_paragraphs = {}
for ref_id, ref_text in reference_texts.items():
ref_paragraphs[ref_id] = split_paragraphs(ref_text)
# 构建全局 IDF
all_token_lists = []
for para in input_paragraphs:
all_token_lists.append(tokenize_chinese(preprocess_text(para)))
for ref_id, paras in ref_paragraphs.items():
for para in paras:
all_token_lists.append(tokenize_chinese(preprocess_text(para)))
global_idf = compute_idf(all_token_lists)
# 逐段比对
results = []
for i, input_para in enumerate(input_paragraphs):
best_match = None
best_score = 0.0
for ref_id, ref_paras in ref_paragraphs.items():
for j, ref_para in enumerate(ref_paras):
scores = compare_paragraphs(input_para, ref_para, global_idf)
if scores["combined_score"] > best_score:
best_score = scores["combined_score"]
best_match = SimilarityResult(
source_paragraph_index=i,
source_text=input_para[:100],
reference_id=ref_id,
reference_paragraph_index=j,
reference_text=ref_para[:100],
ngram_jaccard=scores["ngram_jaccard"],
edit_distance_normalized=scores["edit_distance_normalized"],
cosine_similarity=scores["cosine_similarity"],
combined_score=scores["combined_score"],
is_suspicious=scores["combined_score"] >= threshold,
)
if best_match and best_match.is_suspicious:
results.append(best_match)
suspicious_count = len(results)
max_score = max((r.combined_score for r in results), default=0.0)
return CopyrightReport(
total_paragraphs=len(input_paragraphs),
suspicious_paragraphs=suspicious_count,
max_similarity_score=round(max_score, 4),
risk_level=_determine_risk_level(
max_score, suspicious_count, len(input_paragraphs)
),
results=results,
)
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(description="文本相似度检测")
parser.add_argument("--input", required=True, help="输入文件路径")
parser.add_argument("--reference-dir", required=True, help="参考文本目录")
parser.add_argument("--threshold", type=float, default=0.7, help="判定阈值")
args = parser.parse_args()
input_path = Path(args.input)
if not input_path.exists():
print(f"错误: 输入文件不存在: {input_path}")
sys.exit(1)
input_text = input_path.read_text(encoding="utf-8")
ref_dir = Path(args.reference_dir)
reference_texts = {}
if ref_dir.exists():
for f in ref_dir.glob("*.txt"):
reference_texts[f.stem] = f.read_text(encoding="utf-8")
report = scan_for_plagiarism(input_text, reference_texts, args.threshold)
print(f"=== 版权侵权检测报告 ===")
print(f"总段落数: {report.total_paragraphs}")
print(f"可疑段落: {report.suspicious_paragraphs}")
print(f"最高相似度: {report.max_similarity_score}")
print(f"风险等级: {report.risk_level}")
if report.results:
print(f"\n可疑段落详情:")
for r in report.results:
print(f"\n 段落 {r.source_paragraph_index}: "
f"综合得分 {r.combined_score:.4f}")
print(f" 来源: {r.reference_id} 段落 {r.reference_paragraph_index}")
print(f" 原文: {r.source_text[:60]}...")
print(f" 参考: {r.reference_text[:60]}...")
```
### scripts/age_rating_scanner.py
```python
"""
年龄分级合规检测器
两层架构:
Layer 1: 本地关键词快速扫描
Layer 2: AI 上下文深度分析(需要 API 密钥)
"""
import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional
@dataclass
class KeywordHit:
"""关键词命中记录。"""
keyword: str
category: str # "violence" / "sexual" / "horror" / "profanity" / "substance"
severity: str # "mild" / "moderate" / "severe"
paragraph_index: int
position_in_paragraph: int
context: str # 命中前后上下文
timestamp: Optional[str] = None
@dataclass
class RatingResult:
"""分级检测结果。"""
suggested_rating: str # "all_ages" / "12+" / "18+" / "non_compliant"
target_rating: str # 用户期望的分级
is_compliant: bool
total_hits: int
hits_by_category: dict = field(default_factory=dict)
hits_by_severity: dict = field(default_factory=dict)
keyword_hits: List[KeywordHit] = field(default_factory=list)
ai_analysis: Optional[dict] = None
risk_level: str = "low"
# === 关键词库管理 ===
def _get_keywords_dir() -> Path:
"""获取关键词库目录。"""
return Path(__file__).parent.parent / "assets" / "keyword_databases"
def _get_rules_dir() -> Path:
"""获取分级规则目录。"""
return Path(__file__).parent.parent / "assets" / "rating_rules"
def load_keyword_database(category: str) -> dict:
"""加载指定类别的关键词库。"""
filepath = _get_keywords_dir() / f"{category}_keywords.json"
if not filepath.exists():
return {"category": category, "keywords": {}}
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
def load_all_keywords() -> dict:
"""加载全部类别的关键词库。"""
categories = ["violence", "sexual", "horror", "profanity", "substance"]
all_kw = {}
for cat in categories:
db = load_keyword_database(cat)
all_kw[cat] = db.get("keywords", {})
return all_kw
def load_rating_rules(ruleset: str = "china") -> dict:
"""加载分级规则配置。"""
filepath = _get_rules_dir() / f"{ruleset}_rating.json"
if not filepath.exists():
raise FileNotFoundError(f"分级规则文件不存在: {filepath}")
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
# === Layer 1: 本地关键词扫描 ===
def _extract_context(text: str, pos: int, window: int = 30) -> str:
"""提取命中位置前后的上下文。"""
start = max(0, pos - window)
end = min(len(text), pos + window)
return text[start:end]
def scan_keywords(text: str, keywords_db: dict) -> List[KeywordHit]:
"""逐段扫描关键词命中。"""
paragraphs = re.split(r'\n\s*\n|\n', text)
hits = []
for para_idx, paragraph in enumerate(paragraphs):
paragraph_lower = paragraph.lower()
for category, keywords in keywords_db.items():
for keyword, info in keywords.items():
# 检查主关键词
all_forms = [keyword] + info.get("aliases", [])
for form in all_forms:
form_lower = form.lower()
start = 0
while True:
pos = paragraph_lower.find(form_lower, start)
if pos == -1:
break
hits.append(KeywordHit(
keyword=form,
category=category,
severity=info["severity"],
paragraph_index=para_idx,
position_in_paragraph=pos,
context=_extract_context(paragraph, pos),
))
start = pos + len(form_lower)
return hits
def _count_by_field(hits: List[KeywordHit], field_name: str) -> dict:
"""按指定字段统计命中数。"""
counts = {}
for hit in hits:
value = getattr(hit, field_name)
counts[value] = counts.get(value, 0) + 1
return counts
def calculate_initial_rating(hits: List[KeywordHit],
rules: dict) -> str:
"""根据关键词命中情况计算初步分级。"""
if not hits:
return "all_ages"
by_severity = _count_by_field(hits, "severity")
by_category = _count_by_field(hits, "category")
# 检查是否触发不合规
for trigger in rules.get("non_compliant_triggers", []):
if "category" in trigger and "severity" in trigger:
cat_sev_count = sum(
1 for h in hits
if h.category == trigger["category"]
and h.severity == trigger["severity"]
)
if cat_sev_count >= trigger.get("min_count", 1):
return "non_compliant"
elif "severity" in trigger:
sev_count = by_severity.get(trigger["severity"], 0)
if sev_count >= trigger.get("min_count", 1):
return "non_compliant"
# 逐级检查分级
ratings_order = ["all_ages", "12+", "18+"]
ratings_config = rules.get("ratings", {})
for rating in ratings_order:
config = ratings_config.get(rating, {})
# 检查禁止类别
forbidden = config.get("forbidden_categories", [])
if any(by_category.get(cat, 0) > 0 for cat in forbidden):
continue
# 检查严重度上限
max_sev = config.get("max_severity", "severe")
severity_order = {"mild": 0, "moderate": 1, "severe": 2}
max_sev_level = severity_order.get(max_sev, 2)
# 检查是否有超过允许严重度的命中
exceeded = False
for sev, level in severity_order.items():
if level > max_sev_level and by_severity.get(sev, 0) > 0:
exceeded = True
break
if exceeded:
continue
# 检查各严重度的数量限制
mild_limit = config.get("max_hits_mild", -1)
moderate_limit = config.get("max_hits_moderate", -1)
severe_limit = config.get("max_hits_severe", -1)
mild_ok = mild_limit == -1 or by_severity.get("mild", 0) <= mild_limit
moderate_ok = moderate_limit == -1 or by_severity.get("moderate", 0) <= moderate_limit
severe_ok = severe_limit == -1 or by_severity.get("severe", 0) <= severe_limit
if mild_ok and moderate_ok and severe_ok:
return rating
return "18+"
def _rating_order(rating: str) -> int:
"""分级排序值。"""
order = {"all_ages": 0, "12+": 1, "18+": 2, "non_compliant": 3}
return order.get(rating, 3)
def _determine_risk_level(suggested: str, target: str) -> str:
"""确定风险等级。"""
if suggested == "non_compliant":
return "critical"
if _rating_order(suggested) > _rating_order(target):
diff = _rating_order(suggested) - _rating_order(target)
if diff >= 2:
return "high"
return "medium"
return "low"
# === 辅助分析 ===
def analyze_frame_descriptions(descriptions: List[dict],
keywords_db: dict) -> List[KeywordHit]:
"""分析视频关键帧描述文本。"""
hits = []
for desc_item in descriptions:
text = desc_item.get("description", "")
timestamp = desc_item.get("timestamp", "")
frame_hits = scan_keywords(text, keywords_db)
for hit in frame_hits:
hit.timestamp = timestamp
hits.extend(frame_hits)
return hits
def analyze_audio_transcript(transcript: str,
keywords_db: dict) -> List[KeywordHit]:
"""分析音频转录文本。"""
return scan_keywords(transcript, keywords_db)
# === 主入口 ===
def run_age_rating_scan(text: str, target_rating: str = "all_ages",
ruleset: str = "china",
frame_descriptions: list = None,
audio_transcript: str = None) -> RatingResult:
"""
完整的年龄分级扫描流程。
Args:
text: 剧本/台词文本
target_rating: 用户期望的目标分级
ruleset: 分级规则集("china" 或 "general")
frame_descriptions: 视频关键帧描述列表
audio_transcript: 音频转录文本
Returns:
RatingResult
"""
keywords_db = load_all_keywords()
rules = load_rating_rules(ruleset)
# Layer 1: 本地扫描
all_hits = scan_keywords(text, keywords_db)
# 辅助内容分析
if frame_descriptions:
all_hits.extend(analyze_frame_descriptions(frame_descriptions, keywords_db))
if audio_transcript:
all_hits.extend(analyze_audio_transcript(audio_transcript, keywords_db))
# 计算分级
suggested = calculate_initial_rating(all_hits, rules)
is_compliant = _rating_order(suggested) <= _rating_order(target_rating)
risk_level = _determine_risk_level(suggested, target_rating)
return RatingResult(
suggested_rating=suggested,
target_rating=target_rating,
is_compliant=is_compliant,
total_hits=len(all_hits),
hits_by_category=_count_by_field(all_hits, "category"),
hits_by_severity=_count_by_field(all_hits, "severity"),
keyword_hits=all_hits,
risk_level=risk_level,
)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="年龄分级合规检测")
parser.add_argument("--input", required=True, help="输入文件路径")
parser.add_argument("--target-rating", default="all_ages",
choices=["all_ages", "12+", "18+"])
parser.add_argument("--ruleset", default="china",
choices=["china", "general"])
args = parser.parse_args()
input_path = Path(args.input)
if not input_path.exists():
print(f"错误: 输入文件不存在: {input_path}")
sys.exit(1)
text = input_path.read_text(encoding="utf-8")
result = run_age_rating_scan(text, args.target_rating, args.ruleset)
print(f"=== 年龄分级检测报告 ===")
print(f"建议分级: {result.suggested_rating}")
print(f"目标分级: {result.target_rating}")
print(f"是否合规: {'是' if result.is_compliant else '否'}")
print(f"风险等级: {result.risk_level}")
print(f"总命中数: {result.total_hits}")
print(f"按类别: {json.dumps(result.hits_by_category, ensure_ascii=False)}")
print(f"按严重度: {json.dumps(result.hits_by_severity, ensure_ascii=False)}")
```
### scripts/adaptation_detector.py
```python
"""
小说魔改检测器
比对原著与改编版本,量化改编偏离程度。
使用 Needleman-Wunsch 变体进行章节对齐。
"""
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional
sys.path.insert(0, str(Path(__file__).parent))
from text_similarity import (
preprocess_text, char_ngrams, jaccard_similarity,
tokenize_chinese, compute_idf, build_tfidf_vector,
cosine_similarity_vec,
)
@dataclass
class PlotPoint:
"""情节点。"""
index: int
summary: str
characters: List[str] = field(default_factory=list)
location: Optional[str] = None
importance: str = "normal" # "core" / "normal" / "minor"
@dataclass
class CharacterProfile:
"""角色概要。"""
name: str
traits: List[str] = field(default_factory=list)
relationships: dict = field(default_factory=dict)
fate: Optional[str] = None
@dataclass
class DeviationItem:
"""偏离项。"""
deviation_type: str # "plot_added"/"plot_removed"/"plot_modified"
# "character_changed"/"setting_changed"
original_content: str
adapted_content: str
severity: str # "minor" / "moderate" / "major"
description: str
@dataclass
class AdaptationReport:
"""改编检测报告。"""
deviation_score: float # 0-100
adaptation_type: str # "faithful"/"reasonable"/"severe_modification"
total_deviations: int
deviations_by_type: dict = field(default_factory=dict)
deviations_by_severity: dict = field(default_factory=dict)
deviation_items: List[DeviationItem] = field(default_factory=list)
section_alignment: list = field(default_factory=list)
# === 文本结构提取 ===
def extract_sections(text: str) -> List[dict]:
"""
提取章节/段落结构。
尝试按章节标题分割,如果没有明确标题则按段落分割。
"""
# 尝试按中文章节标题分割
chapter_pattern = r'(第[一二三四五六七八九十百千\d]+[章节回集幕][\s::]*[^\n]*)'
chapters = re.split(chapter_pattern, text)
sections = []
if len(chapters) > 1:
# 有明确章节标题
i = 0
while i < len(chapters):
if re.match(chapter_pattern, chapters[i]):
title = chapters[i].strip()
content = chapters[i + 1].strip() if i + 1 < len(chapters) else ""
sections.append({"title": title, "content": content})
i += 2
else:
if chapters[i].strip():
sections.append({"title": "", "content": chapters[i].strip()})
i += 1
else:
# 按段落分割
paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
for i, para in enumerate(paragraphs):
if len(para) >= 15: # 过滤过短段落
sections.append({"title": f"段落{i + 1}", "content": para})
return sections
def _quick_similarity(text_a: str, text_b: str) -> float:
"""快速计算两段文本的相似度(用于对齐)。"""
if not text_a or not text_b:
return 0.0
# 使用字符 n-gram Jaccard 作为快速相似度
ngrams_a = char_ngrams(text_a, n=3)
ngrams_b = char_ngrams(text_b, n=3)
return jaccard_similarity(ngrams_a, ngrams_b)
# === 章节对齐 ===
def align_sections(original_sections: list, adapted_sections: list) -> list:
"""
基于 Needleman-Wunsch 变体的章节对齐。
Returns:
[(orig_idx_or_None, adapted_idx_or_None, similarity, status), ...]
status: "matched" / "added" / "removed" / "modified"
"""
m = len(original_sections)
n = len(adapted_sections)
if m == 0 and n == 0:
return []
if m == 0:
return [(None, j, 0.0, "added") for j in range(n)]
if n == 0:
return [(i, None, 0.0, "removed") for i in range(m)]
# 构建相似度矩阵
sim_matrix = [[0.0] * n for _ in range(m)]
for i in range(m):
for j in range(n):
sim_matrix[i][j] = _quick_similarity(
original_sections[i]["content"],
adapted_sections[j]["content"],
)
# 动态规划
GAP_PENALTY = -0.1
dp = [[0.0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
dp[i][0] = dp[i - 1][0] + GAP_PENALTY
for j in range(1, n + 1):
dp[0][j] = dp[0][j - 1] + GAP_PENALTY
for i in range(1, m + 1):
for j in range(1, n + 1):
match_score = dp[i - 1][j - 1] + sim_matrix[i - 1][j - 1]
skip_orig = dp[i - 1][j] + GAP_PENALTY
skip_adapt = dp[i][j - 1] + GAP_PENALTY
dp[i][j] = max(match_score, skip_orig, skip_adapt)
# 回溯
alignment = []
i, j = m, n
while i > 0 or j > 0:
if i > 0 and j > 0 and dp[i][j] == dp[i - 1][j - 1] + sim_matrix[i - 1][j - 1]:
sim = sim_matrix[i - 1][j - 1]
status = "matched" if sim >= 0.3 else "modified"
alignment.append((i - 1, j - 1, sim, status))
i -= 1
j -= 1
elif i > 0 and dp[i][j] == dp[i - 1][j] + GAP_PENALTY:
alignment.append((i - 1, None, 0.0, "removed"))
i -= 1
else:
alignment.append((None, j - 1, 0.0, "added"))
j -= 1
alignment.reverse()
return alignment
# === 角色分析 ===
def extract_characters_local(text: str) -> List[str]:
"""本地方式提取角色名(基于高频重复的短词)。"""
# 简单启发式:提取引号中出现的称呼和高频 2-3 字名
names = set()
# 提取对话前的称呼
dialogue_pattern = r'[「『"](.*?)[」』"]'
speaker_pattern = r'(\S{2,4})[说道叫喊问答笑哭]'
for match in re.finditer(speaker_pattern, text):
name = match.group(1)
if len(name) <= 4 and not any(c.isdigit() for c in name):
names.add(name)
return list(names)
# === 偏离度计算 ===
def _classify_deviation_severity(sim: float, status: str) -> str:
"""判定偏离严重程度。"""
if status == "removed":
return "major"
if status == "added":
return "moderate"
if status == "modified":
if sim >= 0.5:
return "minor"
if sim >= 0.2:
return "moderate"
return "major"
return "minor"
def build_deviations(alignment: list, original_sections: list,
adapted_sections: list) -> List[DeviationItem]:
"""从对齐结果构建偏离项列表。"""
deviations = []
for orig_idx, adapt_idx, sim, status in alignment:
if status == "matched":
continue
orig_content = (original_sections[orig_idx]["content"][:200]
if orig_idx is not None else "")
adapt_content = (adapted_sections[adapt_idx]["content"][:200]
if adapt_idx is not None else "")
severity = _classify_deviation_severity(sim, status)
if status == "removed":
desc = f"原著段落被删除"
dev_type = "plot_removed"
elif status == "added":
desc = f"新增了原著中没有的内容"
dev_type = "plot_added"
else:
desc = f"内容被修改(相似度: {sim:.2f})"
dev_type = "plot_modified"
deviations.append(DeviationItem(
deviation_type=dev_type,
original_content=orig_content,
adapted_content=adapt_content,
severity=severity,
description=desc,
))
return deviations
def calculate_deviation_score(deviations: List[DeviationItem],
total_sections: int) -> float:
"""
计算偏离度评分 (0-100)。
权重设计:
- plot_removed × 3.0(删除原著核心最严重)
- plot_modified × 2.0
- plot_added × 1.0
- character_changed × 2.5
- setting_changed × 1.5
严重度加权:minor × 0.5, moderate × 1.0, major × 2.0
"""
if not deviations or total_sections == 0:
return 0.0
severity_weights = {"minor": 0.5, "moderate": 1.0, "major": 2.0}
type_weights = {
"plot_removed": 3.0,
"plot_modified": 2.0,
"plot_added": 1.0,
"character_changed": 2.5,
"setting_changed": 1.5,
}
weighted_sum = 0.0
for d in deviations:
sw = severity_weights.get(d.severity, 1.0)
tw = type_weights.get(d.deviation_type, 1.0)
weighted_sum += sw * tw
# 归一化到 0-100
max_possible = total_sections * 3.0 * 2.0 # 全部为 major + removed
score = min(100.0, (weighted_sum / max(max_possible, 1)) * 100)
return round(score, 1)
def classify_adaptation(score: float) -> str:
"""分类改编类型。"""
if score <= 30:
return "faithful"
if score <= 60:
return "reasonable"
return "severe_modification"
# === 主入口 ===
def detect_adaptation(original_text: str, adapted_text: str) -> AdaptationReport:
"""
完整的魔改检测流程。
Args:
original_text: 原著全文
adapted_text: 改编版全文
Returns:
AdaptationReport
"""
# 提取结构
orig_sections = extract_sections(original_text)
adapt_sections = extract_sections(adapted_text)
if not orig_sections and not adapt_sections:
return AdaptationReport(
deviation_score=0.0,
adaptation_type="faithful",
total_deviations=0,
)
# 章节对齐
alignment = align_sections(orig_sections, adapt_sections)
# 构建偏离项
deviations = build_deviations(alignment, orig_sections, adapt_sections)
# 计算偏离度
total_sections = max(len(orig_sections), len(adapt_sections))
score = calculate_deviation_score(deviations, total_sections)
adaptation_type = classify_adaptation(score)
# 统计
by_type = {}
by_severity = {}
for d in deviations:
by_type[d.deviation_type] = by_type.get(d.deviation_type, 0) + 1
by_severity[d.severity] = by_severity.get(d.severity, 0) + 1
return AdaptationReport(
deviation_score=score,
adaptation_type=adaptation_type,
total_deviations=len(deviations),
deviations_by_type=by_type,
deviations_by_severity=by_severity,
deviation_items=deviations,
section_alignment=[(o, a, s, st) for o, a, s, st in alignment],
)
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(description="小说魔改检测")
parser.add_argument("--original", required=True, help="原著文件路径")
parser.add_argument("--adapted", required=True, help="改编版文件路径")
args = parser.parse_args()
orig_path = Path(args.original)
adapt_path = Path(args.adapted)
if not orig_path.exists():
print(f"错误: 原著文件不存在: {orig_path}")
sys.exit(1)
if not adapt_path.exists():
print(f"错误: 改编文件不存在: {adapt_path}")
sys.exit(1)
original = orig_path.read_text(encoding="utf-8")
adapted = adapt_path.read_text(encoding="utf-8")
report = detect_adaptation(original, adapted)
print(f"=== 小说魔改检测报告 ===")
print(f"偏离度评分: {report.deviation_score}/100")
print(f"改编类型: {report.adaptation_type}")
print(f"总偏离数: {report.total_deviations}")
print(f"按类型: {json.dumps(report.deviations_by_type, ensure_ascii=False)}")
print(f"按严重度: {json.dumps(report.deviations_by_severity, ensure_ascii=False)}")
if report.deviation_items:
print(f"\n偏离详情:")
for d in report.deviation_items[:10]:
print(f" [{d.severity}] {d.description}")
if d.original_content:
print(f" 原文: {d.original_content[:80]}...")
if d.adapted_content:
print(f" 改编: {d.adapted_content[:80]}...")
```
### scripts/report_generator.py
```python
"""
合规报告生成器
生成结构化 JSON 报告和可读 Markdown 报告。
"""
import json
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import List, Optional
@dataclass
class ComplianceReport:
"""完整合规报告。"""
report_id: str = ""
generated_at: str = ""
input_file: str = ""
overall_risk_level: str = "low" # "low"/"medium"/"high"/"critical"
overall_score: float = 100.0 # 0-100 合规得分(越高越合规)
copyright_result: Optional[dict] = None
age_rating_result: Optional[dict] = None
adaptation_result: Optional[dict] = None
violation_summary: List[dict] = field(default_factory=list)
remediation_suggestions: List[str] = field(default_factory=list)
def calculate_overall_risk(copyright_result: dict = None,
age_rating_result: dict = None,
adaptation_result: dict = None) -> tuple:
"""
计算总体风险等级和合规得分。
Returns:
(risk_level, score)
"""
risk_scores = [] # 各模块的风险分(越高越危险)
if copyright_result:
level = copyright_result.get("risk_level", "low")
level_map = {"low": 0, "medium": 30, "high": 60, "critical": 90}
risk_scores.append(level_map.get(level, 0))
if age_rating_result:
if not age_rating_result.get("is_compliant", True):
level = age_rating_result.get("risk_level", "low")
level_map = {"low": 0, "medium": 30, "high": 60, "critical": 90}
risk_scores.append(level_map.get(level, 0))
else:
risk_scores.append(0)
if adaptation_result:
score = adaptation_result.get("deviation_score", 0)
if score >= 60:
risk_scores.append(70)
elif score >= 30:
risk_scores.append(30)
else:
risk_scores.append(0)
if not risk_scores:
return "low", 100.0
max_risk = max(risk_scores)
avg_risk = sum(risk_scores) / len(risk_scores)
# 综合风险:最大风险权重 0.7 + 平均风险 0.3
combined_risk = max_risk * 0.7 + avg_risk * 0.3
if combined_risk >= 70:
risk_level = "critical"
elif combined_risk >= 45:
risk_level = "high"
elif combined_risk >= 20:
risk_level = "medium"
else:
risk_level = "low"
compliance_score = max(0, 100 - combined_risk)
return risk_level, round(compliance_score, 1)
def _build_violation_summary(copyright_result: dict = None,
age_rating_result: dict = None,
adaptation_result: dict = None) -> List[dict]:
"""构建违规摘要列表。"""
violations = []
if copyright_result and copyright_result.get("suspicious_paragraphs", 0) > 0:
violations.append({
"type": "copyright",
"severity": copyright_result.get("risk_level", "medium"),
"description": (
f"发现 {copyright_result['suspicious_paragraphs']} 个疑似侵权段落,"
f"最高相似度 {copyright_result.get('max_similarity_score', 0):.2f}"
),
})
if age_rating_result and not age_rating_result.get("is_compliant", True):
violations.append({
"type": "age_rating",
"severity": age_rating_result.get("risk_level", "medium"),
"description": (
f"内容建议分级 {age_rating_result.get('suggested_rating', '未知')},"
f"超出目标分级 {age_rating_result.get('target_rating', '未知')},"
f"共 {age_rating_result.get('total_hits', 0)} 处命中"
),
})
if adaptation_result and adaptation_result.get("deviation_score", 0) >= 60:
violations.append({
"type": "adaptation",
"severity": "high",
"description": (
f"改编偏离度 {adaptation_result['deviation_score']}/100,"
f"属于{_translate_adaptation_type(adaptation_result.get('adaptation_type', ''))},"
f"共 {adaptation_result.get('total_deviations', 0)} 处偏离"
),
})
return violations
def _translate_adaptation_type(t: str) -> str:
"""翻译改编类型。"""
types = {
"faithful": "忠实改编",
"reasonable": "合理改编",
"severe_modification": "严重魔改",
}
return types.get(t, t)
def _build_remediation(violations: List[dict]) -> List[str]:
"""根据违规摘要生成整改建议。"""
suggestions = []
for v in violations:
if v["type"] == "copyright":
suggestions.append("对疑似侵权段落进行原创性改写,避免与已有作品高度相似")
suggestions.append("核实参考来源的版权状态,确认是否需要获取授权")
elif v["type"] == "age_rating":
suggestions.append("修改或删除不符合目标年龄分级的内容")
suggestions.append("对暴力/恐怖/不当场景进行弱化处理")
elif v["type"] == "adaptation":
suggestions.append("重新审视对原著核心情节的改动,确保改编的合理性")
suggestions.append("考虑获取原著权利人的改编授权")
if not suggestions:
suggestions.append("当前内容未发现明显违规,建议定期复查")
return list(dict.fromkeys(suggestions)) # 去重保序
def generate_json_report(report: ComplianceReport) -> str:
"""生成 JSON 格式报告。"""
data = {
"report_id": report.report_id,
"generated_at": report.generated_at,
"input_file": report.input_file,
"overall_risk_level": report.overall_risk_level,
"overall_score": report.overall_score,
"violation_summary": report.violation_summary,
"remediation_suggestions": report.remediation_suggestions,
}
if report.copyright_result:
data["copyright_detection"] = report.copyright_result
if report.age_rating_result:
data["age_rating_scan"] = report.age_rating_result
if report.adaptation_result:
data["adaptation_detection"] = report.adaptation_result
return json.dumps(data, ensure_ascii=False, indent=2)
def generate_markdown_report(report: ComplianceReport) -> str:
"""生成 Markdown 可读报告。"""
lines = [
f"# AI短剧合规审查报告",
f"",
f"**报告 ID**: {report.report_id}",
f"**生成时间**: {report.generated_at}",
f"**输入文件**: {report.input_file}",
f"",
f"## 总体评估",
f"",
f"| 项目 | 结果 |",
f"|------|------|",
f"| 风险等级 | **{report.overall_risk_level.upper()}** |",
f"| 合规得分 | {report.overall_score}/100 |",
f"",
]
if report.violation_summary:
lines.append("## 违规摘要")
lines.append("")
for v in report.violation_summary:
emoji = {"low": "!", "medium": "!!", "high": "!!!", "critical": "!!!!"}
lines.append(f"- [{v['severity'].upper()}] **{v['type']}**: {v['description']}")
lines.append("")
if report.copyright_result:
cr = report.copyright_result
lines.append("## 版权侵权检测")
lines.append("")
lines.append(f"- 总段落数: {cr.get('total_paragraphs', 0)}")
lines.append(f"- 可疑段落: {cr.get('suspicious_paragraphs', 0)}")
lines.append(f"- 最高相似度: {cr.get('max_similarity_score', 0):.4f}")
lines.append(f"- 风险等级: {cr.get('risk_level', 'low')}")
lines.append("")
if report.age_rating_result:
ar = report.age_rating_result
lines.append("## 年龄分级合规")
lines.append("")
lines.append(f"- 建议分级: {ar.get('suggested_rating', 'N/A')}")
lines.append(f"- 目标分级: {ar.get('target_rating', 'N/A')}")
lines.append(f"- 是否合规: {'是' if ar.get('is_compliant') else '否'}")
lines.append(f"- 总命中数: {ar.get('total_hits', 0)}")
lines.append("")
if report.adaptation_result:
ad = report.adaptation_result
lines.append("## 小说改编检测")
lines.append("")
lines.append(f"- 偏离度: {ad.get('deviation_score', 0)}/100")
lines.append(f"- 改编类型: {_translate_adaptation_type(ad.get('adaptation_type', ''))}")
lines.append(f"- 总偏离数: {ad.get('total_deviations', 0)}")
lines.append("")
lines.append("## 整改建议")
lines.append("")
for i, suggestion in enumerate(report.remediation_suggestions, 1):
lines.append(f"{i}. {suggestion}")
lines.append("")
lines.append("---")
lines.append("*本报告由 ai-drama-review 自动生成,仅供参考,不作为法律依据。*")
return "\n".join(lines)
def generate_violation_annotations(report: ComplianceReport) -> list:
"""生成违规位置标注列表。"""
annotations = []
if report.copyright_result:
for r in report.copyright_result.get("results", []):
annotations.append({
"type": "copyright",
"location": {"paragraph": r.get("source_paragraph_index", 0)},
"severity": "high" if r.get("combined_score", 0) >= 0.85 else "medium",
"description": (
f"与 {r.get('reference_id', '未知')} 相似度 "
f"{r.get('combined_score', 0):.2f}"
),
})
if report.age_rating_result:
for hit in report.age_rating_result.get("keyword_hits", []):
annotations.append({
"type": "age_rating",
"location": {
"paragraph": hit.get("paragraph_index", 0),
"timestamp": hit.get("timestamp"),
},
"severity": hit.get("severity", "mild"),
"description": (
f"[{hit.get('category', '')}] "
f"关键词 '{hit.get('keyword', '')}'"
),
})
return annotations
def build_full_report(input_file: str, copyright_result=None,
age_rating_result=None,
adaptation_result=None) -> ComplianceReport:
"""汇总所有检测结果,构建完整报告。"""
risk_level, score = calculate_overall_risk(
copyright_result, age_rating_result, adaptation_result
)
violations = _build_violation_summary(
copyright_result, age_rating_result, adaptation_result
)
remediation = _build_remediation(violations)
report = ComplianceReport(
report_id=f"DR-{datetime.now().strftime('%Y%m%d%H%M%S')}",
generated_at=datetime.now().isoformat(),
input_file=input_file,
overall_risk_level=risk_level,
overall_score=score,
copyright_result=copyright_result,
age_rating_result=age_rating_result,
adaptation_result=adaptation_result,
violation_summary=violations,
remediation_suggestions=remediation,
)
return report
```
### scripts/review_orchestrator.py
```python
"""
审查流程编排器
协调版权检测、年龄分级、魔改检测三大模块,
输出统一的合规报告。
"""
import argparse
import json
import sys
from dataclasses import asdict
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from env_detect import run_full_detection, determine_run_mode, detect_api_keys
from text_similarity import scan_for_plagiarism, CopyrightReport
from age_rating_scanner import run_age_rating_scan, RatingResult
from adaptation_detector import detect_adaptation, AdaptationReport
from report_generator import (
build_full_report, generate_json_report, generate_markdown_report,
)
def load_input_text(file_path: str) -> str:
"""加载输入文件(支持 .txt / .srt / .json)。"""
path = Path(file_path)
suffix = path.suffix.lower()
text = path.read_text(encoding="utf-8")
if suffix == ".json":
data = json.loads(text)
# 尝试提取常见字段
if isinstance(data, dict):
parts = []
for key in ["script", "text", "content", "dialogue", "subtitles"]:
if key in data:
val = data[key]
if isinstance(val, str):
parts.append(val)
elif isinstance(val, list):
parts.extend(
item.get("text", str(item))
if isinstance(item, dict) else str(item)
for item in val
)
return "\n".join(parts) if parts else text
return text
if suffix == ".srt":
# 提取 SRT 字幕中的文本行
lines = []
for line in text.split("\n"):
line = line.strip()
# 跳过序号行、时间码行、空行
if not line or line.isdigit() or "-->" in line:
continue
lines.append(line)
return "\n".join(lines)
return text
def load_reference_texts(reference_dir: str) -> dict:
"""加载参考文本库。"""
ref_dir = Path(reference_dir)
if not ref_dir.exists():
return {}
texts = {}
for f in ref_dir.iterdir():
if f.suffix.lower() in (".txt", ".md"):
texts[f.stem] = f.read_text(encoding="utf-8")
return texts
def _copyright_result_to_dict(report: CopyrightReport) -> dict:
"""将 CopyrightReport 转为字典。"""
return {
"total_paragraphs": report.total_paragraphs,
"suspicious_paragraphs": report.suspicious_paragraphs,
"max_similarity_score": report.max_similarity_score,
"risk_level": report.risk_level,
"results": [
{
"source_paragraph_index": r.source_paragraph_index,
"source_text": r.source_text,
"reference_id": r.reference_id,
"combined_score": r.combined_score,
}
for r in report.results
],
}
def _rating_result_to_dict(result: RatingResult) -> dict:
"""将 RatingResult 转为字典。"""
return {
"suggested_rating": result.suggested_rating,
"target_rating": result.target_rating,
"is_compliant": result.is_compliant,
"total_hits": result.total_hits,
"hits_by_category": result.hits_by_category,
"hits_by_severity": result.hits_by_severity,
"risk_level": result.risk_level,
"keyword_hits": [
{
"keyword": h.keyword,
"category": h.category,
"severity": h.severity,
"paragraph_index": h.paragraph_index,
"context": h.context,
"timestamp": h.timestamp,
}
for h in result.keyword_hits[:50] # 限制数量
],
}
def _adaptation_result_to_dict(report: AdaptationReport) -> dict:
"""将 AdaptationReport 转为字典。"""
return {
"deviation_score": report.deviation_score,
"adaptation_type": report.adaptation_type,
"total_deviations": report.total_deviations,
"deviations_by_type": report.deviations_by_type,
"deviations_by_severity": report.deviations_by_severity,
"deviation_items": [
{
"type": d.deviation_type,
"severity": d.severity,
"description": d.description,
"original": d.original_content[:100],
"adapted": d.adapted_content[:100],
}
for d in report.deviation_items[:20]
],
}
def format_user_warning(report_dict: dict) -> str:
"""
格式化用户风险提示文本。
当检测到违规时,标注并提示模型告知用户该视频可能存在的违规风险。
"""
risk = report_dict.get("overall_risk_level", "low")
score = report_dict.get("overall_score", 100)
violations = report_dict.get("violation_summary", [])
if risk == "low" and not violations:
return "当前内容未发现明显合规风险。"
risk_labels = {
"low": "低风险",
"medium": "中等风险",
"high": "高风险",
"critical": "严重风险",
}
lines = [
f"[合规警告] 该内容存在 {risk_labels.get(risk, risk)} (合规得分: {score}/100)",
"",
]
for v in violations:
type_labels = {
"copyright": "版权侵权",
"age_rating": "年龄分级",
"adaptation": "小说魔改",
}
label = type_labels.get(v["type"], v["type"])
lines.append(f" - [{v['severity'].upper()}] {label}: {v['description']}")
lines.append("")
suggestions = report_dict.get("remediation_suggestions", [])
if suggestions:
lines.append("整改建议:")
for s in suggestions:
lines.append(f" - {s}")
lines.append("")
lines.append("注意: 以上检测结果仅供参考,不作为法律依据。建议进行人工复核。")
return "\n".join(lines)
def run_full_review(input_file: str,
reference_dir: str = None,
original_file: str = None,
target_rating: str = "all_ages",
checks: list = None,
output_format: str = "json") -> dict:
"""
执行完整审查流程。
Args:
input_file: 输入剧本/台词文件
reference_dir: 参考文本库目录(版权检测用)
original_file: 原著文件路径(魔改检测用)
target_rating: 目标年龄分级
checks: 要执行的检测模块列表
output_format: 输出格式 ("json" 或 "markdown")
Returns:
完整审查结果字典
"""
if checks is None:
checks = ["copyright", "rating", "adaptation"]
# 加载输入
input_text = load_input_text(input_file)
copyright_result = None
age_rating_result = None
adaptation_result = None
# 版权检测
if "copyright" in checks and reference_dir:
ref_texts = load_reference_texts(reference_dir)
if ref_texts:
cr = scan_for_plagiarism(input_text, ref_texts)
copyright_result = _copyright_result_to_dict(cr)
# 年龄分级检测
if "rating" in checks:
rr = run_age_rating_scan(input_text, target_rating)
age_rating_result = _rating_result_to_dict(rr)
# 魔改检测
if "adaptation" in checks and original_file:
orig_path = Path(original_file)
if orig_path.exists():
orig_text = orig_path.read_text(encoding="utf-8")
ar = detect_adaptation(orig_text, input_text)
adaptation_result = _adaptation_result_to_dict(ar)
# 构建报告
report = build_full_report(
input_file, copyright_result, age_rating_result, adaptation_result
)
if output_format == "markdown":
formatted = generate_markdown_report(report)
else:
formatted = generate_json_report(report)
# 生成用户警告
report_dict = json.loads(generate_json_report(report))
warning = format_user_warning(report_dict)
return {
"report": report_dict,
"formatted": formatted,
"warning": warning,
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="AI短剧规范审查")
parser.add_argument("--input", required=True, help="剧本/台词文件路径")
parser.add_argument("--reference-dir", help="参考文本库目录(版权检测用)")
parser.add_argument("--original", help="原著文件路径(魔改检测用)")
parser.add_argument("--target-rating", default="all_ages",
choices=["all_ages", "12+", "18+"])
parser.add_argument("--checks", nargs="+",
default=["copyright", "rating", "adaptation"],
choices=["copyright", "rating", "adaptation"])
parser.add_argument("--output", default="json",
choices=["json", "markdown"])
args = parser.parse_args()
input_path = Path(args.input)
if not input_path.exists():
print(f"错误: 输入文件不存在: {input_path}")
sys.exit(1)
result = run_full_review(
input_file=str(input_path),
reference_dir=args.reference_dir,
original_file=args.original,
target_rating=args.target_rating,
checks=args.checks,
output_format=args.output,
)
print(result["formatted"])
print()
print("=== 风险提示 ===")
print(result["warning"])
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "aaalenwow",
"slug": "ai-drama-review",
"displayName": "Ai Drama Review",
"latest": {
"version": "0.1.1",
"publishedAt": 1773134353756,
"commit": "https://github.com/openclaw/skills/commit/0e0463894d075833761724433445e7520d3045e2"
},
"history": []
}
```
### assets/keyword_databases/horror_keywords.json
```json
{
"category": "horror",
"version": "1.0",
"keywords": {
"恐怖": {"severity": "moderate", "aliases": ["恐惧", "惊恐", "惊悚"]},
"鬼": {"severity": "mild", "aliases": ["鬼魂", "幽灵", "亡灵", "冤魂"]},
"丧尸": {"severity": "moderate", "aliases": ["僵尸", "行尸", "活死人"]},
"尸体": {"severity": "moderate", "aliases": ["死尸", "尸首", "遗体"]},
"腐烂": {"severity": "severe", "aliases": ["腐尸", "腐烂", "发臭"]},
"肢解": {"severity": "severe", "aliases": ["分尸", "碎尸", "断肢"]},
"诅咒": {"severity": "mild", "aliases": ["咒语", "诅咒", "邪咒"]},
"噩梦": {"severity": "mild", "aliases": ["梦魇", "恶梦"]},
"怪物": {"severity": "mild", "aliases": ["妖怪", "怪兽", "魔物"]},
"黑暗": {"severity": "mild", "aliases": ["阴森", "阴暗", "黑暗"]},
"尖叫": {"severity": "mild", "aliases": ["惨叫", "嘶叫"]},
"horror": {"severity": "moderate", "aliases": ["terrifying", "horrifying"]},
"zombie": {"severity": "moderate", "aliases": ["undead", "walking dead"]},
"ghost": {"severity": "mild", "aliases": ["haunted", "phantom", "specter"]},
"gore": {"severity": "severe", "aliases": ["gory", "gruesome", "grotesque"]}
}
}
```
### assets/keyword_databases/profanity_keywords.json
```json
{
"category": "profanity",
"version": "1.0",
"keywords": {
"操": {"severity": "severe", "aliases": ["草", "艹"]},
"妈的": {"severity": "moderate", "aliases": ["他妈的", "你妈的", "妈了个"]},
"傻逼": {"severity": "severe", "aliases": ["沙比", "煞笔"]},
"狗屎": {"severity": "moderate", "aliases": ["狗屁", "放屁"]},
"贱": {"severity": "moderate", "aliases": ["贱人", "贱货", "下贱"]},
"滚": {"severity": "mild", "aliases": ["滚蛋", "滚开"]},
"废物": {"severity": "mild", "aliases": ["废柴", "没用"]},
"混蛋": {"severity": "moderate", "aliases": ["浑蛋", "王八蛋"]},
"畜生": {"severity": "severe", "aliases": ["禽兽", "牲口"]},
"该死": {"severity": "mild", "aliases": ["去死", "找死"]},
"fuck": {"severity": "severe", "aliases": ["fucking", "f*ck"]},
"shit": {"severity": "moderate", "aliases": ["bullshit", "crap"]},
"damn": {"severity": "mild", "aliases": ["damned", "goddamn"]}
}
}
```
### assets/keyword_databases/sexual_keywords.json
```json
{
"category": "sexual",
"version": "1.0",
"keywords": {
"裸": {"severity": "moderate", "aliases": ["裸体", "裸露", "全裸", "赤裸"]},
"性": {"severity": "moderate", "aliases": ["性行为", "性关系", "性暗示"]},
"色情": {"severity": "severe", "aliases": ["淫秽", "黄色", "情色"]},
"诱惑": {"severity": "mild", "aliases": ["挑逗", "勾引", "撩拨"]},
"抚摸": {"severity": "mild", "aliases": ["爱抚", "触摸", "摩挲"]},
"亲密": {"severity": "mild", "aliases": ["亲吻", "拥吻", "热吻"]},
"暴露": {"severity": "moderate", "aliases": ["露骨", "衣衫不整"]},
"侵犯": {"severity": "severe", "aliases": ["性侵", "猥亵", "骚扰", "非礼"]},
"卖淫": {"severity": "severe", "aliases": ["嫖", "妓"]},
"nude": {"severity": "moderate", "aliases": ["naked", "nudity"]},
"sexual": {"severity": "moderate", "aliases": ["sexually", "intercourse"]},
"explicit": {"severity": "severe", "aliases": ["pornographic", "obscene"]}
}
}
```
### assets/keyword_databases/substance_keywords.json
```json
{
"category": "substance",
"version": "1.0",
"keywords": {
"毒品": {"severity": "severe", "aliases": ["吸毒", "贩毒", "制毒", "毒贩"]},
"大麻": {"severity": "moderate", "aliases": ["marijuana", "cannabis"]},
"海洛因": {"severity": "severe", "aliases": ["白粉", "冰毒", "摇头丸"]},
"注射": {"severity": "moderate", "aliases": ["打针", "注射器", "针头"]},
"吸食": {"severity": "moderate", "aliases": ["吸粉", "嗑药"]},
"醉": {"severity": "mild", "aliases": ["醉酒", "喝醉", "烂醉", "酗酒"]},
"烟": {"severity": "mild", "aliases": ["抽烟", "吸烟", "香烟", "烟瘾"]},
"赌": {"severity": "moderate", "aliases": ["赌博", "赌场", "豪赌", "赌注"]},
"drug": {"severity": "severe", "aliases": ["drugs", "narcotics", "cocaine", "heroin"]},
"smoke": {"severity": "mild", "aliases": ["smoking", "cigarette"]},
"alcohol": {"severity": "mild", "aliases": ["drunk", "drinking", "intoxicated"]},
"gamble": {"severity": "moderate", "aliases": ["gambling", "casino", "betting"]}
}
}
```
### assets/keyword_databases/violence_keywords.json
```json
{
"category": "violence",
"version": "1.0",
"keywords": {
"杀": {"severity": "severe", "aliases": ["杀死", "杀害", "杀掉", "杀人", "屠杀", "击杀"]},
"砍": {"severity": "moderate", "aliases": ["砍伤", "砍杀", "砍头"]},
"刺": {"severity": "moderate", "aliases": ["刺伤", "刺杀", "刺穿", "捅"]},
"打": {"severity": "mild", "aliases": ["打架", "打斗", "殴打", "打人"]},
"血": {"severity": "moderate", "aliases": ["流血", "血迹", "鲜血", "血腥", "血泊", "血溅"]},
"虐待": {"severity": "severe", "aliases": ["虐杀", "施虐", "折磨", "酷刑"]},
"爆炸": {"severity": "moderate", "aliases": ["炸弹", "爆破", "引爆", "炸毁"]},
"枪": {"severity": "moderate", "aliases": ["开枪", "射击", "枪杀", "枪击"]},
"斩": {"severity": "severe", "aliases": ["斩首", "斩杀", "斩断"]},
"绞": {"severity": "severe", "aliases": ["绞杀", "勒死", "绞刑", "窒息"]},
"暴力": {"severity": "moderate", "aliases": ["暴打", "暴行", "暴虐"]},
"残忍": {"severity": "severe", "aliases": ["残杀", "残暴", "残害"]},
"伤口": {"severity": "mild", "aliases": ["创伤", "伤疤", "伤痕"]},
"搏斗": {"severity": "mild", "aliases": ["格斗", "肉搏", "缠斗"]},
"毒": {"severity": "moderate", "aliases": ["下毒", "中毒", "毒杀", "毒药"]},
"kill": {"severity": "severe", "aliases": ["killing", "murder", "slaughter"]},
"stab": {"severity": "severe", "aliases": ["stabbing", "stabbed"]},
"torture": {"severity": "severe", "aliases": ["torment", "torturing"]},
"blood": {"severity": "moderate", "aliases": ["bloody", "bleeding", "bloodshed"]},
"fight": {"severity": "mild", "aliases": ["fighting", "brawl"]}
}
}
```
### assets/rating_rules/china_rating.json
```json
{
"version": "1.0",
"description": "中国内容分级规则(参考广电总局相关规定)",
"ratings": {
"all_ages": {
"description": "适合所有年龄段",
"max_severity": "mild",
"max_hits_mild": 5,
"max_hits_moderate": 0,
"max_hits_severe": 0,
"forbidden_categories": ["sexual", "substance"]
},
"12+": {
"description": "12 岁以上,需家长指导",
"max_severity": "moderate",
"max_hits_mild": -1,
"max_hits_moderate": 5,
"max_hits_severe": 0,
"forbidden_categories": []
},
"18+": {
"description": "18 岁以上",
"max_severity": "severe",
"max_hits_mild": -1,
"max_hits_moderate": -1,
"max_hits_severe": 3,
"forbidden_categories": []
}
},
"non_compliant_triggers": [
{"description": "色情内容严重", "category": "sexual", "severity": "severe", "min_count": 1},
{"description": "严重暴力过多", "severity": "severe", "min_count": 5},
{"description": "毒品相关严重内容", "category": "substance", "severity": "severe", "min_count": 1}
]
}
```
### assets/rating_rules/general_rating.json
```json
{
"version": "1.0",
"description": "通用内容分级规则(参考 ESRB/PEGI 思路)",
"ratings": {
"all_ages": {
"description": "Everyone / PEGI 3",
"max_severity": "mild",
"max_hits_mild": 3,
"max_hits_moderate": 0,
"max_hits_severe": 0,
"forbidden_categories": ["sexual", "substance"]
},
"12+": {
"description": "Teen / PEGI 12",
"max_severity": "moderate",
"max_hits_mild": -1,
"max_hits_moderate": 8,
"max_hits_severe": 0,
"forbidden_categories": []
},
"18+": {
"description": "Mature / PEGI 18",
"max_severity": "severe",
"max_hits_mild": -1,
"max_hits_moderate": -1,
"max_hits_severe": 5,
"forbidden_categories": []
}
},
"non_compliant_triggers": [
{"description": "Extreme sexual content", "category": "sexual", "severity": "severe", "min_count": 2},
{"description": "Excessive severe violence", "severity": "severe", "min_count": 8},
{"description": "Drug glorification", "category": "substance", "severity": "severe", "min_count": 2}
]
}
```
### assets/report_templates/full_report.md
```markdown
# AI短剧合规审查报告
**报告 ID**: {{report_id}}
**生成时间**: {{generated_at}}
**输入文件**: {{input_file}}
## 总体评估
| 项目 | 结果 |
|------|------|
| 风险等级 | {{overall_risk_level}} |
| 合规得分 | {{overall_score}}/100 |
## 违规摘要
{{#each violation_summary}}
- [{{severity}}] **{{type}}**: {{description}}
{{/each}}
## 版权侵权检测
- 总段落数: {{copyright.total_paragraphs}}
- 可疑段落: {{copyright.suspicious_paragraphs}}
- 最高相似度: {{copyright.max_similarity_score}}
- 风险等级: {{copyright.risk_level}}
## 年龄分级合规
- 建议分级: {{age_rating.suggested_rating}}
- 目标分级: {{age_rating.target_rating}}
- 是否合规: {{age_rating.is_compliant}}
- 总命中数: {{age_rating.total_hits}}
## 小说改编检测
- 偏离度: {{adaptation.deviation_score}}/100
- 改编类型: {{adaptation.adaptation_type}}
- 总偏离数: {{adaptation.total_deviations}}
## 整改建议
{{#each remediation_suggestions}}
{{@index}}. {{this}}
{{/each}}
---
*本报告由 ai-drama-review 自动生成,仅供参考,不作为法律依据。*
```
### assets/report_templates/summary_report.md
```markdown
# 合规审查摘要
**文件**: {{input_file}} | **时间**: {{generated_at}}
## 结论: {{overall_risk_level}} ({{overall_score}}/100)
{{#if violations}}
### 发现的问题:
{{#each violation_summary}}
- {{description}}
{{/each}}
### 建议:
{{#each remediation_suggestions}}
- {{this}}
{{/each}}
{{else}}
未发现明显合规风险。
{{/if}}
---
*仅供参考,不作为法律依据。*
```
### references/adaptation_analysis.md
```markdown
# 改编检测评估标准
## 偏离度评分体系 (0-100)
### 偏离类型权重
| 类型 | 权重 | 说明 |
|------|------|------|
| plot_removed | ×3.0 | 删除原著情节(最严重) |
| character_changed | ×2.5 | 角色设定改变 |
| plot_modified | ×2.0 | 修改原著情节 |
| setting_changed | ×1.5 | 世界观/设定改变 |
| plot_added | ×1.0 | 新增原创情节 |
### 严重度权重
| 严重度 | 权重 | 标准 |
|--------|------|------|
| major | ×2.0 | 核心情节/角色被大幅改变 |
| moderate | ×1.0 | 次要元素被修改 |
| minor | ×0.5 | 微调,不影响核心 |
### 改编分类
| 评分范围 | 类型 | 说明 |
|----------|------|------|
| 0 - 30 | 忠实改编 | 保留原著核心精神和主要情节 |
| 30 - 60 | 合理改编 | 有较大改动但保持原著基本框架 |
| 60 - 100 | 严重魔改 | 大幅偏离原著,可能引发版权争议 |
## 章节对齐算法
使用 Needleman-Wunsch 变体(全局序列对齐):
- 匹配得分:两段文本的字符 n-gram Jaccard 相似度
- 跳过惩罚:-0.1(允许删除/新增,但有代价)
- 回溯获取最优对齐
对齐结果状态:
- matched: 原著与改编高度对应(相似度 ≥ 0.3)
- modified: 有对应但内容已改变(相似度 < 0.3)
- removed: 原著段落在改编中被删除
- added: 改编中新增了原著没有的内容
## 使用建议
- 偏离度 0-30 通常不需要特别关注
- 偏离度 30-60 建议审查核心情节是否被不当修改
- 偏离度 60+ 强烈建议确认是否已获得原著权利人的改编授权
- 角色命运的重大改变(如将存活角色改为死亡)属于高风险魔改
```
### references/age_rating_standards.md
```markdown
# 年龄分级标准说明
## 中国标准(参考)
中国目前尚无统一的影视内容分级制度,但广电总局有相关管理规定。本工具参考以下原则:
### 全年龄 (all_ages)
- 不含暴力、恐怖、色情、烟酒毒品等内容
- 语言文明,无脏话
- 适合所有年龄段观众
### 12+
- 可包含轻度冲突和紧张情节
- 不含色情和毒品内容
- 轻度暴力描写不超过 5 处
- 无严重暴力、血腥场面
### 18+
- 可包含较强暴力和恐怖元素
- 可包含成人主题讨论
- 严重暴力描写不超过 3 处
- 不含极端残忍或虐待内容
### 不合规 (non_compliant)
- 含严重色情内容
- 含极端暴力(超过 5 处严重暴力)
- 含毒品美化内容
- 需要修改后才能发布
## 国际参考
### ESRB(美国)
- E (Everyone) → 全年龄
- T (Teen, 13+) → 12+
- M (Mature, 17+) → 18+
- AO (Adults Only) → 不合规
### PEGI(欧洲)
- PEGI 3 → 全年龄
- PEGI 12 → 12+
- PEGI 18 → 18+
## 检测类别
| 类别 | 说明 | 严重度范围 |
|------|------|-----------|
| violence | 暴力内容 | mild ~ severe |
| sexual | 色情/性相关 | mild ~ severe |
| horror | 恐怖/惊悚 | mild ~ severe |
| profanity | 脏话/不当言语 | mild ~ severe |
| substance | 烟酒/毒品 | mild ~ severe |
## 免责声明
本工具的分级建议仅供参考,不代表任何官方分级结论。内容创作者应自行判断并遵守当地法律法规。
```
### references/copyright_detection_guide.md
```markdown
# 版权检测方法论
## 检测算法
### 1. n-gram Jaccard 系数
- 将文本切分为字符级 3-gram(如 "你好世界" → {"你好世", "好世界"})
- 计算两组 n-gram 的 Jaccard 系数:|A∩B| / |A∪B|
- 优势:对局部词汇重复敏感,计算快速
- 局限:无法检测同义替换
### 2. 归一化编辑距离
- 计算 Levenshtein 编辑距离(插入/删除/替换操作数)
- 归一化:distance / max(len_a, len_b)
- 优势:衡量整体文本差异
- 局限:对长文本计算较慢(已优化为 O(min(m,n)) 空间)
### 3. TF-IDF 余弦相似度
- 对分词后的文本构建 TF-IDF 向量
- 计算向量间的余弦相似度
- 优势:捕捉语义主题层面的相似性
- 局限:对词序不敏感
### 综合评分
- 加权平均:n-gram(0.3) + 编辑距离(0.3) + 余弦(0.4)
- 阈值:默认 0.7(可配置)
## 风险等级判定
| 等级 | 条件 |
|------|------|
| critical | 最高分 ≥ 0.95 或可疑比例 ≥ 50% |
| high | 最高分 ≥ 0.85 或可疑比例 ≥ 30% |
| medium | 最高分 ≥ 0.70 或可疑比例 ≥ 10% |
| low | 无可疑段落 |
## 局限性说明
- 本地算法无法判断"合理引用"与"侵权"的法律界限
- 通用表达(如成语、常用句式)可能导致误报
- AI 深度分析层可辅助排除误报
- 检测结果仅供参考,最终判断需法律专业意见
```
### scripts/content_analyzer.py
```python
"""
AI 深度内容分析模块
调用 OpenAI / Anthropic API 进行深层内容理解。
使用 urllib.request,零外部依赖。
"""
import json
import os
import sys
import urllib.request
import urllib.error
from pathlib import Path
from typing import Optional
sys.path.insert(0, str(Path(__file__).parent))
from credential_manager import get_credential, list_available_providers
def _call_openai(prompt: str, system_prompt: str = "",
model: str = "gpt-4o") -> str:
"""调用 OpenAI API。"""
api_key = get_credential("openai")
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 2000,
}).encode("utf-8")
req = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
)
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
return data["choices"][0]["message"]["content"]
def _call_anthropic(prompt: str, system_prompt: str = "",
model: str = "claude-sonnet-4-20250514") -> str:
"""调用 Anthropic API。"""
api_key = get_credential("anthropic")
payload = json.dumps({
"model": model,
"max_tokens": 2000,
"system": system_prompt or "You are a content compliance analyst.",
"messages": [{"role": "user", "content": prompt}],
}).encode("utf-8")
req = urllib.request.Request(
"https://api.anthropic.com/v1/messages",
data=payload,
headers={
"x-api-key": api_key,
"Content-Type": "application/json",
"anthropic-version": "2023-06-01",
},
)
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
return data["content"][0]["text"]
def call_ai(prompt: str, system_prompt: str = "",
preferred_provider: str = None) -> Optional[str]:
"""
统一 AI 调用接口,自动选择可用 provider。
Returns:
AI 回复文本,无可用 provider 时返回 None
"""
available = list_available_providers()
if not available:
return None
# 确定调用顺序
providers_to_try = []
if preferred_provider and preferred_provider in available:
providers_to_try.append(preferred_provider)
for p in ["openai", "anthropic"]:
if p in available and p not in providers_to_try:
providers_to_try.append(p)
for provider in providers_to_try:
try:
if provider == "openai":
return _call_openai(prompt, system_prompt)
elif provider == "anthropic":
return _call_anthropic(prompt, system_prompt)
except Exception:
continue
return None
# === 版权分析 ===
def analyze_plagiarism_context(suspicious_pairs: list) -> Optional[dict]:
"""
让 AI 判断可疑相似段落是否构成实质性侵权。
Args:
suspicious_pairs: [{"source": str, "reference": str, "score": float}, ...]
Returns:
{"confirmed": [...], "false_positives": [...], "analysis": str}
"""
if not suspicious_pairs:
return None
pairs_text = ""
for i, pair in enumerate(suspicious_pairs[:10]): # 限制数量
pairs_text += (
f"\n--- 可疑对 {i + 1} (相似度: {pair['score']:.2f}) ---\n"
f"待检文本: {pair['source'][:200]}\n"
f"参考文本: {pair['reference'][:200]}\n"
)
prompt = (
f"以下是文本版权侵权检测中发现的可疑相似段落对。"
f"请分析每一对是否构成实质性侵权,考虑以下因素:\n"
f"1. 是否为通用表达或公共领域内容\n"
f"2. 是否存在独创性的实质相似\n"
f"3. 是否仅为同义改写但核心表达一致\n\n"
f"{pairs_text}\n\n"
f"请以 JSON 格式回复:\n"
f'{{"confirmed": [编号列表], "false_positives": [编号列表], '
f'"analysis": "整体分析说明"}}'
)
system = "你是一位版权合规分析专家,擅长判断文本是否存在侵权。请客观、准确地分析。"
result = call_ai(prompt, system)
if result:
try:
# 尝试提取 JSON
json_match = result[result.find("{"):result.rfind("}") + 1]
return json.loads(json_match)
except (json.JSONDecodeError, ValueError):
return {"analysis": result}
return None
# === 分级分析 ===
def analyze_age_rating_context(hits_with_context: list,
target_rating: str) -> Optional[dict]:
"""
让 AI 分析关键词命中的上下文,排除误报。
Args:
hits_with_context: [{"keyword": str, "context": str, "category": str}, ...]
target_rating: 目标分级
Returns:
{"confirmed": [...], "false_positives": [...], "final_rating": str}
"""
if not hits_with_context:
return None
hits_text = ""
for i, hit in enumerate(hits_with_context[:15]):
hits_text += (
f"\n{i + 1}. 关键词: {hit['keyword']} (类别: {hit['category']})\n"
f" 上下文: {hit['context']}\n"
)
prompt = (
f"以下是内容分级检测中的关键词命中项。目标分级为: {target_rating}\n"
f"请分析每个命中是否为真正的不当内容,排除以下误报情况:\n"
f"1. 否定语境(如 '不要杀人' 中的 '杀')\n"
f"2. 文学修辞或比喻用法\n"
f"3. 历史/教育引用\n"
f"4. 角色对话中的合理表达\n\n"
f"{hits_text}\n\n"
f"请以 JSON 格式回复:\n"
f'{{"confirmed": [编号列表], "false_positives": [编号列表], '
f'"final_rating": "建议分级", "reasoning": "分析说明"}}'
)
system = "你是一位内容分级审核专家,擅长判断内容的年龄适宜性。请准确区分真正的不当内容和误报。"
result = call_ai(prompt, system)
if result:
try:
json_match = result[result.find("{"):result.rfind("}") + 1]
return json.loads(json_match)
except (json.JSONDecodeError, ValueError):
return {"analysis": result}
return None
# === 改编分析 ===
def extract_plot_and_characters(text: str) -> Optional[dict]:
"""让 AI 提取结构化的情节点和角色概要。"""
# 截断过长文本
truncated = text[:5000]
prompt = (
f"请分析以下文本,提取结构化信息:\n\n"
f"{truncated}\n\n"
f"请以 JSON 格式回复:\n"
f'{{"plot_points": [{{"index": 1, "summary": "情节摘要", '
f'"characters": ["角色名"], "importance": "core|normal|minor"}}], '
f'"characters": [{{"name": "角色名", "traits": ["性格"], '
f'"relationships": {{"角色名": "关系"}}}}]}}'
)
system = "你是一位文学分析专家,擅长提取叙事结构和角色信息。"
result = call_ai(prompt, system)
if result:
try:
json_match = result[result.find("{"):result.rfind("}") + 1]
return json.loads(json_match)
except (json.JSONDecodeError, ValueError):
return {"raw_analysis": result}
return None
def analyze_adaptation_significance(deviations: list) -> Optional[dict]:
"""让 AI 评估改编偏差的严重程度和合理性。"""
if not deviations:
return None
dev_text = ""
for i, dev in enumerate(deviations[:10]):
dev_text += (
f"\n{i + 1}. 类型: {dev.get('type', 'unknown')}\n"
f" 原文: {dev.get('original', '')[:150]}\n"
f" 改编: {dev.get('adapted', '')[:150]}\n"
)
prompt = (
f"以下是原著与改编版之间的偏差列表。请评估:\n"
f"1. 每个偏差是否合理\n"
f"2. 是否偏离了原著的核心精神\n"
f"3. 整体改编质量\n\n"
f"{dev_text}\n\n"
f"请以 JSON 格式回复:\n"
f'{{"overall_assessment": "忠实改编|合理改编|严重魔改", '
f'"justified_changes": [编号], "unjustified_changes": [编号], '
f'"reasoning": "分析说明"}}'
)
system = "你是一位文学评论专家,擅长评估小说改编的质量和忠实度。"
result = call_ai(prompt, system)
if result:
try:
json_match = result[result.find("{"):result.rfind("}") + 1]
return json.loads(json_match)
except (json.JSONDecodeError, ValueError):
return {"analysis": result}
return None
# === 综合风险评估 ===
def generate_risk_assessment(all_findings: dict) -> Optional[dict]:
"""让 AI 综合所有发现,生成整体风险评估。"""
findings_text = json.dumps(all_findings, ensure_ascii=False, indent=2)
# 截断过长内容
if len(findings_text) > 4000:
findings_text = findings_text[:4000] + "\n... (已截断)"
prompt = (
f"以下是AI短剧合规审查的全部检测结果:\n\n"
f"{findings_text}\n\n"
f"请综合分析,给出:\n"
f"1. 整体风险评级(low/medium/high/critical)\n"
f"2. 最紧迫的合规问题\n"
f"3. 具体的整改建议\n\n"
f"请以 JSON 格式回复:\n"
f'{{"risk_level": "等级", "top_issues": ["问题列表"], '
f'"remediation": ["整改建议列表"], "summary": "总结"}}'
)
system = "你是一位内容合规顾问,擅长评估AI生成内容的法律和道德风险。"
result = call_ai(prompt, system)
if result:
try:
json_match = result[result.find("{"):result.rfind("}") + 1]
return json.loads(json_match)
except (json.JSONDecodeError, ValueError):
return {"analysis": result}
return None
```
### scripts/credential_manager.py
```python
"""
凭证安全管理模块 - ai-drama-review
安全原则:
- 所有凭证仅通过环境变量读取
- 零持久化:不写文件、不缓存、不打印
- 不通过命令行参数传递(避免进程列表泄露)
- 统一通过 get_credential() 函数访问
"""
import os
import sys
# AI 分析 Provider 密钥映射
_AI_PROVIDER_KEYS = {
"openai": "OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
}
_ALL_KEYS = {**_AI_PROVIDER_KEYS}
def get_credential(provider: str) -> str:
"""
从环境变量获取指定 provider 的 API 密钥。
Args:
provider: Provider 名称(如 "openai", "anthropic")
Returns:
API 密钥字符串
Raises:
ValueError: 未知的 provider 名称
EnvironmentError: 环境变量未设置
"""
env_var = _ALL_KEYS.get(provider.lower())
if not env_var:
raise ValueError(
f"未知的 provider: '{provider}'\n"
f"支持的 provider: {', '.join(sorted(_ALL_KEYS.keys()))}"
)
value = os.environ.get(env_var)
if not value:
raise EnvironmentError(
f"缺少凭证: 请设置环境变量 {env_var}\n"
f"Windows: set {env_var}=your_key_here\n"
f"Linux/macOS: export {env_var}=your_key_here"
)
return value
def list_available_providers() -> list:
"""
列出所有已配置凭证的 AI provider。
Returns:
已配置凭证的 provider 名称列表
"""
return [
provider for provider, env_var in _AI_PROVIDER_KEYS.items()
if os.environ.get(env_var)
]
def check_credential_status() -> str:
"""
检查所有凭证的配置状态。
Returns:
格式化的状态报告字符串(不包含任何密钥值)
"""
lines = ["=== 凭证配置状态 ===", ""]
lines.append("AI 分析 Provider:")
for provider, env_var in _AI_PROVIDER_KEYS.items():
status = "已配置" if os.environ.get(env_var) else "未配置"
lines.append(f" {provider:12s} ({env_var}): {status}")
available = list_available_providers()
lines.append("")
if available:
lines.append(f"运行模式: 混合模式 (本地 + AI 深度分析)")
else:
lines.append(f"运行模式: 仅本地模式 (关键词匹配)")
lines.append(f"提示: 配置 OPENAI_API_KEY 或 ANTHROPIC_API_KEY 可启用 AI 深度分析")
return "\n".join(lines)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--status":
print(check_credential_status())
elif len(sys.argv) > 1 and sys.argv[1] == "--available":
import json
print(json.dumps(list_available_providers(), indent=2, ensure_ascii=False))
else:
print("用法:")
print(" python credential_manager.py --status 查看凭证配置状态")
print(" python credential_manager.py --available 列出可用 provider")
```