story-pipeline
剧情生成管道流技能。支持多剧集连续生成、图谱管理、AI质检+人工确认的双控机制。自动管理人物、场景、钩子的关联关系。
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-shorts-builder-cn
Repository
Skill path: skills/hexidyg/shorts-builder-cn
剧情生成管道流技能。支持多剧集连续生成、图谱管理、AI质检+人工确认的双控机制。自动管理人物、场景、钩子的关联关系。
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Data / AI.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install story-pipeline into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding story-pipeline to shared team environments
- Use story-pipeline for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: story-pipeline
description: 剧情生成管道流技能。支持多剧集连续生成、图谱管理、AI质检+人工确认的双控机制。自动管理人物、场景、钩子的关联关系。
---
# 剧情生成管道流 Skill
## 功能说明
本技能实现一个完整的剧情生成管道:
1. **连续剧集生成** - 基于前集内容自动生成下一集
2. **图谱管理** - 人物、场景、钩子的关联存储与查询
3. **双确认控制** - AI质检通过后,等待人工确认
4. **状态持久化** - 支持暂停、恢复、多剧本并行
---
## 核心工作流程
```
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐
│ 生成第N集 │ -> │ AI质检 │ -> │ 图谱存储 │ -> │ 等待人工确认 │
└──────────┘ └──────────┘ └──────────┘ └──────────────┘
│ │ │ │
▼ ▼ ▼ ▼
查询图谱 通过/重试 存储关联 继续/修改/结束
```
---
## 使用方法
### 启动新管道
```
用户:开始一个新的剧情管道,主题:中国小女孩逆袭,目标集数:10集
```
### 继续管道
```
用户:继续剧情管道 [pipeline_id]
```
### 用户确认操作
```
用户:通过,继续下一集
用户:修改:[具体意见]
用户:暂停
用户:结束
```
---
## 双确认控制机制
### 第一层:AI 质量检查
每集生成后,AI 自动检查:
| 检查项 | 说明 |
|--------|------|
| 剧情连贯性 | 与前集衔接是否自然 |
| 人物一致性 | 角色行为是否符合设定 |
| 钩子处理 | 新增/闭环钩子是否合理 |
| 节奏控制 | 剧情推进是否恰当 |
| 情感曲线 | 情绪起伏是否合理 |
**评分标准:** 0-10分,低于7分触发重试(最多3次)
### 第二层:人工确认
AI 审核通过后,展示预览并等待用户确认:
```
📋 第N集预览
━━━━━━━━━━━━━━━━━━━━━━━━
【剧情摘要】
...
✅ AI 评分:8.5/10
✅ 钩子状态:新增1个,闭环1个
请选择:
1️⃣ 通过,继续下一集
2️⃣ 需要修改(请说明)
3️⃣ 暂停
4️⃣ 结束
━━━━━━━━━━━━━━━━━━━━━━━━
```
---
## 图谱管理
### 图谱查询
生成第N集前,查询第N-1集的图谱:
```python
graph_manager.query_graph(pipeline_id, episode=N-1)
```
返回:
- 上一集的完整内容
- 人物列表及其状态
- 未闭环的钩子
- 关键场景
- 关系网络
### 图谱存储
用户确认后,存储当前集的完整内容:
```python
graph_manager.save_graph(pipeline_id, episode=N, content)
```
存储内容:整集的生成结果(不拆分要素)
---
## 状态管理
### 状态文件:`data/pipeline_state.json`
```json
{
"pipelines": {
"pipeline_2026-03-05-001": {
"theme": "中国小女孩逆袭",
"target_episodes": 10,
"current_episode": 3,
"status": "waiting_user_confirm",
"created_at": "2026-03-05T15:00:00",
"updated_at": "2026-03-05T15:30:00",
"ai_review": {
"score": 8.5,
"checks": {...}
},
"last_output": {
"episode": 3,
"summary": "...",
"content": "完整内容..."
}
}
}
}
```
### 状态类型
| 状态 | 说明 |
|------|------|
| `generating` | 正在生成 |
| `ai_reviewing` | AI 审核中 |
| `waiting_user_confirm` | 等待人工确认 |
| `paused` | 已暂停 |
| `completed` | 已完成 |
| `error` | 错误状态 |
---
## 脚本说明
### pipeline.py - 主控循环
- 初始化管道
- 协调各模块
- 处理用户指令
- 管理循环状态
### ai_reviewer.py - AI 质量检查
- 执行质量评分
- 生成审核报告
- 判断是否通过
### episode_generator.py - 剧集生成
- 基于图谱上下文生成新剧集
- 处理钩子的延续与闭环
- 处理重试逻辑
### graph_manager.py - 图谱管理
- 图谱查询(调用远程接口)
- 图谱存储(调用远程接口)
- 本地缓存管理
---
## 接口说明
### 图谱接口
**URL:** `https://framedream.art/n8n/webhook-test/open_frame_construct`
**查询接口:**
```json
{
"action": "query",
"pipeline_id": "pipeline_2026-03-05-001",
"episode": 2
}
```
**存储接口:**
```json
{
"action": "save",
"pipeline_id": "pipeline_2026-03-05-001",
"episode": 3,
"content": "第3集的完整生成内容..."
}
```
---
## 剧集生成逻辑
### 第一集生成
基于用户提供的主题和目标,生成:
- 主要人物设定
- 初始场景
- 核心冲突
- 开放钩子
### 后续剧集生成
基于图谱查询结果:
1. 读取上一集内容和未闭环钩子
2. 延续剧情主线
3. 处理钩子(延续/闭环/新增)
4. 推进人物成长弧
5. 调整情感曲线
### 结局生成
当达到目标集数或用户要求结束时:
- 闭环所有剩余钩子
- 完成人物成长弧
- 生成总结性结局
---
## 注意事项
1. **重试机制** - AI 审核不通过时,最多重试 3 次
2. **暂停恢复** - 暂停后可通过 pipeline_id 恢复
3. **多管道** - 支持同时运行多个不同主题的管道
4. **图谱一致性** - 确保人物、钩子的关联关系正确
5. **钩子管理** - 追踪每个钩子的创建和闭环状态
---
## 完整工作流程
### 步骤 1:创建管道
```
用户:开始一个新管道,主题:修仙少年,目标20集
AI:好的,创建管道 pipeline_20260305160000
主题:修仙少年
目标:20集
风格:写实电影感
状态:已初始化,准备生成第1集
```
### 步骤 2:生成剧集
AI 调用 `start_generation(pipeline_id)` 获取生成提示词,然后根据提示词生成剧集内容。
### 步骤 3:提交AI审核
AI 调用 `submit_episode(pipeline_id, episode, content)` 提交生成的内容,然后执行AI审核。
### 步骤 4:处理审核结果
AI 调用 `process_ai_review(pipeline_id, episode, ai_result, content)` 处理审核结果。
如果审核不通过(得分<7),自动重试(最多3次)。
### 步骤 5:等待用户确认
审核通过后,展示预览并等待用户确认:
```
📋 第1集预览
━━━━━━━━━━━━━━━━━━━━━━━━
【修仙之路开启】
少年李云在山中发现一块神秘玉佩,
从此踏上了修仙之路...
✅ AI 评分:9.0/10
✅ 新增钩子:H-001 神秘玉佩来历
━━━━━━━━━━━━━━━━━━━━━━━━
请选择:
1️⃣ 通过,继续下一集
2️⃣ 需要修改(请说明)
3️⃣ 暂停
4️⃣ 结束
```
### 步骤 6:处理用户确认
用户确认后,AI 调用 `user_confirm(pipeline_id, action, note)`:
- **approve**:存储图谱,准备下一集
- **modify**:根据意见重新生成
- **pause**:暂停管道
- **end**:结束管道
### 步骤 7:循环生成
重复步骤 2-6,直到达到目标集数或用户结束。
---
## API 接口说明
### 创建管道
```python
create_pipeline(theme: str, target_episodes: int, style: str = "写实电影感")
# 返回:{"success": True, "pipeline_id": "...", "message": "..."}
```
### 开始生成
```python
start_generation(pipeline_id: str)
# 返回:{"success": True, "episode": N, "prompt": "生成提示词"}
```
### 提交剧集
```python
submit_episode(pipeline_id: str, episode: int, content: str)
# 返回:{"success": True, "review_prompt": "审核提示词"}
```
### 处理AI审核
```python
process_ai_review(pipeline_id: str, episode: int, ai_result: str, content: str)
# 返回:{"success": True, "passed": True/False, ...}
```
### 用户确认
```python
user_confirm(pipeline_id: str, action: str, note: str = None)
# action: "approve" / "modify" / "pause" / "end"
# 返回:{"success": True, "status": "...", ...}
```
### 获取状态
```python
get_status(pipeline_id: str)
# 返回:管道当前状态信息
```
### 列出管道
```python
list_pipelines()
# 返回:所有管道列表
```
### 恢复管道
```python
resume_pipeline(pipeline_id: str)
# 返回:恢复结果
```
---
## 示例对话
```
用户:开始一个新管道,主题:修仙少年,目标20集
AI:好的,创建管道 pipeline_20260305160000
正在生成第1集...
📋 第1集预览
━━━━━━━━━━━━━━━━━━
【修仙之路开启】
少年李云在山中发现一块神秘玉佩...
✅ AI 评分:9.0/10
✅ 新增钩子:H-001 神秘玉佩来历
请确认:通过/修改/暂停/结束
用户:通过
AI:正在存储图谱...
开始生成第2集...
```
---
## 注意事项
1. **重试机制**:AI审核不通过时,最多重试3次
2. **状态持久化**:所有状态保存在 `data/pipeline_state.json`
3. **图谱存储**:通过远程接口存储,需要网络连接
4. **暂停恢复**:暂停后可通过 `resume_pipeline` 恢复
5. **多管道**:支持同时运行多个不同主题的管道
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### data/pipeline_state.json
```json
{
"pipelines": {
"pipeline_20260305182013": {
"pipeline_id": "pipeline_20260305182013",
"theme": "末日地球移民外星",
"target_episodes": 5,
"current_episode": 5,
"status": "completed",
"created_at": "2026-03-05T18:20:13.642511",
"updated_at": "2026-03-05T19:33:03.210508",
"style": "科幻史诗感",
"retry_count": 0,
"ai_review": {
"score": 9.5,
"checks": {
"剧情连贯性": {
"score": 10,
"comment": "与全剧完美收官,所有线索都有回应"
},
"人物一致性": {
"score": 9,
"comment": "每个角色都有合理的结局,情感饱满"
},
"钩子处理": {
"score": 10,
"comment": "5个钩子全部闭环,没有遗漏,收尾干净"
},
"节奏控制": {
"score": 9,
"comment": "从着陆到建设,节奏舒缓但有张力"
},
"情感曲线": {
"score": 10,
"comment": "从紧张到释放,从思乡到新生,情感高潮完美"
},
"创新性": {
"score": 9,
"comment": "星辰的名字和象征意义,林远山的录像设计,都是亮点"
}
},
"suggestions": [],
"summary": "大结局完美收官,所有钩子闭环,人物各有归宿,主题升华到位。星辰的名字是点睛之笔,代表着人类的希望与未来。一部完整的科幻史诗。"
},
"last_output": {
"episode": 5,
"content": "## 第5集:新希望(大结局)\n\n### 剧情摘要\n飞船抵达新行星\"新希望\",乘客们发现这里比想象中更美好。陈曦的孩子展现出更多秘密——他是宇宙辐射进化的新人类。林远山的影像信息揭示真相,人类移民计划有了真正的希望。陈曦给孩子取名\"星辰\",代表新的开始。\n\n### 完整内容\n\n| idx | 场景 | 人物 | 事件 | 情绪 |\n|-----|------|------|------|------|\n| 01 | 飞船下降,穿越大气层 | 舷窗外是绿色的云层 | 乘客们紧张地握着安全带,陈曦抱着孩子,看着窗外 | 紧张、期待 |\n| 02 | 新行星表面,飞船着陆 | 一片绿色的森林延伸到地平线 | 舱门打开,清新的空气涌入,人们相拥而泣 | 释放、感动 |\n| 03 | 新行星表面,探索队出发 | 李明带领第一批探索者 | 他们发现植物、水源、甚至是类似地球的生态系统 | 惊喜、希望 |\n| 04 | 飞船内部,陈曦休息 | 孩子安静地睡着 | 李婷走来检查陈曦的身体状况,两人相视而笑 | 温情、释然 |\n| 05 | 飞船控制中心,李明操作 | 他播放林远山留下的全息录像 | 林远山的身影出现在空中,开始讲述 | 怀念、重要 |\n| 06 | 全息录像中,林远山说话 | \"如果你看到这段录像,说明飞船已经安全抵达\" | 他揭露了更多秘密:新行星的发现、新希望公司的阴谋、以及他对人类的最后期望 | 揭示、感人 |\n| 07 | 录像继续 | 林远山提到陈曦的孩子 | \"宇宙辐射会带来基因变化,这个孩子...可能是人类进化的关键\" | 震惊、重要 |\n| 08 | 录像结束,控制中心安静 | 李明看向陈曦 | 陈曦低头看着孩子,眼眶湿润:\"他早就知道了\" | 感动、理解 |\n| 09 | 新行星表面,人们开始建设 | 临时营地搭建 | 有人发现土壤肥沃,有人发现清洁水源,希望的声音此起彼伏 | 希望、忙碌 |\n| 10 | 营地边缘,张毅被押送 | 他看着新世界,神情复杂 | \"我输了...但至少,人类还有未来\" | 悔恨、释然 |\n| 11 | 夜晚,营地篝火 | 人们围坐在一起 | 有人唱起了地球的歌,有人哭泣,有人微笑 | 团聚、思乡 |\n| 12 | 营地边缘,陈曦独自坐 | 她看着孩子,星空璀璨 | 孩子的眼睛微微发光,似乎在感应着什么 | 神秘、宁静 |\n| 13 | 孩子的视角,星空变换 | 他看到宇宙的能量流动 | 一种超越人类感知的景象,星辰在向他召唤 | 奇幻、宏大 |\n| 14 | 次日清晨,营地集会 | 李明宣布成立新政府 | \"从今天起,我们是新希望星的人类,我们将重建文明\" | 庄严、新生 |\n| 15 | 集会中,陈曦站起来 | 她抱着孩子走到台前 | \"我想给我的孩子取个名字...星辰\" | 温情、希望 |\n| 16 | 人群欢呼 | 人们重复着这个名字 | \"星辰...新的开始,新的希望\" | 欢乐、希望 |\n| 17 | 闪回,地球最后的画面 | 林远山站在天文台 | 他看着被雾霾笼罩的地球,轻声说:\"去吧,带着希望\" | 怀念、传承 |\n| 18 | 回到现在,新行星 | 陈曦和孩子站在高处 | 她看着脚下的绿色大地,孩子伸出手触碰阳光 | 新生、希望 |\n| 19 | 广角镜头,新行星全景 | 飞船、营地、森林、人类 | 一群人在新的家园开始新的生活 | 宏大、希望 |\n| 20 | 最后一个镜头,孩子的眼睛 | 眼中倒映着星空和地球的轮廓 | 画外音:\"人类的旅程从未结束,只是换了一个舞台\" | 升华、永恒 |\n\n### 钩子闭环\n\n- **H-003** [冲突]:飞船承载量秘密泄露 → 在新行星上,人们共同建设家园,矛盾转化为合作\n- **H-004** [秘密]:另一个宜居行星 → 正是林远山发现的第三颗行星,现在是人类的家园\n- **H-006** [危机]:李明揭露阴谋后的局势 → 张毅被制服,新政府成立,秩序恢复\n- **H-009** [悬念]:婴儿能力来源 → 宇宙辐射导致的基因进化,林远山早已预见\n- **H-010** [秘密]:新行星上等待什么 → 一个比地球更美好的家园,人类的新希望\n\n### 人物结局\n\n- **陈曦**:从工程师到母亲,再到新人类的守护者,完成了个人成长,代表着母性与希望\n- **李明**:从秘密守护者到新政府的领袖,完成了使命,代表着传承与责任\n- **林远山**(回忆):虽然留在地球,但他的预见和安排拯救了人类,代表着智慧与牺牲\n- **张毅**:阴谋失败,但最终接受现实,代表着复杂的人性\n- **星辰**(婴儿):新人类的代表,象征着人类的未来与进化\n\n### 主题升华\n\n本剧探讨了人类在绝境中的选择:是控制还是放手,是独占还是共享。最终,希望不在远方,而在于我们共同创造。人类的旅程从未结束,只是换了一个舞台。星辰代表着新生与进化,也代表着人类永不放弃的精神。\n\n### 情感曲线\n\n开场紧张期待 → 中段揭示与感动 → 高潮新生与希望 → 结尾升华与永恒\n",
"summary": "飞船抵达新行星\"新希望\",乘客们发现这里比想象中更美好。陈曦的孩子展现出更多秘密——他是宇宙辐射进化的新人类。林远山的影像信息揭示真相,人类移民计划有了真正的希望。陈曦给孩子取名\"星辰\",代表新的开始。"
},
"error_message": null
}
}
}
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "hexidyg",
"slug": "shorts-builder-cn",
"displayName": "连续短剧剧情构建",
"latest": {
"version": "1.0.0",
"publishedAt": 1772782465150,
"commit": "https://github.com/openclaw/skills/commit/679c42819b8678e859c04e2388a6ef5aa6dbc902"
},
"history": []
}
```
### scripts/__init__.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Story Pipeline Skill
剧情生成管道流技能
"""
from .pipeline import (
get_pipeline,
create_pipeline,
start_generation,
submit_episode,
process_ai_review,
user_confirm,
get_status,
list_pipelines,
resume_pipeline
)
from .graph_manager import get_graph_manager, GraphManager
from .ai_reviewer import get_ai_reviewer, AIReviewer
from .episode_generator import get_episode_generator, EpisodeGenerator
__all__ = [
# 主控函数
'get_pipeline',
'create_pipeline',
'start_generation',
'submit_episode',
'process_ai_review',
'user_confirm',
'get_status',
'list_pipelines',
'resume_pipeline',
# 管理器
'get_graph_manager',
'GraphManager',
'get_ai_reviewer',
'AIReviewer',
'get_episode_generator',
'EpisodeGenerator',
]
__version__ = '1.0.0'
```
### scripts/ai_reviewer.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI 质量检查模块
对生成的剧集进行质量评分和审核
"""
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class ReviewResult:
"""审核结果"""
passed: bool
score: float
checks: Dict[str, Dict[str, Any]]
suggestions: List[str]
summary: str
class AIReviewer:
"""AI 质量检查器"""
# 检查项及其权重
CHECK_ITEMS = {
"剧情连贯性": {
"weight": 0.25,
"description": "与前集衔接是否自然,剧情推进是否合理"
},
"人物一致性": {
"weight": 0.20,
"description": "角色行为是否符合设定,人物发展是否合理"
},
"钩子处理": {
"weight": 0.20,
"description": "新增/闭环钩子是否合理,伏笔是否恰当"
},
"节奏控制": {
"weight": 0.15,
"description": "剧情推进速度是否恰当,是否有拖沓或仓促"
},
"情感曲线": {
"weight": 0.10,
"description": "情绪起伏是否合理,是否有感染力"
},
"创新性": {
"weight": 0.10,
"description": "是否有新意,避免老套情节"
}
}
# 通过阈值
PASS_THRESHOLD = 7.0
def __init__(self):
"""初始化审核器"""
pass
def review_episode(
self,
episode_content: str,
prev_content: Optional[str] = None,
graph_data: Optional[Dict] = None,
episode_number: int = 1
) -> ReviewResult:
"""
审核单集内容
注意:实际的AI审核由调用此模块的LLM执行
此模块提供审核框架和结果结构
Args:
episode_content: 当前剧集内容
prev_content: 上一集内容(可选)
graph_data: 图谱数据(可选)
episode_number: 剧集序号
Returns:
ReviewResult 审核结果对象
"""
# 这个方法由LLM调用时填充实际审核逻辑
# 这里只返回结构框架
pass
def create_review_prompt(
self,
episode_content: str,
prev_content: Optional[str] = None,
graph_data: Optional[Dict] = None,
episode_number: int = 1
) -> str:
"""
创建审核提示词
Args:
episode_content: 当前剧集内容
prev_content: 上一集内容
graph_data: 图谱数据
episode_number: 剧集序号
Returns:
审核提示词
"""
prompt = f"""# 剧集质量审核任务
## 审核对象
- 剧集序号:第 {episode_number} 集
- 内容:{episode_content}
"""
if prev_content:
prompt += f"""## 上一集内容
{prev_content}
"""
if graph_data:
prompt += f"""## 图谱数据
{json.dumps(graph_data, ensure_ascii=False, indent=2)}
"""
prompt += """## 审核要求
请对上述剧集进行质量审核,按以下维度评分(每项0-10分):
| 维度 | 权重 | 说明 |
|------|------|------|
| 剧情连贯性 | 25% | 与前集衔接是否自然,剧情推进是否合理 |
| 人物一致性 | 20% | 角色行为是否符合设定,人物发展是否合理 |
| 钩子处理 | 20% | 新增/闭环钩子是否合理,伏笔是否恰当 |
| 节奏控制 | 15% | 剧情推进速度是否恰当 |
| 情感曲线 | 10% | 情绪起伏是否合理,是否有感染力 |
| 创新性 | 10% | 是否有新意,避免老套情节 |
## 输出格式
请以JSON格式输出审核结果:
```json
{
"passed": true/false,
"score": 综合得分(0-10),
"checks": {
"剧情连贯性": {"score": 分数, "comment": "评语"},
"人物一致性": {"score": 分数, "comment": "评语"},
"钩子处理": {"score": 分数, "comment": "评语"},
"节奏控制": {"score": 分数, "comment": "评语"},
"情感曲线": {"score": 分数, "comment": "评语"},
"创新性": {"score": 分数, "comment": "评语"}
},
"suggestions": ["改进建议1", "改进建议2"],
"summary": "总体评价"
}
```
注意:综合得分低于7分视为不通过。
"""
return prompt
def parse_review_result(self, response: str) -> ReviewResult:
"""
解析审核结果
Args:
response: AI返回的审核结果(JSON格式)
Returns:
ReviewResult 对象
"""
try:
# 尝试提取JSON
json_str = response
if "```json" in response:
json_str = response.split("```json")[1].split("```")[0]
elif "```" in response:
json_str = response.split("```")[1].split("```")[0]
data = json.loads(json_str.strip())
return ReviewResult(
passed=data.get("passed", False),
score=data.get("score", 0),
checks=data.get("checks", {}),
suggestions=data.get("suggestions", []),
summary=data.get("summary", "")
)
except (json.JSONDecodeError, KeyError) as e:
# 解析失败,返回默认结果
return ReviewResult(
passed=False,
score=0,
checks={},
suggestions=[f"解析审核结果失败: {str(e)}"],
summary="审核结果解析失败"
)
def format_review_output(self, result: ReviewResult) -> str:
"""
格式化审核结果输出
Args:
result: 审核结果
Returns:
格式化的输出字符串
"""
status_icon = "✅" if result.passed else "❌"
output = f"""
{status_icon} AI 质量审核结果
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
综合评分:{result.score:.1f}/10
审核状态:{'通过' if result.passed else '未通过'}
详细评分:
"""
for item, weight_info in self.CHECK_ITEMS.items():
check = result.checks.get(item, {})
score = check.get("score", 0)
comment = check.get("comment", "")
output += f" • {item}: {score}分 - {comment}\n"
if result.suggestions:
output += "\n改进建议:\n"
for i, suggestion in enumerate(result.suggestions, 1):
output += f" {i}. {suggestion}\n"
output += f"\n总结:{result.summary}\n"
output += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
return output
# 单例实例
_ai_reviewer = None
def get_ai_reviewer() -> AIReviewer:
"""获取AI审核器单例"""
global _ai_reviewer
if _ai_reviewer is None:
_ai_reviewer = AIReviewer()
return _ai_reviewer
```
### scripts/episode_generator.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
剧集生成模块
负责生成新的剧集内容,基于图谱上下文和钩子管理
"""
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Hook:
"""钩子数据结构"""
id: str
hook_type: str # 悬念、伏笔、冲突、秘密
description: str
created_in: int # 创建于第几集
resolved_in: Optional[int] = None # 闭环于第几集
status: str = "open" # open, resolved
importance: str = "medium" # high, medium, low
@dataclass
class Character:
"""人物数据结构"""
id: str
name: str
age: int
traits: List[str]
first_appear: int
current_status: str = "active"
development: List[str] = field(default_factory=list)
@dataclass
class EpisodeContent:
"""剧集内容结构"""
episode: int
title: str
summary: str
content: str
characters: List[Dict]
scenes: List[Dict]
hooks_created: List[Hook]
hooks_resolved: List[str] # resolved hook ids
emotional_curve: str # 情感曲线描述
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
class EpisodeGenerator:
"""剧集生成器"""
# 钩子类型
HOOK_TYPES = [
"悬念", # 让观众想知道接下来发生什么
"伏笔", # 埋下后续剧情的种子
"冲突", # 人物之间的矛盾
"秘密", # 角色隐藏的信息
"谜团", # 需要解开的谜题
"危机" # 主角面临的困境
]
def __init__(self):
"""初始化生成器"""
pass
def generate_first_episode(
self,
theme: str,
target_episodes: int,
style: str = "写实电影感"
) -> str:
"""
生成第一集的提示词
Args:
theme: 故事主题
target_episodes: 目标集数
style: 风格
Returns:
生成提示词
"""
prompt = f"""# 第一集生成任务
## 基础设定
- 故事主题:{theme}
- 总集数:{target_episodes}集
- 风格:{style}
## 第一集要求
作为开篇,第一集需要:
1. **人物引入**
- 建立1-3个主要人物
- 展示主角的核心特质和初始状态
- 暗示人物的成长潜力
2. **场景建立**
- 构建故事发生的主要场景
- 营造独特的氛围感
- 为后续剧情预留空间
3. **核心冲突**
- 引入故事的主要矛盾
- 设置主角的目标和阻碍
- 建立情感共鸣点
4. **钩子设置**
- 设置2-4个悬念钩子
- 埋下1-2个伏笔
- 让观众产生继续观看的欲望
5. **情感曲线**
- 开场建立共情
- 中段制造张力
- 结尾留下悬念
## 输出格式
请生成第一集的完整内容,包含:
```
## 第1集:[标题]
### 剧情摘要
[100字以内的摘要]
### 完整内容
[分镜表格或剧本内容]
### 人物设定
- 人物1:[设定]
- 人物2:[设定]
### 钩子清单
- H-001 [类型]:[描述]
- H-002 [类型]:[描述]
### 情感曲线
[描述本集的情绪起伏]
```
"""
return prompt
def generate_next_episode(
self,
episode_number: int,
prev_content: str,
graph_data: Dict,
open_hooks: List[Hook],
style: str = "写实电影感"
) -> str:
"""
生成后续剧集的提示词
Args:
episode_number: 当前剧集序号
prev_content: 上一集内容
graph_data: 图谱数据
open_hooks: 未闭环的钩子列表
style: 风格
Returns:
生成提示词
"""
# 格式化钩子信息
hooks_info = ""
if open_hooks:
hooks_info = "### 未闭环钩子\n"
for hook in open_hooks:
hooks_info += f"- {hook.id} [{hook.hook_type}]:{hook.description}(第{hook.created_in}集创建)\n"
prompt = f"""# 第{episode_number}集生成任务
## 上下文信息
### 上一集内容
{prev_content}
### 图谱数据
{json.dumps(graph_data, ensure_ascii=False, indent=2)}
{hooks_info}
## 第{episode_number}集要求
1. **剧情延续**
- 自然衔接上一集结尾
- 推进主线剧情发展
- 保持叙事节奏
2. **人物发展**
- 展现人物的变化和成长
- 深化人物关系
- 保持角色行为一致性
3. **钩子处理**
- 选择1-2个钩子进行延续或闭环
- 可以新增新的钩子
- 避免钩子过多导致剧情混乱
4. **情感节奏**
- 根据整体进度调整情感强度
- 中间剧集可以增加波折
- 为后续高潮做铺垫
## 输出格式
```
## 第{episode_number}集:[标题]
### 剧情摘要
[100字以内的摘要]
### 完整内容
[分镜表格或剧本内容]
### 人物变化
[描述本集中人物的变化]
### 钩子处理
- 闭环:[钩子ID及处理方式]
- 新增:[新钩子]
### 情感曲线
[描述本集的情绪起伏]
```
"""
return prompt
def generate_final_episode(
self,
episode_number: int,
prev_content: str,
graph_data: Dict,
open_hooks: List[Hook],
style: str = "写实电影感"
) -> str:
"""
生成最终集的提示词
Args:
episode_number: 最终剧集序号
prev_content: 上一集内容
graph_data: 图谱数据
open_hooks: 未闭环的钩子列表
style: 风格
Returns:
生成提示词
"""
# 格式化钩子信息
hooks_to_resolve = "### 需要闭环的钩子\n"
for hook in open_hooks:
hooks_to_resolve += f"- {hook.id} [{hook.hook_type}]:{hook.description}(第{hook.created_in}集创建)\n"
prompt = f"""# 最终集(第{episode_number}集)生成任务
## 上下文信息
### 上一集内容
{prev_content}
### 图谱数据
{json.dumps(graph_data, ensure_ascii=False, indent=2)}
{hooks_to_resolve}
## 最终集要求
作为大结局,需要:
1. **钩子闭环**
- 必须闭环所有剩余钩子
- 给每个悬念一个合理的答案
- 避免草率收尾
2. **人物结局**
- 完成主角的成长弧
- 给每个重要角色一个交代
- 可以有开放式结局,但要有情感满足
3. **情感高潮**
- 达到情感曲线的最高点
- 提供情感释放
- 让观众感到圆满或深思
4. **主题升华**
- 回应故事的核心主题
- 传递积极的价值观
- 留下回味空间
## 输出格式
```
## 第{episode_number}集:[标题](大结局)
### 剧情摘要
[100字以内的摘要]
### 完整内容
[分镜表格或剧本内容]
### 钩子闭环
- [钩子ID]:[闭环方式]
### 人物结局
[描述主要人物的最终状态]
### 主题升华
[本剧的核心主题和价值传递]
### 情感曲线
[描述最终集的情绪高潮]
```
"""
return prompt
def parse_generated_content(self, content: str) -> EpisodeContent:
"""
解析生成的内容
Args:
content: 生成的剧集内容
Returns:
EpisodeContent 对象
"""
# 简单解析,实际可能需要更复杂的逻辑
lines = content.split("\n")
title = ""
summary = ""
hooks_created = []
hooks_resolved = []
for line in lines:
if line.startswith("## 第") and ":" in line:
title = line.split(":", 1)[1].strip()
elif line.startswith("### 剧情摘要"):
# 获取摘要
idx = lines.index(line)
if idx + 1 < len(lines):
summary = lines[idx + 1].strip()
# 从内容中提取剧集序号
episode = 1
if "第" in content and "集" in content:
import re
match = re.search(r"第(\d+)集", content)
if match:
episode = int(match.group(1))
return EpisodeContent(
episode=episode,
title=title,
summary=summary,
content=content,
characters=[],
scenes=[],
hooks_created=hooks_created,
hooks_resolved=hooks_resolved,
emotional_curve=""
)
# 单例实例
_episode_generator = None
def get_episode_generator() -> EpisodeGenerator:
"""获取剧集生成器单例"""
global _episode_generator
if _episode_generator is None:
_episode_generator = EpisodeGenerator()
return _episode_generator
```
### scripts/generate_engine.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
"""
```
### scripts/graph_manager.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
图谱管理模块
使用本地JSON存储管理剧情图谱
"""
import json
import os
from typing import Dict, Optional, Any, List
from datetime import datetime
class GraphManager:
"""图谱管理器 - 本地JSON存储"""
def __init__(self, storage_dir: str = None):
"""
初始化图谱管理器
Args:
storage_dir: 存储目录,默认为 data/graphs
"""
if storage_dir is None:
# 默认存储在 data/graphs 目录
self.storage_dir = os.path.join(
os.path.dirname(__file__),
"..",
"data",
"graphs"
)
else:
self.storage_dir = storage_dir
# 确保目录存在
os.makedirs(self.storage_dir, exist_ok=True)
# 内存缓存
self.cache: Dict[str, Dict] = {}
def _get_graph_path(self, pipeline_id: str) -> str:
"""获取图谱文件路径"""
return os.path.join(self.storage_dir, f"{pipeline_id}.json")
def _load_graph(self, pipeline_id: str) -> Dict:
"""加载图谱数据"""
# 先检查缓存
if pipeline_id in self.cache:
return self.cache[pipeline_id]
# 从文件加载
graph_path = self._get_graph_path(pipeline_id)
if os.path.exists(graph_path):
try:
with open(graph_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.cache[pipeline_id] = data
return data
except (json.JSONDecodeError, Exception):
pass
# 返回空图谱
return {
"pipeline_id": pipeline_id,
"episodes": {},
"characters": {},
"hooks": [],
"scenes": {},
"relationships": []
}
def _save_graph(self, pipeline_id: str, data: Dict):
"""保存图谱数据"""
graph_path = self._get_graph_path(pipeline_id)
# 更新缓存
self.cache[pipeline_id] = data
# 保存到文件
with open(graph_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def query_graph(self, pipeline_id: str, episode: int) -> Dict[str, Any]:
"""
查询图谱 - 获取指定剧集的图谱数据
Args:
pipeline_id: 管道ID
episode: 剧集序号
Returns:
图谱数据字典
"""
graph = self._load_graph(pipeline_id)
# 获取指定剧集的内容
episode_data = graph.get("episodes", {}).get(str(episode), {})
if not episode_data:
return {
"success": False,
"error": f"第{episode}集数据不存在",
"data": None
}
# 提取未闭环的钩子
open_hooks = [h for h in graph.get("hooks", []) if h.get("status") == "open"]
return {
"success": True,
"episode": episode,
"content": episode_data.get("content", ""),
"summary": episode_data.get("summary", ""),
"characters": self._get_episode_characters(graph, episode),
"hooks": open_hooks,
"scenes": episode_data.get("scenes", []),
"relationships": graph.get("relationships", [])
}
def save_graph(
self,
pipeline_id: str,
episode: int,
content: str,
summary: str = "",
characters: List[Dict] = None,
hooks_created: List[Dict] = None,
hooks_resolved: List[str] = None,
scenes: List[Dict] = None
) -> Dict[str, Any]:
"""
存储图谱 - 保存当前剧集的完整内容
在人工审核通过后调用此方法存储图谱
Args:
pipeline_id: 管道ID
episode: 剧集序号
content: 完整的剧集内容
summary: 摘要
characters: 人物列表
hooks_created: 新增的钩子
hooks_resolved: 闭环的钩子ID列表
scenes: 场景列表
Returns:
存储结果
"""
graph = self._load_graph(pipeline_id)
# 保存剧集内容
if "episodes" not in graph:
graph["episodes"] = {}
graph["episodes"][str(episode)] = {
"content": content,
"summary": summary,
"characters": characters or [],
"scenes": scenes or [],
"timestamp": datetime.now().isoformat()
}
# 更新人物
if characters:
for char in characters:
char_id = char.get("id", f"C-{char.get('name', 'unknown')}")
graph["characters"][char_id] = {
**char,
"last_appear": episode
}
# 更新钩子
if "hooks" not in graph:
graph["hooks"] = []
# 处理新增钩子
if hooks_created:
for hook in hooks_created:
hook["id"] = hook.get("id", f"H-{len(graph['hooks']) + 1}")
hook["created_in"] = episode
hook["status"] = "open"
graph["hooks"].append(hook)
# 处理闭环钩子
if hooks_resolved:
for hook in graph["hooks"]:
if hook.get("id") in hooks_resolved:
hook["status"] = "resolved"
hook["resolved_in"] = episode
# 更新场景
if scenes:
for scene in scenes:
scene_id = scene.get("id", f"S-{scene.get('name', 'unknown')}")
if scene_id not in graph["scenes"]:
graph["scenes"][scene_id] = {
**scene,
"episodes": [episode]
}
else:
if episode not in graph["scenes"][scene_id].get("episodes", []):
graph["scenes"][scene_id].setdefault("episodes", []).append(episode)
# 保存
self._save_graph(pipeline_id, graph)
return {
"success": True,
"message": f"第{episode}集图谱存储成功",
"episode": episode,
"pipeline_id": pipeline_id
}
def get_open_hooks(self, pipeline_id: str) -> List[Dict]:
"""
获取未闭环的钩子列表
Args:
pipeline_id: 管道ID
Returns:
未闭环钩子列表
"""
graph = self._load_graph(pipeline_id)
return [h for h in graph.get("hooks", []) if h.get("status") == "open"]
def get_all_hooks(self, pipeline_id: str) -> List[Dict]:
"""
获取所有钩子
Args:
pipeline_id: 管道ID
Returns:
所有钩子列表
"""
graph = self._load_graph(pipeline_id)
return graph.get("hooks", [])
def get_characters(self, pipeline_id: str) -> Dict:
"""
获取所有人物
Args:
pipeline_id: 管道ID
Returns:
人物字典
"""
graph = self._load_graph(pipeline_id)
return graph.get("characters", {})
def get_full_graph(self, pipeline_id: str) -> Dict:
"""
获取完整图谱
Args:
pipeline_id: 管道ID
Returns:
完整图谱数据
"""
return self._load_graph(pipeline_id)
def delete_graph(self, pipeline_id: str) -> Dict:
"""
删除图谱
Args:
pipeline_id: 管道ID
Returns:
删除结果
"""
# 清除缓存
if pipeline_id in self.cache:
del self.cache[pipeline_id]
# 删除文件
graph_path = self._get_graph_path(pipeline_id)
if os.path.exists(graph_path):
os.remove(graph_path)
return {"success": True, "message": "图谱已删除"}
return {"success": False, "error": "图谱不存在"}
def _get_episode_characters(self, graph: Dict, episode: int) -> List[Dict]:
"""获取指定剧集涉及的人物"""
episode_data = graph.get("episodes", {}).get(str(episode), {})
char_ids = episode_data.get("characters", [])
characters = []
for char_id in char_ids:
if isinstance(char_id, str):
char_data = graph.get("characters", {}).get(char_id, {})
if char_data:
characters.append(char_data)
elif isinstance(char_id, dict):
characters.append(char_id)
return characters
def clear_cache(self, pipeline_id: Optional[str] = None):
"""
清除缓存
Args:
pipeline_id: 指定管道ID,为None时清除所有缓存
"""
if pipeline_id:
if pipeline_id in self.cache:
del self.cache[pipeline_id]
else:
self.cache = {}
# 单例实例
_graph_manager = None
def get_graph_manager() -> GraphManager:
"""获取图谱管理器单例"""
global _graph_manager
if _graph_manager is None:
_graph_manager = GraphManager()
return _graph_manager
```
### scripts/pipeline.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
剧情管道主控模块
协调各模块,管理双确认控制流程
"""
import json
import os
from datetime import datetime
from typing import Dict, Optional, Any
from dataclasses import dataclass, asdict
# 导入其他模块
from graph_manager import get_graph_manager, GraphManager
from ai_reviewer import get_ai_reviewer, AIReviewer, ReviewResult
from episode_generator import get_episode_generator, EpisodeGenerator
# 状态文件路径
STATE_FILE = os.path.join(os.path.dirname(__file__), "..", "data", "pipeline_state.json")
# 注意:人工审核没有最大次数限制,用户可以一直要求修改直到满意
@dataclass
class PipelineState:
"""管道状态"""
pipeline_id: str
theme: str
target_episodes: int
current_episode: int
status: str
created_at: str
updated_at: str
style: str = "写实电影感"
retry_count: int = 0
ai_review: Optional[Dict] = None
last_output: Optional[Dict] = None
error_message: Optional[str] = None
class StoryPipeline:
"""剧情管道控制器"""
def __init__(self):
"""初始化管道控制器"""
self.graph_manager = get_graph_manager()
self.ai_reviewer = get_ai_reviewer()
self.episode_generator = get_episode_generator()
self.states: Dict[str, PipelineState] = {}
self._load_states()
def _load_states(self):
"""加载状态文件"""
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
for pid, state in data.get("pipelines", {}).items():
self.states[pid] = PipelineState(**state)
except (json.JSONDecodeError, Exception) as e:
print(f"加载状态文件失败: {e}")
def _save_states(self):
"""保存状态文件"""
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
data = {
"pipelines": {
pid: asdict(state) for pid, state in self.states.items()
}
}
with open(STATE_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def create_pipeline(
self,
theme: str,
target_episodes: int,
style: str = "写实电影感"
) -> Dict[str, Any]:
"""
创建新的管道
Args:
theme: 故事主题
target_episodes: 目标集数
style: 风格
Returns:
创建结果
"""
# 生成管道ID
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
pipeline_id = f"pipeline_{timestamp}"
# 创建状态
state = PipelineState(
pipeline_id=pipeline_id,
theme=theme,
target_episodes=target_episodes,
current_episode=0,
status="initialized",
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat(),
style=style
)
self.states[pipeline_id] = state
self._save_states()
return {
"success": True,
"pipeline_id": pipeline_id,
"message": f"管道创建成功,主题:{theme},目标:{target_episodes}集"
}
def start_generation(self, pipeline_id: str) -> Dict[str, Any]:
"""
开始生成(第一集或继续生成)
Args:
pipeline_id: 管道ID
Returns:
生成提示词,供LLM使用
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
# 更新状态
state.current_episode += 1
state.status = "generating"
state.updated_at = datetime.now().isoformat()
self._save_states()
if state.current_episode == 1:
# 生成第一集
prompt = self.episode_generator.generate_first_episode(
theme=state.theme,
target_episodes=state.target_episodes,
style=state.style
)
else:
# 查询图谱
graph_result = self.graph_manager.query_graph(
pipeline_id,
state.current_episode - 1
)
# 获取未闭环钩子
open_hooks = self.graph_manager.get_open_hooks(pipeline_id)
# 获取上一集内容
prev_content = ""
if state.last_output:
prev_content = state.last_output.get("content", "")
# 判断是否最终集
if state.current_episode >= state.target_episodes:
prompt = self.episode_generator.generate_final_episode(
episode_number=state.current_episode,
prev_content=prev_content,
graph_data=graph_result,
open_hooks=open_hooks,
style=state.style
)
else:
prompt = self.episode_generator.generate_next_episode(
episode_number=state.current_episode,
prev_content=prev_content,
graph_data=graph_result,
open_hooks=open_hooks,
style=state.style
)
return {
"success": True,
"pipeline_id": pipeline_id,
"episode": state.current_episode,
"prompt": prompt
}
def submit_episode(
self,
pipeline_id: str,
episode: int,
content: str
) -> Dict[str, Any]:
"""
提交生成的剧集内容,进入AI审核阶段
Args:
pipeline_id: 管道ID
episode: 剧集序号
content: 剧集内容
Returns:
AI审核提示词
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
# 更新状态
state.status = "ai_reviewing"
state.updated_at = datetime.now().isoformat()
self._save_states()
# 获取上一集内容(如果有)
prev_content = None
if episode > 1:
graph_result = self.graph_manager.query_graph(pipeline_id, episode - 1)
prev_content = graph_result.get("content", "")
# 创建审核提示词
review_prompt = self.ai_reviewer.create_review_prompt(
episode_content=content,
prev_content=prev_content,
graph_data=None,
episode_number=episode
)
return {
"success": True,
"pipeline_id": pipeline_id,
"episode": episode,
"review_prompt": review_prompt
}
def process_ai_review(
self,
pipeline_id: str,
episode: int,
ai_review_result: str,
generated_content: str
) -> Dict[str, Any]:
"""
处理AI审核结果
注意:AI审核不通过时可以无限重试,没有次数限制
Args:
pipeline_id: 管道ID
episode: 剧集序号
ai_review_result: AI审核结果(JSON字符串)
generated_content: 生成的剧集内容
Returns:
处理结果
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
# 解析审核结果
review = self.ai_reviewer.parse_review_result(ai_review_result)
if not review.passed:
# AI审核未通过,可以无限重试
state.retry_count += 1
state.status = "ai_retry_needed"
self._save_states()
return {
"success": False,
"passed": False,
"retry_count": state.retry_count,
"review": {
"score": review.score,
"checks": review.checks,
"suggestions": review.suggestions
},
"message": f"AI审核未通过(得分{review.score}),需要重新生成(无次数限制)"
}
# AI审核通过,进入等待人工确认状态
# 注意:此时还不存储图谱,等人工确认通过后才存储
state.status = "waiting_user_confirm"
state.retry_count = 0
state.ai_review = {
"score": review.score,
"checks": review.checks,
"suggestions": review.suggestions,
"summary": review.summary
}
state.last_output = {
"episode": episode,
"content": generated_content,
"summary": self._extract_summary(generated_content)
}
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"passed": True,
"status": "waiting_user_confirm",
"review": {
"score": review.score,
"summary": review.summary
},
"preview": {
"episode": episode,
"summary": state.last_output["summary"],
"content_preview": generated_content[:500] + "..." if len(generated_content) > 500 else generated_content
},
"message": "AI审核通过,等待人工确认"
}
def user_confirm(
self,
pipeline_id: str,
action: str,
modification_note: Optional[str] = None
) -> Dict[str, Any]:
"""
处理用户确认
Args:
pipeline_id: 管道ID
action: 用户操作 (approve/modify/pause/end)
modification_note: 修改意见(action=modify时需要)
Returns:
处理结果
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
if action == "approve":
# 用户确认通过,存储图谱
if state.last_output:
save_result = self.graph_manager.save_graph(
pipeline_id=pipeline_id,
episode=state.last_output["episode"],
content=state.last_output["content"]
)
if not save_result.get("success"):
return {
"success": False,
"error": "图谱存储失败",
"detail": save_result.get("error")
}
# 检查是否完成
if state.current_episode >= state.target_episodes:
state.status = "completed"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "completed",
"message": "剧情管道已完成!所有集数已生成。"
}
# 准备生成下一集
state.status = "ready_for_next"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "ready_for_next",
"next_episode": state.current_episode + 1,
"message": f"第{state.current_episode}集已确认,准备生成第{state.current_episode + 1}集"
}
elif action == "modify":
# 用户要求修改
state.status = "modification_requested"
state.error_message = modification_note
state.retry_count = 0
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "modification_requested",
"message": f"将根据修改意见重新生成:{modification_note}"
}
elif action == "pause":
# 用户暂停
state.status = "paused"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "paused",
"message": f"管道已暂停,当前进度:第{state.current_episode}集"
}
elif action == "end":
# 用户结束
state.status = "user_ended"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "user_ended",
"message": f"管道已结束,共生成{state.current_episode}集"
}
else:
return {"success": False, "error": f"未知操作:{action}"}
def get_status(self, pipeline_id: str) -> Dict[str, Any]:
"""
获取管道状态
Args:
pipeline_id: 管道ID
Returns:
状态信息
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
return {
"success": True,
"pipeline_id": pipeline_id,
"theme": state.theme,
"target_episodes": state.target_episodes,
"current_episode": state.current_episode,
"status": state.status,
"created_at": state.created_at,
"updated_at": state.updated_at,
"ai_review": state.ai_review,
"last_output": state.last_output
}
def list_pipelines(self) -> Dict[str, Any]:
"""
列出所有管道
Returns:
管道列表
"""
pipelines = []
for pid, state in self.states.items():
pipelines.append({
"pipeline_id": pid,
"theme": state.theme,
"current_episode": state.current_episode,
"target_episodes": state.target_episodes,
"status": state.status,
"updated_at": state.updated_at
})
return {
"success": True,
"count": len(pipelines),
"pipelines": pipelines
}
def resume_pipeline(self, pipeline_id: str) -> Dict[str, Any]:
"""
恢复暂停的管道
Args:
pipeline_id: 管道ID
Returns:
恢复结果
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
if state.status != "paused":
return {"success": False, "error": f"管道状态不是暂停状态:{state.status}"}
state.status = "ready_for_next"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"message": f"管道已恢复,当前进度:第{state.current_episode}集",
"next_episode": state.current_episode + 1
}
def _extract_summary(self, content: str) -> str:
"""从内容中提取摘要"""
lines = content.split("\n")
for i, line in enumerate(lines):
if "剧情摘要" in line or "摘要" in line:
# 获取下一行
if i + 1 < len(lines):
return lines[i + 1].strip()
# 如果没有找到摘要,返回前100字
return content[:100] + "..." if len(content) > 100 else content
# 单例实例
_pipeline = None
def get_pipeline() -> StoryPipeline:
"""获取管道控制器单例"""
global _pipeline
if _pipeline is None:
_pipeline = StoryPipeline()
return _pipeline
# 便捷函数
def create_pipeline(theme: str, target_episodes: int, style: str = "写实电影感") -> Dict:
"""创建新管道"""
return get_pipeline().create_pipeline(theme, target_episodes, style)
def start_generation(pipeline_id: str) -> Dict:
"""开始生成"""
return get_pipeline().start_generation(pipeline_id)
def submit_episode(pipeline_id: str, episode: int, content: str) -> Dict:
"""提交剧集"""
return get_pipeline().submit_episode(pipeline_id, episode, content)
def process_ai_review(pipeline_id: str, episode: int, ai_result: str, content: str) -> Dict:
"""处理AI审核"""
return get_pipeline().process_ai_review(pipeline_id, episode, ai_result, content)
def user_confirm(pipeline_id: str, action: str, note: str = None) -> Dict:
"""用户确认"""
return get_pipeline().user_confirm(pipeline_id, action, note)
def get_status(pipeline_id: str) -> Dict:
"""获取状态"""
return get_pipeline().get_status(pipeline_id)
def list_pipelines() -> Dict:
"""列出管道"""
return get_pipeline().list_pipelines()
def resume_pipeline(pipeline_id: str) -> Dict:
"""恢复管道"""
return get_pipeline().resume_pipeline(pipeline_id)
```