trading-agents
基于 AgentScope 的多智能体股票诊断系统。一键执行完整诊断流程,自动生成技术面、基本面、舆情面分析、研究员辩论、交易员决策、风控讨论及最终投资决策报告。
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-trading-agents
Repository
Skill path: skills/ganyu21/trading-agents
基于 AgentScope 的多智能体股票诊断系统。一键执行完整诊断流程,自动生成技术面、基本面、舆情面分析、研究员辩论、交易员决策、风控讨论及最终投资决策报告。
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install trading-agents into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding trading-agents to shared team environments
- Use trading-agents for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: trading-agents
description: 基于 AgentScope 的多智能体股票诊断系统。一键执行完整诊断流程,自动生成技术面、基本面、舆情面分析、研究员辩论、交易员决策、风控讨论及最终投资决策报告。
---
# Trading Agents 股票投资分析系统
基于 AgentScope 框架的多智能体协作股票诊断系统,**一键执行完整诊断流程**。
## 核心特性
✅ **一键全自動执行** - 单次调用完成所有角色分析和报告生成
✅ **多智能体协作** - 分析师、研究员、交易员、风控、基金经理五层决策
✅ **完整报告输出** - 自动生成 7 份专业 MD 报告和 1 份合并 PDF
✅ **后台运行支持** - 避免长任务超时,自动保存中间结果
✅ **数据驱动** - 集成 Tushare/AKShare 实时金融数据
## 系统架构
### 智能体团队架构
1. **分析师团队** (ReActAgent) - 数据采集
- MarketAnalyst: 技术面分析
- FundamentalsAnalyst: 基本面分析
- NewsAnalyst: 舆情面分析
2. **研究员团队** (AgentBase) - 多空辩论
- BullishResearcher: 看多研究员
- BearishResearcher: 看空研究员
- ResearchFacilitator: 研究主持人
3. **交易与风控团队** - 决策与风控
- Trader: 交易员
- AggressiveRisk: 激进型风控
- NeutralRisk: 中性型风控
- ConservativeRisk: 保守型风控
- RiskFacilitator: 风控协调人
4. **基金经理** - 最终决策
- Manager: 做出最终投资决策
**所有角色通过一次调用自动完成:**
```python
advisor = StockAdvisorSystem()
result = advisor.diagnose("600519.SH") # 一键执行所有角色
advisor.save_report(result) # 生成所有报告
```
### 一键自动执行所有角色(推荐)
**这是最简单也是最推荐的使用方式,一次调用完成所有分析并生成所有报告:**
```python
from trading_agents import StockAdvisorSystem
# 创建系统实例
advisor = StockAdvisorSystem()
# 一键执行完整诊断(自动完成所有角色)
result = advisor.diagnose("600519.SH", base_report_dir="report")
# 保存所有报告(JSON + PDF)
advisor.save_report(result)
print(f"✅ 所有报告已生成:{result['report_dir']}")
print(f"📊 最终决策:{result['final_decision']}")
```
**以上代码会自动完成以下所有步骤:**
```
阶段 1: 数据采集(分析师团队)
├─ MarketAnalyst → 技术面分析报告.md
├─ FundamentalsAnalyst → 基本面分析报告.md
└─ NewsAnalyst → 舆情面分析报告.md
↓
阶段 2: 研究员辩论
├─ BullishResearcher vs BearishResearcher
└─ → 研究员辩论报告.md
↓
阶段 3: 交易员决策
├─ Trader 综合分析所有报告
└─ → 交易员决策报告.md
↓
阶段 4: 风险管理讨论
├─ AggressiveRisk vs NeutralRisk vs ConservativeRisk
└─ → 风险管理讨论报告.md
↓
阶段 5: 基金经理最终决策
├─ Manager 做出最终决策
└─ → 最终决策报告.md
↓
阶段 6: 报告生成
├─ complete_diagnosis_report.json
└─ {股票名}_{代码}_{时间}_{决策}.pdf
```
### 分步执行(可选)
```python
from trading_agents import StockAdvisorSystem
advisor = StockAdvisorSystem()
# 阶段 1: 分析师数据采集
result = advisor.diagnose("600519.SH") # 已完成所有分析师报告
# 阶段 2-5: 后续流程自动完成
# 研究员辩论 → 交易员决策 → 风控讨论 → 经理决策
# 阶段 6: 保存所有报告
advisor.save_report(result)
```
## 快速开始
### 一键完整诊断(推荐)
#### 方式 1:使用 OpenClaw 技能调用
```python
from trading_agents import StockAdvisorSystem
# 创建系统实例
advisor = StockAdvisorSystem()
# 一键执行完整诊断(自动完成所有角色分析并生成所有报告)
result = advisor.diagnose("600519.SH", base_report_dir="report")
# 保存完整报告(包含 JSON 和合并 PDF)
advisor.save_report(result)
print(f"✅ 所有报告已生成:{result['report_dir']}")
print(f"📊 最终决策:{result['final_decision']}")
```
#### 方式 2:命令行执行
```bash
# 直接运行诊断脚本
python3 scripts/stock_advisor.py --stock 600519.SH
# 或使用 nohup 后台运行(适合长时间任务)
nohup python3 scripts/stock_advisor.py --stock 600519.SH > diagnose.log 2>&1 &
```
#### 方式 3:后台运行模式(防超时)
```bash
# 使用后台诊断脚本
python3 scripts/background_diagnose.py 600519.SH 贵州茅台
```
### ⚠️ 重要提示
完整诊断流程需要 **10-15 分钟**,包含以下自动执行步骤:
1. ✅ **数据采集** - 3 位分析师独立获取数据并生成报告
2. ✅ **研究员辩论** - 看多看空双方辩论形成共识
3. ✅ **交易员决策** - 综合分析报告制定交易策略
4. ✅ **风控讨论** - 三种风险偏好评估风险
5. ✅ **经理决策** - 做出最终投资决策
6. ✅ **报告生成** - 自动生成所有 MD 报告和合并 PDF
**推荐使用后台运行模式**以避免会话超时:
- 详见 [BACKGROUND_RULES.md](BACKGROUND_RULES.md)
- 自动保存中间结果,防止中断丢失数据
### 使用方法
**作为 OpenClaw 技能使用:**
将本目录复制到 OpenClaw 项目的 skills 目录:
```bash
cp -r trading-agents /path/to/openclaw/skills/
```
**OpenClaw 中调用示例(一键完成所有角色):**
```python
# OpenClaw 会自动加载技能
from trading_agents import StockAdvisorSystem
# 创建系统实例
advisor = StockAdvisorSystem()
# 一键执行完整诊断(自动完成所有角色分析并生成所有报告)
result = advisor.diagnose("600519.SH", base_report_dir="report")
# 保存所有报告(JSON + PDF)
advisor.save_report(result)
# 输出包含:
# - MarketAnalyst_技术面分析.md
# - FundamentalsAnalyst_基本面分析.md
# - NewsAnalyst_舆情面分析.md
# - 研究员辩论报告.md
# - 交易员决策报告.md
# - 风险管理讨论报告.md
# - 最终决策报告.md
# - complete_diagnosis_report.json
# - {股票名}_{代码}_{时间}_{决策}.pdf
```
**作为 Python 包安装:**
```bash
# 安装整个技能包
pip install /path/to/trading-agents/
# 或进入 scripts 目录安装核心代码
pip install /path/to/trading-agents/scripts/
```
**独立使用:**
```python
from trading_agents import StockAdvisorSystem
# 创建系统实例
advisor = StockAdvisorSystem()
# 一键诊断股票(自动执行所有角色并生成所有报告)
result = advisor.diagnose("600519.SH", base_report_dir="report")
# 保存完整报告
advisor.save_report(result)
print(f"报告目录:{result['report_dir']}")
print(f"最终决策:{result['final_decision']}")
```
### 使用特定模型
```python
from trading_agents.config import config
# 切换模型
config.model_name = "qwen-max-2025-01-25" # 或其他支持的模型
# 支持的模型:
# - kimi-k2.5
# - qwen-max-2025-01-25
# - qwen3.5-plus
# - glm-5
# - MiniMax/MiniMax-M2.5
```
## 配置说明
### 环境变量
```bash
# Tushare API Token
TUSHARE_TOKEN=your_token_here
# 阿里云百炼 API Key
ALIYUN_BAILIAN_API_KEY=your_key_here
```
### 配置参数
```python
from trading_agents.config import config
# 辩论轮数
config.debate_rounds = 2
# 风控讨论轮数
config.risk_discussion_rounds = 2
# 权重配置
config.tech_weight = 0.25 # 技术面权重
config.fund_weight = 0.35 # 基本面权重
config.news_weight = 0.20 # 舆情面权重
config.research_weight = 0.20 # 研究员共识权重
```
## 输出报告(自动生成)
系统自动生成以下报告文件:
### 分析师报告(阶段 1)
- `MarketAnalyst_技术面分析.md` - 技术面分析报告
- `FundamentalsAnalyst_基本面分析.md` - 基本面分析报告
- `NewsAnalyst_舆情面分析.md` - 舆情面分析报告
### 研究员辩论(阶段 2)
- `研究员辩论报告.md` - 看多看空辩论记录
### 交易员决策(阶段 3)
- `交易员决策报告.md` - 交易策略报告
### 风控讨论(阶段 4)
- `风险管理讨论报告.md` - 风险评估报告
### 经理决策(阶段 5)
- `最终决策报告.md` - 最终投资决策
### 完整报告(阶段 6)
- `complete_diagnosis_report.json` - 完整 JSON 数据
- `{股票名}_{代码}_{时间}_{决策}.pdf` - 合并 PDF 报告
**所有报告通过一键调用自动生成:**
```python
advisor = StockAdvisorSystem()
result = advisor.diagnose("600519.SH")
advisor.save_report(result) # 自动生成所有 MD 和 PDF
```
## 核心组件(可选)
> 注意:推荐使用一键调用方式,以上代码已自动完成所有角色。以下为高级用法,可按需单独调用。
### 分析师智能体
```python
from trading_agents.agents import (
MarketAnalystAgent,
FundamentalsAnalystAgent,
NewsAnalystAgent
)
# 技术面分析
market_analyst = MarketAnalystAgent()
report = market_analyst.analyze("600519.SH")
# 基本面分析
fund_analyst = FundamentalsAnalystAgent()
report = fund_analyst.analyze("600519.SH")
# 舆情分析
news_analyst = NewsAnalystAgent()
report = news_analyst.analyze("600519.SH", "贵州茅台")
```
### 数据工具
```python
from trading_agents.tools import TushareTools, AKShareTools
# Tushare 数据
tushare = TushareTools(token)
data = tushare.get_stock_daily("600519.SH", days=60)
indicators = tushare.get_technical_indicators("600519.SH")
# AKShare 数据
akshare = AKShareTools()
news = akshare.get_stock_news("600519.SH", days=7)
sentiment = akshare.get_market_sentiment()
```
### AgentScope 工具包
```python
from trading_agents.tools import (
create_market_analyst_toolkit,
create_fundamentals_analyst_toolkit,
create_news_analyst_toolkit,
create_stock_toolkit
)
# 创建工具包
toolkit = create_stock_toolkit()
```
## 批量诊断
```python
from trading_agents.batch_diagnose import batch_diagnose
stocks = ["600519.SH", "000858.SZ", "002594.SZ"]
results = batch_diagnose(stocks, output_dir="report/batch")
```
## 依赖安装
```bash
# 基础依赖
pip install agentscope>=0.0.5
pip install tushare>=1.2.89
pip install akshare>=1.12.0
pip install pandas>=2.0.0
pip install numpy>=1.24.0
pip install requests>=2.31.0
pip install openai>=1.0.0
pip install fpdf2>=2.8.0
pip install python-dotenv
# 阿里云夸克搜索 SDK(可选)
pip install alibabacloud_iqs20241111>=1.0.0
pip install alibabacloud_tea_openapi>=0.3.0
# Web 界面(可选)
pip install streamlit>=1.28.0
```
## 项目结构
### Skill 目录结构 (OpenClaw 兼容)
```
trading-agents/ # 技能根目录
├── SKILL.md # Skill 说明文档 (必需)
├── __init__.py # Python 包标识 (OpenClaw 需要)
├── setup.py # 安装配置
└── scripts/ # 完整源代码
├── __init__.py
├── stock_advisor.py # 主系统入口
├── config.py # 系统配置
├── batch_diagnose.py # 批量诊断
├── streamlit_app.py # Web 界面
├── requirements.txt # 依赖清单
├── agents/
│ ├── __init__.py
│ ├── analysts.py # 分析师团队 (ReActAgent)
│ ├── researchers.py # 研究员团队 (AgentBase)
│ ├── trader.py # 交易员
│ ├── risk_managers.py # 风险管理团队
│ └── manager.py # 基金经理
└── tools/
├── __init__.py
├── tushare_tools.py # Tushare 数据接口
├── akshare_tools.py # AKShare 数据接口
└── toolkit.py # AgentScope 工具注册
```
### 移植到其他项目
只需复制整个 `trading-agents` 目录:
```bash
# 从源项目复制到目标项目
cp -r /source/project/.qoder/skills/trading-agents \
/target/project/.qoder/skills/
# 或在目标项目中克隆后复制
mkdir -p /target/project/.qoder/skills/
cp -r trading-agents /target/project/.qoder/skills/
```
## 注意事项
1. **API Token**: 需要配置 TUSHARE_TOKEN 和 ALIYUN_BAILIAN_API_KEY
2. **中文字体**: PDF 生成需要系统中文字体支持
3. **网络连接**: 需要访问阿里云百炼 API
4. **数据限制**: Tushare 免费版有数据调用限制
5. **执行时间**: 完整诊断流程需要 10-15 分钟,建议使用后台运行模式
## 总结
### 为什么选择 Trading Agents?
✅ **一键完成所有角色** - 无需手动调用多个函数,一次诊断完成所有分析
✅ **自动化报告生成** - 自动生成 7 份 MD 报告和 1 份合并 PDF,无需手动整理
✅ **专业智能体团队** - 5 层决策流程,模拟专业投资机构的工作流
✅ **实时数据驱动** - 集成 Tushare/AKShare 获取最新市场数据
✅ **灵活可配置** - 支持多种模型、权重、辩论轮数等参数配置
### 快速上手
```python
from trading_agents import StockAdvisorSystem
# 一行代码完成股票诊断
result = StockAdvisorSystem().diagnose("600519.SH")
# 一行代码保存所有报告
StockAdvisorSystem().save_report(result)
```
就这么简单!所有复杂的分析、辩论、决策过程都由智能体团队自动完成。
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### scripts/stock_advisor.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AgentScope 股票诊断智能体系统
基于 AgentScope 框架实现的多智能体协作股票分析系统
核心功能:
1. 分析师团队 - 使用 ReActAgent 自主调用工具获取数据
2. 研究员团队 - 使用 AgentBase 实现看多看空辩论
3. 交易员 - 综合分析报告制定交易策略
4. 风险管理团队 - 评估交易策略的风险
5. 基金经理 - 做出最终投资决策
"""
import os
import sys
import json
import logging
import glob
import re
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 设置日志级别,过滤 AgentScope 的 WARNING 日志(如 thinking 块警告)
logging.getLogger("agentscope").setLevel(logging.ERROR)
# 过滤 OpenAI formatter 的 thinking 块警告
logging.getLogger("_openai_formatter").setLevel(logging.ERROR)
import agentscope
from dotenv import load_dotenv
load_dotenv()
from .config import config
from .tools.tushare_tools import TushareTools
# 分析师团队(ReActAgent)
from .agents.analysts import (
MarketAnalystAgent,
FundamentalsAnalystAgent,
NewsAnalystAgent,
)
# 研究员团队(AgentBase)
from .agents.researchers import (
BullishResearcherAgent,
BearishResearcherAgent,
ResearchFacilitatorAgent,
)
# 交易员和风控(原实现)
from .agents.trader import Trader
from .agents.risk_managers import (
AggressiveRisk, NeutralRisk, ConservativeRisk, RiskFacilitator
)
from .agents.manager import Manager
@dataclass
class GlobalState:
"""全局状态管理"""
ts_code: str = ""
stock_name: str = ""
report_dir: str = ""
analyst_reports: Dict[str, str] = field(default_factory=dict)
trader_report: str = ""
research_debate: str = ""
risk_discussion: str = ""
final_decision: str = ""
class StockAdvisorSystem:
"""
股票诊断智能体系统
使用 AgentScope 框架实现:
- 分析师使用 ReActAgent,可自主调用工具
- 研究员使用 AgentBase,支持标准消息传递
- 全流程使用 AgentScope 框架特性
"""
def __init__(self):
"""初始化系统"""
print("=" * 60)
print("AgentScope 股票诊断智能体系统")
print("=" * 60)
# 初始化 AgentScope
agentscope.init(project="股票诊断系统")
# 初始化全局状态
self.state = GlobalState()
# 初始化数据工具(用于获取基本信息)
self.tushare = TushareTools(config.tushare_token)
# 保存配置引用
self.config = config
# 初始化智能体
self._init_agents()
print("系统初始化完成")
def _init_agents(self):
"""初始化所有智能体"""
print("\n初始化智能体...")
# 分析师团队(ReActAgent)
self.market_analyst = MarketAnalystAgent()
self.fundamentals_analyst = FundamentalsAnalystAgent()
self.news_analyst = NewsAnalystAgent()
print(" 分析师团队就绪 (ReActAgent)")
# 研究员团队(AgentBase)
self.bullish_researcher = BullishResearcherAgent()
self.bearish_researcher = BearishResearcherAgent()
self.research_facilitator = ResearchFacilitatorAgent()
print(" 研究员团队就绪 (AgentBase)")
# 交易员
self.trader = Trader()
print(" 交易员就绪")
# 风险管理团队
self.aggressive_risk = AggressiveRisk()
self.neutral_risk = NeutralRisk()
self.conservative_risk = ConservativeRisk()
self.risk_facilitator = RiskFacilitator()
print(" 风险管理团队就绪")
# 基金经理
self.manager = Manager()
print(" 基金经理就绪")
def diagnose(self, ts_code: str, base_report_dir: str = "report") -> Dict:
"""
诊断股票
Args:
ts_code: 股票代码
base_report_dir: 报告目录
Returns:
诊断结果字典
"""
print(f"\n{'=' * 60}")
print(f"开始诊断股票: {ts_code}")
print(f"{'=' * 60}")
start_time = datetime.now()
# 创建报告目录
timestamp = start_time.strftime('%Y%m%d_%H%M%S')
ts_code_safe = ts_code.replace('.', '_')
report_subdir = f"{ts_code_safe}_{timestamp}"
report_dir = os.path.join(base_report_dir, report_subdir)
if not os.path.exists(report_dir):
os.makedirs(report_dir)
print(f"\n报告目录: {report_dir}")
# 初始化状态
self.state = GlobalState(ts_code=ts_code, report_dir=report_dir)
# 获取股票基本信息
basic_info = self.tushare.get_stock_basic(ts_code)
self.state.stock_name = basic_info.get('name', ts_code)
print(f"\n股票名称: {self.state.stock_name}")
# 阶段1: 数据采集(ReActAgent)
print(f"\n{'─' * 40}")
print("阶段1: 数据采集(分析师团队)")
print(f"{'─' * 40}")
self._run_analysts()
# 阶段2: 研究员辩论(AgentBase)
print(f"\n{'─' * 40}")
print("阶段2: 研究员辩论")
print(f"{'─' * 40}")
self._run_research_debate()
# 阶段3: 交易员决策
print(f"\n{'─' * 40}")
print("阶段3: 交易员决策")
print(f"{'─' * 40}")
self._run_trader()
# 阶段4: 风险管理讨论
print(f"\n{'─' * 40}")
print("阶段4: 风险管理讨论")
print(f"{'─' * 40}")
self._run_risk_discussion()
# 阶段5: 基金经理最终决策
print(f"\n{'─' * 40}")
print("阶段5: 基金经理最终决策")
print(f"{'─' * 40}")
self._run_final_decision()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"\n{'=' * 60}")
print(f"诊断完成! 耗时: {duration:.1f}秒")
print(f"{'=' * 60}")
# 返回结果
return {
"ts_code": self.state.ts_code,
"stock_name": self.state.stock_name,
"report_dir": self.state.report_dir,
"analyst_reports": self.state.analyst_reports,
"research_debate": self.state.research_debate,
"trader_report": self.state.trader_report,
"risk_discussion": self.state.risk_discussion,
"final_decision": self.state.final_decision,
"diagnosis_time": datetime.now().isoformat(),
"duration_seconds": duration,
}
def _run_analysts(self):
"""运行分析师团队"""
print("\n[MarketAnalyst] 正在分析技术面...")
market_report = self.market_analyst.analyze(self.state.ts_code)
self.state.analyst_reports["MarketAnalyst"] = market_report
self._save_analyst_report("MarketAnalyst", "技术面分析", market_report)
print(" 技术面分析完成")
print("\n[FundamentalsAnalyst] 正在分析基本面...")
fund_report = self.fundamentals_analyst.analyze(self.state.ts_code)
self.state.analyst_reports["FundamentalsAnalyst"] = fund_report
self._save_analyst_report("FundamentalsAnalyst", "基本面分析", fund_report)
print(" 基本面分析完成")
print("\n[NewsAnalyst] 正在分析舆情面...")
news_report = self.news_analyst.analyze(self.state.ts_code, self.state.stock_name)
self.state.analyst_reports["NewsAnalyst"] = news_report
self._save_analyst_report("NewsAnalyst", "舆情面分析", news_report)
print(" 舆情面分析完成")
def _run_research_debate(self):
"""运行研究员辩论"""
print("\n[ResearchFacilitator] 主持研究员辩论...")
self.state.research_debate = self.research_facilitator.facilitate_debate_sync(
self.bullish_researcher,
self.bearish_researcher,
self.state.analyst_reports,
rounds=config.debate_rounds
)
self._save_report("研究员辩论报告", self.state.research_debate)
print(" 研究员辩论完成")
def _run_trader(self):
"""运行交易员决策"""
print("\n[Trader] 正在制定交易决策...")
self.state.trader_report = self.trader.make_decision(
self.state.analyst_reports,
self.state.research_debate
)
self._save_report("交易员决策报告", self.state.trader_report)
print(" 交易决策完成")
def _run_risk_discussion(self):
"""运行风险管理讨论"""
print("\n[RiskFacilitator] 主持风险管理讨论...")
self.state.risk_discussion = self.risk_facilitator.facilitate_discussion(
self.aggressive_risk,
self.neutral_risk,
self.conservative_risk,
self.state.trader_report,
rounds=config.risk_discussion_rounds
)
self._save_report("风险管理讨论报告", self.state.risk_discussion)
print(" 风险管理讨论完成")
def _run_final_decision(self):
"""运行基金经理最终决策"""
print("\n[Manager] 正在做出最终决策...")
self.state.final_decision = self.manager.make_final_decision(
self.state.ts_code,
self.state.stock_name,
self.state.analyst_reports,
self.state.research_debate,
self.state.trader_report,
self.state.risk_discussion
)
self._save_report("最终决策报告", self.state.final_decision)
print(" 最终决策完成")
def _save_analyst_report(self, analyst_name: str, report_title: str, report_content: str):
"""保存分析师报告"""
filename = f"{analyst_name}_{report_title}.md"
filepath = os.path.join(self.state.report_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(report_content)
print(f" 已保存: {filename}")
def _save_report(self, report_name: str, report_content: str):
"""保存报告"""
filename = f"{report_name.replace(' ', '_')}.md"
filepath = os.path.join(self.state.report_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(report_content)
print(f" 已保存: {filename}")
def save_report(self, result: Dict):
"""保存完整诊断报告(JSON格式)"""
report_dir = result.get('report_dir', 'report')
json_filename = "complete_diagnosis_report.json"
json_path = os.path.join(report_dir, json_filename)
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"\n完整JSON报告已保存: {json_path}")
# 合并所有MD文件为PDF
self._merge_md_to_pdf(report_dir)
return json_path
def _get_md_files_in_order(self, report_dir: str) -> List[str]:
"""
按照生成顺序获取MD文件列表
生成顺序:
1. MarketAnalyst_技术面分析.md
2. FundamentalsAnalyst_基本面分析.md
3. NewsAnalyst_舆情面分析.md
4. 研究员辩论报告.md
5. 交易员决策报告.md
6. 风险管理讨论报告.md
7. 最终决策报告.md
"""
# 定义文件顺序映射
order_mapping = {
'MarketAnalyst_技术面分析.md': 1,
'FundamentalsAnalyst_基本面分析.md': 2,
'NewsAnalyst_舆情面分析.md': 3,
'研究员辩论报告.md': 4,
'交易员决策报告.md': 5,
'风险管理讨论报告.md': 6,
'最终决策报告.md': 7,
}
# 获取所有MD文件
md_files = glob.glob(os.path.join(report_dir, '*.md'))
# 按预定义顺序排序
def get_sort_key(filepath):
filename = os.path.basename(filepath)
return order_mapping.get(filename, 999)
sorted_files = sorted(md_files, key=get_sort_key)
return sorted_files
def _merge_md_to_pdf(self, report_dir: str) -> str:
"""
将所有MD文件按生成顺序合并为PDF
Args:
report_dir: 报告目录路径
Returns:
PDF文件路径
"""
# 获取按顺序排列的MD文件
md_files = self._get_md_files_in_order(report_dir)
if not md_files:
print("\n警告: 未找到MD文件,跳过PDF生成")
return ""
print(f"\n开始合并 {len(md_files)} 个MD文件为PDF...")
# 读取所有MD文件内容
md_contents = []
for i, md_file in enumerate(md_files, 1):
filename = os.path.basename(md_file)
print(f" [{i}/{len(md_files)}] 读取: {filename}")
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
md_contents.append({
'filename': filename,
'content': content
})
# 使用替代方法生成PDF
return self._merge_md_to_pdf_alternative(report_dir, md_contents)
def _merge_md_to_pdf_alternative(self, report_dir: str, md_contents: List[Dict]) -> str:
"""
使用 fpdf2 直接生成 PDF
直接解析 Markdown 内容渲染,不经过 HTML 中间层
"""
return self._generate_pdf_with_fpdf2(report_dir, md_contents)
def _generate_pdf_with_fpdf2(self, report_dir: str, md_contents: List[Dict]) -> str:
"""
使用 fpdf2 生成 PDF
直接解析 Markdown 内容渲染为 PDF
"""
try:
from fpdf import FPDF # pyright: ignore[reportMissingModuleSource]
except ImportError:
print("正在安装 fpdf2...")
import subprocess
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'fpdf2'])
from fpdf import FPDF # pyright: ignore[reportMissingModuleSource]
# 获取中文字体路径
font_path = self._get_chinese_font_path()
class PDF(FPDF):
def __init__(self, font_path=None, ts_code="", stock_name=""):
super().__init__()
self.font_path = font_path
self.ts_code = ts_code
self.stock_name = stock_name
self.has_font = font_path is not None
def header(self):
if self.page_no() > 1:
if self.has_font:
self.set_font('CustomFont', '', 9)
else:
self.set_font('Arial', '', 9)
self.set_text_color(128, 128, 128)
self.cell(0, 8, f'股票诊断报告 - {self.stock_name} ({self.ts_code})', align='C')
self.ln(8)
self.set_text_color(0, 0, 0)
def footer(self):
self.set_y(-15)
if self.has_font:
self.set_font('CustomFont', '', 8)
else:
self.set_font('Arial', '', 8)
self.set_text_color(128, 128, 128)
self.cell(0, 10, f'第 {self.page_no()} 页', align='C')
self.set_text_color(0, 0, 0)
pdf = PDF(font_path=font_path, ts_code=self.state.ts_code, stock_name=self.state.stock_name)
pdf.set_auto_page_break(auto=True, margin=20)
pdf.set_margins(20, 20, 20)
if font_path:
try:
pdf.add_font('CustomFont', '', font_path)
pdf.add_font('CustomFont', 'B', font_path)
pdf.has_font = True
print(f" 成功加载字体: {font_path}")
except Exception as e:
print(f"警告: 无法加载中文字体: {e}")
pdf.has_font = False
else:
print("警告: 未找到中文字体,PDF可能无法正确显示中文")
pdf.has_font = False
# 添加封面
self._add_cover_page(pdf)
# 添加各章节内容
for idx, md_item in enumerate(md_contents, 1):
filename = md_item['filename']
content = md_item['content']
section_title = filename.replace('.md', '')
print(f" 生成PDF章节: {section_title}")
pdf.add_page()
self._set_font(pdf, 'B', 16)
pdf.set_text_color(30, 30, 30)
pdf.cell(0, 12, f'{idx}. {section_title}')
pdf.ln(12)
pdf.set_draw_color(200, 200, 200)
pdf.line(20, pdf.get_y(), 190, pdf.get_y())
pdf.ln(8)
self._render_markdown_to_pdf(pdf, content)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
diagnosis_result = self._extract_diagnosis_result()
pdf_filename = f"{self.state.stock_name}_{self.state.ts_code}_{timestamp}_{diagnosis_result}.pdf"
pdf_path = os.path.join(report_dir, pdf_filename)
pdf.output(pdf_path)
print(f"\nPDF报告已保存: {pdf_path}")
return pdf_path
def _add_cover_page(self, pdf):
"""添加封面页"""
pdf.add_page()
self._set_font(pdf, 'B', 28)
pdf.set_text_color(30, 30, 30)
pdf.ln(50)
pdf.cell(0, 15, '股票诊断报告', align='C')
pdf.ln(25)
self._set_font(pdf, '', 22)
pdf.set_text_color(50, 50, 50)
pdf.cell(0, 12, self.state.stock_name, align='C')
pdf.ln(12)
self._set_font(pdf, '', 16)
pdf.set_text_color(100, 100, 100)
pdf.cell(0, 10, self.state.ts_code, align='C')
pdf.ln(40)
self._set_font(pdf, '', 12)
pdf.cell(0, 8, f'生成时间: {datetime.now().strftime("%Y年%m月%d日 %H:%M:%S")}', align='C')
pdf.ln(80)
pdf.set_text_color(150, 150, 150)
self._set_font(pdf, '', 10)
pdf.cell(0, 8, '本报告由AI智能体自动生成,仅供参考', align='C')
pdf.set_text_color(0, 0, 0)
def _set_font(self, pdf, style: str = '', size: int = 11):
"""设置字体"""
if pdf.has_font:
pdf.set_font('CustomFont', style, size)
else:
pdf.set_font('Arial', style, size)
def _render_markdown_to_pdf(self, pdf, content: str):
"""
将 Markdown 内容直接渲染到 PDF
支持标题、段落、列表、表格、代码块、引用等
"""
import re
lines = content.split('\n')
i = 0
in_code_block = False
in_table = False
table_data = []
while i < len(lines):
line = lines[i]
# 代码块处理
if line.strip().startswith('```'):
if in_code_block:
in_code_block = False
pdf.ln(3)
else:
in_code_block = True
pdf.ln(2)
i += 1
continue
if in_code_block:
self._set_font(pdf, '', 9)
pdf.set_fill_color(245, 245, 245)
code_line = line if line else ' '
available_width = pdf.w - pdf.l_margin - pdf.r_margin - 4
pdf.set_x(pdf.l_margin + 2)
pdf.multi_cell(available_width, 5, code_line, fill=True)
i += 1
continue
# 表格处理
if '|' in line and line.strip().startswith('|'):
if not in_table:
in_table = True
table_data = []
cells = [cell.strip() for cell in line.strip().split('|')[1:-1]]
if cells and not all(c.replace('-', '').replace(':', '') == '' for c in cells):
table_data.append(cells)
i += 1
continue
elif in_table:
self._render_table(pdf, table_data)
in_table = False
table_data = []
# 引用块处理
if line.strip().startswith('>'):
quote_text = line.strip()[1:].strip()
self._render_quote(pdf, quote_text)
i += 1
continue
# 空行处理
if not line.strip():
pdf.ln(2)
i += 1
continue
# 分隔线处理
if line.strip() in ['---', '***', '___']:
pdf.ln(3)
pdf.set_draw_color(200, 200, 200)
pdf.line(20, pdf.get_y(), 190, pdf.get_y())
pdf.ln(3)
i += 1
continue
# 标题处理
heading_match = re.match(r'^(#{1,6})\s+(.+)$', line)
if heading_match:
level = len(heading_match.group(1))
text = heading_match.group(2)
self._render_heading(pdf, level, text)
i += 1
continue
# 无序列表处理
list_match = re.match(r'^(\s*)[-*+]\s+(.+)$', line)
if list_match:
indent = len(list_match.group(1)) // 2
text = list_match.group(2)
self._render_list_item(pdf, indent, text)
i += 1
continue
# 有序列表处理
ordered_match = re.match(r'^(\s*)\d+\.\s+(.+)$', line)
if ordered_match:
indent = len(ordered_match.group(1)) // 2
text = ordered_match.group(2)
self._render_list_item(pdf, indent, text, ordered=True)
i += 1
continue
# 普通段落处理
self._render_paragraph(pdf, line)
i += 1
if in_table and table_data:
self._render_table(pdf, table_data)
def _render_heading(self, pdf, level: int, text: str):
"""渲染标题"""
sizes = {1: 18, 2: 15, 3: 13, 4: 12, 5: 11, 6: 11}
size = sizes.get(level, 11)
pdf.ln(4)
self._set_font(pdf, 'B', size)
pdf.set_text_color(30, 30, 30)
text = self._clean_markdown(text)
pdf.multi_cell(0, 8, text)
pdf.ln(2)
self._set_font(pdf, '', 11)
pdf.set_text_color(0, 0, 0)
def _render_paragraph(self, pdf, text: str):
"""渲染段落"""
self._set_font(pdf, '', 11)
text = self._clean_markdown(text)
available_width = pdf.w - pdf.l_margin - pdf.r_margin
pdf.multi_cell(available_width, 6, text)
pdf.ln(2)
def _render_list_item(self, pdf, indent: int, text: str, ordered: bool = False):
"""渲染列表项"""
self._set_font(pdf, '', 11)
text = self._clean_markdown(text)
indent_width = 8 * (indent + 1)
pdf.set_x(pdf.l_margin + indent_width)
marker = '• ' if not ordered else '◦ '
available_width = pdf.w - pdf.l_margin - pdf.r_margin - indent_width
pdf.multi_cell(available_width, 6, marker + text)
pdf.ln(1)
def _render_quote(self, pdf, text: str):
"""渲染引用块"""
self._set_font(pdf, '', 10)
pdf.set_text_color(80, 80, 80)
text = self._clean_markdown(text)
x = pdf.get_x()
y = pdf.get_y()
pdf.set_fill_color(240, 240, 240)
pdf.rect(x, y, 3, 6, 'F')
pdf.set_x(x + 6)
available_width = pdf.w - pdf.l_margin - pdf.r_margin - 6
pdf.multi_cell(available_width, 6, text, fill=True)
pdf.ln(2)
self._set_font(pdf, '', 11)
pdf.set_text_color(0, 0, 0)
def _render_table(self, pdf, table_data):
"""渲染表格"""
if not table_data:
return
pdf.ln(2)
self._set_font(pdf, '', 10)
num_cols = max(len(row) for row in table_data)
available_width = pdf.w - pdf.l_margin - pdf.r_margin
col_width = available_width / num_cols
if table_data:
self._set_font(pdf, 'B', 10)
pdf.set_fill_color(240, 240, 240)
for cell in table_data[0]:
cell = self._clean_markdown(cell)
pdf.cell(col_width, 7, cell[:25], border=1, fill=True)
pdf.ln()
self._set_font(pdf, '', 10)
pdf.set_fill_color(255, 255, 255)
for row in table_data[1:]:
for j, cell in enumerate(row):
if j < num_cols:
cell = self._clean_markdown(cell)
pdf.cell(col_width, 7, cell[:25], border=1)
pdf.ln()
pdf.ln(2)
def _extract_diagnosis_result(self) -> str:
"""从最终决策中提取诊断结果"""
if not self.state.final_decision:
return "未确定"
decision = self.state.final_decision.upper()
# 定义关键词映射
if any(word in decision for word in ['买入', '买进', 'BUY', '强烈买入']):
return "买入"
elif any(word in decision for word in ['卖出', '抛售', 'SELL', '强烈卖出']):
return "卖出"
elif any(word in decision for word in ['持有', '观望', 'HOLD', '中性']):
return "持有"
else:
return "综合诊断"
def _clean_markdown(self, text: str) -> str:
"""清理Markdown标记"""
import re
# 移除粗体标记
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'__(.+?)__', r'\1', text)
# 移除斜体标记
text = re.sub(r'\*(.+?)\*', r'\1', text)
text = re.sub(r'_(.+?)_', r'\1', text)
# 移除代码标记
text = re.sub(r'`(.+?)`', r'\1', text)
# 移除链接标记,保留文本
text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text)
return text
def _get_chinese_font_path(self) -> Optional[str]:
"""获取中文字体路径"""
# 常见中文字体路径(macOS)
possible_fonts = [
'/System/Library/Fonts/PingFang.ttc',
'/System/Library/Fonts/STHeiti Light.ttc',
'/System/Library/Fonts/STHeiti Medium.ttc',
'/System/Library/Fonts/Hiragino Sans GB.ttc',
'/Library/Fonts/Arial Unicode.ttf',
'/System/Library/Fonts/Helvetica.ttc',
]
# Linux
possible_fonts.extend([
'/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc',
'/usr/share/fonts/truetype/wqy/wqy-microhei.ttc',
'/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc',
'/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc',
])
# Windows
possible_fonts.extend([
'C:/Windows/Fonts/simhei.ttf',
'C:/Windows/Fonts/simsun.ttc',
'C:/Windows/Fonts/msyh.ttc',
'C:/Windows/Fonts/msyhbd.ttc',
'C:/Windows/Fonts/arialuni.ttf',
])
for font_path in possible_fonts:
if os.path.exists(font_path):
print(f" 使用字体: {font_path}")
return font_path
return None
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(
description='AgentScope 股票诊断智能体系统'
)
parser.add_argument(
'--stock', '-s',
type=str,
default='600519.SH',
help='股票代码(默认: 600519.SH)'
)
parser.add_argument(
'--output', '-o',
type=str,
default='report',
help='输出目录(默认: report)'
)
args = parser.parse_args()
# 创建系统实例
advisor = StockAdvisorSystem()
# 执行诊断
result = advisor.diagnose(args.stock, args.output)
# 保存完整报告
advisor.save_report(result)
# 输出最终决策
print("\n" + "=" * 60)
print("最终诊股意见")
print("=" * 60)
print(result['final_decision'])
print("\n" + "=" * 60)
print(f"所有报告已保存到: {result['report_dir']}")
print("=" * 60)
return result
if __name__ == "__main__":
main()
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "ganyu21",
"slug": "trading-agents",
"displayName": "stock trading agents",
"latest": {
"version": "1.0.6",
"publishedAt": 1773245637141,
"commit": "https://github.com/openclaw/skills/commit/009690ea24c4e711beb5fded005c888eb301f1be"
},
"history": [
{
"version": "1.0.5",
"publishedAt": 1772877433160,
"commit": "https://github.com/openclaw/skills/commit/283fe7a969e1cc114b5ae1f3f6d1fe935f50c363"
},
{
"version": "1.0.3",
"publishedAt": 1772701671179,
"commit": "https://github.com/openclaw/skills/commit/9cbb9739aefd6fda231e0613dcb475d9d62e5475"
}
]
}
```
### scripts/__init__.py
```python
"""
Trading Agents 股票投资分析系统
基于 AgentScope 的多智能体股票诊断技能
"""
from .config import config, Config
from .stock_advisor import StockAdvisorSystem
__version__ = "1.0.0"
__all__ = ["config", "Config", "StockAdvisorSystem"]
```
### scripts/agents/__init__.py
```python
"""智能体模块
基于 AgentScope 框架实现:
- 分析师团队: 使用 ReActAgent 自主调用工具
- 研究员团队: 使用 AgentBase 实现辩论机制
- 交易员/风控/经理: LLM 驱动的决策智能体
"""
# 分析师(AgentScope ReActAgent)
from .analysts import (
MarketAnalystAgent,
FundamentalsAnalystAgent,
NewsAnalystAgent,
create_analyst_team,
)
# 研究员(AgentScope AgentBase)
from .researchers import (
BullishResearcherAgent,
BearishResearcherAgent,
ResearchFacilitatorAgent,
create_research_team,
)
# 交易员
from .trader import Trader
# 风险管理团队
from .risk_managers import (
AggressiveRisk,
NeutralRisk,
ConservativeRisk,
RiskFacilitator,
)
# 基金经理
from .manager import Manager
__all__ = [
# 分析师
"MarketAnalystAgent",
"FundamentalsAnalystAgent",
"NewsAnalystAgent",
"create_analyst_team",
# 研究员
"BullishResearcherAgent",
"BearishResearcherAgent",
"ResearchFacilitatorAgent",
"create_research_team",
# 交易与风控
"Trader",
"AggressiveRisk",
"NeutralRisk",
"ConservativeRisk",
"RiskFacilitator",
"Manager",
]
```
### scripts/agents/analysts.py
```python
"""
分析师智能体模块
使用 AgentScope ReActAgent 实现数据采集和分析功能
"""
import asyncio
from typing import Dict
from datetime import datetime
from agentscope.agent import ReActAgent
from agentscope.model import OpenAIChatModel
from agentscope.formatter import OpenAIChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from ..config import config, LLM_API_KEY
from ..tools.toolkit import (
create_market_analyst_toolkit,
create_fundamentals_analyst_toolkit,
create_news_analyst_toolkit,
)
class MarketAnalystAgent:
"""
技术面分析师 - 使用 AgentScope ReActAgent 实现
能够自主调用工具获取行情数据并生成分析报告
"""
def __init__(self):
self.name = "MarketAnalyst"
self.sys_prompt = """你是一名资深的技术面分析师,擅长从技术指标和价格走势中挖掘投资机会。
你的职责是:
1. 使用工具获取股票的日线行情数据
2. 使用工具获取技术指标(MA、MACD、RSI等)
3. 基于数据撰写专业的技术面分析报告
分析报告必须包含以下部分:
- 价格走势分析(均线系统、金叉死叉信号)
- 技术指标解读(MACD、RSI等)
- 成交量能分析
- 关键价位判断(支撑位、阻力位)
- 技术评分(0-100分)
- 投资建议
要求:分析要专业、深入,评分要客观公正,全文使用中文。请用中文回答。
"""
self.agent = ReActAgent(
name=self.name,
sys_prompt=self.sys_prompt,
model=OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
),
formatter=OpenAIChatFormatter(),
toolkit=create_market_analyst_toolkit(),
memory=InMemoryMemory(),
max_iters=5,
)
def analyze(self, ts_code: str) -> str:
"""
执行技术面分析
Args:
ts_code: 股票代码
Returns:
Markdown格式的技术分析报告
"""
prompt = f"""请为股票 **{ts_code}** 进行技术面分析。
步骤:
1. 首先使用 get_stock_basic 工具获取股票基本信息
2. 使用 get_stock_daily 工具获取近60日行情数据
3. 使用 get_technical_indicators 工具获取技术指标
4. 基于获取的数据,撰写完整的技术面分析报告
请按照以下Markdown格式输出:
# 技术面分析报告
## 基本信息
- 股票代码: {ts_code}
- 分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## 价格走势分析
[分析内容]
## 技术指标解读
[分析内容]
## 成交量能分析
[分析内容]
## 关键价位判断
[分析内容]
## 技术评分: [评分]/100
## 投资建议
[具体建议]
"""
msg = Msg(name="user", content=prompt, role="user")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(self.agent(msg))
return self._extract_content(result.content)
except Exception as e:
print(f"[{self.name}] 分析失败: {e}")
return f"# 技术面分析报告\n\n分析过程中出现错误: {str(e)}"
finally:
loop.close()
def _extract_content(self, content) -> str:
"""从 Msg.content 中提取字符串内容,优先提取 Markdown 格式的报告,过滤 thinking 内容"""
if isinstance(content, str):
# 过滤 thinking 内容
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
# 如果是列表,优先查找包含 Markdown 标题的内容,跳过 thinking
for item in reversed(content): # 从后往前找,最后一条通常是最终报告
# 跳过 thinking 类型
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
item_str = item.get('text', '')
else:
item_str = str(item)
else:
item_str = str(item)
# 跳过 thinking 字符串
if "'type': 'thinking'" in item_str or '"type": "thinking"' in item_str:
continue
# 检查是否包含 Markdown 标题(分析报告的特征)
if item_str.startswith('#') or '\n#' in item_str:
return item_str
# 如果没有找到 Markdown,返回最后一个非空、非 thinking 内容
for item in reversed(content):
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
item_str = item.get('text', '')
else:
continue # 跳过其他字典
else:
item_str = str(item).strip()
if "'type': 'thinking'" in item_str or '"type": "thinking"' in item_str:
continue
if item_str and not item_str.startswith('{'):
return item_str
# 最后备选:合并所有非 thinking 内容
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
class FundamentalsAnalystAgent:
"""
基本面分析师 - 使用 AgentScope ReActAgent 实现
能够自主调用工具获取财务数据并生成分析报告
"""
def __init__(self):
self.name = "FundamentalsAnalyst"
self.sys_prompt = """你是一名资深的基本面分析师,擅长从财务数据和估值指标中评估公司的内在价值。
你的职责是:
1. 使用工具获取股票的基本信息
2. 使用工具获取估值数据(PE、PB、PS等)
3. 使用工具获取财务指标(ROE、毛利率等)
4. 基于数据撰写专业的基本面分析报告
分析报告必须包含以下部分:
- 估值分析(与行业对比)
- 盈利能力分析
- 成长性分析
- 财务健康分析
- 基本面评分(0-100分)
- 投资建议
要求:分析要专业、深入,评分要客观公正,全文使用中文。请用中文回答。
"""
self.agent = ReActAgent(
name=self.name,
sys_prompt=self.sys_prompt,
model=OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
),
formatter=OpenAIChatFormatter(),
toolkit=create_fundamentals_analyst_toolkit(),
memory=InMemoryMemory(),
max_iters=5,
)
def analyze(self, ts_code: str) -> str:
"""执行基本面分析"""
prompt = f"""请为股票 **{ts_code}** 进行基本面分析。
步骤:
1. 使用 get_stock_basic 工具获取股票基本信息
2. 使用 get_valuation 工具获取估值数据
3. 使用 get_financial_indicator 工具获取财务指标
4. 基于获取的数据,撰写完整的基本面分析报告
请按照以下Markdown格式输出:
# 基本面分析报告
## 基本信息
- 股票代码: {ts_code}
- 分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## 估值分析
[分析内容]
## 盈利能力分析
[分析内容]
## 成长性分析
[分析内容]
## 财务健康分析
[分析内容]
## 基本面评分: [评分]/100
## 投资建议
[具体建议]
"""
msg = Msg(name="user", content=prompt, role="user")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(self.agent(msg))
return self._extract_content(result.content)
except Exception as e:
print(f"[{self.name}] 分析失败: {e}")
return f"# 基本面分析报告\n\n分析过程中出现错误: {str(e)}"
finally:
loop.close()
def _extract_content(self, content) -> str:
"""从 Msg.content 中提取字符串内容,优先提取 Markdown 格式的报告,过滤 thinking 内容"""
if isinstance(content, str):
# 过滤 thinking 内容
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
# 如果是列表,优先查找包含 Markdown 标题的内容,跳过 thinking
for item in reversed(content): # 从后往前找,最后一条通常是最终报告
# 跳过 thinking 类型
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
item_str = item.get('text', '')
else:
item_str = str(item)
else:
item_str = str(item)
# 跳过 thinking 字符串
if "'type': 'thinking'" in item_str or '"type": "thinking"' in item_str:
continue
# 检查是否包含 Markdown 标题(分析报告的特征)
if item_str.startswith('#') or '\n#' in item_str:
return item_str
# 如果没有找到 Markdown,返回最后一个非空、非 thinking 内容
for item in reversed(content):
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
item_str = item.get('text', '')
else:
continue # 跳过其他字典
else:
item_str = str(item).strip()
if "'type': 'thinking'" in item_str or '"type": "thinking"' in item_str:
continue
if item_str and not item_str.startswith('{'):
return item_str
# 最后备选:合并所有非 thinking 内容
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
class NewsAnalystAgent:
"""
舆情分析师 - 使用 AgentScope ReActAgent 实现
能够自主调用工具获取新闻数据并生成舆情分析报告
"""
def __init__(self):
self.name = "NewsAnalyst"
self.sys_prompt = """你是一名资深的舆情分析师,擅长从新闻和市场情绪中挖掘投资机会和风险。
你的职责是:
1. 使用工具获取股票基本信息
2. 使用工具获取个股相关新闻
3. 使用工具获取市场整体情绪
4. 基于数据撰写专业的舆情分析报告
分析报告必须包含以下部分:
- 舆情概述
- 关键新闻解读
- 市场情绪分析
- 舆情风险警示
- 舆情评分(0-100分)
- 投资建议
要求:分析要专业、敏锐,评分要客观公正,全文使用中文。请用中文回答。
"""
self.agent = ReActAgent(
name=self.name,
sys_prompt=self.sys_prompt,
model=OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
),
formatter=OpenAIChatFormatter(),
toolkit=create_news_analyst_toolkit(),
memory=InMemoryMemory(),
max_iters=5,
)
def analyze(self, ts_code: str, stock_name: str = "") -> str:
"""执行舆情分析"""
prompt = f"""请为股票 **{ts_code}** ({stock_name or '未知'}) 进行舆情分析。
步骤:
1. 使用 get_stock_basic 工具获取股票基本信息(如果没有提供股票名称)
2. 使用 get_stock_news 工具获取近7天的相关新闻
3. 使用 get_market_sentiment 工具获取市场整体情绪
4. 基于获取的数据,撰写完整的舆情分析报告
请按照以下Markdown格式输出:
# 新闻舆情分析报告
## 基本信息
- 股票代码: {ts_code}
- 股票名称: {stock_name or '待获取'}
- 分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 新闻周期: 近7天
## 舆情概述
[分析内容]
## 关键新闻解读
[分析内容]
## 市场情绪分析
[分析内容]
## 舆情风险警示
[分析内容]
## 舆情评分: [评分]/100
## 投资建议
[具体建议]
"""
msg = Msg(name="user", content=prompt, role="user")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(self.agent(msg))
return self._extract_content(result.content)
except Exception as e:
print(f"[{self.name}] 分析失败: {e}")
return f"# 新闻舆情分析报告\n\n分析过程中出现错误: {str(e)}"
finally:
loop.close()
def _extract_content(self, content) -> str:
"""从 Msg.content 中提取字符串内容,优先提取 Markdown 格式的报告,过滤 thinking 内容"""
if isinstance(content, str):
# 过滤 thinking 内容
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
# 如果是列表,优先查找包含 Markdown 标题的内容,跳过 thinking
for item in reversed(content): # 从后往前找,最后一条通常是最终报告
# 跳过 thinking 类型
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
item_str = item.get('text', '')
else:
item_str = str(item)
else:
item_str = str(item)
# 跳过 thinking 字符串
if "'type': 'thinking'" in item_str or '"type": "thinking"' in item_str:
continue
# 检查是否包含 Markdown 标题(分析报告的特征)
if item_str.startswith('#') or '\n#' in item_str:
return item_str
# 如果没有找到 Markdown,返回最后一个非空、非 thinking 内容
for item in reversed(content):
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
item_str = item.get('text', '')
else:
continue # 跳过其他字典
else:
item_str = str(item).strip()
if "'type': 'thinking'" in item_str or '"type": "thinking"' in item_str:
continue
if item_str and not item_str.startswith('{'):
return item_str
# 最后备选:合并所有非 thinking 内容
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
def create_analyst_team() -> Dict[str, object]:
"""
创建分析师团队
Returns:
包含三个分析师的字典
"""
return {
"MarketAnalyst": MarketAnalystAgent(),
"FundamentalsAnalyst": FundamentalsAnalystAgent(),
"NewsAnalyst": NewsAnalystAgent(),
}
```
### scripts/agents/manager.py
```python
"""
基金经理智能体模块
使用 AgentScope AgentBase 实现
负责综合风险评估,做出最终决策
"""
import json
import asyncio
from datetime import datetime
from typing import Dict, List
from agentscope.agent import AgentBase
from agentscope.model import OpenAIChatModel
from agentscope.formatter import OpenAIChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from ..config import config, LLM_API_KEY
class Manager(AgentBase):
"""
基金经理智能体 - 使用 AgentScope AgentBase 实现
综合风险评估,做出最终决策
"""
def __init__(self):
super().__init__()
self.name = "Manager"
self.model = OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
)
self.formatter = OpenAIChatFormatter()
self.memory = InMemoryMemory()
self.sys_prompt = """你是一名资深的基金经理,负责基于全面量化数据做出最终投资决策。
你的职责:
1. 综合评估所有分析报告和风险建议
2. 权衡交易员的机会观点和风险管理的谨慎观点
3. 给出最终的投资决策(UP或NONE)
4. 制定具体的执行计划和风控措施
决策原则:
- UP: 建议买入/加仓,适用于综合评分较高且风险可控的情况
- NONE: 观望/不操作,适用于综合评分偏低或风险较大的情况
要求:决策要基于数据,理由要具体明确,全文使用中文。请用中文回答。
"""
async def reply(self, msg: Msg | List[Msg] | None) -> Msg:
"""处理消息并回复,支持流式输出(显示 thinking 过程)"""
if msg is not None:
await self.memory.add(msg)
memory_content = await self.memory.get_memory()
prompt = await self.formatter.format([
Msg("system", self.sys_prompt, "system"),
*memory_content,
])
response = await self.model(prompt)
# 处理流式响应 - 显示 thinking 思考过程,但报告只保留 text
if hasattr(response, '__aiter__'):
last_thinking = ""
last_text = ""
final_text = ""
print(f"\n[{self.name}] ", flush=True)
async for chunk in response: # pyright: ignore[reportGeneralTypeIssues]
# 安全地获取 content 属性
chunk_content = getattr(chunk, 'content', None)
if chunk_content:
thinking, text = self._extract_thinking_and_text(chunk_content)
# 显示 thinking 增量
if thinking and len(thinking) > len(last_thinking):
delta = thinking[len(last_thinking):]
print(delta, end='', flush=True)
last_thinking = thinking
# 显示 text 增量
if text and len(text) > len(last_text):
delta = text[len(last_text):]
print(delta, end='', flush=True)
last_text = text
final_text = text
print() # 换行
content = final_text
else:
# 安全地获取 content 属性
response_content = getattr(response, 'content', '')
content = self._extract_content(response_content)
reply_msg = Msg(name=self.name, content=content, role="assistant")
await self.memory.add(reply_msg)
return reply_msg
def _extract_thinking_and_text(self, chunk_content) -> tuple:
"""从流式响应块中分别提取 thinking 和 text 内容"""
thinking = ""
text = ""
if isinstance(chunk_content, str):
if "'type': 'thinking'" not in chunk_content and '"type": "thinking"' not in chunk_content:
text = chunk_content
elif isinstance(chunk_content, dict):
if chunk_content.get('type') == 'thinking':
thinking = chunk_content.get('thinking', '')
elif chunk_content.get('type') == 'text':
text = chunk_content.get('text', '')
else:
text = chunk_content.get('text', '')
elif isinstance(chunk_content, list):
for item in chunk_content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
thinking += item.get('thinking', '')
elif item.get('type') == 'text':
text += item.get('text', '')
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
text += item
return thinking, text
def _extract_content(self, content) -> str:
"""从响应中提取字符串内容,过滤 thinking"""
if isinstance(content, str):
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
else:
texts.append(str(item))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
async def observe(self, msg: Msg | List[Msg] | None) -> None:
"""观察消息但不回复"""
if msg is not None:
await self.memory.add(msg)
async def handle_interrupt(self) -> Msg:
"""处理中断"""
return Msg(name=self.name, content="决策被中断。", role="assistant")
def make_final_decision(self, ts_code: str, stock_name: str,
analyst_reports: Dict[str, str],
research_debate: str,
trader_report: str,
risk_discussion: str) -> str:
"""做出最终决策(同步方法,供外部调用)"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
self._make_final_decision_async(
ts_code, stock_name, analyst_reports,
research_debate, trader_report, risk_discussion
)
)
finally:
loop.close()
async def _make_final_decision_async(self, ts_code: str, stock_name: str,
analyst_reports: Dict[str, str],
research_debate: str,
trader_report: str,
risk_discussion: str) -> str:
"""异步做出最终决策"""
scores = self._extract_all_scores(analyst_reports, research_debate)
# 直接返回 Markdown 报告
return await self._generate_decision_async(
ts_code, stock_name, scores, trader_report, risk_discussion
)
def _extract_all_scores(self, analyst_reports: Dict[str, str],
research_debate: str) -> Dict:
"""提取所有维度的评分"""
scores = {"tech_score": 50, "fund_score": 50, "news_score": 50, "research_score": 50}
for name, report in analyst_reports.items():
lines = report.split('\n')
for line in lines:
if "技术评分" in line and "/100" in line:
try:
scores["tech_score"] = int(line.split(':')[1].split('/')[0].strip())
except:
pass
elif "基本面评分" in line and "/100" in line:
try:
scores["fund_score"] = int(line.split(':')[1].split('/')[0].strip())
except:
pass
elif "舆情评分" in line and "/100" in line:
try:
scores["news_score"] = int(line.split(':')[1].split('/')[0].strip())
except:
pass
if "看涨" in research_debate and "胜出" in research_debate:
scores["research_score"] = 70
elif "看跌" in research_debate and "胜出" in research_debate:
scores["research_score"] = 30
scores["total_score"] = int(
scores["tech_score"] * config.tech_weight +
scores["fund_score"] * config.fund_weight +
scores["news_score"] * config.news_weight +
scores["research_score"] * config.research_weight
)
return scores
async def _generate_decision_async(self, ts_code: str, stock_name: str, scores: Dict,
trader_report: str, risk_discussion: str) -> str:
"""异步生成最终决策,直接返回Markdown报告"""
# 截取报告内容避免请求过大
trader_summary = trader_report[:600] if len(trader_report) > 600 else trader_report
risk_summary = risk_discussion[:600] if len(risk_discussion) > 600 else risk_discussion
prompt = f"""作为基金经理,请根据以下信息做出最终投资决策(500字内):
## 股票信息
- 股票代码: {ts_code}
- 股票名称: {stock_name}
## 综合评分
- 技术面: {scores.get('tech_score', 50)}/100 (权重{int(config.tech_weight*100)}%)
- 基本面: {scores.get('fund_score', 50)}/100 (权重{int(config.fund_weight*100)}%)
- 舆情面: {scores.get('news_score', 50)}/100 (权重{int(config.news_weight*100)}%)
- **综合总分: {scores.get('total_score', 50)}/100**
## 交易员决策摘要
{trader_summary}
## 风险管理建议摘要
{risk_summary}
请直接用Markdown格式输出最终决策报告:
# 最终诊股决策报告
## 股票信息
- 代码: {ts_code}
- 名称: {stock_name}
- 诊断时间: [datetime.now().strftime('%Y-%m-%d %H:%M:%S')]
## 最终决策
- 操作建议: **UP/NONE**
- 置信度: XX/100
- 建议仓位: XX%
- 目标价: XXX
- 止损价: XXX
## 综合评分
| 维度 | 评分 | 权重 |
|------|------|------|
| 技术面 | XX/100 | XX% |
| 基本面 | XX/100 | XX% |
| 舆情面 | XX/100 | XX% |
| **综合评分** | **XX/100** | - |
## 决策理由
[具体理由,权衡交易员和风险管理观点]
## 风险提示
[风险提示内容]
---
*本报告由AgentScope股票诊断智能体系统自动生成*
决策逻辑参考:
- 综合评分75+:UP,仓位15-20%
- 综合评分60-75:UP,仓位10-15%
- 综合评分45-60:谨慎UP或NONE,仓位5-10%
- 综合评分<45:NONE,仓位0%
"""
try:
msg = Msg(name="user", content=prompt, role="user")
response = await self.reply(msg)
report = self._extract_content(response.content)
return report
except Exception as e:
print(f"[{self.name}] 决策生成失败: {e}")
return self._generate_fallback_report(ts_code, stock_name, scores)
def _generate_fallback_report(self, ts_code: str, stock_name: str, scores: Dict) -> str:
"""生成备用报告"""
total_score = scores.get("total_score", 50)
if total_score >= 70:
action, position = "UP", "15%"
elif total_score >= 55:
action, position = "UP", "10%"
else:
action, position = "NONE", "0%"
return f"""# 最终诊股决策报告
## 股票信息
- 代码: {ts_code}
- 名称: {stock_name}
- 诊断时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## 最终决策
- 操作建议: **{action}**
- 置信度: {total_score}/100
- 建议仓位: {position}
- 目标价: 根据技术分析确定
- 止损价: 建议设置8%止损
## 综合评分
| 维度 | 评分 | 权重 |
|------|------|------|
| 技术面 | {scores.get('tech_score', 50)}/100 | {int(config.tech_weight * 100)}% |
| 基本面 | {scores.get('fund_score', 50)}/100 | {int(config.fund_weight * 100)}% |
| 舆情面 | {scores.get('news_score', 50)}/100 | {int(config.news_weight * 100)}% |
| **综合评分** | **{total_score}/100** | - |
## 风险提示
投资有风险,入市需谨慎。本建议仅供参考,不构成投资建议。
---
*本报告由AgentScope股票诊断智能体系统自动生成*
"""
def _rule_based_decision(self, scores: Dict) -> Dict:
"""基于规则的决策(备用)"""
total_score = scores.get("total_score", 50)
if total_score >= 70:
action, confidence, position = "UP", min(90, total_score), "15%"
reasoning = f"综合评分{total_score}分,较高,具备投资价值"
elif total_score >= 55:
action, confidence, position = "UP", total_score, "10%"
reasoning = f"综合评分{total_score}分,中等偏上,建议适度仓位"
elif total_score >= 40:
action, confidence, position = "NONE", 60, "0%"
reasoning = f"综合评分{total_score}分,一般,存在不确定性,建议观望"
else:
action, confidence, position = "NONE", min(90, 100 - total_score), "0%"
reasoning = f"综合评分{total_score}分,较低,风险较大,不建议买入"
return {
"action": action,
"confidence": confidence,
"position": position,
"target_price": "根据技术分析确定",
"stop_loss": "建议设置8%止损",
"reasoning": reasoning,
"risk_warning": "投资有风险,入市需谨慎。本建议仅供参考,不构成投资建议。"
}
def _generate_final_report(self, ts_code: str, stock_name: str,
scores: Dict, decision: Dict) -> str:
return f"""# 最终诊股决策报告
## 股票信息
- 代码: {ts_code}
- 名称: {stock_name}
- 诊断时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## 综合评分
| 维度 | 评分 | 权重 |
|------|------|------|
| 技术面 | {scores.get('tech_score', 50)}/100 | {int(config.tech_weight * 100)}% |
| 基本面 | {scores.get('fund_score', 50)}/100 | {int(config.fund_weight * 100)}% |
| 舆情面 | {scores.get('news_score', 50)}/100 | {int(config.news_weight * 100)}% |
| 研究员共识 | {scores.get('research_score', 50)}/100 | {int(config.research_weight * 100)}% |
| **综合评分** | **{scores.get('total_score', 50)}/100** | - |
## 最终决策
| 字段 | 值 |
|------|----||
| 操作建议 | **{decision.get('action', '观望')}** |
| 置信度 | {decision.get('confidence', 50)}/100 |
| 建议仓位 | {decision.get('position', '0%')} |
| 目标价 | {decision.get('target_price', 'N/A')} |
| 止损价 | {decision.get('stop_loss', 'N/A')} |
## 决策理由
{decision.get('reasoning', '综合分析后做出的决策')}
## 风险提示
{decision.get('risk_warning', '投资有风险,入市需谨慎。')}
---
*本报告由AgentScope股票诊断智能体系统自动生成*
"""
```
### scripts/agents/researchers.py
```python
"""
研究员智能体模块
使用 AgentScope AgentBase 实现辩证式研究分析
"""
import asyncio
from typing import Dict, List, Optional
from agentscope.agent import AgentBase
from agentscope.model import OpenAIChatModel
from agentscope.formatter import OpenAIChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from ..config import config, LLM_API_KEY
class BullishResearcherAgent(AgentBase):
"""
看多研究员 - 使用 AgentScope AgentBase 实现
从乐观角度分析股票的投资价值
"""
def __init__(self):
super().__init__()
self.name = "BullishResearcher"
self.model = OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
)
self.formatter = OpenAIChatFormatter()
self.memory = InMemoryMemory()
self.sys_prompt = """你是一名看多研究员,擅长发掘股票的投资亮点和上涨潜力。
你的角色特点:
1. 积极乐观,善于发现投资机会
2. 关注成长性、行业前景、竞争优势
3. 对利好消息敏感,合理推断正面影响
4. 在辩论中要有理有据地阐述看多观点
分析风格:
- 重点挖掘被低估的价值
- 强调公司的成长空间
- 分析行业红利和政策支持
- 关注资金流入和市场情绪转暖信号
注意:虽然你是看多派,但分析要基于数据,不能无中生有。请用中文回答。
"""
async def reply(self, msg: Msg | List[Msg] | None) -> Msg:
"""处理消息并回复,支持流式输出"""
if msg is not None:
await self.memory.add(msg)
memory_content = await self.memory.get_memory()
prompt = await self.formatter.format([
Msg("system", self.sys_prompt, "system"),
*memory_content,
])
response = await self.model(prompt)
# 处理流式响应 - DashScope返回累积内容,需要取增量
if hasattr(response, '__aiter__'):
last_content = ""
final_content = ""
print(f"\n[{self.name}] ", end='', flush=True)
async for chunk in response: # pyright: ignore[reportGeneralTypeIssues]
if hasattr(chunk, 'content') and chunk.content:
current_text = self._extract_text_from_chunk(chunk.content)
if current_text:
# 计算增量部分用于显示
if len(current_text) > len(last_content):
delta = current_text[len(last_content):]
print(delta, end='', flush=True)
last_content = current_text
final_content = current_text # 保留最后的完整内容
print() # 换行
content = final_content
else:
# 非流式响应处理
content = self._extract_content(getattr(response, 'content', str(response)))
reply_msg = Msg(name=self.name, content=content, role="assistant")
await self.memory.add(reply_msg)
return reply_msg
def _extract_text_from_chunk(self, chunk_content) -> str:
"""从流式响应块中提取文本,过滤 thinking 内容"""
if isinstance(chunk_content, str):
if "'type': 'thinking'" in chunk_content or '"type": "thinking"' in chunk_content:
return ""
return chunk_content
elif isinstance(chunk_content, dict):
if chunk_content.get('type') == 'thinking':
return ""
if chunk_content.get('type') == 'text':
return chunk_content.get('text', '')
return chunk_content.get('text', '')
elif isinstance(chunk_content, list):
texts = []
for item in chunk_content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return ''.join(texts)
return ""
def _extract_content(self, content) -> str:
"""从响应中提取字符串内容,过滤 thinking"""
if isinstance(content, str):
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
else:
texts.append(str(item))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
class BearishResearcherAgent(AgentBase):
"""
看空研究员 - 使用 AgentScope AgentBase 实现
从谨慎角度分析股票的风险因素
"""
def __init__(self):
super().__init__()
self.name = "BearishResearcher"
self.model = OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
)
self.formatter = OpenAIChatFormatter()
self.memory = InMemoryMemory()
self.sys_prompt = """你是一名看空研究员,擅长发现股票的潜在风险和下跌隐患。
你的角色特点:
1. 谨慎保守,善于发现风险因素
2. 关注估值泡沫、业绩下滑、行业风险
3. 对利空消息敏感,评估负面影响
4. 在辩论中要有理有据地阐述看空观点
分析风格:
- 重点识别被高估的风险
- 强调公司面临的挑战
- 分析行业竞争和政策风险
- 关注资金流出和市场情绪恶化信号
注意:虽然你是看空派,但分析要基于数据,不能危言耸听。请用中文回答。
"""
async def reply(self, msg: Msg | List[Msg] | None) -> Msg:
"""处理消息并回复,支持流式输出"""
if msg is not None:
await self.memory.add(msg)
memory_content = await self.memory.get_memory()
prompt = await self.formatter.format([
Msg("system", self.sys_prompt, "system"),
*memory_content,
])
response = await self.model(prompt)
# 处理流式响应 - DashScope返回累积内容,需要取增量
if hasattr(response, '__aiter__'):
last_content = ""
final_content = ""
print(f"\n[{self.name}] ", end='', flush=True)
async for chunk in response: # pyright: ignore[reportGeneralTypeIssues]
if hasattr(chunk, 'content') and chunk.content:
current_text = self._extract_text_from_chunk(chunk.content)
if current_text:
# 计算增量部分用于显示
if len(current_text) > len(last_content):
delta = current_text[len(last_content):]
print(delta, end='', flush=True)
last_content = current_text
final_content = current_text # 保留最后的完整内容
print() # 换行
content = final_content
else:
# 非流式响应处理
content = self._extract_content(getattr(response, 'content', str(response)))
reply_msg = Msg(name=self.name, content=content, role="assistant")
await self.memory.add(reply_msg)
return reply_msg
def _extract_text_from_chunk(self, chunk_content) -> str:
"""从流式响应块中提取文本,过滤 thinking 内容"""
if isinstance(chunk_content, str):
if "'type': 'thinking'" in chunk_content or '"type": "thinking"' in chunk_content:
return ""
return chunk_content
elif isinstance(chunk_content, dict):
if chunk_content.get('type') == 'thinking':
return ""
if chunk_content.get('type') == 'text':
return chunk_content.get('text', '')
return chunk_content.get('text', '')
elif isinstance(chunk_content, list):
texts = []
for item in chunk_content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return ''.join(texts)
return ""
def _extract_content(self, content) -> str:
"""从响应中提取字符串内容,过滤 thinking"""
if isinstance(content, str):
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
else:
texts.append(str(item))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
async def observe(self, msg: Msg | List[Msg] | None) -> None:
"""观察消息但不回复"""
if msg is not None:
await self.memory.add(msg)
async def handle_interrupt(self) -> Msg:
"""处理中断"""
return Msg(name=self.name, content="分析被中断。", role="assistant")
def analyze_sync(self, context: str) -> str:
"""同步分析方法"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
msg = Msg(name="user", content=context, role="user")
result = loop.run_until_complete(self.reply(msg))
return self._extract_content(result.content)
finally:
loop.close()
class ResearchFacilitatorAgent(AgentBase):
"""
研究主持人 - 使用 AgentScope AgentBase 实现
负责组织和总结看多看空研究员的辩论
"""
def __init__(self):
super().__init__()
self.name = "ResearchFacilitator"
self.model = OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
)
self.formatter = OpenAIChatFormatter()
self.memory = InMemoryMemory()
self.sys_prompt = """你是一名资深的研究主持人,负责组织和总结看多、看空研究员的辩论。
你的职责:
1. 引导辩论有序进行
2. 确保双方观点充分表达
3. 提炼关键分歧点
4. 综合双方观点形成结论
总结报告要求:
- 公正客观,不偏向任何一方
- 明确列出多空双方的核心论点
- 分析哪些观点更有说服力
- 给出综合研判和投资建议
全文使用中文撰写。请用中文回答。
"""
async def reply(self, msg: Msg | List[Msg] | None) -> Msg:
"""处理消息并回复,支持流式输出"""
if msg is not None:
await self.memory.add(msg)
memory_content = await self.memory.get_memory()
prompt = await self.formatter.format([
Msg("system", self.sys_prompt, "system"),
*memory_content,
])
response = await self.model(prompt)
# 处理流式响应 - DashScope返回累积内容,需要取增量
if hasattr(response, '__aiter__'):
last_content = ""
final_content = ""
print(f"\n[{self.name}] ", end='', flush=True)
async for chunk in response: # pyright: ignore[reportGeneralTypeIssues]
if hasattr(chunk, 'content') and chunk.content:
current_text = self._extract_text_from_chunk(chunk.content)
if current_text:
# 计算增量部分用于显示
if len(current_text) > len(last_content):
delta = current_text[len(last_content):]
print(delta, end='', flush=True)
last_content = current_text
final_content = current_text # 保留最后的完整内容
print() # 换行
content = final_content
else:
# 非流式响应处理
content = self._extract_content(getattr(response, 'content', str(response)))
reply_msg = Msg(name=self.name, content=content, role="assistant")
await self.memory.add(reply_msg)
return reply_msg
def _extract_text_from_chunk(self, chunk_content) -> str:
"""从流式响应块中提取文本,过滤 thinking 内容"""
if isinstance(chunk_content, str):
if "'type': 'thinking'" in chunk_content or '"type": "thinking"' in chunk_content:
return ""
return chunk_content
elif isinstance(chunk_content, dict):
if chunk_content.get('type') == 'thinking':
return ""
if chunk_content.get('type') == 'text':
return chunk_content.get('text', '')
return chunk_content.get('text', '')
elif isinstance(chunk_content, list):
texts = []
for item in chunk_content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return ''.join(texts)
return ""
def _extract_content(self, content) -> str:
"""从响应中提取字符串内容,过滤 thinking"""
if isinstance(content, str):
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
else:
texts.append(str(item))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
def facilitate_debate_sync(
self,
bullish: BullishResearcherAgent,
bearish: BearishResearcherAgent,
analyst_reports: Dict[str, str],
rounds: int = 2
) -> str:
"""
同步执行辩论流程
Args:
bullish: 看多研究员
bearish: 看空研究员
analyst_reports: 分析师报告
rounds: 辩论轮数
Returns:
辩论总结报告
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
self._facilitate_debate(bullish, bearish, analyst_reports, rounds)
)
return result
finally:
loop.close()
async def _facilitate_debate(
self,
bullish: BullishResearcherAgent,
bearish: BearishResearcherAgent,
analyst_reports: Dict[str, str],
rounds: int = 2
) -> str:
"""
执行辩论流程
"""
debate_history = []
# 清空研究员的 memory,避免累积导致请求过大
bullish.memory = InMemoryMemory()
bearish.memory = InMemoryMemory()
self.memory = InMemoryMemory()
# 带重试的安全调用方法
async def safe_reply(agent, msg, max_retries=3) -> Msg:
"""带重试的安全调用"""
for attempt in range(max_retries):
try:
# 每次重试前清空 memory
agent.memory = InMemoryMemory()
return await agent.reply(msg)
except Exception as e:
print(f"\n[{agent.name}] 调用失败 (attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2) # 等待2秒后重试
# 所有重试失败后返回默认响应
return Msg(name=agent.name, content=f"[网络超时,未能获取{agent.name}的分析]", role="assistant")
# 准备分析报告摘要,限制每份报告最多800字符
reports_summary = "\n\n".join([
f"### {name}\n{report[:800]}..." if len(report) > 800 else f"### {name}\n{report}"
for name, report in analyst_reports.items()
])
# 第一轮:初始观点
bullish_prompt = f"""基于以下分析师报告,请从看多角度阐述你的投资观点:
{reports_summary}
请简洁分析这只股票的投资亮点和上涨潜力,给出具体论据(500字内)。
"""
bearish_prompt = f"""基于以下分析师报告,请从看空角度阐述你的风险观点:
{reports_summary}
请简洁分析这只股票的风险因素和下跌隐患,给出具体论据(500字内)。
"""
# 看多先发言
bullish_msg = Msg(name="user", content=bullish_prompt, role="user")
bullish_response = await safe_reply(bullish, bullish_msg)
bullish_content = self._extract_content(bullish_response.content)[:1000]
debate_history.append(f"## 看多研究员(第1轮)\n\n{bullish_content}")
# 看空发言
bearish_msg = Msg(name="user", content=bearish_prompt, role="user")
bearish_response = await safe_reply(bearish, bearish_msg)
bearish_content = self._extract_content(bearish_response.content)[:1000]
debate_history.append(f"## 看空研究员(第1轮)\n\n{bearish_content}")
# 后续轮次:互相回应(清空 memory 避免累积)
for round_num in range(2, rounds + 1):
# 看多回应看空
bullish_rebuttal = f"""看空研究员的观点是:
{bearish_content[:500]}
请简洁回应这些看空观点,并进一步阐述你的看多理由(500字内)。
"""
bullish_msg = Msg(name="user", content=bullish_rebuttal, role="user")
bullish_response = await safe_reply(bullish, bullish_msg)
bullish_content = self._extract_content(bullish_response.content)[:1000]
debate_history.append(f"## 看多研究员(第{round_num}轮)\n\n{bullish_content}")
# 看空回应看多
bearish_rebuttal = f"""看多研究员的观点是:
{bullish_content[:500]}
请简洁回应这些看多观点,并进一步阐述你的看空理由(500字内)。
"""
bearish_msg = Msg(name="user", content=bearish_rebuttal, role="user")
bearish_response = await safe_reply(bearish, bearish_msg)
bearish_content = self._extract_content(bearish_response.content)[:1000]
debate_history.append(f"## 看空研究员(第{round_num}轮)\n\n{bearish_content}")
# 主持人总结(限制辩论历史长度)
debate_summary = "\n\n".join(debate_history)[:4000]
summary_prompt = f"""以下是看多研究员和看空研究员的辩论记录:
{debate_summary}
请作为研究主持人,对这场辩论进行总结(800字内):
# 研究员辩论总结报告
## 辩论概述
[概述内容]
## 核心分歧点
[分歧点列表]
## 看多观点评价
[评价内容]
## 看空观点评价
[评价内容]
## 综合研判
[研判结论]
## 投资建议
[具体建议]
## 风险提示
[风险点列表]
"""
# 主持人也使用安全调用
self.memory = InMemoryMemory()
summary_msg = Msg(name="user", content=summary_prompt, role="user")
try:
summary_response = await self.reply(summary_msg)
summary_content = self._extract_content(summary_response.content)
except Exception as e:
print(f"\n[{self.name}] 总结失败: {e}")
summary_content = "## 辩论总结\n\n由于网络问题,未能生成完整总结。"
# 组合完整报告
separator = "\n\n---\n\n"
full_report = f"""# 研究员辩论报告
---
{separator.join(debate_history)}
---
{summary_content}
"""
return full_report
def create_research_team() -> Dict[str, object]:
"""
创建研究员团队
Returns:
包含研究员和主持人的字典
"""
return {
"BullishResearcher": BullishResearcherAgent(),
"BearishResearcher": BearishResearcherAgent(),
"ResearchFacilitator": ResearchFacilitatorAgent(),
}
```
### scripts/agents/risk_managers.py
```python
"""
风险管理智能体模块
使用 AgentScope AgentBase 实现
包含:AggressiveRisk(激进型)、NeutralRisk(中性型)、ConservativeRisk(保守型)、RiskFacilitator(协调人)
"""
import json
import asyncio
from datetime import datetime
from typing import Dict, List
from agentscope.agent import AgentBase
from agentscope.model import OpenAIChatModel
from agentscope.formatter import OpenAIChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from ..config import config, LLM_API_KEY
class AggressiveRisk(AgentBase):
"""激进型风险管理者 - 使用 AgentScope AgentBase 实现"""
def __init__(self):
super().__init__()
self.name = "AggressiveRisk"
self.model = OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
)
self.formatter = OpenAIChatFormatter()
self.memory = InMemoryMemory()
self.sys_prompt = """你是一名激进型风险管理者,偏好高风险高收益,但仍需基于量化数据进行评估。
你的特点:
1. 积极乐观,善于发现投资机会
2. 风险容忍度较高
3. 建议较高的仓位配置(15-25%)
4. 量化预期收益率
要求:分析要基于数据,不能无中生有。请用中文回答。
"""
async def reply(self, msg: Msg | List[Msg] | None) -> Msg:
"""处理消息并回复,支持流式输出(显示 thinking 过程)"""
if msg is not None:
await self.memory.add(msg)
memory_content = await self.memory.get_memory()
prompt = await self.formatter.format([
Msg("system", self.sys_prompt, "system"),
*memory_content,
])
response = await self.model(prompt)
# 处理流式响应 - 显示 thinking 思考过程,但报告只保留 text
if hasattr(response, '__aiter__'):
last_thinking = ""
last_text = ""
final_text = ""
print(f"\n[{self.name}] ", flush=True)
async for chunk in response: # pyright: ignore[reportGeneralTypeIssues]
if hasattr(chunk, 'content') and chunk.content:
thinking, text = self._extract_thinking_and_text(chunk.content)
# 显示 thinking 增量(灰色标记)
if thinking and len(thinking) > len(last_thinking):
delta = thinking[len(last_thinking):]
print(delta, end='', flush=True)
last_thinking = thinking
# 显示 text 增量
if text and len(text) > len(last_text):
delta = text[len(last_text):]
print(delta, end='', flush=True)
last_text = text
final_text = text
print() # 换行
content = final_text
else:
content = self._extract_content(response.content) # type: ignore[union-attr]
reply_msg = Msg(name=self.name, content=content, role="assistant")
await self.memory.add(reply_msg)
return reply_msg
def _extract_thinking_and_text(self, chunk_content) -> tuple:
"""从流式响应块中分别提取 thinking 和 text 内容"""
thinking = ""
text = ""
if isinstance(chunk_content, str):
if "'type': 'thinking'" not in chunk_content and '"type": "thinking"' not in chunk_content:
text = chunk_content
elif isinstance(chunk_content, dict):
if chunk_content.get('type') == 'thinking':
thinking = chunk_content.get('thinking', '')
elif chunk_content.get('type') == 'text':
text = chunk_content.get('text', '')
else:
text = chunk_content.get('text', '')
elif isinstance(chunk_content, list):
for item in chunk_content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
thinking += item.get('thinking', '')
elif item.get('type') == 'text':
text += item.get('text', '')
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
text += item
return thinking, text
def _extract_content(self, content) -> str:
"""从响应中提取字符串内容,过滤 thinking"""
if isinstance(content, str):
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
else:
texts.append(str(item))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
async def observe(self, msg: Msg | List[Msg] | None) -> None:
if msg is not None:
await self.memory.add(msg)
async def handle_interrupt(self) -> Msg:
return Msg(name=self.name, content="评估被中断。", role="assistant")
def assess(self, trader_report: str) -> Dict:
"""评估交易决策(同步方法)"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self._assess_async(trader_report))
finally:
loop.close()
async def _assess_async(self, trader_report: str) -> Dict:
"""异步评估交易决策"""
report_summary = trader_report[:600] if len(trader_report) > 600 else trader_report
prompt = f"""请从激进型角度评估以下交易决策(300字内):
{report_summary}
评估要求:从激进角度挖掘潜在高收益机会,建议较高仓位(15-25%)
请直接用Markdown格式回复:
### 激进型评估
- 建议仓位: XX%
- 操作建议: 加大仓位/买入/持有
- 预期收益: +XX%
- 机会分析: [具体理由]
"""
try:
msg = Msg(name="user", content=prompt, role="user")
response = await self.reply(msg)
content = self._extract_content(response.content)
return {"perspective": "激进型", "content": content, "position": "20%"}
except Exception as e:
print(f"[{self.name}] 评估失败: {e}")
return {"perspective": "激进型", "position": "25%", "content": "市场机会难得,建议适当提高仓位"}
class NeutralRisk(AgentBase):
"""中性型风险管理者 - 使用 AgentScope AgentBase 实现"""
def __init__(self):
super().__init__()
self.name = "NeutralRisk"
self.model = OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
)
self.formatter = OpenAIChatFormatter()
self.memory = InMemoryMemory()
self.sys_prompt = """你是一名中性型风险管理者,追求风险与收益的平衡,基于量化数据进行评估。
你的特点:
1. 平衡分析,综合评估机会与风险
2. 建议适中的仓位配置(8-15%)
3. 强调风报比要合理(建议1:2以上)
要求:分析要客观全面,引用具体数据。请用中文回答。
"""
async def reply(self, msg: Msg | List[Msg] | None) -> Msg:
"""处理消息并回复,支持流式输出(显示 thinking 过程)"""
if msg is not None:
await self.memory.add(msg)
memory_content = await self.memory.get_memory()
prompt = await self.formatter.format([
Msg("system", self.sys_prompt, "system"),
*memory_content,
])
response = await self.model(prompt)
# 处理流式响应 - 显示 thinking 思考过程,但报告只保留 text
if hasattr(response, '__aiter__'):
last_thinking = ""
last_text = ""
final_text = ""
print(f"\n[{self.name}] ", flush=True)
async for chunk in response: # pyright: ignore[reportGeneralTypeIssues]
if hasattr(chunk, 'content') and chunk.content:
thinking, text = self._extract_thinking_and_text(chunk.content)
# 显示 thinking 增量(灰色标记)
if thinking and len(thinking) > len(last_thinking):
delta = thinking[len(last_thinking):]
print(delta, end='', flush=True)
last_thinking = thinking
# 显示 text 增量
if text and len(text) > len(last_text):
delta = text[len(last_text):]
print(delta, end='', flush=True)
last_text = text
final_text = text
print() # 换行
content = final_text
else:
content = self._extract_content(response.content) # type: ignore[union-attr]
reply_msg = Msg(name=self.name, content=content, role="assistant")
await self.memory.add(reply_msg)
return reply_msg
def _extract_thinking_and_text(self, chunk_content) -> tuple:
"""从流式响应块中分别提取 thinking 和 text 内容"""
thinking = ""
text = ""
if isinstance(chunk_content, str):
if "'type': 'thinking'" not in chunk_content and '"type": "thinking"' not in chunk_content:
text = chunk_content
elif isinstance(chunk_content, dict):
if chunk_content.get('type') == 'thinking':
thinking = chunk_content.get('thinking', '')
elif chunk_content.get('type') == 'text':
text = chunk_content.get('text', '')
else:
text = chunk_content.get('text', '')
elif isinstance(chunk_content, list):
for item in chunk_content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
thinking += item.get('thinking', '')
elif item.get('type') == 'text':
text += item.get('text', '')
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
text += item
return thinking, text
def _extract_content(self, content) -> str:
"""从响应中提取字符串内容,过滤 thinking"""
if isinstance(content, str):
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
else:
texts.append(str(item))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
async def observe(self, msg: Msg | List[Msg] | None) -> None:
if msg is not None:
await self.memory.add(msg)
async def handle_interrupt(self) -> Msg:
return Msg(name=self.name, content="评估被中断。", role="assistant")
def assess(self, trader_report: str) -> Dict:
"""评估交易决策(同步方法)"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self._assess_async(trader_report))
finally:
loop.close()
async def _assess_async(self, trader_report: str) -> Dict:
"""异步评估交易决策"""
report_summary = trader_report[:600] if len(trader_report) > 600 else trader_report
prompt = f"""请从中性型角度评估以下交易决策(300字内):
{report_summary}
评估要求:平衡分析机会与风险,建议适中仓位(8-15%),强调风报比
请直接用Markdown格式回复:
### 中性型评估
- 建议仓位: XX%
- 操作建议: 维持/调整/平衡
- 预期收益: +XX%
- 风报比: X:X
- 平衡分析: [具体理由]
"""
try:
msg = Msg(name="user", content=prompt, role="user")
response = await self.reply(msg)
content = self._extract_content(response.content)
return {"perspective": "中性型", "content": content, "position": "10%"}
except Exception as e:
print(f"[{self.name}] 评估失败: {e}")
return {"perspective": "中性型", "position": "10%", "content": "交易员的建议基本合理"}
class ConservativeRisk(AgentBase):
"""保守型风险管理者 - 使用 AgentScope AgentBase 实现"""
def __init__(self):
super().__init__()
self.name = "ConservativeRisk"
self.model = OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
)
self.formatter = OpenAIChatFormatter()
self.memory = InMemoryMemory()
self.sys_prompt = """你是一名保守型风险管理者,优先考虑风险控制,基于量化数据进行评估。
你的特点:
1. 谨慎保守,善于发现风险因素
2. 风险容忍度较低
3. 建议较低的仓位配置(3-10%)
4. 量化最大可能损失
要求:分析要基于数据,不能危言耸听。请用中文回答。
"""
async def reply(self, msg: Msg | List[Msg] | None) -> Msg:
"""处理消息并回复,支持流式输出(显示 thinking 过程)"""
if msg is not None:
await self.memory.add(msg)
memory_content = await self.memory.get_memory()
prompt = await self.formatter.format([
Msg("system", self.sys_prompt, "system"),
*memory_content,
])
response = await self.model(prompt)
# 处理流式响应 - 显示 thinking 思考过程,但报告只保留 text
if hasattr(response, '__aiter__'):
last_thinking = ""
last_text = ""
final_text = ""
print(f"\n[{self.name}] ", flush=True)
async for chunk in response: # pyright: ignore[reportGeneralTypeIssues]
if hasattr(chunk, 'content') and chunk.content:
thinking, text = self._extract_thinking_and_text(chunk.content)
# 显示 thinking 增量(灰色标记)
if thinking and len(thinking) > len(last_thinking):
delta = thinking[len(last_thinking):]
print(delta, end='', flush=True)
last_thinking = thinking
# 显示 text 增量
if text and len(text) > len(last_text):
delta = text[len(last_text):]
print(delta, end='', flush=True)
last_text = text
final_text = text
print() # 换行
content = final_text
else:
content = self._extract_content(response.content) # type: ignore[union-attr]
reply_msg = Msg(name=self.name, content=content, role="assistant")
await self.memory.add(reply_msg)
return reply_msg
def _extract_thinking_and_text(self, chunk_content) -> tuple:
"""从流式响应块中分别提取 thinking 和 text 内容"""
thinking = ""
text = ""
if isinstance(chunk_content, str):
if "'type': 'thinking'" not in chunk_content and '"type": "thinking"' not in chunk_content:
text = chunk_content
elif isinstance(chunk_content, dict):
if chunk_content.get('type') == 'thinking':
thinking = chunk_content.get('thinking', '')
elif chunk_content.get('type') == 'text':
text = chunk_content.get('text', '')
else:
text = chunk_content.get('text', '')
elif isinstance(chunk_content, list):
for item in chunk_content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
thinking += item.get('thinking', '')
elif item.get('type') == 'text':
text += item.get('text', '')
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
text += item
return thinking, text
def _extract_content(self, content) -> str:
"""从响应中提取字符串内容,过滤 thinking"""
if isinstance(content, str):
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
else:
texts.append(str(item))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
async def observe(self, msg: Msg | List[Msg] | None) -> None:
if msg is not None:
await self.memory.add(msg)
async def handle_interrupt(self) -> Msg:
return Msg(name=self.name, content="评估被中断。", role="assistant")
def assess(self, trader_report: str) -> Dict:
"""评估交易决策(同步方法)"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self._assess_async(trader_report))
finally:
loop.close()
async def _assess_async(self, trader_report: str) -> Dict:
"""异步评估交易决策"""
report_summary = trader_report[:600] if len(trader_report) > 600 else trader_report
prompt = f"""请从保守型角度评估以下交易决策(300字内):
{report_summary}
评估要求:重点识别潜在风险点,建议较低仓位(3-10%),量化最大可能损失
请直接用Markdown格式回复:
### 保守型评估
- 建议仓位: XX%
- 操作建议: 降低仓位/谨慎/观望
- 最大损失: -XX%
- 风险警示: [具体风险点]
"""
try:
msg = Msg(name="user", content=prompt, role="user")
response = await self.reply(msg)
content = self._extract_content(response.content)
return {"perspective": "保守型", "content": content, "position": "5%"}
except Exception as e:
print(f"[{self.name}] 评估失败: {e}")
return {"perspective": "保守型", "position": "5%", "content": "市场存在不确定性,建议降低仓位"}
class RiskFacilitator(AgentBase):
"""风险讨论协调人 - 使用 AgentScope AgentBase 实现"""
def __init__(self):
super().__init__()
self.name = "RiskFacilitator"
self.model = OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
)
self.formatter = OpenAIChatFormatter()
self.memory = InMemoryMemory()
self.sys_prompt = """你是一名专业的风险管理协调者,负责基于量化数据协调三种风险偏好。
你的职责:
1. 量化对比三种观点的仓位、风险等级、收益预期
2. 按权重(30%激进 + 40%中性 + 30%保守)计算综合仓位
3. 给出平衡后的风险等级
4. 提供具体的风控措施
要求:结论要客观公正,引用具体数据。请用中文回答。
"""
async def reply(self, msg: Msg | List[Msg] | None) -> Msg:
"""处理消息并回复,支持流式输出(显示 thinking 过程)"""
if msg is not None:
await self.memory.add(msg)
memory_content = await self.memory.get_memory()
prompt = await self.formatter.format([
Msg("system", self.sys_prompt, "system"),
*memory_content,
])
response = await self.model(prompt)
# 处理流式响应 - 显示 thinking 思考过程,但报告只保留 text
if hasattr(response, '__aiter__'):
last_thinking = ""
last_text = ""
final_text = ""
print(f"\n[{self.name}] ", flush=True)
async for chunk in response: # pyright: ignore[reportGeneralTypeIssues]
if hasattr(chunk, 'content') and chunk.content:
# 分别提取 thinking 和 text
thinking, text = self._extract_thinking_and_text(chunk.content)
# 显示 thinking 增量(灰色标记)
if thinking and len(thinking) > len(last_thinking):
delta = thinking[len(last_thinking):]
print(delta, end='', flush=True)
last_thinking = thinking
# 显示 text 增量
if text and len(text) > len(last_text):
delta = text[len(last_text):]
print(delta, end='', flush=True)
last_text = text
final_text = text # 保留最后的完整 text 内容
print() # 换行
content = final_text
else:
content = self._extract_content(response.content) # type: ignore[union-attr]
reply_msg = Msg(name=self.name, content=content, role="assistant")
await self.memory.add(reply_msg)
return reply_msg
def _extract_thinking_and_text(self, chunk_content) -> tuple:
"""从流式响应块中分别提取 thinking 和 text 内容"""
thinking = ""
text = ""
if isinstance(chunk_content, str):
# 字符串格式,无法区分,都当作 text
if "'type': 'thinking'" not in chunk_content and '"type": "thinking"' not in chunk_content:
text = chunk_content
elif isinstance(chunk_content, dict):
if chunk_content.get('type') == 'thinking':
thinking = chunk_content.get('thinking', '')
elif chunk_content.get('type') == 'text':
text = chunk_content.get('text', '')
else:
text = chunk_content.get('text', '')
elif isinstance(chunk_content, list):
for item in chunk_content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
thinking += item.get('thinking', '')
elif item.get('type') == 'text':
text += item.get('text', '')
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
text += item
return thinking, text
def _extract_content(self, content) -> str:
"""从响应中提取字符串内容,过滤 thinking"""
if isinstance(content, str):
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
import re
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
else:
texts.append(str(item))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
async def observe(self, msg: Msg | List[Msg] | None) -> None:
if msg is not None:
await self.memory.add(msg)
async def handle_interrupt(self) -> Msg:
return Msg(name=self.name, content="协调被中断。", role="assistant")
def facilitate_discussion(self, aggressive: AggressiveRisk, neutral: NeutralRisk,
conservative: ConservativeRisk, trader_report: str,
rounds: int = 2) -> str:
"""主持风险讨论(同步方法)"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
self._facilitate_discussion_async(aggressive, neutral, conservative, trader_report, rounds)
)
finally:
loop.close()
async def _facilitate_discussion_async(self, aggressive: AggressiveRisk, neutral: NeutralRisk,
conservative: ConservativeRisk, trader_report: str,
rounds: int = 2) -> str:
"""异步主持风险讨论"""
# 在异步方法中直接调用异步方法,避免嵌套事件循环
agg_assess = await aggressive._assess_async(trader_report)
neu_assess = await neutral._assess_async(trader_report)
cons_assess = await conservative._assess_async(trader_report)
recommendation = await self._make_recommendation_async(agg_assess, neu_assess, cons_assess)
return self._generate_report(agg_assess, neu_assess, cons_assess, recommendation, rounds)
async def _make_recommendation_async(self, agg: Dict, neu: Dict, cons: Dict) -> Dict:
"""异步形成最终风险建议"""
def parse_position(pos_str):
try:
return float(str(pos_str).replace('%', ''))
except:
return 10.0
agg_pos = parse_position(agg.get('position', '20%'))
neu_pos = parse_position(neu.get('position', '10%'))
cons_pos = parse_position(cons.get('position', '5%'))
adjusted_position = (agg_pos * 0.3 + neu_pos * 0.4 + cons_pos * 0.3)
prompt = f"""请协调三种风险管理者的评估(300字内):
【激进型】建议仓位 {agg.get('position', '20%')}
{agg.get('content', '')[:200]}
【中性型】建议仓位 {neu.get('position', '10%')}
{neu.get('content', '')[:200]}
【保守型】建议仓位 {cons.get('position', '5%')}
{cons.get('content', '')[:200]}
协调要求:按权重(30%激进 + 40%中性 + 30%保守)计算综合仓位,提供风控措施
请直接用Markdown格式回复:
### 协调结果
- 综合仓位: XX%
- 风控措施: [具体措施]
- 综合结论: [结论]
"""
try:
msg = Msg(name="user", content=prompt, role="user")
response = await self.reply(msg)
content = self._extract_content(response.content)
return {
"adjusted_position": f"{adjusted_position:.1f}%",
"content": content,
"risk_controls": "设置止损线、分批建仓、密切关注市场变化"
}
except Exception as e:
print(f"[{self.name}] 协调失败: {e}")
return {
"adjusted_position": f"{adjusted_position:.1f}%",
"content": "综合三方观点,建议采取平衡策略",
"risk_controls": "设置止损线、分批建仓、密切关注市场变化"
}
def _generate_report(self, agg: Dict, neu: Dict, cons: Dict,
recommendation: Dict, rounds: int) -> str:
return f"""# 风险管理讨论报告
## 讨论信息
- 讨论轮次: {rounds}
- 讨论时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## 激进派观点
- 建议仓位: {agg.get('position', 'N/A')}
{agg.get('content', '')}
## 中性派观点
- 建议仓位: {neu.get('position', 'N/A')}
{neu.get('content', '')}
## 保守派观点
- 建议仓位: {cons.get('position', 'N/A')}
{cons.get('content', '')}
## 协调结果
- 风险调整后仓位: {recommendation.get('adjusted_position', 'N/A')}
- 风险控制措施: {recommendation.get('risk_controls', 'N/A')}
{recommendation.get('content', '')}
"""
```
### scripts/agents/trader.py
```python
"""
交易员智能体模块
使用 AgentScope AgentBase 实现
负责综合分析师报告和研究员结论,提出交易建议
"""
import json
import asyncio
from datetime import datetime
from typing import Dict, List
from agentscope.agent import AgentBase
from agentscope.model import OpenAIChatModel
from agentscope.formatter import OpenAIChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from ..config import config, LLM_API_KEY
class Trader(AgentBase):
"""
交易员智能体 - 使用 AgentScope AgentBase 实现
综合研究报告,提出交易建议
"""
def __init__(self):
super().__init__()
self.name = "Trader"
self.model = OpenAIChatModel(
model_name=config.model_name,
api_key=LLM_API_KEY.get("bailian"),
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=True,
)
self.formatter = OpenAIChatFormatter()
self.memory = InMemoryMemory()
self.sys_prompt = """你是一名专业的交易员,负责基于多维度量化数据做出交易决策。
你的职责:
1. 综合评估技术面、基本面、舆情面的分析报告
2. 结合研究员辩论结论调整判断
3. 给出明确的交易建议(买入/卖出/持有/观望)
4. 提供量化的仓位建议、目标价和止损价
要求:决策要基于数据,理由要具体明确,全文使用中文。请用中文回答。
"""
async def reply(self, msg: Msg | List[Msg] | None) -> Msg:
"""处理消息并回复,支持流式输出(显示 thinking 过程)"""
if msg is not None:
await self.memory.add(msg)
memory_content = await self.memory.get_memory()
prompt = await self.formatter.format([
Msg("system", self.sys_prompt, "system"),
*memory_content,
])
response = await self.model(prompt)
# 处理流式响应 - 显示 thinking 思考过程,但报告只保留 text
if hasattr(response, '__aiter__'):
last_thinking = ""
last_text = ""
final_text = ""
print(f"\n[{self.name}] ", flush=True)
async for chunk in response: # pyright: ignore[reportGeneralTypeIssues]
if hasattr(chunk, 'content') and chunk.content:
# 分别提取 thinking 和 text
thinking, text = self._extract_thinking_and_text(chunk.content)
# 显示 thinking 增量
if thinking and len(thinking) > len(last_thinking):
delta = thinking[len(last_thinking):]
print(delta, end='', flush=True)
last_thinking = thinking
# 显示 text 增量
if text and len(text) > len(last_text):
delta = text[len(last_text):]
print(delta, end='', flush=True)
last_text = text
final_text = text # 保留最后的完整 text 内容
print() # 换行
content = final_text
else:
# 安全地获取 content 属性
response_content = getattr(response, 'content', '')
content = self._extract_content(response_content)
reply_msg = Msg(name=self.name, content=content, role="assistant")
await self.memory.add(reply_msg)
return reply_msg
def _extract_thinking_and_text(self, chunk_content) -> tuple:
"""从流式响应块中分别提取 thinking 和 text 内容"""
thinking = ""
text = ""
if isinstance(chunk_content, str):
# 字符串格式,无法区分,都当作 text
if "'type': 'thinking'" not in chunk_content and '"type": "thinking"' not in chunk_content:
text = chunk_content
elif isinstance(chunk_content, dict):
if chunk_content.get('type') == 'thinking':
thinking = chunk_content.get('thinking', '')
elif chunk_content.get('type') == 'text':
text = chunk_content.get('text', '')
else:
text = chunk_content.get('text', '')
elif isinstance(chunk_content, list):
for item in chunk_content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
thinking += item.get('thinking', '')
elif item.get('type') == 'text':
text += item.get('text', '')
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
text += item
return thinking, text
def _extract_content(self, content) -> str:
"""从响应中提取字符串内容,过滤 thinking 内容"""
if isinstance(content, str):
# 检查是否包含 thinking 格式的字符串
if "'type': 'thinking'" in content or '"type": "thinking"' in content:
# 尝试提取实际文本内容
import re
# 移除所有 thinking 块
content = re.sub(r"\[\{'type': 'thinking'[^\]]*\]\]", "", content)
content = re.sub(r'\[\{"type": "thinking"[^\]]*\]\]', "", content)
return content
elif isinstance(content, list):
# 过滤 thinking 内容
texts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'thinking':
continue
if item.get('type') == 'text':
texts.append(item.get('text', ''))
else:
texts.append(str(item))
elif isinstance(item, str):
if "'type': 'thinking'" not in item and '"type": "thinking"' not in item:
texts.append(item)
return "\n".join(texts) if texts else ""
else:
return str(content)
async def observe(self, msg: Msg | List[Msg] | None) -> None:
"""观察消息但不回复"""
if msg is not None:
await self.memory.add(msg)
async def handle_interrupt(self) -> Msg:
"""处理中断"""
return Msg(name=self.name, content="交易决策被中断。", role="assistant")
def make_decision(self, analyst_reports: Dict[str, str],
research_debate: str) -> str:
"""做出交易决策(同步方法,供外部调用)"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
self._make_decision_async(analyst_reports, research_debate)
)
return result
finally:
loop.close()
async def _make_decision_async(self, analyst_reports: Dict[str, str],
research_debate: str) -> str:
"""异步做出交易决策"""
scores = self._extract_scores(analyst_reports)
# 截取报告内容,避免请求过大
combined_reports = "\n\n".join([
f"### {name}\n{report[:600]}..." if len(report) > 600 else f"### {name}\n{report}"
for name, report in analyst_reports.items()
])
research_summary = research_debate[:800] if len(research_debate) > 800 else research_debate
prompt = f"""请根据以下分析报告做出交易决策:
## 分析师报告摘要
{combined_reports}
## 研究员辩论结论
{research_summary}
## 决策要求:
1. **综合评分**:汇总技术面、基本面、舆情面的评分
2. **加权计算**:按权重({int(config.tech_weight*100)}%技术 + {int(config.fund_weight*100)}%基本 + {int(config.news_weight*100)}%舆情)计算总分
3. **量化决策**:给出具体的仓位、目标价、止损价
请直接用Markdown格式输出交易决策报告(500字内):
# 交易决策报告
## 决策信号
- 方向: 买入/卖出/持有/观望
- 置信度: XX/100
- 建议仓位: XX%
## 综合评分
- 技术面: XX/100
- 基本面: XX/100
- 舆情面: XX/100
- 加权总分: XX/100
## 决策依据
[技术面、基本面、舆情面的具体分析理由]
## 目标价位
- 目标价: XXX
- 止损价: XXX
"""
try:
msg = Msg(name="user", content=prompt, role="user")
response = await self.reply(msg)
report = self._extract_content(response.content)
return report
except Exception as e:
print(f"[{self.name}] 模型调用失败: {e}")
# 备用报告
return self._generate_fallback_report(scores)
def _extract_scores(self, analyst_reports: Dict[str, str]) -> Dict:
"""从报告中提取评分"""
scores = {"tech_score": 50, "fund_score": 50, "news_score": 50}
for name, report in analyst_reports.items():
lines = report.split('\n')
for line in lines:
if "技术评分" in line and "/100" in line:
try:
scores["tech_score"] = int(line.split(':')[1].split('/')[0].strip())
except:
pass
elif "基本面评分" in line and "/100" in line:
try:
scores["fund_score"] = int(line.split(':')[1].split('/')[0].strip())
except:
pass
elif "舆情评分" in line and "/100" in line:
try:
scores["news_score"] = int(line.split(':')[1].split('/')[0].strip())
except:
pass
return scores
def _generate_fallback_report(self, scores: Dict) -> str:
"""生成备用报告"""
total_score = int(
scores["tech_score"] * config.tech_weight +
scores["fund_score"] * config.fund_weight +
scores["news_score"] * config.news_weight
) // (config.tech_weight + config.fund_weight + config.news_weight)
if total_score >= 70:
action, position = "买入", "15%"
elif total_score >= 55:
action, position = "持有", "10%"
elif total_score >= 40:
action, position = "观望", "5%"
else:
action, position = "观望", "0%"
return f"""# 交易决策报告
## 决策时间
{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## 决策信号
- 方向: **{action}**
- 置信度: {total_score}/100
- 建议仓位: {position}
## 综合评分
- 技术面: {scores.get('tech_score', 50)}/100
- 基本面: {scores.get('fund_score', 50)}/100
- 舆情面: {scores.get('news_score', 50)}/100
- 加权总分: {total_score}/100
## 目标价位
- 目标价: 根据技术分析确定
- 止损价: 建议设置8%止损
"""
def _make_rule_based_decision(self, scores: Dict, research_debate: str) -> Dict:
"""基于规则的决策(备用)"""
total_score = (
scores["tech_score"] * config.tech_weight +
scores["fund_score"] * config.fund_weight +
scores["news_score"] * config.news_weight
) / (config.tech_weight + config.fund_weight + config.news_weight)
if "看涨" in research_debate and "胜出" in research_debate:
total_score += 10
elif "看跌" in research_debate and "胜出" in research_debate:
total_score -= 10
if total_score >= 70:
action, confidence, position = "买入", min(95, int(total_score + 10)), "15%"
elif total_score >= 55:
action, confidence, position = "持有", int(total_score), "10%"
elif total_score >= 40:
action, confidence, position = "观望", int(total_score), "0%"
else:
action, confidence, position = "卖出", min(95, int(100 - total_score)), "0%"
return {
"action": action, "confidence": confidence, "position": position,
"target_price": "根据技术分析确定", "stop_loss": "建议设置8%止损",
"tech_reason": f"技术面评分{scores['tech_score']}分",
"fund_reason": f"基本面评分{scores['fund_score']}分",
"news_reason": f"舆情面评分{scores['news_score']}分"
}
def _generate_report(self, scores: Dict, decision: Dict) -> str:
"""生成交易决策报告"""
return f"""# 交易决策报告
## 决策时间
{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## 决策信号
| 字段 | 值 |
|------|----||
| 方向 | **{decision.get('action', '观望')}** |
| 置信度 | {decision.get('confidence', 50)}/100 |
| 建议仓位 | {decision.get('position', '0%')} |
## 决策依据
### 技术面(权重{int(config.tech_weight * 100)}%)
- 综合评分: {scores.get('tech_score', 50)}/100
- 核心理由: {decision.get('tech_reason', '无')}
### 基本面(权重{int(config.fund_weight * 100)}%)
- 综合评分: {scores.get('fund_score', 50)}/100
- 核心理由: {decision.get('fund_reason', '无')}
### 舆情面(权重{int(config.news_weight * 100)}%)
- 综合评分: {scores.get('news_score', 50)}/100
- 核心理由: {decision.get('news_reason', '无')}
## 目标价位
- 目标价: {decision.get('target_price', 'N/A')}
- 止损价: {decision.get('stop_loss', 'N/A')}
"""
```
### scripts/batch_diagnose.py
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量股票诊断模块
支持输入股票代码列表进行批量诊断,并通过钉钉机器人推送结果
"""
import os
import sys
import json
import argparse
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from dotenv import load_dotenv
load_dotenv()
from .stock_advisor import StockAdvisorSystem
# 钉钉机器人Webhook URL(从环境变量读取)
DINGTALK_WEBHOOK_URL = os.getenv('DINGTALK_WEBHOOK_URL', '')
# 钉钉机器人签名密钥(可选,用于加签模式)
DINGTALK_SECRET = os.getenv('DINGTALK_SECRET', '')
def convert_to_ts_code(stock_code: str) -> str:
"""
将股票代码转换为ts_code格式
Args:
stock_code: 6位股票代码(如 600519)或已带后缀的代码(如 600519.SH)
Returns:
ts_code格式(如 600519.SH)
"""
code = stock_code.strip().upper()
# 如果已经是 ts_code 格式,直接返回
if '.' in code:
return code
# 移除可能的前缀
if code.startswith('SH'):
code = code[2:]
return f"{code}.SH"
elif code.startswith('SZ'):
code = code[2:]
return f"{code}.SZ"
# 根据代码前缀判断交易所
if code.startswith('6'):
return f"{code}.SH" # 上海
elif code.startswith('0') or code.startswith('3'):
return f"{code}.SZ" # 深圳
elif code.startswith('8') or code.startswith('4'):
return f"{code}.BJ" # 北交所
else:
return f"{code}.SZ" # 默认深圳
def parse_stock_codes(codes_input: str) -> List[str]:
"""
解析股票代码输入,支持多种格式
Args:
codes_input: 股票代码字符串,支持逗号、空格、换行分隔
Returns:
ts_code 格式的股票代码列表
"""
# 替换各种分隔符为逗号
codes_str = codes_input.replace('\n', ',').replace(' ', ',').replace(',', ',')
# 分割并去重
codes = [c.strip() for c in codes_str.split(',') if c.strip()]
# 转换为 ts_code 格式
ts_codes = [convert_to_ts_code(code) for code in codes]
# 去重并保持顺序
seen = set()
unique_codes = []
for code in ts_codes:
if code not in seen:
seen.add(code)
unique_codes.append(code)
return unique_codes
def batch_diagnose(
stock_codes: List[str],
output_dir: str = "report"
) -> List[Dict]:
"""
批量诊断指定的股票列表
Args:
stock_codes: 股票代码列表(ts_code格式)
output_dir: 报告输出目录
Returns:
诊断结果列表
"""
print("=" * 60)
print("批量股票诊断系统")
print("=" * 60)
print(f"待诊断股票: {len(stock_codes)} 只")
print(f"股票列表: {', '.join(stock_codes)}")
print("=" * 60)
if not stock_codes:
print("❌ 股票列表为空,无法进行诊断")
return []
# 创建诊断系统
print("\n初始化诊断系统...")
advisor = StockAdvisorSystem()
# 诊断结果
results = []
# 逐一诊断
for i, ts_code in enumerate(stock_codes, 1):
print(f"\n{'=' * 60}")
print(f"[{i}/{len(stock_codes)}] 正在诊断: {ts_code}")
print(f"{'=' * 60}")
try:
# 执行诊断
result = advisor.diagnose(ts_code, output_dir)
if result:
result['rank'] = i
# 保存完整报告
advisor.save_report(result)
results.append(result)
print(f"✅ {ts_code} 诊断完成")
else:
print(f"⚠️ {ts_code} 诊断结果为空")
except Exception as e:
print(f"❌ {ts_code} 诊断失败: {e}")
import traceback
traceback.print_exc()
continue
# 输出汇总
print(f"\n{'=' * 60}")
print("批量诊断汇总")
print(f"{'=' * 60}")
print(f"计划诊断: {len(stock_codes)} 只股票")
print(f"成功完成: {len(results)} 只股票")
print(f"报告目录: {output_dir}")
if results:
print(f"\n诊断完成的股票:")
for r in results:
print(f" - {r['ts_code']} ({r['stock_name']})")
return results
def generate_dingtalk_sign(timestamp: str, secret: str) -> str:
"""
生成钉钉机器人签名(加签模式)
Args:
timestamp: 时间戳(毫秒)
secret: 签名密钥
Returns:
签名字符串
"""
import hmac
import hashlib
import base64
import urllib.parse
string_to_sign = f'{timestamp}\n{secret}'
hmac_code = hmac.new(
secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def send_dingtalk_message(stocks: List[Dict], webhook_url: Optional[str] = None, secret: Optional[str] = None) -> bool:
"""
通过钉钉机器人发送推荐买入股票消息
Args:
stocks: 推荐买入的股票列表
webhook_url: 钉钉机器人Webhook URL,为空则使用环境变量
secret: 签名密钥,为空则使用环境变量
Returns:
是否发送成功
"""
url = webhook_url or DINGTALK_WEBHOOK_URL
secret_key = secret or DINGTALK_SECRET
if not url:
print("\n⚠️ 未配置钉钉Webhook URL")
print("请设置环境变量 DINGTALK_WEBHOOK_URL 或在.env文件中配置")
return False
if not stocks:
print("\n⚠️ 没有推荐买入的股票,跳过发送")
return False
# 如果配置了签名密钥,生成签名
if secret_key:
timestamp = str(int(datetime.now().timestamp() * 1000))
sign = generate_dingtalk_sign(timestamp, secret_key)
url = f"{url}×tamp={timestamp}&sign={sign}"
# 构建消息内容
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 构建Markdown格式消息
markdown_title = "📈 智能诊股推荐"
markdown_text = f"## 📈 智能诊股推荐\n\n"
markdown_text += f"> 分析时间: {current_time}\n\n"
markdown_text += f"**共 {len(stocks)} 只股票推荐买入:**\n\n"
for i, stock in enumerate(stocks, 1):
ts_code = stock.get('ts_code', 'N/A')
stock_name = stock.get('stock_name', 'N/A')
action = stock.get('action', 'N/A')
overall_score = stock.get('overall_score', 'N/A')
confidence = stock.get('confidence', 'N/A')
position = stock.get('position', 'N/A')
target_price = stock.get('target_price', 'N/A')
stop_loss = stock.get('stop_loss', 'N/A')
markdown_text += f"---\n\n"
markdown_text += f"### {i}. {stock_name} ({ts_code})\n\n"
markdown_text += f"- **操作建议**: {action}\n"
markdown_text += f"- **综合评分**: {overall_score}/100\n"
markdown_text += f"- **置信度**: {confidence}/100\n"
markdown_text += f"- **建议仓位**: {position}%\n"
markdown_text += f"- **目标价**: {target_price}\n"
markdown_text += f"- **止损价**: {stop_loss}\n\n"
markdown_text += f"---\n\n"
markdown_text += f"*本消息由AgentScope智能诊股系统自动生成*"
# 构建钉钉消息请求数据
data = {
"msgtype": "markdown",
"markdown": {
"title": markdown_title,
"text": markdown_text
}
}
try:
print(f"\n{'=' * 60}")
print("发送钉钉消息...")
response = requests.post(
url,
json=data,
headers={'Content-Type': 'application/json'},
timeout=10
)
result = response.json()
if result.get('errcode') == 0:
print("✅ 钉钉消息发送成功!")
return True
else:
print(f"❌ 发送失败: {result.get('errmsg', '未知错误')}")
return False
except requests.exceptions.Timeout:
print("❌ 发送超时")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 发送失败: {e}")
return False
except Exception as e:
print(f"❌ 发送异常: {e}")
return False
def send_dingtalk_text_message(text: str, webhook_url: Optional[str] = None, secret: Optional[str] = None) -> bool:
"""
发送纯文本消息到钉钉
Args:
text: 消息文本内容
webhook_url: 钉钉机器人Webhook URL
secret: 签名密钥
Returns:
是否发送成功
"""
url = webhook_url or DINGTALK_WEBHOOK_URL
secret_key = secret or DINGTALK_SECRET
if not url:
print("⚠️ 未配置钉钉Webhook URL")
return False
# 如果配置了签名密钥,生成签名
if secret_key:
timestamp = str(int(datetime.now().timestamp() * 1000))
sign = generate_dingtalk_sign(timestamp, secret_key)
url = f"{url}×tamp={timestamp}&sign={sign}"
data = {
"msgtype": "text",
"text": {
"content": text
}
}
try:
response = requests.post(
url,
json=data,
headers={'Content-Type': 'application/json'},
timeout=10
)
result = response.json()
return result.get('errcode') == 0
except Exception as e:
print(f"发送失败: {e}")
return False
def analyze_reports_from_directory(
report_dir: str = "report",
top_n: int = 5,
days: int = 2
) -> List[Dict]:
"""
从报告目录读取最近的诊断报告,分析并排序,挑选推荐买入的股票
Args:
report_dir: 报告目录路径
top_n: 返回前N只推荐买入的股票
days: 分析最近几天的报告(默认2天,即昨天和今天)
Returns:
排序后的推荐买入股票列表
"""
import re
print(f"\n{'=' * 60}")
print("分析诊断报告 - 筛选推荐买入股票")
print(f"{'=' * 60}")
print(f"报告目录: {report_dir}")
print(f"分析范围: 最近 {days} 天的报告")
if not os.path.exists(report_dir):
print(f"❌ 报告目录不存在: {report_dir}")
return []
# 获取所有子目录(每个子目录是一次诊断)
subdirs = [d for d in os.listdir(report_dir)
if os.path.isdir(os.path.join(report_dir, d))]
if not subdirs:
print("❌ 报告目录下没有诊断报告")
return []
# 筛选最近N天的报告
today = datetime.now().date()
date_threshold = today - timedelta(days=days - 1) # 包含今天
filtered_subdirs = []
for subdir in subdirs:
# 从report目录名解析日期(格式如: 000807_SZ_20260131_011712)
date_match = re.search(r'_(\d{8})_\d{6}$', subdir)
if date_match:
try:
report_date_str = date_match.group(1)
report_date = datetime.strptime(report_date_str, '%Y%m%d').date()
if report_date >= date_threshold:
filtered_subdirs.append(subdir)
except ValueError:
continue
else:
# 无法解析日期,检查文件修改时间
subdir_path = os.path.join(report_dir, subdir)
mtime = datetime.fromtimestamp(os.path.getmtime(subdir_path)).date()
if mtime >= date_threshold:
filtered_subdirs.append(subdir)
print(f"\n找到 {len(subdirs)} 个诊断报告目录")
print(f"符合时间范围的报告: {len(filtered_subdirs)} 个({date_threshold} 至今天)")
if not filtered_subdirs:
print("⚠️ 没有符合时间范围的报告")
return []
all_decisions = []
for subdir in filtered_subdirs:
subdir_path = os.path.join(report_dir, subdir)
# 尝试读取最终决策报告
json_file = os.path.join(subdir_path, "complete_diagnosis_report.json")
decision_file = os.path.join(subdir_path, "最终决策报告.md")
decision_data = None
# 优先从JSON文件读取(数据更完整)
if os.path.exists(json_file):
try:
with open(json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
decision_data = parse_decision_from_json(json_data, subdir)
except Exception as e:
print(f" ⚠️ 解析JSON失败 ({subdir}): {e}")
# 如果JSON不可用,从MD文件解析
if decision_data is None and os.path.exists(decision_file):
try:
with open(decision_file, 'r', encoding='utf-8') as f:
md_content = f.read()
decision_data = parse_decision_from_md(md_content, subdir)
except Exception as e:
print(f" ⚠️ 解析MD失败 ({subdir}): {e}")
if decision_data:
all_decisions.append(decision_data)
print(f"\n成功解析 {len(all_decisions)} 份诊断报告")
if not all_decisions:
print("❌ 没有成功解析的诊断报告")
return []
# 筛选推荐买入的股票(操作建议为UP)
buy_recommendations = [
d for d in all_decisions
if d.get('action', '').upper() in ['UP', '买入', 'BUY', '增持']
]
print(f"\n推荐买入的股票: {len(buy_recommendations)} 只")
if not buy_recommendations:
print("⚠️ 没有推荐买入的股票")
# 显示所有股票的操作建议
print("\n所有股票操作建议:")
for d in all_decisions:
print(f" - {d.get('ts_code', 'N/A')} ({d.get('stock_name', 'N/A')}): {d.get('action', 'N/A')}")
return []
# 按综合评分排序(从高到低)
buy_recommendations.sort(
key=lambda x: (x.get('overall_score', 0), x.get('confidence', 0)),
reverse=True
)
# 取前N只
top_stocks = buy_recommendations[:top_n]
# 输出结果
print(f"\n{'=' * 60}")
print(f"Top {len(top_stocks)} 推荐买入股票(按综合评分排序)")
print(f"{'=' * 60}")
print(f"{'排名':<4} {'代码':<12} {'名称':<10} {'操作':<6} {'综合评分':<8} {'置信度':<8} {'仓位':<6}")
print("-" * 70)
for i, stock in enumerate(top_stocks, 1):
print(f"{i:<4} {stock.get('ts_code', 'N/A'):<12} "
f"{stock.get('stock_name', 'N/A'):<10} "
f"{stock.get('action', 'N/A'):<6} "
f"{stock.get('overall_score', 'N/A'):<8} "
f"{stock.get('confidence', 'N/A'):<8} "
f"{stock.get('position', 'N/A'):<6}")
print("-" * 70)
return top_stocks
def parse_decision_from_json(json_data: Dict, subdir: str) -> Optional[Dict[str, Any]]:
"""从JSON数据解析决策信息"""
import re
final_decision = json_data.get('final_decision', '')
if not final_decision:
return None
result: Dict[str, Any] = {
'ts_code': json_data.get('ts_code', ''),
'stock_name': json_data.get('stock_name', ''),
'report_dir': subdir,
'diagnosis_time': json_data.get('diagnosis_time', ''),
}
# 解析操作建议
action_match = re.search(r'操作建议[\s\|]*\**([^\|\*\n]+)\**', final_decision)
if action_match:
result['action'] = action_match.group(1).strip()
# 解析综合评分
score_match = re.search(r'综合评分[\s\|]*\**(\d+)[//](\d+)\**', final_decision)
if score_match:
result['overall_score'] = int(score_match.group(1))
# 解析置信度
conf_match = re.search(r'置信度[\s\|]*(\d+)[//]?(\d*)', final_decision)
if conf_match:
result['confidence'] = int(conf_match.group(1))
# 解析建议仓位
pos_match = re.search(r'建议仓位[\s\|]*([\d.]+)', final_decision)
if pos_match:
result['position'] = float(pos_match.group(1))
# 解析目标价
target_match = re.search(r'目标价[\s\|]*([\d.]+)', final_decision)
if target_match:
result['target_price'] = float(target_match.group(1))
# 解析止损价
stop_match = re.search(r'止损价[\s\|]*([\d.]+)', final_decision)
if stop_match:
result['stop_loss'] = float(stop_match.group(1))
# 解析决策理由
reason_match = re.search(r'## 决策理由\n([^#]+)', final_decision)
if reason_match:
result['reason'] = reason_match.group(1).strip()
return result
def parse_decision_from_md(md_content: str, subdir: str) -> Optional[Dict[str, Any]]:
"""从Markdown内容解析决策信息"""
import re
result: Dict[str, Any] = {
'report_dir': subdir,
}
# 解析股票代码
code_match = re.search(r'代码[::][\s]*([\w.]+)', md_content)
if code_match:
result['ts_code'] = code_match.group(1).strip()
# 解析股票名称
name_match = re.search(r'名称[::][\s]*([^\n]+)', md_content)
if name_match:
result['stock_name'] = name_match.group(1).strip()
# 解析操作建议
action_match = re.search(r'操作建议[\s\|]*\**([^\|\*\n]+)\**', md_content)
if action_match:
result['action'] = action_match.group(1).strip()
# 解析综合评分
score_match = re.search(r'综合评分[\s\|]*\**(\d+)[//](\d+)\**', md_content)
if score_match:
result['overall_score'] = int(score_match.group(1))
# 解析置信度
conf_match = re.search(r'置信度[\s\|]*(\d+)[//]?(\d*)', md_content)
if conf_match:
result['confidence'] = int(conf_match.group(1))
# 解析建议仓位
pos_match = re.search(r'建议仓位[\s\|]*([\d.]+)', md_content)
if pos_match:
result['position'] = float(pos_match.group(1))
# 解析目标价
target_match = re.search(r'目标价[\s\|]*([\d.]+)', md_content)
if target_match:
result['target_price'] = float(target_match.group(1))
# 解析止损价
stop_match = re.search(r'止损价[\s\|]*([\d.]+)', md_content)
if stop_match:
result['stop_loss'] = float(stop_match.group(1))
# 解析决策理由
reason_match = re.search(r'## 决策理由\n([^#]+)', md_content)
if reason_match:
result['reason'] = reason_match.group(1).strip()
return result
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='批量股票诊断 - 支持输入股票代码列表'
)
parser.add_argument(
'--stocks', '-s',
type=str,
required=False,
help='股票代码列表,逗号分隔(如: 600519,000001,300750)'
)
parser.add_argument(
'--file', '-f',
type=str,
required=False,
help='股票代码文件路径,每行一个代码'
)
parser.add_argument(
'--output', '-o',
type=str,
default='report',
help='报告输出目录(默认: report)'
)
parser.add_argument(
'--analyze', '-a',
action='store_true',
help='分析已有报告,挑选推荐买入的股票'
)
parser.add_argument(
'--analyze-top', '-at',
type=int,
default=5,
help='分析时返回前N只推荐买入的股票(默认: 5)'
)
parser.add_argument(
'--send-dingtalk', '-d',
action='store_true',
help='将推荐买入的股票发送到钉钉'
)
parser.add_argument(
'--webhook-url',
type=str,
default='',
help='钉钉机器人Webhook URL(也可通过环境变量DINGTALK_WEBHOOK_URL配置)'
)
parser.add_argument(
'--secret',
type=str,
default='',
help='钉钉机器人签名密钥(也可通过环境变量DINGTALK_SECRET配置)'
)
args = parser.parse_args()
# 分析已有报告
if args.analyze:
top_stocks = analyze_reports_from_directory(
report_dir=args.output,
top_n=args.analyze_top
)
if top_stocks:
print(f"\n✅ 分析完成,推荐买入 {len(top_stocks)} 只股票")
# 发送钉钉消息
if args.send_dingtalk:
send_dingtalk_message(
top_stocks,
args.webhook_url if args.webhook_url else None,
args.secret if args.secret else None
)
else:
print("\n⚠️ 没有推荐买入的股票")
return
# 获取股票代码列表
stock_codes = []
if args.stocks:
# 从命令行参数获取
stock_codes = parse_stock_codes(args.stocks)
elif args.file:
# 从文件读取
if os.path.exists(args.file):
with open(args.file, 'r', encoding='utf-8') as f:
content = f.read()
stock_codes = parse_stock_codes(content)
else:
print(f"❌ 文件不存在: {args.file}")
return
else:
# 交互式输入
print("请输入股票代码(逗号分隔,如: 600519,000001,300750):")
codes_input = input().strip()
if codes_input:
stock_codes = parse_stock_codes(codes_input)
if not stock_codes:
print("❌ 未提供有效的股票代码")
print("\n使用方法:")
print(" python batch_diagnose.py --stocks 600519,000001,300750")
print(" python batch_diagnose.py --file stocks.txt")
print(" python batch_diagnose.py --analyze --send-dingtalk")
return
# 执行批量诊断
results = batch_diagnose(
stock_codes=stock_codes,
output_dir=args.output
)
if results:
print(f"\n✅ 批量诊断完成,共诊断 {len(results)} 只股票")
# 分析结果并发送钉钉
if args.send_dingtalk:
print("\n" + "=" * 60)
print("分析诊断结果...")
top_stocks = analyze_reports_from_directory(
report_dir=args.output,
top_n=args.analyze_top,
days=1 # 只分析今天的报告
)
if top_stocks:
send_dingtalk_message(
top_stocks,
args.webhook_url if args.webhook_url else None,
args.secret if args.secret else None
)
else:
print("\n❌ 批量诊断未产生有效结果")
if __name__ == "__main__":
main()
```
### scripts/config.py
```python
"""
AgentScope股票诊断系统配置文件
使用 AgentScope 框架的标准模型配置
"""
import os
from dataclasses import dataclass, field
from typing import Optional
from dotenv import load_dotenv
load_dotenv()
# AgentScope 相关导入(供外部模块使用)
from agentscope.model import OpenAIChatModel
from agentscope.formatter import OpenAIChatFormatter
from agentscope.memory import InMemoryMemory
# LLM API配置
LLM_API_KEY = {
"bailian": os.getenv("ALIYUN_BAILIAN_API_KEY", "")
}
@dataclass
class Config:
"""系统配置"""
# Tushare配置
tushare_token: str = field(
default_factory=lambda: os.getenv("TUSHARE_TOKEN", "")
)
# 阿里云百炼配置
bailian_api_key: str = field(
default_factory=lambda: os.getenv("ALIYUN_BAILIAN_API_KEY", "")
)
# 模型配置
model_name: str = "qwen3.5-plus" # 默认使用Qwen-Max
temperature: float = 0.3
# 支持的模型配置
supported_models: dict = field(default_factory=lambda: {
"kimi-k2.5": {
"display_name": "Kimi",
"model_name": "kimi-k2.5",
"description": "月之暗面 Kimi 模型"
},
"qwen-max-2025-01-25": {
"display_name": "Qwen-Max",
"model_name": "qwen-max-2025-01-25",
"description": "通义千问 Max 最新版本"
},
"qwen3.5-plus": {
"display_name": "Qwen3.5",
"model_name": "qwen3.5-plus",
"description": "通义千问3.5 Plus"
},
"glm-5": {
"display_name": "GLM5",
"model_name": "glm-5",
"description": "智谱 GLM5"
},
"MiniMax/MiniMax-M2.5": {
"display_name": "Minimax",
"model_name": "MiniMax/MiniMax-M2.5",
"description": "Minimax M2.5"
}
})
# 数据采集配置
market_data_days: int = 60 # 获取近60个交易日数据
news_days: int = 7 # 获取近7天新闻
# 辩论/讨论配置
debate_rounds: int = 2 # 研究员辩论轮数
risk_discussion_rounds: int = 2 # 风控讨论轮数
# 权重配置
tech_weight: float = 0.25 # 技术面权重
fund_weight: float = 0.35 # 基本面权重
news_weight: float = 0.20 # 舆情面权重
research_weight: float = 0.20 # 研究员共识权重
def validate(self) -> bool:
"""验证配置完整性"""
errors = []
if not self.tushare_token:
errors.append("缺少TUSHARE_TOKEN")
if not self.bailian_api_key:
errors.append("缺少ALIYUN_BAILIAN_API_KEY")
if errors:
print(f"配置验证失败: {', '.join(errors)}")
return False
return True
# 全局配置实例
config = Config()
def create_model(model_name: Optional[str] = None, stream: bool = True) -> OpenAIChatModel:
"""
创建 AgentScope OpenAIChatModel 实例(通过百炼平台调用)
Args:
model_name: 模型名称,默认使用配置中的 model_name
stream: 是否启用流式输出
Returns:
OpenAIChatModel 实例
"""
actual_model_name = model_name or config.model_name
# 验证模型是否支持
if actual_model_name not in config.supported_models:
print(f"警告: 不支持的模型 {actual_model_name},将使用默认模型")
actual_model_name = config.model_name
return OpenAIChatModel(
model_name=actual_model_name,
api_key=LLM_API_KEY.get("bailian") or "",
client_args={
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1"
},
stream=stream,
)
def create_formatter() -> OpenAIChatFormatter:
"""创建 AgentScope OpenAI 消息格式化器"""
return OpenAIChatFormatter()
def create_memory() -> InMemoryMemory:
"""创建 AgentScope 内存实例"""
return InMemoryMemory()
```
### scripts/requirements.txt
```text
# AgentScope股票诊断智能体系统依赖
# 智能体框架
agentscope>=0.0.5
# 数据接口
tushare>=1.2.89
akshare>=1.12.0
# 阿里云夸克搜索SDK
alibabacloud_iqs20241111>=1.0.0
alibabacloud_tea_openapi>=0.3.0
# 基础依赖
pandas>=2.0.0
numpy>=1.24.0
requests>=2.31.0
# LLM支持
openai>=1.0.0
# Web界面
streamlit>=1.28.0
# PDF生成
fpdf2>=2.8.0
python-dotenv
```
### scripts/tools/__init__.py
```python
"""数据采集工具模块
包含:
1. TushareTools: Tushare 数据接口
2. AKShareTools: AKShare 数据接口
3. AgentScope Toolkit: 使用 AgentScope Toolkit 注册的工具函数
"""
from .tushare_tools import TushareTools
from .akshare_tools import AKShareTools
from .toolkit import (
create_stock_toolkit,
create_market_analyst_toolkit,
create_fundamentals_analyst_toolkit,
create_news_analyst_toolkit,
# 工具函数
get_stock_daily,
get_technical_indicators,
get_stock_basic,
get_valuation,
get_financial_indicator,
get_stock_news,
get_market_sentiment,
)
__all__ = [
# 基础工具类
"TushareTools",
"AKShareTools",
# AgentScope Toolkit
"create_stock_toolkit",
"create_market_analyst_toolkit",
"create_fundamentals_analyst_toolkit",
"create_news_analyst_toolkit",
# 工具函数
"get_stock_daily",
"get_technical_indicators",
"get_stock_basic",
"get_valuation",
"get_financial_indicator",
"get_stock_news",
"get_market_sentiment",
]
```
### scripts/tools/akshare_tools.py
```python
"""
AKShare数据采集工具
提供新闻舆情数据的获取功能
"""
import os
from datetime import datetime, timedelta
from typing import Dict, List
import json
try:
import akshare as ak
AKSHARE_AVAILABLE = True
except ImportError:
AKSHARE_AVAILABLE = False
print("警告: akshare未安装,将使用模拟数据")
class AKShareTools:
"""AKShare数据采集工具类"""
def __init__(self):
"""初始化AKShare工具"""
if AKSHARE_AVAILABLE:
print("✅ AKShare初始化成功")
else:
print("⚠️ AKShare未安装,将使用模拟数据")
def get_stock_news(self, stock_code: str, stock_name: str = "", days: int = 7) -> Dict:
"""
获取个股相关新闻
Args:
stock_code: 股票代码(如 600519)
stock_name: 股票名称(用于搜索)
days: 获取天数
Returns:
新闻数据字典
"""
# 提取纯股票代码
pure_code = stock_code.split('.')[0] if '.' in stock_code else stock_code
news_list = []
if AKSHARE_AVAILABLE:
try:
# 尝试获取个股新闻
df = ak.stock_news_em(symbol=pure_code)
if df is not None and not df.empty:
# 取最近的新闻
for _, row in df.head(20).iterrows():
news_item = {
"title": row.get('新闻标题', ''),
"content": row.get('新闻内容', '')[:200] if row.get('新闻内容') else '',
"source": row.get('文章来源', ''),
"publish_time": str(row.get('发布时间', '')),
"url": row.get('新闻链接', '')
}
news_list.append(news_item)
except Exception as e:
print(f"获取个股新闻失败: {e}")
# 如果没有获取到新闻,使用模拟数据
if not news_list:
news_list = self._mock_stock_news(pure_code, stock_name)
# 分析新闻情绪
sentiment_result = self._analyze_sentiment(news_list)
return {
"stock_code": stock_code,
"stock_name": stock_name,
"news_count": len(news_list),
"news_list": news_list[:10], # 最多返回10条
"sentiment": sentiment_result
}
def get_market_news(self, limit: int = 10) -> List[Dict]:
"""
获取市场热点新闻
Args:
limit: 获取条数
Returns:
新闻列表
"""
news_list = []
if AKSHARE_AVAILABLE:
try:
# 获取财经新闻
df = ak.stock_info_global_em()
if df is not None and not df.empty:
for _, row in df.head(limit).iterrows():
news_item = {
"title": row.get('标题', ''),
"content": row.get('内容', '')[:200] if row.get('内容') else '',
"publish_time": str(row.get('发布时间', ''))
}
news_list.append(news_item)
except Exception as e:
print(f"获取市场新闻失败: {e}")
if not news_list:
news_list = self._mock_market_news(limit)
return news_list
def get_industry_news(self, industry: str, limit: int = 5) -> List[Dict]:
"""
获取行业新闻
Args:
industry: 行业名称
limit: 获取条数
Returns:
新闻列表
"""
# 行业新闻通常需要特定接口,这里简化处理
return self._mock_industry_news(industry, limit)
def get_market_sentiment(self) -> Dict:
"""
获取市场情绪指标
Returns:
市场情绪数据
"""
if AKSHARE_AVAILABLE:
try:
# 获取涨跌统计
df = ak.stock_changes_em()
if df is not None and not df.empty:
up_count = len(df[df.get('涨跌幅', 0) > 0]) if '涨跌幅' in df.columns else 0
down_count = len(df[df.get('涨跌幅', 0) < 0]) if '涨跌幅' in df.columns else 0
flat_count = len(df[df.get('涨跌幅', 0) == 0]) if '涨跌幅' in df.columns else 0
total = up_count + down_count + flat_count
if total > 0:
up_ratio = up_count / total * 100
if up_ratio > 60:
mood = "乐观"
elif up_ratio < 40:
mood = "悲观"
else:
mood = "中性"
return {
"up_count": up_count,
"down_count": down_count,
"flat_count": flat_count,
"up_ratio": round(up_ratio, 2),
"market_mood": mood,
"update_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
print(f"获取市场情绪失败: {e}")
# 返回模拟数据
import random
up_ratio = random.uniform(30, 70)
if up_ratio > 55:
mood = "乐观"
elif up_ratio < 45:
mood = "悲观"
else:
mood = "中性"
return {
"up_count": int(up_ratio * 50),
"down_count": int((100 - up_ratio) * 50),
"flat_count": 100,
"up_ratio": round(up_ratio, 2),
"market_mood": mood,
"update_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
def _analyze_sentiment(self, news_list: List[Dict]) -> Dict:
"""
分析新闻情绪
Args:
news_list: 新闻列表
Returns:
情绪分析结果
"""
# 简化的情绪分析(基于关键词)
positive_keywords = ['上涨', '增长', '利好', '突破', '创新高', '盈利', '超预期', '看好', '买入', '增持', '推荐']
negative_keywords = ['下跌', '下滑', '利空', '亏损', '减持', '风险', '警告', '暴跌', '卖出', '减仓', '回调']
positive_count = 0
negative_count = 0
neutral_count = 0
analyzed_news = []
for news in news_list:
title = news.get('title', '')
content = news.get('content', '')
text = title + content
pos_score = sum(1 for kw in positive_keywords if kw in text)
neg_score = sum(1 for kw in negative_keywords if kw in text)
if pos_score > neg_score:
sentiment = "利好"
positive_count += 1
elif neg_score > pos_score:
sentiment = "利空"
negative_count += 1
else:
sentiment = "中性"
neutral_count += 1
analyzed_news.append({
**news,
"sentiment": sentiment
})
total = positive_count + negative_count + neutral_count
if total == 0:
total = 1
# 计算情绪评分(0-100)
sentiment_score = 50 + (positive_count - negative_count) / total * 50
sentiment_score = max(0, min(100, sentiment_score))
# 判断整体情绪
if sentiment_score > 60:
overall = "乐观"
elif sentiment_score < 40:
overall = "悲观"
else:
overall = "中性"
return {
"positive_count": positive_count,
"negative_count": negative_count,
"neutral_count": neutral_count,
"positive_pct": round(positive_count / total * 100, 1),
"negative_pct": round(negative_count / total * 100, 1),
"neutral_pct": round(neutral_count / total * 100, 1),
"sentiment_score": round(sentiment_score),
"overall_sentiment": overall,
"analyzed_news": analyzed_news[:5] # 返回前5条分析结果
}
def _mock_stock_news(self, stock_code: str, stock_name: str) -> List[Dict]:
"""生成模拟的个股新闻"""
import random
templates = [
{"title": f"{stock_name}发布业绩预告,净利润同比增长{random.randint(10, 50)}%", "sentiment": "利好"},
{"title": f"{stock_name}获得机构调研,多家券商给予买入评级", "sentiment": "利好"},
{"title": f"{stock_name}签署重大合同,金额达{random.randint(1, 10)}亿元", "sentiment": "利好"},
{"title": f"{stock_name}股东减持计划公告", "sentiment": "利空"},
{"title": f"{stock_name}发布新产品,市场反应平淡", "sentiment": "中性"},
{"title": f"行业分析:{stock_name}所在行业面临调整", "sentiment": "中性"},
{"title": f"{stock_name}高管增持公司股份", "sentiment": "利好"},
{"title": f"{stock_name}收到监管问询函", "sentiment": "利空"},
]
selected = random.sample(templates, min(6, len(templates)))
news_list = []
for i, item in enumerate(selected):
news_list.append({
"title": item["title"],
"content": f"这是关于{stock_name}的详细新闻内容...",
"source": random.choice(["东方财富", "同花顺", "新浪财经", "证券时报"]),
"publish_time": (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d %H:%M'),
"url": f"https://example.com/news/{i}",
"sentiment": item["sentiment"]
})
return news_list
def _mock_market_news(self, limit: int) -> List[Dict]:
"""生成模拟的市场新闻"""
import random
templates = [
"央行宣布货币政策维持稳健中性",
"北向资金今日净流入超50亿元",
"科技板块持续走强,半导体概念领涨",
"新能源汽车销量创新高",
"房地产调控政策进一步优化",
"A股市场成交额突破万亿",
"外资机构看好中国经济增长前景",
"多家上市公司披露回购计划"
]
news_list = []
for i, title in enumerate(random.sample(templates, min(limit, len(templates)))):
news_list.append({
"title": title,
"content": f"{title}的详细内容...",
"publish_time": (datetime.now() - timedelta(hours=i*2)).strftime('%Y-%m-%d %H:%M')
})
return news_list
def _mock_industry_news(self, industry: str, limit: int) -> List[Dict]:
"""生成模拟的行业新闻"""
import random
templates = [
f"{industry}行业景气度持续上升",
f"{industry}板块获资金关注",
f"政策利好{industry}行业发展",
f"{industry}龙头企业业绩超预期",
f"{industry}行业竞争格局分析"
]
news_list = []
for i, title in enumerate(templates[:limit]):
news_list.append({
"title": title,
"content": f"{title}的详细内容...",
"publish_time": (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d %H:%M')
})
return news_list
```
### scripts/tools/toolkit.py
```python
"""
AgentScope 工具注册模块
使用 Toolkit 注册 Tushare/AKShare 数据采集工具
"""
from agentscope.tool import Toolkit, ToolResponse
from agentscope.message import TextBlock
from .tushare_tools import TushareTools
from .akshare_tools import AKShareTools
from ..config import config
# 全局工具实例
_tushare_tools = None
_akshare_tools = None
def get_tushare_tools() -> TushareTools:
"""获取 Tushare 工具单例"""
global _tushare_tools
if _tushare_tools is None:
_tushare_tools = TushareTools(config.tushare_token)
return _tushare_tools
def get_akshare_tools() -> AKShareTools:
"""获取 AKShare 工具单例"""
global _akshare_tools
if _akshare_tools is None:
_akshare_tools = AKShareTools()
return _akshare_tools
# ============ AgentScope 工具函数定义 ============
async def get_stock_daily(ts_code: str, days: int = 60) -> ToolResponse:
"""
获取股票日线行情数据
Args:
ts_code: 股票代码,如 600519.SH
days: 获取天数,默认60天
Returns:
包含日线数据的 ToolResponse
"""
try:
tools = get_tushare_tools()
data = tools.get_stock_daily(ts_code, days)
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"股票 {ts_code} 近 {days} 日行情数据:\n"
f"- 最新收盘价: {data.get('latest_close', 'N/A')}\n"
f"- 最新涨跌幅: {data.get('latest_pct_chg', 'N/A')}%\n"
f"- 最新成交量: {data.get('latest_volume', 'N/A')}\n"
f"- 最新成交额: {data.get('latest_amount', 'N/A')}万元\n"
f"- 数据条数: {data.get('data_count', 0)}"
)
]
)
except Exception as e:
return ToolResponse(
content=[TextBlock(type="text", text=f"获取行情数据失败: {str(e)}")]
)
async def get_technical_indicators(ts_code: str, days: int = 60) -> ToolResponse:
"""
获取股票技术指标数据(MA、MACD、RSI等)
Args:
ts_code: 股票代码,如 600519.SH
days: 数据天数,默认60天
Returns:
包含技术指标的 ToolResponse
"""
try:
tools = get_tushare_tools()
data = tools.get_technical_indicators(ts_code, days)
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"股票 {ts_code} 技术指标:\n"
f"- 当前价: {data.get('current_price', 'N/A')}\n"
f"- MA5: {data.get('ma5', 'N/A')}\n"
f"- MA20: {data.get('ma20', 'N/A')}\n"
f"- MA60: {data.get('ma60', 'N/A')}\n"
f"- MA信号: {data.get('ma_signal', 'N/A')}\n"
f"- MACD: {data.get('macd', 'N/A')}\n"
f"- RSI: {data.get('rsi', 'N/A')}\n"
f"- 趋势: {data.get('trend', 'N/A')}\n"
f"- 支撑位: {data.get('support', 'N/A')}\n"
f"- 阻力位: {data.get('resistance', 'N/A')}\n"
f"- 技术评分: {data.get('technical_score', 50)}/100"
)
]
)
except Exception as e:
return ToolResponse(
content=[TextBlock(type="text", text=f"获取技术指标失败: {str(e)}")]
)
async def get_stock_basic(ts_code: str) -> ToolResponse:
"""
获取股票基本信息
Args:
ts_code: 股票代码,如 600519.SH
Returns:
包含基本信息的 ToolResponse
"""
try:
tools = get_tushare_tools()
data = tools.get_stock_basic(ts_code)
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"股票基本信息:\n"
f"- 代码: {ts_code}\n"
f"- 名称: {data.get('name', 'N/A')}\n"
f"- 行业: {data.get('industry', 'N/A')}\n"
f"- 板块: {data.get('market', 'N/A')}\n"
f"- 上市日期: {data.get('list_date', 'N/A')}"
)
]
)
except Exception as e:
return ToolResponse(
content=[TextBlock(type="text", text=f"获取基本信息失败: {str(e)}")]
)
async def get_valuation(ts_code: str) -> ToolResponse:
"""
获取股票估值数据(PE、PB、PS等)
Args:
ts_code: 股票代码,如 600519.SH
Returns:
包含估值数据的 ToolResponse
"""
try:
tools = get_tushare_tools()
data = tools.get_valuation(ts_code)
total_mv = data.get('total_mv', 0)
total_mv_yi = round(total_mv / 10000, 2) if total_mv else 'N/A'
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"股票 {ts_code} 估值数据:\n"
f"- PE(TTM): {data.get('pe_ttm', 'N/A')}\n"
f"- PB: {data.get('pb', 'N/A')}\n"
f"- PS(TTM): {data.get('ps_ttm', 'N/A')}\n"
f"- 股息率: {data.get('dv_ratio', 'N/A')}%\n"
f"- 总市值: {total_mv_yi}亿元"
)
]
)
except Exception as e:
return ToolResponse(
content=[TextBlock(type="text", text=f"获取估值数据失败: {str(e)}")]
)
async def get_financial_indicator(ts_code: str) -> ToolResponse:
"""
获取股票财务指标数据
Args:
ts_code: 股票代码,如 600519.SH
Returns:
包含财务指标的 ToolResponse
"""
try:
tools = get_tushare_tools()
data = tools.get_financial_indicator(ts_code)
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"股票 {ts_code} 财务指标:\n"
f"- ROE: {data.get('roe', 'N/A')}%\n"
f"- ROA: {data.get('roa', 'N/A')}%\n"
f"- 毛利率: {data.get('gross_margin', 'N/A')}%\n"
f"- 净利率: {data.get('net_margin', 'N/A')}%\n"
f"- 负债率: {data.get('debt_ratio', 'N/A')}%"
)
]
)
except Exception as e:
return ToolResponse(
content=[TextBlock(type="text", text=f"获取财务指标失败: {str(e)}")]
)
async def get_stock_news(ts_code: str, stock_name: str = "", days: int = 7) -> ToolResponse:
"""
获取股票相关新闻
Args:
ts_code: 股票代码,如 600519.SH
stock_name: 股票名称(可选)
days: 新闻天数,默认7天
Returns:
包含新闻数据的 ToolResponse
"""
try:
tools = get_akshare_tools()
data = tools.get_stock_news(ts_code, stock_name, days)
sentiment = data.get('sentiment', {})
analyzed_news = sentiment.get('analyzed_news', [])
news_list = ""
for i, news in enumerate(analyzed_news[:5], 1):
news_list += f"{i}. {news.get('title', 'N/A')} - {news.get('sentiment', '中性')}\n"
if not news_list:
news_list = "暂无相关新闻"
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"股票 {ts_code} 近 {days} 天新闻舆情:\n"
f"- 新闻总数: {data.get('news_count', 0)}条\n"
f"- 利好新闻: {sentiment.get('positive_count', 0)}条 ({sentiment.get('positive_pct', 0)}%)\n"
f"- 利空新闻: {sentiment.get('negative_count', 0)}条 ({sentiment.get('negative_pct', 0)}%)\n"
f"- 中性新闻: {sentiment.get('neutral_count', 0)}条\n"
f"- 整体情绪: {sentiment.get('overall_sentiment', '中性')}\n"
f"- 舆情评分: {sentiment.get('sentiment_score', 50)}/100\n\n"
f"关键新闻:\n{news_list}"
)
]
)
except Exception as e:
return ToolResponse(
content=[TextBlock(type="text", text=f"获取新闻数据失败: {str(e)}")]
)
async def get_market_sentiment() -> ToolResponse:
"""
获取市场整体情绪
Returns:
包含市场情绪数据的 ToolResponse
"""
try:
tools = get_akshare_tools()
data = tools.get_market_sentiment()
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"市场整体情绪:\n"
f"- 上涨股票数: {data.get('up_count', 'N/A')}\n"
f"- 下跌股票数: {data.get('down_count', 'N/A')}\n"
f"- 上涨比例: {data.get('up_ratio', 'N/A')}%\n"
f"- 市场情绪: {data.get('market_mood', '中性')}"
)
]
)
except Exception as e:
return ToolResponse(
content=[TextBlock(type="text", text=f"获取市场情绪失败: {str(e)}")]
)
# ============ 创建 Toolkit ============
def create_stock_toolkit() -> Toolkit:
"""
创建股票分析工具包
Returns:
Toolkit: 包含所有股票分析工具的工具包
"""
toolkit = Toolkit()
# 注册 Tushare 工具
toolkit.register_tool_function(get_stock_daily)
toolkit.register_tool_function(get_technical_indicators)
toolkit.register_tool_function(get_stock_basic)
toolkit.register_tool_function(get_valuation)
toolkit.register_tool_function(get_financial_indicator)
# 注册 AKShare 工具
toolkit.register_tool_function(get_stock_news)
toolkit.register_tool_function(get_market_sentiment)
return toolkit
# ============ 为 ReActAgent 创建预配置的 Toolkit ============
def create_market_analyst_toolkit() -> Toolkit:
"""创建技术面分析师工具包"""
toolkit = Toolkit()
toolkit.register_tool_function(get_stock_daily)
toolkit.register_tool_function(get_technical_indicators)
toolkit.register_tool_function(get_stock_basic)
return toolkit
def create_fundamentals_analyst_toolkit() -> Toolkit:
"""创建基本面分析师工具包"""
toolkit = Toolkit()
toolkit.register_tool_function(get_stock_basic)
toolkit.register_tool_function(get_valuation)
toolkit.register_tool_function(get_financial_indicator)
return toolkit
def create_news_analyst_toolkit() -> Toolkit:
"""创建舆情分析师工具包"""
toolkit = Toolkit()
toolkit.register_tool_function(get_stock_basic)
toolkit.register_tool_function(get_stock_news)
toolkit.register_tool_function(get_market_sentiment)
return toolkit
```
### scripts/tools/tushare_tools.py
```python
"""
Tushare数据采集工具
提供行情数据和基本面数据的获取功能
"""
import os
from datetime import datetime, timedelta
from typing import Dict, Optional, List
import json
try:
import tushare as ts
TUSHARE_AVAILABLE = True
except ImportError:
TUSHARE_AVAILABLE = False
print("警告: tushare未安装,将使用模拟数据")
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
class TushareTools:
"""Tushare数据采集工具类"""
def __init__(self, token: Optional[str] = None):
"""
初始化Tushare工具
Args:
token: Tushare API Token
"""
self.token = token or os.getenv("TUSHARE_TOKEN", "")
self.pro = None
if TUSHARE_AVAILABLE and self.token:
try:
ts.set_token(self.token)
self.pro = ts.pro_api()
print("✅ Tushare初始化成功")
except Exception as e:
print(f"⚠️ Tushare初始化失败: {e}")
def get_stock_basic(self, ts_code: str) -> Dict:
"""
获取股票基本信息
Args:
ts_code: 股票代码,如 '600519.SH'
Returns:
股票基本信息字典
"""
if self.pro:
try:
df = self.pro.stock_basic(
ts_code=ts_code,
fields='ts_code,symbol,name,area,industry,market,list_date'
)
if not df.empty:
return df.iloc[0].to_dict()
except Exception as e:
print(f"获取股票基本信息失败: {e}")
# 返回模拟数据
return {
"ts_code": ts_code,
"symbol": ts_code.split('.')[0],
"name": "模拟股票",
"area": "未知",
"industry": "未知",
"market": "主板",
"list_date": "20100101"
}
def get_stock_daily(self, ts_code: str, days: int = 60) -> Dict:
"""
获取日K线数据
Args:
ts_code: 股票代码
days: 获取天数
Returns:
包含K线数据的字典
"""
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=days * 2)).strftime('%Y%m%d')
if self.pro:
try:
df = self.pro.daily(
ts_code=ts_code,
start_date=start_date,
end_date=end_date
)
if not df.empty:
df = df.head(days).sort_values('trade_date')
return {
"ts_code": ts_code,
"data_count": len(df),
"latest_date": df.iloc[-1]['trade_date'],
"latest_close": float(df.iloc[-1]['close']),
"latest_open": float(df.iloc[-1]['open']),
"latest_high": float(df.iloc[-1]['high']),
"latest_low": float(df.iloc[-1]['low']),
"latest_volume": float(df.iloc[-1]['vol']),
"latest_amount": float(df.iloc[-1]['amount']),
"latest_pct_chg": float(df.iloc[-1]['pct_chg']),
"price_list": df['close'].tolist(),
"volume_list": df['vol'].tolist(),
"date_list": df['trade_date'].tolist()
}
except Exception as e:
print(f"获取日K线数据失败: {e}")
# 返回模拟数据
import random
base_price = 100.0
prices = [base_price + random.uniform(-5, 5) for _ in range(days)]
return {
"ts_code": ts_code,
"data_count": days,
"latest_date": end_date,
"latest_close": prices[-1],
"latest_open": prices[-1] - random.uniform(-1, 1),
"latest_high": prices[-1] + random.uniform(0, 2),
"latest_low": prices[-1] - random.uniform(0, 2),
"latest_volume": random.uniform(100000, 500000),
"latest_amount": random.uniform(10000000, 50000000),
"latest_pct_chg": random.uniform(-3, 3),
"price_list": prices,
"volume_list": [random.uniform(100000, 500000) for _ in range(days)],
"date_list": [(datetime.now() - timedelta(days=days-i)).strftime('%Y%m%d') for i in range(days)]
}
def get_technical_indicators(self, ts_code: str, days: int = 60) -> Dict:
"""
计算技术指标
Args:
ts_code: 股票代码
days: 数据天数
Returns:
技术指标字典
"""
daily_data = self.get_stock_daily(ts_code, days)
prices = daily_data.get("price_list", [])
if not prices or len(prices) < 20:
return self._mock_technical_indicators()
# 计算均线
ma5 = sum(prices[-5:]) / 5 if len(prices) >= 5 else prices[-1]
ma10 = sum(prices[-10:]) / 10 if len(prices) >= 10 else prices[-1]
ma20 = sum(prices[-20:]) / 20 if len(prices) >= 20 else prices[-1]
ma60 = sum(prices[-60:]) / 60 if len(prices) >= 60 else prices[-1]
current_price = prices[-1]
# 判断均线信号
ma_signal = "金叉" if ma5 > ma20 else "死叉"
# 简化的MACD计算
ema12 = self._calculate_ema(prices, 12)
ema26 = self._calculate_ema(prices, 26)
macd = ema12 - ema26
macd_signal = "多头" if macd > 0 else "空头"
# 简化的RSI计算
rsi = self._calculate_rsi(prices, 14)
if rsi > 70:
rsi_signal = "超买"
elif rsi < 30:
rsi_signal = "超卖"
else:
rsi_signal = "中性"
# 趋势判断
if current_price > ma20 and ma5 > ma20:
trend = "上涨"
elif current_price < ma20 and ma5 < ma20:
trend = "下跌"
else:
trend = "震荡"
# 支撑位和阻力位(简化计算)
recent_prices = prices[-20:]
support = min(recent_prices) * 0.98
resistance = max(recent_prices) * 1.02
# 技术评分
score = 50
if ma_signal == "金叉":
score += 15
if macd_signal == "多头":
score += 15
if rsi_signal == "超卖":
score += 10
elif rsi_signal == "超买":
score -= 10
if trend == "上涨":
score += 10
elif trend == "下跌":
score -= 10
score = max(0, min(100, score))
return {
"ts_code": ts_code,
"current_price": round(current_price, 2),
"ma5": round(ma5, 2),
"ma10": round(ma10, 2),
"ma20": round(ma20, 2),
"ma60": round(ma60, 2),
"ma_signal": ma_signal,
"macd": round(macd, 4),
"macd_signal": macd_signal,
"rsi": round(rsi, 2),
"rsi_signal": rsi_signal,
"trend": trend,
"support": round(support, 2),
"resistance": round(resistance, 2),
"technical_score": score
}
def get_financial_indicator(self, ts_code: str) -> Dict:
"""
获取财务指标
Args:
ts_code: 股票代码
Returns:
财务指标字典
"""
if self.pro:
try:
# 获取最新财务指标
df = self.pro.fina_indicator(
ts_code=ts_code,
fields='ts_code,end_date,roe,roa,debt_to_assets,grossprofit_margin,netprofit_margin,revenue_ps,cfps'
)
if not df.empty:
latest = df.iloc[0]
return {
"ts_code": ts_code,
"end_date": latest.get('end_date', ''),
"roe": float(latest.get('roe', 0) or 0),
"roa": float(latest.get('roa', 0) or 0),
"debt_ratio": float(latest.get('debt_to_assets', 0) or 0),
"gross_margin": float(latest.get('grossprofit_margin', 0) or 0),
"net_margin": float(latest.get('netprofit_margin', 0) or 0)
}
except Exception as e:
print(f"获取财务指标失败: {e}")
# 返回模拟数据
import random
return {
"ts_code": ts_code,
"end_date": datetime.now().strftime('%Y%m%d'),
"roe": round(random.uniform(5, 25), 2),
"roa": round(random.uniform(2, 15), 2),
"debt_ratio": round(random.uniform(20, 60), 2),
"gross_margin": round(random.uniform(20, 50), 2),
"net_margin": round(random.uniform(5, 25), 2)
}
def get_valuation(self, ts_code: str) -> Dict:
"""
获取估值指标
Args:
ts_code: 股票代码
Returns:
估值指标字典
"""
if self.pro:
try:
df = self.pro.daily_basic(
ts_code=ts_code,
fields='ts_code,trade_date,pe_ttm,pb,ps_ttm,dv_ratio,total_mv,circ_mv'
)
if not df.empty:
latest = df.iloc[0]
return {
"ts_code": ts_code,
"trade_date": latest.get('trade_date', ''),
"pe_ttm": float(latest.get('pe_ttm', 0) or 0),
"pb": float(latest.get('pb', 0) or 0),
"ps_ttm": float(latest.get('ps_ttm', 0) or 0),
"dv_ratio": float(latest.get('dv_ratio', 0) or 0),
"total_mv": float(latest.get('total_mv', 0) or 0),
"circ_mv": float(latest.get('circ_mv', 0) or 0)
}
except Exception as e:
print(f"获取估值指标失败: {e}")
# 返回模拟数据
import random
return {
"ts_code": ts_code,
"trade_date": datetime.now().strftime('%Y%m%d'),
"pe_ttm": round(random.uniform(10, 50), 2),
"pb": round(random.uniform(1, 10), 2),
"ps_ttm": round(random.uniform(1, 20), 2),
"dv_ratio": round(random.uniform(0, 5), 2),
"total_mv": round(random.uniform(10000, 100000), 2),
"circ_mv": round(random.uniform(5000, 80000), 2)
}
def get_income_statement(self, ts_code: str) -> Dict:
"""
获取利润表数据
Args:
ts_code: 股票代码
Returns:
利润表数据字典
"""
if self.pro:
try:
df = self.pro.income(
ts_code=ts_code,
fields='ts_code,end_date,revenue,operate_profit,total_profit,n_income'
)
if not df.empty and len(df) >= 2:
current = df.iloc[0]
previous = df.iloc[1]
revenue_growth = 0
profit_growth = 0
if previous.get('revenue') and previous['revenue'] != 0:
revenue_growth = ((current.get('revenue', 0) or 0) - (previous.get('revenue', 0) or 0)) / (previous.get('revenue', 0) or 1) * 100
if previous.get('n_income') and previous['n_income'] != 0:
profit_growth = ((current.get('n_income', 0) or 0) - (previous.get('n_income', 0) or 0)) / (previous.get('n_income', 0) or 1) * 100
return {
"ts_code": ts_code,
"end_date": current.get('end_date', ''),
"revenue": float(current.get('revenue', 0) or 0),
"operate_profit": float(current.get('operate_profit', 0) or 0),
"total_profit": float(current.get('total_profit', 0) or 0),
"net_income": float(current.get('n_income', 0) or 0),
"revenue_growth": round(revenue_growth, 2),
"profit_growth": round(profit_growth, 2)
}
except Exception as e:
print(f"获取利润表数据失败: {e}")
# 返回模拟数据
import random
return {
"ts_code": ts_code,
"end_date": datetime.now().strftime('%Y%m%d'),
"revenue": round(random.uniform(1000000, 10000000), 2),
"operate_profit": round(random.uniform(100000, 1000000), 2),
"total_profit": round(random.uniform(100000, 1000000), 2),
"net_income": round(random.uniform(50000, 500000), 2),
"revenue_growth": round(random.uniform(-20, 50), 2),
"profit_growth": round(random.uniform(-30, 60), 2)
}
def _calculate_ema(self, prices: List[float], period: int) -> float:
"""计算EMA"""
if len(prices) < period:
return prices[-1] if prices else 0
multiplier = 2 / (period + 1)
ema = sum(prices[:period]) / period
for price in prices[period:]:
ema = (price - ema) * multiplier + ema
return ema
def _calculate_rsi(self, prices: List[float], period: int = 14) -> float:
"""计算RSI"""
if len(prices) < period + 1:
return 50
gains = []
losses = []
for i in range(1, len(prices)):
change = prices[i] - prices[i-1]
if change > 0:
gains.append(change)
losses.append(0)
else:
gains.append(0)
losses.append(abs(change))
avg_gain = sum(gains[-period:]) / period
avg_loss = sum(losses[-period:]) / period
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def _mock_technical_indicators(self) -> Dict:
"""返回模拟的技术指标"""
import random
return {
"ts_code": "000000.XX",
"current_price": 100.0,
"ma5": 99.5,
"ma10": 98.0,
"ma20": 97.0,
"ma60": 95.0,
"ma_signal": "金叉",
"macd": 0.5,
"macd_signal": "多头",
"rsi": 55,
"rsi_signal": "中性",
"trend": "震荡",
"support": 95.0,
"resistance": 105.0,
"technical_score": 60
}
```