story-pipeline
Story generation pipeline skill. Supports multi-episode continuous generation, graph management, AI quality check + human confirmation dual control mechanism. Automatically manages relationships between characters, scenes, and hooks.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-story-pipeline
Repository
Skill path: skills/hexidyg/story-pipeline
Story generation pipeline skill. Supports multi-episode continuous generation, graph management, AI quality check + human confirmation dual control mechanism. Automatically manages relationships between characters, scenes, and hooks.
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Data / AI.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install story-pipeline into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding story-pipeline to shared team environments
- Use story-pipeline for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: story-pipeline
description: Story generation pipeline skill. Supports multi-episode continuous generation, graph management, AI quality check + human confirmation dual control mechanism. Automatically manages relationships between characters, scenes, and hooks.
---
# Story Generation Pipeline Skill
## Features
This skill implements a complete story generation pipeline:
1. **Continuous Episode Generation** - Automatically generates next episode based on previous content
2. **Graph Management** - Storage and querying of character, scene, and hook relationships
3. **Dual Confirmation Control** - AI quality check followed by human confirmation
4. **State Persistence** - Supports pause, resume, and multiple parallel stories
---
## Core Workflow
```
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐
│ Generate Ep N│ -> │ AI Review │ -> │ Graph Storage│ -> │ Wait Human Confirm│
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
Query Graph Pass/Retry Store Relations Continue/Modify/End
```
---
## Usage
### Start New Pipeline
```
User: Start a new story pipeline, theme: Chinese girl comeback, target episodes: 10
```
### Continue Pipeline
```
User: Continue story pipeline [pipeline_id]
```
### User Confirmation Actions
```
User: Approved, continue to next episode
User: Modify: [specific feedback]
User: Pause
User: End
```
---
## Dual Confirmation Control Mechanism
### Layer 1: AI Quality Check
After each episode is generated, AI automatically checks:
| Check Item | Description |
|------------|-------------|
| Plot Coherence | Natural connection with previous episode |
| Character Consistency | Character behavior matches established traits |
| Hook Handling | Reasonable addition/closure of hooks |
| Pacing Control | Appropriate plot progression speed |
| Emotional Curve | Reasonable emotional ups and downs |
**Scoring Standard:** 0-10 points, below 7 triggers retry (max 3 times)
### Layer 2: Human Confirmation
After AI review passes, display preview and wait for user confirmation:
```
📋 Episode N Preview
━━━━━━━━━━━━━━━━━━━━━━━━
【Plot Summary】
...
✅ AI Score: 8.5/10
✅ Hook Status: 1 new, 1 closed
Please select:
1️⃣ Approved, continue to next episode
2️⃣ Needs modification (please specify)
3️⃣ Pause
4️⃣ End
━━━━━━━━━━━━━━━━━━━━━━━━
```
---
## Graph Management
### Graph Query
Before generating episode N, query episode N-1's graph:
```python
graph_manager.query_graph(pipeline_id, episode=N-1)
```
Returns:
- Complete content of previous episode
- Character list and their statuses
- Unclosed hooks
- Key scenes
- Relationship network
### Graph Storage
After user confirmation, store current episode's complete content:
```python
graph_manager.save_graph(pipeline_id, episode=N, content)
```
Stores: Complete generation result of the episode (not split into elements)
---
## State Management
### State File: `data/pipeline_state.json`
```json
{
"pipelines": {
"pipeline_2026-03-05-001": {
"theme": "Chinese girl comeback",
"target_episodes": 10,
"current_episode": 3,
"status": "waiting_user_confirm",
"created_at": "2026-03-05T15:00:00",
"updated_at": "2026-03-05T15:30:00",
"ai_review": {
"score": 8.5,
"checks": {...}
},
"last_output": {
"episode": 3,
"summary": "...",
"content": "Complete content..."
}
}
}
}
```
### Status Types
| Status | Description |
|--------|-------------|
| `generating` | Currently generating |
| `ai_reviewing` | AI review in progress |
| `waiting_user_confirm` | Waiting for human confirmation |
| `paused` | Paused |
| `completed` | Completed |
| `error` | Error state |
---
## Script Description
### pipeline.py - Main Control Loop
- Initialize pipeline
- Coordinate modules
- Handle user commands
- Manage loop state
### ai_reviewer.py - AI Quality Check
- Execute quality scoring
- Generate review report
- Determine pass/fail
### episode_generator.py - Episode Generation
- Generate new episode based on graph context
- Handle hook continuation and closure
- Handle retry logic
### graph_manager.py - Graph Management
- Graph query (call remote API)
- Graph storage (call remote API)
- Local cache management
---
## API Description
### Graph API
**Query API:**
```json
{
"action": "query",
"pipeline_id": "pipeline_2026-03-05-001",
"episode": 2
}
```
**Storage API:**
```json
{
"action": "save",
"pipeline_id": "pipeline_2026-03-05-001",
"episode": 3,
"content": "Complete generation content for episode 3..."
}
```
---
## Episode Generation Logic
### First Episode Generation
Based on user-provided theme and goals, generate:
- Main character settings
- Initial scenes
- Core conflict
- Open hooks
### Subsequent Episode Generation
Based on graph query results:
1. Read previous episode content and unclosed hooks
2. Continue main plot line
3. Handle hooks (continue/close/add)
4. Advance character growth arc
5. Adjust emotional curve
### Finale Generation
When target episodes reached or user requests end:
- Close all remaining hooks
- Complete character growth arcs
- Generate conclusive ending
---
## Important Notes
1. **Retry Mechanism** - Max 3 retries when AI review fails
2. **Pause/Resume** - Can resume via pipeline_id after pause
3. **Multiple Pipelines** - Supports running multiple different-themed pipelines simultaneously
4. **Graph Consistency** - Ensure correct character and hook relationships
5. **Hook Management** - Track creation and closure status of each hook
---
## Complete Workflow
### Step 1: Create Pipeline
```
User: Start a new pipeline, theme: Cultivation boy, target 20 episodes
AI: OK, creating pipeline pipeline_20260305160000
Theme: Cultivation boy
Target: 20 episodes
Style: Realistic cinematic
Status: Initialized, ready to generate episode 1
```
### Step 2: Generate Episode
AI calls `start_generation(pipeline_id)` to get generation prompt, then generates episode content based on the prompt.
### Step 3: Submit AI Review
AI calls `submit_episode(pipeline_id, episode, content)` to submit generated content, then executes AI review.
### Step 4: Process Review Result
AI calls `process_ai_review(pipeline_id, episode, ai_result, content)` to process review results.
If review fails (score < 7), automatically retry (max 3 times).
### Step 5: Wait User Confirmation
After review passes, display preview and wait for user confirmation:
```
📋 Episode 1 Preview
━━━━━━━━━━━━━━━━━━━━━━━━
【Cultivation Journey Begins】
Young Li Yun discovers a mysterious jade pendant in the mountains,
从此踏上修仙之路...
✅ AI Score: 9.0/10
✅ New Hook: H-001 Origin of mysterious jade pendant
━━━━━━━━━━━━━━━━━━━━━━━━
Please select:
1️⃣ Approved, continue to next episode
2️⃣ Needs modification (please specify)
3️⃣ Pause
4️⃣ End
```
### Step 6: Process User Confirmation
After user confirms, AI calls `user_confirm(pipeline_id, action, note)`:
- **approve**: Store graph, prepare next episode
- **modify**: Regenerate based on feedback
- **pause**: Pause pipeline
- **end**: End pipeline
### Step 7: Loop Generation
Repeat steps 2-6 until target episodes reached or user ends.
---
## API Reference
### Create Pipeline
```python
create_pipeline(theme: str, target_episodes: int, style: str = "realistic cinematic")
# Returns: {"success": True, "pipeline_id": "...", "message": "..."}
```
### Start Generation
```python
start_generation(pipeline_id: str)
# Returns: {"success": True, "episode": N, "prompt": "generation prompt"}
```
### Submit Episode
```python
submit_episode(pipeline_id: str, episode: int, content: str)
# Returns: {"success": True, "review_prompt": "review prompt"}
```
### Process AI Review
```python
process_ai_review(pipeline_id: str, episode: int, ai_result: str, content: str)
# Returns: {"success": True, "passed": True/False, ...}
```
### User Confirm
```python
user_confirm(pipeline_id: str, action: str, note: str = None)
# action: "approve" / "modify" / "pause" / "end"
# Returns: {"success": True, "status": "...", ...}
```
### Get Status
```python
get_status(pipeline_id: str)
# Returns: Current pipeline status info
```
### List Pipelines
```python
list_pipelines()
# Returns: List of all pipelines
```
### Resume Pipeline
```python
resume_pipeline(pipeline_id: str)
# Returns: Resume result
```
---
## Example Dialogue
```
User: Start a new pipeline, theme: Cultivation boy, target 20 episodes
AI: OK, creating pipeline pipeline_20260305160000
Generating episode 1...
📋 Episode 1 Preview
━━━━━━━━━━━━━━━━━━
【Cultivation Journey Begins】
Young Li Yun discovers a mysterious jade pendant in the mountains...
✅ AI Score: 9.0/10
✅ New Hook: H-001 Origin of mysterious jade pendant
Please confirm: Approve/Modify/Pause/End
User: Approved
AI: Storing graph...
Starting generation of episode 2...
```
---
## Important Notes
1. **Retry Mechanism**: Max 3 retries when AI review fails
2. **State Persistence**: All states saved in `data/pipeline_state.json`
3. **Graph Storage**: Stored via remote API, requires network connection
4. **Pause/Resume**: Can resume via `resume_pipeline` after pause
5. **Multiple Pipelines**: Supports running multiple different-themed pipelines simultaneously
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### data/pipeline_state.json
```json
{
"pipelines": {
"pipeline_20260305182013": {
"pipeline_id": "pipeline_20260305182013",
"theme": "末日地球移民外星",
"target_episodes": 5,
"current_episode": 5,
"status": "completed",
"created_at": "2026-03-05T18:20:13.642511",
"updated_at": "2026-03-05T19:33:03.210508",
"style": "科幻史诗感",
"retry_count": 0,
"ai_review": {
"score": 9.5,
"checks": {
"剧情连贯性": {
"score": 10,
"comment": "与全剧完美收官,所有线索都有回应"
},
"人物一致性": {
"score": 9,
"comment": "每个角色都有合理的结局,情感饱满"
},
"钩子处理": {
"score": 10,
"comment": "5个钩子全部闭环,没有遗漏,收尾干净"
},
"节奏控制": {
"score": 9,
"comment": "从着陆到建设,节奏舒缓但有张力"
},
"情感曲线": {
"score": 10,
"comment": "从紧张到释放,从思乡到新生,情感高潮完美"
},
"创新性": {
"score": 9,
"comment": "星辰的名字和象征意义,林远山的录像设计,都是亮点"
}
},
"suggestions": [],
"summary": "大结局完美收官,所有钩子闭环,人物各有归宿,主题升华到位。星辰的名字是点睛之笔,代表着人类的希望与未来。一部完整的科幻史诗。"
},
"last_output": {
"episode": 5,
"content": "## 第5集:新希望(大结局)\n\n### 剧情摘要\n飞船抵达新行星\"新希望\",乘客们发现这里比想象中更美好。陈曦的孩子展现出更多秘密——他是宇宙辐射进化的新人类。林远山的影像信息揭示真相,人类移民计划有了真正的希望。陈曦给孩子取名\"星辰\",代表新的开始。\n\n### 完整内容\n\n| idx | 场景 | 人物 | 事件 | 情绪 |\n|-----|------|------|------|------|\n| 01 | 飞船下降,穿越大气层 | 舷窗外是绿色的云层 | 乘客们紧张地握着安全带,陈曦抱着孩子,看着窗外 | 紧张、期待 |\n| 02 | 新行星表面,飞船着陆 | 一片绿色的森林延伸到地平线 | 舱门打开,清新的空气涌入,人们相拥而泣 | 释放、感动 |\n| 03 | 新行星表面,探索队出发 | 李明带领第一批探索者 | 他们发现植物、水源、甚至是类似地球的生态系统 | 惊喜、希望 |\n| 04 | 飞船内部,陈曦休息 | 孩子安静地睡着 | 李婷走来检查陈曦的身体状况,两人相视而笑 | 温情、释然 |\n| 05 | 飞船控制中心,李明操作 | 他播放林远山留下的全息录像 | 林远山的身影出现在空中,开始讲述 | 怀念、重要 |\n| 06 | 全息录像中,林远山说话 | \"如果你看到这段录像,说明飞船已经安全抵达\" | 他揭露了更多秘密:新行星的发现、新希望公司的阴谋、以及他对人类的最后期望 | 揭示、感人 |\n| 07 | 录像继续 | 林远山提到陈曦的孩子 | \"宇宙辐射会带来基因变化,这个孩子...可能是人类进化的关键\" | 震惊、重要 |\n| 08 | 录像结束,控制中心安静 | 李明看向陈曦 | 陈曦低头看着孩子,眼眶湿润:\"他早就知道了\" | 感动、理解 |\n| 09 | 新行星表面,人们开始建设 | 临时营地搭建 | 有人发现土壤肥沃,有人发现清洁水源,希望的声音此起彼伏 | 希望、忙碌 |\n| 10 | 营地边缘,张毅被押送 | 他看着新世界,神情复杂 | \"我输了...但至少,人类还有未来\" | 悔恨、释然 |\n| 11 | 夜晚,营地篝火 | 人们围坐在一起 | 有人唱起了地球的歌,有人哭泣,有人微笑 | 团聚、思乡 |\n| 12 | 营地边缘,陈曦独自坐 | 她看着孩子,星空璀璨 | 孩子的眼睛微微发光,似乎在感应着什么 | 神秘、宁静 |\n| 13 | 孩子的视角,星空变换 | 他看到宇宙的能量流动 | 一种超越人类感知的景象,星辰在向他召唤 | 奇幻、宏大 |\n| 14 | 次日清晨,营地集会 | 李明宣布成立新政府 | \"从今天起,我们是新希望星的人类,我们将重建文明\" | 庄严、新生 |\n| 15 | 集会中,陈曦站起来 | 她抱着孩子走到台前 | \"我想给我的孩子取个名字...星辰\" | 温情、希望 |\n| 16 | 人群欢呼 | 人们重复着这个名字 | \"星辰...新的开始,新的希望\" | 欢乐、希望 |\n| 17 | 闪回,地球最后的画面 | 林远山站在天文台 | 他看着被雾霾笼罩的地球,轻声说:\"去吧,带着希望\" | 怀念、传承 |\n| 18 | 回到现在,新行星 | 陈曦和孩子站在高处 | 她看着脚下的绿色大地,孩子伸出手触碰阳光 | 新生、希望 |\n| 19 | 广角镜头,新行星全景 | 飞船、营地、森林、人类 | 一群人在新的家园开始新的生活 | 宏大、希望 |\n| 20 | 最后一个镜头,孩子的眼睛 | 眼中倒映着星空和地球的轮廓 | 画外音:\"人类的旅程从未结束,只是换了一个舞台\" | 升华、永恒 |\n\n### 钩子闭环\n\n- **H-003** [冲突]:飞船承载量秘密泄露 → 在新行星上,人们共同建设家园,矛盾转化为合作\n- **H-004** [秘密]:另一个宜居行星 → 正是林远山发现的第三颗行星,现在是人类的家园\n- **H-006** [危机]:李明揭露阴谋后的局势 → 张毅被制服,新政府成立,秩序恢复\n- **H-009** [悬念]:婴儿能力来源 → 宇宙辐射导致的基因进化,林远山早已预见\n- **H-010** [秘密]:新行星上等待什么 → 一个比地球更美好的家园,人类的新希望\n\n### 人物结局\n\n- **陈曦**:从工程师到母亲,再到新人类的守护者,完成了个人成长,代表着母性与希望\n- **李明**:从秘密守护者到新政府的领袖,完成了使命,代表着传承与责任\n- **林远山**(回忆):虽然留在地球,但他的预见和安排拯救了人类,代表着智慧与牺牲\n- **张毅**:阴谋失败,但最终接受现实,代表着复杂的人性\n- **星辰**(婴儿):新人类的代表,象征着人类的未来与进化\n\n### 主题升华\n\n本剧探讨了人类在绝境中的选择:是控制还是放手,是独占还是共享。最终,希望不在远方,而在于我们共同创造。人类的旅程从未结束,只是换了一个舞台。星辰代表着新生与进化,也代表着人类永不放弃的精神。\n\n### 情感曲线\n\n开场紧张期待 → 中段揭示与感动 → 高潮新生与希望 → 结尾升华与永恒\n",
"summary": "飞船抵达新行星\"新希望\",乘客们发现这里比想象中更美好。陈曦的孩子展现出更多秘密——他是宇宙辐射进化的新人类。林远山的影像信息揭示真相,人类移民计划有了真正的希望。陈曦给孩子取名\"星辰\",代表新的开始。"
},
"error_message": null
}
}
}
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "hexidyg",
"slug": "story-pipeline",
"displayName": "Story generation pipeline skill",
"latest": {
"version": "1.0.4",
"publishedAt": 1772786630619,
"commit": "https://github.com/openclaw/skills/commit/7759106c8dbfebfb5d2b188e0e2d1291201dd59a"
},
"history": []
}
```
### scripts/__init__.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Story Pipeline Skill
剧情生成管道流技能
"""
from .pipeline import (
get_pipeline,
create_pipeline,
start_generation,
submit_episode,
process_ai_review,
user_confirm,
get_status,
list_pipelines,
resume_pipeline
)
from .graph_manager import get_graph_manager, GraphManager
from .ai_reviewer import get_ai_reviewer, AIReviewer
from .episode_generator import get_episode_generator, EpisodeGenerator
__all__ = [
# 主控函数
'get_pipeline',
'create_pipeline',
'start_generation',
'submit_episode',
'process_ai_review',
'user_confirm',
'get_status',
'list_pipelines',
'resume_pipeline',
# 管理器
'get_graph_manager',
'GraphManager',
'get_ai_reviewer',
'AIReviewer',
'get_episode_generator',
'EpisodeGenerator',
]
__version__ = '1.0.0'
```
### scripts/ai_reviewer.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI 质量检查模块
对生成的剧集进行质量评分和审核
"""
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class ReviewResult:
"""审核结果"""
passed: bool
score: float
checks: Dict[str, Dict[str, Any]]
suggestions: List[str]
summary: str
class AIReviewer:
"""AI 质量检查器"""
# 检查项及其权重
CHECK_ITEMS = {
"剧情连贯性": {
"weight": 0.25,
"description": "与前集衔接是否自然,剧情推进是否合理"
},
"人物一致性": {
"weight": 0.20,
"description": "角色行为是否符合设定,人物发展是否合理"
},
"钩子处理": {
"weight": 0.20,
"description": "新增/闭环钩子是否合理,伏笔是否恰当"
},
"节奏控制": {
"weight": 0.15,
"description": "剧情推进速度是否恰当,是否有拖沓或仓促"
},
"情感曲线": {
"weight": 0.10,
"description": "情绪起伏是否合理,是否有感染力"
},
"创新性": {
"weight": 0.10,
"description": "是否有新意,避免老套情节"
}
}
# 通过阈值
PASS_THRESHOLD = 7.0
def __init__(self):
"""初始化审核器"""
pass
def review_episode(
self,
episode_content: str,
prev_content: Optional[str] = None,
graph_data: Optional[Dict] = None,
episode_number: int = 1
) -> ReviewResult:
"""
审核单集内容
注意:实际的AI审核由调用此模块的LLM执行
此模块提供审核框架和结果结构
Args:
episode_content: 当前剧集内容
prev_content: 上一集内容(可选)
graph_data: 图谱数据(可选)
episode_number: 剧集序号
Returns:
ReviewResult 审核结果对象
"""
# 这个方法由LLM调用时填充实际审核逻辑
# 这里只返回结构框架
pass
def create_review_prompt(
self,
episode_content: str,
prev_content: Optional[str] = None,
graph_data: Optional[Dict] = None,
episode_number: int = 1
) -> str:
"""
创建审核提示词
Args:
episode_content: 当前剧集内容
prev_content: 上一集内容
graph_data: 图谱数据
episode_number: 剧集序号
Returns:
审核提示词
"""
prompt = f"""# 剧集质量审核任务
## 审核对象
- 剧集序号:第 {episode_number} 集
- 内容:{episode_content}
"""
if prev_content:
prompt += f"""## 上一集内容
{prev_content}
"""
if graph_data:
prompt += f"""## 图谱数据
{json.dumps(graph_data, ensure_ascii=False, indent=2)}
"""
prompt += """## 审核要求
请对上述剧集进行质量审核,按以下维度评分(每项0-10分):
| 维度 | 权重 | 说明 |
|------|------|------|
| 剧情连贯性 | 25% | 与前集衔接是否自然,剧情推进是否合理 |
| 人物一致性 | 20% | 角色行为是否符合设定,人物发展是否合理 |
| 钩子处理 | 20% | 新增/闭环钩子是否合理,伏笔是否恰当 |
| 节奏控制 | 15% | 剧情推进速度是否恰当 |
| 情感曲线 | 10% | 情绪起伏是否合理,是否有感染力 |
| 创新性 | 10% | 是否有新意,避免老套情节 |
## 输出格式
请以JSON格式输出审核结果:
```json
{
"passed": true/false,
"score": 综合得分(0-10),
"checks": {
"剧情连贯性": {"score": 分数, "comment": "评语"},
"人物一致性": {"score": 分数, "comment": "评语"},
"钩子处理": {"score": 分数, "comment": "评语"},
"节奏控制": {"score": 分数, "comment": "评语"},
"情感曲线": {"score": 分数, "comment": "评语"},
"创新性": {"score": 分数, "comment": "评语"}
},
"suggestions": ["改进建议1", "改进建议2"],
"summary": "总体评价"
}
```
注意:综合得分低于7分视为不通过。
"""
return prompt
def parse_review_result(self, response: str) -> ReviewResult:
"""
解析审核结果
Args:
response: AI返回的审核结果(JSON格式)
Returns:
ReviewResult 对象
"""
try:
# 尝试提取JSON
json_str = response
if "```json" in response:
json_str = response.split("```json")[1].split("```")[0]
elif "```" in response:
json_str = response.split("```")[1].split("```")[0]
data = json.loads(json_str.strip())
return ReviewResult(
passed=data.get("passed", False),
score=data.get("score", 0),
checks=data.get("checks", {}),
suggestions=data.get("suggestions", []),
summary=data.get("summary", "")
)
except (json.JSONDecodeError, KeyError) as e:
# 解析失败,返回默认结果
return ReviewResult(
passed=False,
score=0,
checks={},
suggestions=[f"解析审核结果失败: {str(e)}"],
summary="审核结果解析失败"
)
def format_review_output(self, result: ReviewResult) -> str:
"""
格式化审核结果输出
Args:
result: 审核结果
Returns:
格式化的输出字符串
"""
status_icon = "✅" if result.passed else "❌"
output = f"""
{status_icon} AI 质量审核结果
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
综合评分:{result.score:.1f}/10
审核状态:{'通过' if result.passed else '未通过'}
详细评分:
"""
for item, weight_info in self.CHECK_ITEMS.items():
check = result.checks.get(item, {})
score = check.get("score", 0)
comment = check.get("comment", "")
output += f" • {item}: {score}分 - {comment}\n"
if result.suggestions:
output += "\n改进建议:\n"
for i, suggestion in enumerate(result.suggestions, 1):
output += f" {i}. {suggestion}\n"
output += f"\n总结:{result.summary}\n"
output += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
return output
# 单例实例
_ai_reviewer = None
def get_ai_reviewer() -> AIReviewer:
"""获取AI审核器单例"""
global _ai_reviewer
if _ai_reviewer is None:
_ai_reviewer = AIReviewer()
return _ai_reviewer
```
### scripts/episode_generator.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
剧集生成模块
负责生成新的剧集内容,基于图谱上下文和钩子管理
"""
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Hook:
"""钩子数据结构"""
id: str
hook_type: str # 悬念、伏笔、冲突、秘密
description: str
created_in: int # 创建于第几集
resolved_in: Optional[int] = None # 闭环于第几集
status: str = "open" # open, resolved
importance: str = "medium" # high, medium, low
@dataclass
class Character:
"""人物数据结构"""
id: str
name: str
age: int
traits: List[str]
first_appear: int
current_status: str = "active"
development: List[str] = field(default_factory=list)
@dataclass
class EpisodeContent:
"""剧集内容结构"""
episode: int
title: str
summary: str
content: str
characters: List[Dict]
scenes: List[Dict]
hooks_created: List[Hook]
hooks_resolved: List[str] # resolved hook ids
emotional_curve: str # 情感曲线描述
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
class EpisodeGenerator:
"""剧集生成器"""
# 钩子类型
HOOK_TYPES = [
"悬念", # 让观众想知道接下来发生什么
"伏笔", # 埋下后续剧情的种子
"冲突", # 人物之间的矛盾
"秘密", # 角色隐藏的信息
"谜团", # 需要解开的谜题
"危机" # 主角面临的困境
]
def __init__(self):
"""初始化生成器"""
pass
def generate_first_episode(
self,
theme: str,
target_episodes: int,
style: str = "写实电影感"
) -> str:
"""
生成第一集的提示词
Args:
theme: 故事主题
target_episodes: 目标集数
style: 风格
Returns:
生成提示词
"""
prompt = f"""# 第一集生成任务
## 基础设定
- 故事主题:{theme}
- 总集数:{target_episodes}集
- 风格:{style}
## 第一集要求
作为开篇,第一集需要:
1. **人物引入**
- 建立1-3个主要人物
- 展示主角的核心特质和初始状态
- 暗示人物的成长潜力
2. **场景建立**
- 构建故事发生的主要场景
- 营造独特的氛围感
- 为后续剧情预留空间
3. **核心冲突**
- 引入故事的主要矛盾
- 设置主角的目标和阻碍
- 建立情感共鸣点
4. **钩子设置**
- 设置2-4个悬念钩子
- 埋下1-2个伏笔
- 让观众产生继续观看的欲望
5. **情感曲线**
- 开场建立共情
- 中段制造张力
- 结尾留下悬念
## 输出格式
请生成第一集的完整内容,包含:
```
## 第1集:[标题]
### 剧情摘要
[100字以内的摘要]
### 完整内容
[分镜表格或剧本内容]
### 人物设定
- 人物1:[设定]
- 人物2:[设定]
### 钩子清单
- H-001 [类型]:[描述]
- H-002 [类型]:[描述]
### 情感曲线
[描述本集的情绪起伏]
```
"""
return prompt
def generate_next_episode(
self,
episode_number: int,
prev_content: str,
graph_data: Dict,
open_hooks: List[Hook],
style: str = "写实电影感"
) -> str:
"""
生成后续剧集的提示词
Args:
episode_number: 当前剧集序号
prev_content: 上一集内容
graph_data: 图谱数据
open_hooks: 未闭环的钩子列表
style: 风格
Returns:
生成提示词
"""
# 格式化钩子信息
hooks_info = ""
if open_hooks:
hooks_info = "### 未闭环钩子\n"
for hook in open_hooks:
hooks_info += f"- {hook.id} [{hook.hook_type}]:{hook.description}(第{hook.created_in}集创建)\n"
prompt = f"""# 第{episode_number}集生成任务
## 上下文信息
### 上一集内容
{prev_content}
### 图谱数据
{json.dumps(graph_data, ensure_ascii=False, indent=2)}
{hooks_info}
## 第{episode_number}集要求
1. **剧情延续**
- 自然衔接上一集结尾
- 推进主线剧情发展
- 保持叙事节奏
2. **人物发展**
- 展现人物的变化和成长
- 深化人物关系
- 保持角色行为一致性
3. **钩子处理**
- 选择1-2个钩子进行延续或闭环
- 可以新增新的钩子
- 避免钩子过多导致剧情混乱
4. **情感节奏**
- 根据整体进度调整情感强度
- 中间剧集可以增加波折
- 为后续高潮做铺垫
## 输出格式
```
## 第{episode_number}集:[标题]
### 剧情摘要
[100字以内的摘要]
### 完整内容
[分镜表格或剧本内容]
### 人物变化
[描述本集中人物的变化]
### 钩子处理
- 闭环:[钩子ID及处理方式]
- 新增:[新钩子]
### 情感曲线
[描述本集的情绪起伏]
```
"""
return prompt
def generate_final_episode(
self,
episode_number: int,
prev_content: str,
graph_data: Dict,
open_hooks: List[Hook],
style: str = "写实电影感"
) -> str:
"""
生成最终集的提示词
Args:
episode_number: 最终剧集序号
prev_content: 上一集内容
graph_data: 图谱数据
open_hooks: 未闭环的钩子列表
style: 风格
Returns:
生成提示词
"""
# 格式化钩子信息
hooks_to_resolve = "### 需要闭环的钩子\n"
for hook in open_hooks:
hooks_to_resolve += f"- {hook.id} [{hook.hook_type}]:{hook.description}(第{hook.created_in}集创建)\n"
prompt = f"""# 最终集(第{episode_number}集)生成任务
## 上下文信息
### 上一集内容
{prev_content}
### 图谱数据
{json.dumps(graph_data, ensure_ascii=False, indent=2)}
{hooks_to_resolve}
## 最终集要求
作为大结局,需要:
1. **钩子闭环**
- 必须闭环所有剩余钩子
- 给每个悬念一个合理的答案
- 避免草率收尾
2. **人物结局**
- 完成主角的成长弧
- 给每个重要角色一个交代
- 可以有开放式结局,但要有情感满足
3. **情感高潮**
- 达到情感曲线的最高点
- 提供情感释放
- 让观众感到圆满或深思
4. **主题升华**
- 回应故事的核心主题
- 传递积极的价值观
- 留下回味空间
## 输出格式
```
## 第{episode_number}集:[标题](大结局)
### 剧情摘要
[100字以内的摘要]
### 完整内容
[分镜表格或剧本内容]
### 钩子闭环
- [钩子ID]:[闭环方式]
### 人物结局
[描述主要人物的最终状态]
### 主题升华
[本剧的核心主题和价值传递]
### 情感曲线
[描述最终集的情绪高潮]
```
"""
return prompt
def parse_generated_content(self, content: str) -> EpisodeContent:
"""
解析生成的内容
Args:
content: 生成的剧集内容
Returns:
EpisodeContent 对象
"""
# 简单解析,实际可能需要更复杂的逻辑
lines = content.split("\n")
title = ""
summary = ""
hooks_created = []
hooks_resolved = []
for line in lines:
if line.startswith("## 第") and ":" in line:
title = line.split(":", 1)[1].strip()
elif line.startswith("### 剧情摘要"):
# 获取摘要
idx = lines.index(line)
if idx + 1 < len(lines):
summary = lines[idx + 1].strip()
# 从内容中提取剧集序号
episode = 1
if "第" in content and "集" in content:
import re
match = re.search(r"第(\d+)集", content)
if match:
episode = int(match.group(1))
return EpisodeContent(
episode=episode,
title=title,
summary=summary,
content=content,
characters=[],
scenes=[],
hooks_created=hooks_created,
hooks_resolved=hooks_resolved,
emotional_curve=""
)
# 单例实例
_episode_generator = None
def get_episode_generator() -> EpisodeGenerator:
"""获取剧集生成器单例"""
global _episode_generator
if _episode_generator is None:
_episode_generator = EpisodeGenerator()
return _episode_generator
```
### scripts/generate_engine.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
"""
```
### scripts/graph_manager.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
图谱管理模块
使用本地JSON存储管理剧情图谱
"""
import json
import os
from typing import Dict, Optional, Any, List
from datetime import datetime
class GraphManager:
"""图谱管理器 - 本地JSON存储"""
def __init__(self, storage_dir: str = None):
"""
初始化图谱管理器
Args:
storage_dir: 存储目录,默认为 data/graphs
"""
if storage_dir is None:
# 默认存储在 data/graphs 目录
self.storage_dir = os.path.join(
os.path.dirname(__file__),
"..",
"data",
"graphs"
)
else:
self.storage_dir = storage_dir
# 确保目录存在
os.makedirs(self.storage_dir, exist_ok=True)
# 内存缓存
self.cache: Dict[str, Dict] = {}
def _get_graph_path(self, pipeline_id: str) -> str:
"""获取图谱文件路径"""
return os.path.join(self.storage_dir, f"{pipeline_id}.json")
def _load_graph(self, pipeline_id: str) -> Dict:
"""加载图谱数据"""
# 先检查缓存
if pipeline_id in self.cache:
return self.cache[pipeline_id]
# 从文件加载
graph_path = self._get_graph_path(pipeline_id)
if os.path.exists(graph_path):
try:
with open(graph_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.cache[pipeline_id] = data
return data
except (json.JSONDecodeError, Exception):
pass
# 返回空图谱
return {
"pipeline_id": pipeline_id,
"episodes": {},
"characters": {},
"hooks": [],
"scenes": {},
"relationships": []
}
def _save_graph(self, pipeline_id: str, data: Dict):
"""保存图谱数据"""
graph_path = self._get_graph_path(pipeline_id)
# 更新缓存
self.cache[pipeline_id] = data
# 保存到文件
with open(graph_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def query_graph(self, pipeline_id: str, episode: int) -> Dict[str, Any]:
"""
查询图谱 - 获取指定剧集的图谱数据
Args:
pipeline_id: 管道ID
episode: 剧集序号
Returns:
图谱数据字典
"""
graph = self._load_graph(pipeline_id)
# 获取指定剧集的内容
episode_data = graph.get("episodes", {}).get(str(episode), {})
if not episode_data:
return {
"success": False,
"error": f"第{episode}集数据不存在",
"data": None
}
# 提取未闭环的钩子
open_hooks = [h for h in graph.get("hooks", []) if h.get("status") == "open"]
return {
"success": True,
"episode": episode,
"content": episode_data.get("content", ""),
"summary": episode_data.get("summary", ""),
"characters": self._get_episode_characters(graph, episode),
"hooks": open_hooks,
"scenes": episode_data.get("scenes", []),
"relationships": graph.get("relationships", [])
}
def save_graph(
self,
pipeline_id: str,
episode: int,
content: str,
summary: str = "",
characters: List[Dict] = None,
hooks_created: List[Dict] = None,
hooks_resolved: List[str] = None,
scenes: List[Dict] = None
) -> Dict[str, Any]:
"""
存储图谱 - 保存当前剧集的完整内容
在人工审核通过后调用此方法存储图谱
Args:
pipeline_id: 管道ID
episode: 剧集序号
content: 完整的剧集内容
summary: 摘要
characters: 人物列表
hooks_created: 新增的钩子
hooks_resolved: 闭环的钩子ID列表
scenes: 场景列表
Returns:
存储结果
"""
graph = self._load_graph(pipeline_id)
# 保存剧集内容
if "episodes" not in graph:
graph["episodes"] = {}
graph["episodes"][str(episode)] = {
"content": content,
"summary": summary,
"characters": characters or [],
"scenes": scenes or [],
"timestamp": datetime.now().isoformat()
}
# 更新人物
if characters:
for char in characters:
char_id = char.get("id", f"C-{char.get('name', 'unknown')}")
graph["characters"][char_id] = {
**char,
"last_appear": episode
}
# 更新钩子
if "hooks" not in graph:
graph["hooks"] = []
# 处理新增钩子
if hooks_created:
for hook in hooks_created:
hook["id"] = hook.get("id", f"H-{len(graph['hooks']) + 1}")
hook["created_in"] = episode
hook["status"] = "open"
graph["hooks"].append(hook)
# 处理闭环钩子
if hooks_resolved:
for hook in graph["hooks"]:
if hook.get("id") in hooks_resolved:
hook["status"] = "resolved"
hook["resolved_in"] = episode
# 更新场景
if scenes:
for scene in scenes:
scene_id = scene.get("id", f"S-{scene.get('name', 'unknown')}")
if scene_id not in graph["scenes"]:
graph["scenes"][scene_id] = {
**scene,
"episodes": [episode]
}
else:
if episode not in graph["scenes"][scene_id].get("episodes", []):
graph["scenes"][scene_id].setdefault("episodes", []).append(episode)
# 保存
self._save_graph(pipeline_id, graph)
return {
"success": True,
"message": f"第{episode}集图谱存储成功",
"episode": episode,
"pipeline_id": pipeline_id
}
def get_open_hooks(self, pipeline_id: str) -> List[Dict]:
"""
获取未闭环的钩子列表
Args:
pipeline_id: 管道ID
Returns:
未闭环钩子列表
"""
graph = self._load_graph(pipeline_id)
return [h for h in graph.get("hooks", []) if h.get("status") == "open"]
def get_all_hooks(self, pipeline_id: str) -> List[Dict]:
"""
获取所有钩子
Args:
pipeline_id: 管道ID
Returns:
所有钩子列表
"""
graph = self._load_graph(pipeline_id)
return graph.get("hooks", [])
def get_characters(self, pipeline_id: str) -> Dict:
"""
获取所有人物
Args:
pipeline_id: 管道ID
Returns:
人物字典
"""
graph = self._load_graph(pipeline_id)
return graph.get("characters", {})
def get_full_graph(self, pipeline_id: str) -> Dict:
"""
获取完整图谱
Args:
pipeline_id: 管道ID
Returns:
完整图谱数据
"""
return self._load_graph(pipeline_id)
def delete_graph(self, pipeline_id: str) -> Dict:
"""
删除图谱
Args:
pipeline_id: 管道ID
Returns:
删除结果
"""
# 清除缓存
if pipeline_id in self.cache:
del self.cache[pipeline_id]
# 删除文件
graph_path = self._get_graph_path(pipeline_id)
if os.path.exists(graph_path):
os.remove(graph_path)
return {"success": True, "message": "图谱已删除"}
return {"success": False, "error": "图谱不存在"}
def _get_episode_characters(self, graph: Dict, episode: int) -> List[Dict]:
"""获取指定剧集涉及的人物"""
episode_data = graph.get("episodes", {}).get(str(episode), {})
char_ids = episode_data.get("characters", [])
characters = []
for char_id in char_ids:
if isinstance(char_id, str):
char_data = graph.get("characters", {}).get(char_id, {})
if char_data:
characters.append(char_data)
elif isinstance(char_id, dict):
characters.append(char_id)
return characters
def clear_cache(self, pipeline_id: Optional[str] = None):
"""
清除缓存
Args:
pipeline_id: 指定管道ID,为None时清除所有缓存
"""
if pipeline_id:
if pipeline_id in self.cache:
del self.cache[pipeline_id]
else:
self.cache = {}
# 单例实例
_graph_manager = None
def get_graph_manager() -> GraphManager:
"""获取图谱管理器单例"""
global _graph_manager
if _graph_manager is None:
_graph_manager = GraphManager()
return _graph_manager
```
### scripts/pipeline.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
剧情管道主控模块
协调各模块,管理双确认控制流程
"""
import json
import os
from datetime import datetime
from typing import Dict, Optional, Any
from dataclasses import dataclass, asdict
# 导入其他模块
from graph_manager import get_graph_manager, GraphManager
from ai_reviewer import get_ai_reviewer, AIReviewer, ReviewResult
from episode_generator import get_episode_generator, EpisodeGenerator
# 状态文件路径
STATE_FILE = os.path.join(os.path.dirname(__file__), "..", "data", "pipeline_state.json")
# 注意:人工审核没有最大次数限制,用户可以一直要求修改直到满意
@dataclass
class PipelineState:
"""管道状态"""
pipeline_id: str
theme: str
target_episodes: int
current_episode: int
status: str
created_at: str
updated_at: str
style: str = "写实电影感"
retry_count: int = 0
ai_review: Optional[Dict] = None
last_output: Optional[Dict] = None
error_message: Optional[str] = None
class StoryPipeline:
"""剧情管道控制器"""
def __init__(self):
"""初始化管道控制器"""
self.graph_manager = get_graph_manager()
self.ai_reviewer = get_ai_reviewer()
self.episode_generator = get_episode_generator()
self.states: Dict[str, PipelineState] = {}
self._load_states()
def _load_states(self):
"""加载状态文件"""
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
for pid, state in data.get("pipelines", {}).items():
self.states[pid] = PipelineState(**state)
except (json.JSONDecodeError, Exception) as e:
print(f"加载状态文件失败: {e}")
def _save_states(self):
"""保存状态文件"""
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
data = {
"pipelines": {
pid: asdict(state) for pid, state in self.states.items()
}
}
with open(STATE_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def create_pipeline(
self,
theme: str,
target_episodes: int,
style: str = "写实电影感"
) -> Dict[str, Any]:
"""
创建新的管道
Args:
theme: 故事主题
target_episodes: 目标集数
style: 风格
Returns:
创建结果
"""
# 生成管道ID
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
pipeline_id = f"pipeline_{timestamp}"
# 创建状态
state = PipelineState(
pipeline_id=pipeline_id,
theme=theme,
target_episodes=target_episodes,
current_episode=0,
status="initialized",
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat(),
style=style
)
self.states[pipeline_id] = state
self._save_states()
return {
"success": True,
"pipeline_id": pipeline_id,
"message": f"管道创建成功,主题:{theme},目标:{target_episodes}集"
}
def start_generation(self, pipeline_id: str) -> Dict[str, Any]:
"""
开始生成(第一集或继续生成)
Args:
pipeline_id: 管道ID
Returns:
生成提示词,供LLM使用
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
# 更新状态
state.current_episode += 1
state.status = "generating"
state.updated_at = datetime.now().isoformat()
self._save_states()
if state.current_episode == 1:
# 生成第一集
prompt = self.episode_generator.generate_first_episode(
theme=state.theme,
target_episodes=state.target_episodes,
style=state.style
)
else:
# 查询图谱
graph_result = self.graph_manager.query_graph(
pipeline_id,
state.current_episode - 1
)
# 获取未闭环钩子
open_hooks = self.graph_manager.get_open_hooks(pipeline_id)
# 获取上一集内容
prev_content = ""
if state.last_output:
prev_content = state.last_output.get("content", "")
# 判断是否最终集
if state.current_episode >= state.target_episodes:
prompt = self.episode_generator.generate_final_episode(
episode_number=state.current_episode,
prev_content=prev_content,
graph_data=graph_result,
open_hooks=open_hooks,
style=state.style
)
else:
prompt = self.episode_generator.generate_next_episode(
episode_number=state.current_episode,
prev_content=prev_content,
graph_data=graph_result,
open_hooks=open_hooks,
style=state.style
)
return {
"success": True,
"pipeline_id": pipeline_id,
"episode": state.current_episode,
"prompt": prompt
}
def submit_episode(
self,
pipeline_id: str,
episode: int,
content: str
) -> Dict[str, Any]:
"""
提交生成的剧集内容,进入AI审核阶段
Args:
pipeline_id: 管道ID
episode: 剧集序号
content: 剧集内容
Returns:
AI审核提示词
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
# 更新状态
state.status = "ai_reviewing"
state.updated_at = datetime.now().isoformat()
self._save_states()
# 获取上一集内容(如果有)
prev_content = None
if episode > 1:
graph_result = self.graph_manager.query_graph(pipeline_id, episode - 1)
prev_content = graph_result.get("content", "")
# 创建审核提示词
review_prompt = self.ai_reviewer.create_review_prompt(
episode_content=content,
prev_content=prev_content,
graph_data=None,
episode_number=episode
)
return {
"success": True,
"pipeline_id": pipeline_id,
"episode": episode,
"review_prompt": review_prompt
}
def process_ai_review(
self,
pipeline_id: str,
episode: int,
ai_review_result: str,
generated_content: str
) -> Dict[str, Any]:
"""
处理AI审核结果
注意:AI审核不通过时可以无限重试,没有次数限制
Args:
pipeline_id: 管道ID
episode: 剧集序号
ai_review_result: AI审核结果(JSON字符串)
generated_content: 生成的剧集内容
Returns:
处理结果
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
# 解析审核结果
review = self.ai_reviewer.parse_review_result(ai_review_result)
if not review.passed:
# AI审核未通过,可以无限重试
state.retry_count += 1
state.status = "ai_retry_needed"
self._save_states()
return {
"success": False,
"passed": False,
"retry_count": state.retry_count,
"review": {
"score": review.score,
"checks": review.checks,
"suggestions": review.suggestions
},
"message": f"AI审核未通过(得分{review.score}),需要重新生成(无次数限制)"
}
# AI审核通过,进入等待人工确认状态
# 注意:此时还不存储图谱,等人工确认通过后才存储
state.status = "waiting_user_confirm"
state.retry_count = 0
state.ai_review = {
"score": review.score,
"checks": review.checks,
"suggestions": review.suggestions,
"summary": review.summary
}
state.last_output = {
"episode": episode,
"content": generated_content,
"summary": self._extract_summary(generated_content)
}
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"passed": True,
"status": "waiting_user_confirm",
"review": {
"score": review.score,
"summary": review.summary
},
"preview": {
"episode": episode,
"summary": state.last_output["summary"],
"content_preview": generated_content[:500] + "..." if len(generated_content) > 500 else generated_content
},
"message": "AI审核通过,等待人工确认"
}
def user_confirm(
self,
pipeline_id: str,
action: str,
modification_note: Optional[str] = None
) -> Dict[str, Any]:
"""
处理用户确认
Args:
pipeline_id: 管道ID
action: 用户操作 (approve/modify/pause/end)
modification_note: 修改意见(action=modify时需要)
Returns:
处理结果
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
if action == "approve":
# 用户确认通过,存储图谱
if state.last_output:
save_result = self.graph_manager.save_graph(
pipeline_id=pipeline_id,
episode=state.last_output["episode"],
content=state.last_output["content"]
)
if not save_result.get("success"):
return {
"success": False,
"error": "图谱存储失败",
"detail": save_result.get("error")
}
# 检查是否完成
if state.current_episode >= state.target_episodes:
state.status = "completed"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "completed",
"message": "剧情管道已完成!所有集数已生成。"
}
# 准备生成下一集
state.status = "ready_for_next"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "ready_for_next",
"next_episode": state.current_episode + 1,
"message": f"第{state.current_episode}集已确认,准备生成第{state.current_episode + 1}集"
}
elif action == "modify":
# 用户要求修改
state.status = "modification_requested"
state.error_message = modification_note
state.retry_count = 0
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "modification_requested",
"message": f"将根据修改意见重新生成:{modification_note}"
}
elif action == "pause":
# 用户暂停
state.status = "paused"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "paused",
"message": f"管道已暂停,当前进度:第{state.current_episode}集"
}
elif action == "end":
# 用户结束
state.status = "user_ended"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"status": "user_ended",
"message": f"管道已结束,共生成{state.current_episode}集"
}
else:
return {"success": False, "error": f"未知操作:{action}"}
def get_status(self, pipeline_id: str) -> Dict[str, Any]:
"""
获取管道状态
Args:
pipeline_id: 管道ID
Returns:
状态信息
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
return {
"success": True,
"pipeline_id": pipeline_id,
"theme": state.theme,
"target_episodes": state.target_episodes,
"current_episode": state.current_episode,
"status": state.status,
"created_at": state.created_at,
"updated_at": state.updated_at,
"ai_review": state.ai_review,
"last_output": state.last_output
}
def list_pipelines(self) -> Dict[str, Any]:
"""
列出所有管道
Returns:
管道列表
"""
pipelines = []
for pid, state in self.states.items():
pipelines.append({
"pipeline_id": pid,
"theme": state.theme,
"current_episode": state.current_episode,
"target_episodes": state.target_episodes,
"status": state.status,
"updated_at": state.updated_at
})
return {
"success": True,
"count": len(pipelines),
"pipelines": pipelines
}
def resume_pipeline(self, pipeline_id: str) -> Dict[str, Any]:
"""
恢复暂停的管道
Args:
pipeline_id: 管道ID
Returns:
恢复结果
"""
if pipeline_id not in self.states:
return {"success": False, "error": "管道不存在"}
state = self.states[pipeline_id]
if state.status != "paused":
return {"success": False, "error": f"管道状态不是暂停状态:{state.status}"}
state.status = "ready_for_next"
state.updated_at = datetime.now().isoformat()
self._save_states()
return {
"success": True,
"message": f"管道已恢复,当前进度:第{state.current_episode}集",
"next_episode": state.current_episode + 1
}
def _extract_summary(self, content: str) -> str:
"""从内容中提取摘要"""
lines = content.split("\n")
for i, line in enumerate(lines):
if "剧情摘要" in line or "摘要" in line:
# 获取下一行
if i + 1 < len(lines):
return lines[i + 1].strip()
# 如果没有找到摘要,返回前100字
return content[:100] + "..." if len(content) > 100 else content
# 单例实例
_pipeline = None
def get_pipeline() -> StoryPipeline:
"""获取管道控制器单例"""
global _pipeline
if _pipeline is None:
_pipeline = StoryPipeline()
return _pipeline
# 便捷函数
def create_pipeline(theme: str, target_episodes: int, style: str = "写实电影感") -> Dict:
"""创建新管道"""
return get_pipeline().create_pipeline(theme, target_episodes, style)
def start_generation(pipeline_id: str) -> Dict:
"""开始生成"""
return get_pipeline().start_generation(pipeline_id)
def submit_episode(pipeline_id: str, episode: int, content: str) -> Dict:
"""提交剧集"""
return get_pipeline().submit_episode(pipeline_id, episode, content)
def process_ai_review(pipeline_id: str, episode: int, ai_result: str, content: str) -> Dict:
"""处理AI审核"""
return get_pipeline().process_ai_review(pipeline_id, episode, ai_result, content)
def user_confirm(pipeline_id: str, action: str, note: str = None) -> Dict:
"""用户确认"""
return get_pipeline().user_confirm(pipeline_id, action, note)
def get_status(pipeline_id: str) -> Dict:
"""获取状态"""
return get_pipeline().get_status(pipeline_id)
def list_pipelines() -> Dict:
"""列出管道"""
return get_pipeline().list_pipelines()
def resume_pipeline(pipeline_id: str) -> Dict:
"""恢复管道"""
return get_pipeline().resume_pipeline(pipeline_id)
```