Back to skills
SkillHub ClubShip Full StackFull Stack

btpanel

宝塔面板(BT-Panel)运维监控技能,提供服务器资源监控、网站状态检查、服务状态检查、SSH安全审计、计划任务管理、日志读取等功能

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,132
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
C61.1

Install command

npx @skill-hub/cli install openclaw-skills-btpanel

Repository

openclaw/skills

Skill path: skills/aapanel/btpanel

宝塔面板(BT-Panel)运维监控技能,提供服务器资源监控、网站状态检查、服务状态检查、SSH安全审计、计划任务管理、日志读取等功能

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install btpanel into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding btpanel to shared team environments
  • Use btpanel for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: btpanel
description: 宝塔面板(BT-Panel)运维监控技能,提供服务器资源监控、网站状态检查、服务状态检查、SSH安全审计、计划任务管理、日志读取等功能
user-invocable: true
disable-model-invocation: false
icon: icon/bt.png
metadata:
  openclaw:
    requires:
      bins:
        - python3
    keywords:
      - 宝塔面板
      - BT-Panel
      - 面板
      - 服务器监控
      - 系统资源
      - CPU监控
      - 内存监控
      - 磁盘监控
      - 网站状态
      - SSL证书
      - 服务状态
      - 日志读取
      - SSH
      - 计划任务
      - crontab
      - 备份任务
---

# 宝塔面板运维监控

宝塔面板服务器的全方位运维监控工具,支持多服务器管理、资源监控、网站状态检查、服务状态检查、SSH安全审计、计划任务管理等功能。

![宝塔面板](icon/bt-logo.svg)

## 图标资源

技能包提供以下图标文件,可在生成报告时引用:

| 文件 | 格式 | 用途 |
|------|------|------|
| `icon/bt-logo.svg` | SVG | 矢量图标,适合缩放 |

**使用示例**(生成报告时):
```markdown
# 服务器巡检报告

![宝塔面板](icon/bt-logo.svg)

## 概述
...
```

## AI 使用约束

本技能用于查询和展示服务器状态数据,AI应遵循以下原则:

1. **数据中立**:如实展示监控数据,不夸大或缩小问题严重性
2. **客观分析**:基于阈值配置给出告警,避免主观判断
3. **数据驱动**:建议和结论应基于实际数据,不得臆测
4. **隐私保护**:不主动泄露服务器敏感信息(如IP、Token、域名)
5. **执行前告知**:由于接口数据较多,获取和分析需要一定时间,AI应先向用户简述即将执行的操作步骤,然后再执行命令获取数据

**执行流程示例**:
```
AI: 我将为您执行以下操作:
    1. 获取服务器系统资源状态(CPU、内存、磁盘)
    2. 检查网站运行状态
    3. 检查服务运行状态
    正在获取数据,请稍候...
    [执行命令]
    [展示结果和分析]
```

## 服务器配置管理
> **重要:** 没有服务器信息时需要添加

使用配置工具管理服务器:

```bash
# 查看帮助
python3 {baseDir}/scripts/bt-config.py -h

# 添加服务器
python3 {baseDir}/scripts/bt-config.py add -n prod-01 -H https://panel.example.com:8888 -t YOUR_TOKEN

# 列出服务器
python3 {baseDir}/scripts/bt-config.py list

# 设置阈值
python3 {baseDir}/scripts/bt-config.py threshold --cpu 75 --memory 80
```

## 常用场景

### 场景一:初次使用配置服务器

当用户第一次使用本技能时,需要先配置服务器连接信息:

```bash
# 添加服务器(需要面板地址和API Token)
python3 {baseDir}/scripts/bt-config.py add -n prod-01 -H https://panel.example.com:8888 -t YOUR_API_TOKEN

# 查看已配置的服务器
python3 {baseDir}/scripts/bt-config.py list
```

**获取API Token的方法**:
1. 登录宝塔面板
2. 进入「面板设置」->「API接口」
3. 点击「获取API Token」

**用户意图识别**:
- "帮我配置宝塔服务器" → 引导用户添加服务器配置
- "添加一台服务器" → 执行 bt-config.py add
- "查看有哪些服务器" → 执行 bt-config.py list


### 场景二:多服务器资源汇总

当用户需要了解所有服务器的整体运行状态时:

```bash
# 查看所有服务器的资源使用情况
python3 {baseDir}/scripts/monitor.py --format table

# 查看所有服务器的网站状态汇总
python3 {baseDir}/scripts/sites.py

# 查看所有服务器的服务状态
python3 {baseDir}/scripts/services.py
```

**用户意图识别**:
- "服务器整体情况怎么样" → 执行 monitor.py
- "所有服务器健康状态" → 执行 monitor.py + sites.py
- "多服务器资源使用情况" → 执行 monitor.py --format table

### 场景三:单台服务器日常巡检

当用户需要对单台服务器进行全面检查时:

```bash
# 指定服务器名称进行各项检查
python3 {baseDir}/scripts/monitor.py --server prod-01 --format table
python3 {baseDir}/scripts/sites.py --server prod-01
python3 {baseDir}/scripts/services.py --server prod-01
python3 {baseDir}/scripts/ssh.py --status --server prod-01
python3 {baseDir}/scripts/crontab.py --backup-only --server prod-01
```

**用户意图识别**:
- "检查 prod-01 这台服务器" → 执行上述检查命令
- "帮我日常巡检" → 执行系统监控、网站状态、服务状态检查
- "这台服务器有问题吗" → 执行全面检查并汇总告警

### 场景四:网站SSL证书检查

当用户关心SSL证书是否即将过期时:

```bash
# 查看SSL即将过期的网站
python3 {baseDir}/scripts/sites.py --filter ssl-warning

# 查看SSL已过期的网站
python3 {baseDir}/scripts/sites.py --filter ssl-expired
```

**用户意图识别**:
- "SSL证书快过期了吗" → 执行 sites.py --filter ssl-warning
- "有哪些网站证书过期了" → 执行 sites.py --filter ssl-expired

### 场景五:安全审计

当用户需要进行安全检查时:

```bash
# 查看SSH登录失败记录
python3 {baseDir}/scripts/ssh.py --logs --filter failed

# 搜索特定IP的登录记录
python3 {baseDir}/scripts/ssh.py --logs --search 192.168.1.100

# 查看SSH服务状态
python3 {baseDir}/scripts/ssh.py --status
```

**用户意图识别**:
- "有没有异常登录" → 执行 ssh.py --logs --filter failed
- "查一下这个IP的登录记录" → 执行 ssh.py --logs --search IP
- "SSH安全检查" → 执行 ssh.py --status 和 ssh.py --logs

### 场景六:服务故障排查

当某个服务出现问题时:

```bash
# 查看服务状态
python3 {baseDir}/scripts/services.py --server prod-01

# 查看服务错误日志
python3 {baseDir}/scripts/logs.py --server prod-01 --service nginx --lines 200
python3 {baseDir}/scripts/logs.py --server prod-01 --service redis
```

**用户意图识别**:
- "Nginx/Apache/Redis出问题了" → 查看服务状态 + 查看错误日志
- "服务报错了,帮我看看日志" → 执行 logs.py 查看对应服务日志

### 场景七:备份任务检查

当用户关心备份是否正常时:

```bash
# 查看所有备份任务
python3 {baseDir}/scripts/crontab.py --backup-only

# 查看特定备份任务的执行日志
python3 {baseDir}/scripts/crontab.py --logs --task-id 11
```

**用户意图识别**:
- "备份任务正常吗" → 执行 crontab.py --backup-only
- "查看备份日志" → 执行 crontab.py --logs --task-id ID

## 版本要求

- **宝塔面板**: >= 9.0.0
- **Python**: >= 3.10

## 用法

### 系统资源监控

```bash
# 查看帮助
python3 {baseDir}/scripts/monitor.py -h

# 监控所有服务器
python3 {baseDir}/scripts/monitor.py

# 监控指定服务器
python3 {baseDir}/scripts/monitor.py --server prod-01

# JSON格式输出
python3 {baseDir}/scripts/monitor.py --format json

# 表格格式输出
python3 {baseDir}/scripts/monitor.py --format table

# 输出到文件
python3 {baseDir}/scripts/monitor.py --output report.json
```

### 网站状态检查

```bash
# 查看帮助
python3 {baseDir}/scripts/sites.py -h

# 检查所有服务器的网站状态
python3 {baseDir}/scripts/sites.py

# 检查指定服务器
python3 {baseDir}/scripts/sites.py --server prod-01

# 只显示停止的网站
python3 {baseDir}/scripts/sites.py --filter stopped

# 只显示SSL即将过期的网站(30天内)
python3 {baseDir}/scripts/sites.py --filter ssl-warning

# 只显示SSL已过期的网站
python3 {baseDir}/scripts/sites.py --filter ssl-expired

# JSON格式输出
python3 {baseDir}/scripts/sites.py --format json

# 输出到文件
python3 {baseDir}/scripts/sites.py --output sites.json
```

### 服务状态检查

```bash
# 查看帮助
python3 {baseDir}/scripts/services.py -h

# 检查所有服务器的服务状态
python3 {baseDir}/scripts/services.py

# 检查指定服务器
python3 {baseDir}/scripts/services.py --server prod-01

# 只检查特定服务
python3 {baseDir}/scripts/services.py --service nginx --service redis

# JSON格式输出
python3 {baseDir}/scripts/services.py --format json

# 输出到文件
python3 {baseDir}/scripts/services.py --output services.json
```

### 日志读取

```bash
# 查看帮助
python3 {baseDir}/scripts/logs.py -h

# 查看Nginx错误日志
python3 {baseDir}/scripts/logs.py --service nginx

# 查看Redis日志
python3 {baseDir}/scripts/logs.py --service redis

# 查看Apache错误日志
python3 {baseDir}/scripts/logs.py --service apache

# 查看MySQL错误日志
python3 {baseDir}/scripts/logs.py --service mysql

# 查看MySQL慢查询日志
python3 {baseDir}/scripts/logs.py --service mysql --log-type slow

# 查看PostgreSQL日志(需要插件)
python3 {baseDir}/scripts/logs.py --service pgsql

# 查看PostgreSQL慢日志
python3 {baseDir}/scripts/logs.py --service pgsql --log-type slow

# 指定服务器和行数
python3 {baseDir}/scripts/logs.py --server prod-01 --service nginx --lines 200

# JSON格式输出
python3 {baseDir}/scripts/logs.py --service nginx --format json
```

### SSH状态和日志检查

```bash
# 查看帮助
python3 {baseDir}/scripts/ssh.py -h

# 查看SSH服务状态
python3 {baseDir}/scripts/ssh.py --status

# 查看SSH登录日志
python3 {baseDir}/scripts/ssh.py --logs

# 只查看失败的登录日志
python3 {baseDir}/scripts/ssh.py --logs --filter failed

# 只查看成功的登录日志
python3 {baseDir}/scripts/ssh.py --logs --filter success

# 搜索特定IP的登录记录
python3 {baseDir}/scripts/ssh.py --logs --search 192.168.1.1

# 指定服务器
python3 {baseDir}/scripts/ssh.py --status --server prod-01

# JSON格式输出
python3 {baseDir}/scripts/ssh.py --logs --format json
```

### 计划任务检查

```bash
# 查看帮助
python3 {baseDir}/scripts/crontab.py -h

# 查看所有计划任务
python3 {baseDir}/scripts/crontab.py

# 只查看备份任务
python3 {baseDir}/scripts/crontab.py --backup-only

# 查看指定服务器
python3 {baseDir}/scripts/crontab.py --server prod-01

# 查看备份任务日志
python3 {baseDir}/scripts/crontab.py --logs --task-id 11

# JSON格式输出
python3 {baseDir}/scripts/crontab.py --format json
```

## 参数说明

### monitor.py 参数

| 参数 | 说明 | 默认值 |
|------|------|--------|
| `--server`, `-s` | 指定服务器名称 | 所有服务器 |
| `--format`, `-f` | 输出格式 (json/table) | json |
| `--output`, `-o` | 输出文件路径 | 标准输出 |
| `--config`, `-c` | 配置文件路径 | 自动查找 |

### sites.py 参数

| 参数 | 说明 | 默认值 |
|------|------|--------|
| `--server`, `-s` | 指定服务器名称 | 所有服务器 |
| `--format`, `-f` | 输出格式 (json/table) | table |
| `--output`, `-o` | 输出文件路径 | 标准输出 |
| `--filter` | 过滤条件 (stopped/ssl-warning/ssl-expired) | 无 |
| `--config`, `-c` | 配置文件路径 | 自动查找 |

### services.py 参数

| 参数 | 说明 | 默认值 |
|------|------|--------|
| `--server`, `-s` | 指定服务器名称 | 所有服务器 |
| `--format`, `-f` | 输出格式 (json/table) | table |
| `--output`, `-o` | 输出文件路径 | 标准输出 |
| `--service` | 指定要检查的服务(可多次指定) | 默认服务列表 |
| `--config`, `-c` | 配置文件路径 | 自动查找 |

### logs.py 参数

| 参数 | 说明 | 默认值 |
|------|------|--------|
| `--server`, `-s` | 指定服务器名称 | 所有服务器 |
| `--service` | 服务名称 (nginx/apache/redis/mysql/pgsql) | 必填 |
| `--log-type` | 日志类型 (error/slow) | error |
| `--lines`, `-n` | 返回最后N行日志 | 100 |
| `--format`, `-f` | 输出格式 (json/table) | table |
| `--output`, `-o` | 输出文件路径 | 标准输出 |
| `--config`, `-c` | 配置文件路径 | 自动查找 |

**注意**:只有已安装的服务才能获取日志,尝试获取未安装服务的日志会返回错误。

### ssh.py 参数

| 参数 | 说明 | 默认值 |
|------|------|--------|
| `--server`, `-s` | 指定服务器名称 | 所有服务器 |
| `--status` | 查看SSH服务状态 | 否 |
| `--logs` | 查看SSH登录日志 | 否 |
| `--filter` | 日志过滤 (ALL/success/failed) | ALL |
| `--search` | 搜索关键字(IP或用户名) | 无 |
| `--limit`, `-n` | 返回日志条数 | 50 |
| `--format`, `-f` | 输出格式 (json/table) | table |
| `--output`, `-o` | 输出文件路径 | 标准输出 |
| `--config`, `-c` | 配置文件路径 | 自动查找 |

### crontab.py 参数

| 参数 | 说明 | 默认值 |
|------|------|--------|
| `--server`, `-s` | 指定服务器名称 | 所有服务器 |
| `--backup-only` | 只显示备份任务 | 否 |
| `--logs` | 查看任务日志 | 否 |
| `--task-id` | 任务ID(配合--logs使用) | 无 |
| `--days` | 日志查询天数 | 7 |
| `--format`, `-f` | 输出格式 (json/table) | table |
| `--output`, `-o` | 输出文件路径 | 标准输出 |
| `--config`, `-c` | 配置文件路径 | 自动查找 |

## 监控指标

### 系统资源监控 (monitor.py)

通过单一API接口获取完整的系统监控数据:

- **CPU**: 使用率、核心数、型号、用户/系统占用
- **内存**: 总量、使用量、可用量、缓存、使用率
- **磁盘**: 多分区详情、总量、使用量、使用率
- **网络**: 实时速度、总流量、各网卡统计
- **负载**: 1/5/15分钟负载
- **系统**: 主机名、操作系统、运行时间、面板版本
- **资源**: 网站、数据库、FTP账户数量

### 网站状态检查 (sites.py)

支持多种项目类型:

| 类型 | 进程信息 | 运行状态判断 |
|------|----------|--------------|
| PHP | 无 | status==1 && stop为空 |
| Java | pid_info | pid > 0 |
| Node | load_info | run==true |
| Go | load_info | run==true |
| Python | pids | run==true |
| .NET | load_info | run==true |
| Proxy(反代) | 无 | status==1 |
| HTML(静态) | 无 | status==1 |
| Other(其他) | load_info | run==true |

检查项目:
- **运行状态**: 运行中/已停止/启动中
- **SSL证书**: 有效/即将过期/已过期
- **进程信息**: PID、内存、CPU、线程数(适用于Java/Node/Go/Python/.NET/Other)
- **反代健康**: 反代项目的后端健康状态
- **基础信息**: 路径、域名、PHP版本、端口、代理地址

### 服务状态检查 (services.py)

支持检查的服务:

| 服务 | 状态检查 | 日志支持 |
|------|----------|----------|
| Nginx | ✓ | ✓ 错误日志 |
| Apache | ✓ | ✓ 错误日志 |
| MySQL | ✓ | ✓ 错误日志/慢日志 |
| Redis | ✓ | ✓ 日志文件 |
| Memcached | ✓ | ✗ |
| Pure-FTPD | ✓ | ✗ |
| PHP (多版本) | ✓ | ✗ |
| PostgreSQL | ✓ | ✓ 错误日志/慢日志 |

**服务状态字段说明**:

| 字段 | 说明 |
|------|------|
| `installed` (setup) | 服务是否已安装 |
| `status` | 服务是否正在运行 |
| `version` | 已安装的版本号 |
| `pid` | 主进程ID(运行中时) |

**重要区别**:
- `installed=false`:服务未安装,无法获取日志
- `installed=true, status=false`:服务已安装但未运行
- `installed=true, status=true`:服务已安装且正在运行

**PHP多版本共存说明**:
- PHP是支持多版本共存的服务,一台服务器可能同时安装多个PHP版本
- PHP服务名称格式:`php-X.X`(如 `php-8.2`、`php-7.4`)
- 系统会自动扫描已安装的PHP版本并分别显示状态
- 常见PHP版本:8.5, 8.4, 8.3, 8.2, 8.1, 8.0, 7.4, 7.3, 7.2, 7.1, 7.0, 5.4, 5.3, 5.2

检查项目:
- **运行状态**: 运行中/已停止
- **版本信息**: 已安装版本号
- **进程PID**: 主进程ID

### 日志读取 (logs.py)

支持的日志类型:

| 日志类型 | 服务 | 获取方式 |
|----------|------|----------|
| 错误日志 | nginx | 文件: /www/server/nginx/logs/error.log |
| 错误日志 | apache | 文件: /www/wwwlogs/error_log |
| 日志文件 | redis | 文件: /www/server/redis/redis.log |
| 错误日志 | mysql | 接口: /database?action=GetErrorLog |
| 慢日志 | mysql | 接口: /database?action=GetSlowLogs |
| 错误日志 | pgsql | 插件接口: pgsql_manager |
| 慢日志 | pgsql | 插件接口: pgsql_manager |

**注意事项**:
- 只有已安装(`installed=true`)的服务才能获取日志
- 尝试获取未安装服务的日志会返回错误
- Memcached 和 Pure-FTPD 不支持日志获取

### SSH状态和日志检查 (ssh.py)

检查项目:
- **SSH服务状态**: 运行中/已停止
- **端口**: SSH监听端口
- **Ping设置**: 是否允许ping
- **防火墙状态**: 是否启用
- **Fail2ban**: 是否安装和运行

登录日志字段:
- **时间**: 登录时间
- **类型**: 成功/失败
- **用户**: 登录用户名
- **IP地址**: 来源IP
- **地区**: IP归属地
- **登录方式**: password/key

### 计划任务检查 (crontab.py)

任务类型:
- **备份网站**: 自动备份网站文件和数据库
- **备份数据库**: 单独备份数据库
- **备份目录**: 备份指定目录
- **Shell脚本**: 自定义Shell命令
- **同步时间**: NTP时间同步
- **切割日志**: 日志分割任务
- **访问URL**: 定时HTTP请求

检查项目:
- **任务状态**: 启用/禁用
- **执行周期**: 每天/每小时/每周/每月/间隔分钟
- **备份目标**: 网站名称/数据库名称
- **保留数量**: 备份保留份数
- **执行结果**: 最后一次执行状态

## 告警配置

### SSL证书告警

| 剩余天数 | 告警级别 |
|----------|----------|
| 已过期 | critical |
| ≤ 7 天 | critical |
| ≤ 30 天 | warning |

### 服务器资源告警阈值

可在配置文件中设置告警阈值:

```yaml
global:
  thresholds:
    cpu: 80      # CPU使用率告警阈值(%)
    memory: 85   # 内存使用率告警阈值(%)
    disk: 90     # 磁盘使用率告警阈值(%)
```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### README.md

```markdown
# btpanel

宝塔面板(BT-Panel)运维监控技能,提供服务器资源监控、网站状态检查、服务状态检查、SSH安全审计、计划任务管理、日志读取等功能

## 版本要求

- **宝塔面板**: >= 9.0.0
- **Python**: >= 3.10

## 快速开始

1. 安装依赖:
   ```bash
   pip install requests pyyaml rich
   ```

2. 配置服务器:
   ```bash
   # 使用配置工具添加服务器
   python3 scripts/bt-config.py add --name prod-01 --host https://panel.example.com:8888 --token YOUR_TOKEN

   # 查看配置
   python3 scripts/bt-config.py list
   ```

   或手动编辑配置文件 `~/.openclaw/bt-skills.yaml`

3. 运行:
   ```bash
   # 查看帮助
   python3 scripts/monitor.py --help

   # 监控所有服务器
   python3 scripts/monitor.py
   ```

## 可用脚本

| 脚本 | 功能 |
|------|------|
| monitor.py | 系统资源监控 |
| sites.py | 网站状态检查 |
| services.py | 服务状态检查 |
| logs.py | 日志读取 |
| ssh.py | SSH状态和登录日志 |
| crontab.py | 计划任务检查 |
| bt-config.py | 配置管理工具 |

## 配置管理工具

```bash
# 初始化配置
python3 scripts/bt-config.py init

# 列出服务器
python3 scripts/bt-config.py list

# 添加服务器
python3 scripts/bt-config.py add -n prod-01 -H https://panel.example.com:8888 -t YOUR_TOKEN

# 更新服务器
python3 scripts/bt-config.py update prod-01 --disabled

# 删除服务器
python3 scripts/bt-config.py remove prod-01

# 设置阈值
python3 scripts/bt-config.py threshold --cpu 75 --memory 80

# 查看配置路径
python3 scripts/bt-config.py path
```

详细使用说明请参考 SKILL.md

```

### _meta.json

```json
{
  "owner": "aapanel",
  "slug": "btpanel",
  "displayName": "btpanel",
  "latest": {
    "version": "1.0.1",
    "publishedAt": 1772534360100,
    "commit": "https://github.com/openclaw/skills/commit/bb808f0bc46e360431af96dfe60e5b0ecbc0a34a"
  },
  "history": []
}

```

### scripts/bt-config.py

```python
#!/usr/bin/env python3
# /// script
# dependencies = [
#   "pyyaml>=6.0",
# ]
# ///
"""
宝塔面板配置管理工具
支持查看、添加、删除、修改服务器配置
"""

import argparse
import json
import sys
from pathlib import Path

# 兼容开发环境和发布环境的导入
# 发布环境: bt_common/ (脚本在 scripts/)
# 开发环境: src/bt_common/ (脚本在 src/btpanel/scripts/)
_script_root = Path(__file__).parent.parent
if (_script_root / "bt_common").exists():
    # 发布环境: 脚本在 {baseDir}/scripts/,bt_common 在 {baseDir}/bt_common/
    sys.path.insert(0, str(_script_root))
else:
    # 开发环境: 脚本在 src/btpanel/scripts/,bt_common 在 src/bt_common/
    # _script_root = src/btpanel, 需要找 src/bt_common
    dev_src = _script_root.parent  # src/
    if (dev_src / "bt_common").exists():
        sys.path.insert(0, str(dev_src))
    else:
        # 兜底:使用项目根目录的 src
        sys.path.insert(0, str(_script_root.parent.parent / "src"))

from bt_common import (
    GLOBAL_CONFIG_FILE,
    MIN_PANEL_VERSION,
    add_server,
    create_default_global_config,
    find_config_file,
    get_config_info,
    load_config,
    normalize_host,
    remove_server,
    update_thresholds,
    validate_host,
)


def cmd_list(args):
    """列出所有服务器配置"""
    try:
        config_info = get_config_info()

        print("=" * 60)
        print("宝塔面板配置信息")
        print("=" * 60)
        print(f"配置文件: {config_info.get('current_config_path', '未设置')}")
        print(f"全局配置: {config_info.get('global_config_path', '')}")
        print(f"宝塔版本要求: >= {config_info.get('min_panel_version', MIN_PANEL_VERSION)}")
        print()

        servers = config_info.get("servers", [])
        if not servers:
            print("暂无服务器配置")
            print()
            print("使用以下命令添加服务器:")
            print("  bt-config add --name <名称> --host <地址> --token <密钥>")
            return 0

        print(f"服务器列表 ({len(servers)} 个):")
        print("-" * 60)
        for server in servers:
            status = "✓" if server.get("enabled", True) else "✗"
            print(f"  [{status}] {server['name']}")
            print(f"       地址: {server['host']}")
        print()

        thresholds = config_info.get("thresholds", {})
        if thresholds:
            print("告警阈值:")
            print(f"  CPU: {thresholds.get('cpu', 80)}%")
            print(f"  内存: {thresholds.get('memory', 85)}%")
            print(f"  磁盘: {thresholds.get('disk', 90)}%")

        return 0

    except Exception as e:
        print(f"错误: {e}", file=sys.stderr)
        return 1


def cmd_add(args):
    """添加服务器配置"""
    try:
        # 验证并规范化地址
        is_valid, result = validate_host(args.host)
        if not is_valid:
            print(f"错误: {result}", file=sys.stderr)
            return 1

        normalized_host = result
        if normalized_host != args.host:
            print(f"提示: 地址已规范化为 {normalized_host}")

        # 检查是否已存在
        config_info = get_config_info()
        existing_names = [s["name"] for s in config_info.get("servers", [])]

        if args.name in existing_names and not args.force:
            print(f"错误: 服务器 '{args.name}' 已存在,使用 --force 覆盖")
            return 1

        result = add_server(
            name=args.name,
            host=normalized_host,
            token=args.token,
            timeout=args.timeout,
            enabled=not args.disabled,
        )

        if result:
            print(f"✓ 已添加服务器: {args.name}")
            print(f"  地址: {normalized_host}")
            print(f"  超时: {args.timeout}ms")
            print(f"  状态: {'禁用' if args.disabled else '启用'}")
            print()
            print(f"配置文件: {GLOBAL_CONFIG_FILE}")
            return 0
        else:
            print("添加失败")
            return 1

    except ValueError as e:
        print(f"错误: {e}", file=sys.stderr)
        return 1
    except Exception as e:
        print(f"错误: {e}", file=sys.stderr)
        return 1


def cmd_remove(args):
    """删除服务器配置"""
    try:
        result = remove_server(args.name)

        if result:
            print(f"✓ 已删除服务器: {args.name}")
            print(f"配置文件: {GLOBAL_CONFIG_FILE}")
            return 0
        else:
            print(f"未找到服务器: {args.name}")
            return 1

    except Exception as e:
        print(f"错误: {e}", file=sys.stderr)
        return 1


def cmd_update(args):
    """更新服务器配置"""
    try:
        # 先删除再添加
        config_info = get_config_info()
        existing = None
        for s in config_info.get("servers", []):
            if s["name"] == args.name:
                existing = s
                break

        if not existing:
            print(f"未找到服务器: {args.name}")
            return 1

        # 合并参数
        new_host = args.host if args.host else existing["host"]
        new_token = args.token if args.token else existing.get("token", "")
        new_timeout = args.timeout if args.timeout else existing.get("timeout", 10000)
        new_enabled = not args.disabled if args.disabled is not None else existing.get("enabled", True)

        # 删除旧的,添加新的
        remove_server(args.name)
        add_server(
            name=args.name,
            host=new_host,
            token=new_token,
            timeout=new_timeout,
            enabled=new_enabled,
        )

        print(f"✓ 已更新服务器: {args.name}")
        print(f"  地址: {new_host}")
        print(f"  超时: {new_timeout}ms")
        print(f"  状态: {'禁用' if not new_enabled else '启用'}")
        return 0

    except Exception as e:
        print(f"错误: {e}", file=sys.stderr)
        return 1


def cmd_threshold(args):
    """设置告警阈值"""
    try:
        result = update_thresholds(
            cpu=args.cpu,
            memory=args.memory,
            disk=args.disk,
        )

        if result:
            print("✓ 已更新告警阈值:")
            if args.cpu:
                print(f"  CPU: {args.cpu}%")
            if args.memory:
                print(f"  内存: {args.memory}%")
            if args.disk:
                print(f"  磁盘: {args.disk}%")
            print(f"配置文件: {GLOBAL_CONFIG_FILE}")
            return 0
        else:
            print("更新失败")
            return 1

    except Exception as e:
        print(f"错误: {e}", file=sys.stderr)
        return 1


def cmd_init(args):
    """初始化配置文件"""
    try:
        config_path = create_default_global_config()
        print(f"✓ 已创建配置文件: {config_path}")
        print()
        print("请编辑配置文件添加服务器信息:")
        print(f"  {config_path}")
        print()
        print("或使用命令添加服务器:")
        print("  bt-config add --name <名称> --host <地址> --token <密钥>")
        return 0

    except Exception as e:
        print(f"错误: {e}", file=sys.stderr)
        return 1


def cmd_show(args):
    """显示完整配置"""
    try:
        config_path = find_config_file()
        if not config_path:
            print("未找到配置文件")
            print("运行 'bt-config init' 创建配置文件")
            return 1

        config = load_config(config_path)

        if args.format == "json":
            print(json.dumps(config, ensure_ascii=False, indent=2))
        else:
            import yaml
            print(yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False))

        return 0

    except Exception as e:
        print(f"错误: {e}", file=sys.stderr)
        return 1


def cmd_path(args):
    """显示配置文件路径"""
    config_path = find_config_file()
    print(f"全局配置: {GLOBAL_CONFIG_FILE}")
    print(f"全局配置存在: {'是' if GLOBAL_CONFIG_FILE.exists() else '否'}")
    if config_path:
        print(f"当前使用: {config_path}")
    else:
        print("当前使用: 未配置")
    print()
    print("配置优先级:")
    print("  1. BT_CONFIG_PATH 环境变量")
    print(f"  2. 全局配置: {GLOBAL_CONFIG_FILE}")
    print("  3. 本地配置: config/servers.local.yaml")
    print("  4. 默认配置: config/servers.yaml")
    return 0


def main():
    """主函数"""
    parser = argparse.ArgumentParser(
        description="宝塔面板配置管理工具",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  # 初始化配置文件
  bt-config init

  # 列出所有服务器
  bt-config list

  # 添加服务器
  bt-config add --name prod-01 --host https://panel.example.com:8888 --token YOUR_TOKEN

  # 更新服务器
  bt-config update prod-01 --host https://new.example.com:8888

  # 禁用服务器
  bt-config update prod-01 --disabled

  # 删除服务器
  bt-config remove prod-01

  # 设置告警阈值
  bt-config threshold --cpu 75 --memory 80

  # 显示配置文件路径
  bt-config path

  # 显示完整配置
  bt-config show
  bt-config show --format json
        """,
    )

    subparsers = parser.add_subparsers(dest="command", help="可用命令")

    # list 命令
    subparsers.add_parser("list", help="列出所有服务器配置")

    # add 命令
    add_parser = subparsers.add_parser("add", help="添加服务器配置")
    add_parser.add_argument("--name", "-n", required=True, help="服务器名称")
    add_parser.add_argument("--host", "-H", required=True, help="面板地址 (如 https://panel.example.com:8888)")
    add_parser.add_argument("--token", "-t", required=True, help="API Token")
    add_parser.add_argument("--timeout", type=int, default=10000, help="超时时间(毫秒),默认 10000")
    add_parser.add_argument("--disabled", action="store_true", help="禁用此服务器")
    add_parser.add_argument("--force", "-f", action="store_true", help="强制覆盖已存在的配置")

    # remove 命令
    remove_parser = subparsers.add_parser("remove", help="删除服务器配置")
    remove_parser.add_argument("name", help="服务器名称")

    # update 命令
    update_parser = subparsers.add_parser("update", help="更新服务器配置")
    update_parser.add_argument("name", help="服务器名称")
    update_parser.add_argument("--host", "-H", help="面板地址")
    update_parser.add_argument("--token", "-t", help="API Token")
    update_parser.add_argument("--timeout", type=int, help="超时时间(毫秒)")
    update_parser.add_argument("--disabled", type=lambda x: x.lower() in ("true", "1", "yes"), help="是否禁用 (true/false)")

    # threshold 命令
    threshold_parser = subparsers.add_parser("threshold", help="设置告警阈值")
    threshold_parser.add_argument("--cpu", type=int, help="CPU 使用率阈值(%)")
    threshold_parser.add_argument("--memory", type=int, help="内存使用率阈值(%)")
    threshold_parser.add_argument("--disk", type=int, help="磁盘使用率阈值(%)")

    # init 命令
    subparsers.add_parser("init", help="初始化配置文件")

    # show 命令
    show_parser = subparsers.add_parser("show", help="显示完整配置")
    show_parser.add_argument("--format", "-f", choices=["yaml", "json"], default="yaml", help="输出格式")

    # path 命令
    subparsers.add_parser("path", help="显示配置文件路径")

    args = parser.parse_args()

    if not args.command:
        parser.print_help()
        return 0

    # 分发命令
    commands = {
        "list": cmd_list,
        "add": cmd_add,
        "remove": cmd_remove,
        "update": cmd_update,
        "threshold": cmd_threshold,
        "init": cmd_init,
        "show": cmd_show,
        "path": cmd_path,
    }

    handler = commands.get(args.command)
    if handler:
        return handler(args)
    else:
        parser.print_help()
        return 1


if __name__ == "__main__":
    sys.exit(main())

```

### scripts/crontab.py

```python
#!/usr/bin/env python3
# /// script
# dependencies = [
#   "requests>=2.28",
#   "pyyaml>=6.0",
#   "rich>=13.0",
# ]
# ///
"""
计划任务检查脚本
检查宝塔面板的计划任务,重点关注备份任务
"""

import argparse
import json
import re
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Optional

# 兼容开发环境和发布环境的导入
_skill_root = Path(__file__).parent.parent

if (_skill_root / "bt_common").exists():
    sys.path.insert(0, str(_skill_root))
else:
    sys.path.insert(0, str(_skill_root.parent / "src"))

from bt_common import (
    BtClient,
    BtClientManager,
    load_config,
)


# 任务类型映射
TASK_TYPE_MAP = {
    "toShell": "Shell脚本",
    "site": "备份网站",
    "database": "备份数据库",
    "path": "备份目录",
    "sync_time": "同步时间",
    "log": "切割日志",
    "rememory": "释放内存",
    "access": "访问URL",
    "backup": "备份",
}

# 任务类型分类
BACKUP_TYPES = ["site", "database", "path"]


def parse_crontab_task(task: dict) -> dict:
    """
    解析计划任务数据

    Args:
        task: 原始任务数据

    Returns:
        解析后的任务信息
    """
    s_type = task.get("sType", "")
    task_type = TASK_TYPE_MAP.get(s_type, s_type or "其他")

    # 判断是否为备份任务
    is_backup = s_type in BACKUP_TYPES or "备份" in task.get("name", "")

    # 解析执行周期
    cycle = task.get("cycle", "") or task.get("type_zh", "")

    # 解析执行时间
    exec_time = ""
    if task.get("type") == "day":
        hour = task.get("where_hour", 0)
        minute = task.get("where_minute", 0)
        exec_time = f"每天 {hour:02d}:{minute:02d}"
    elif task.get("type") == "hour":
        minute = task.get("where_minute", 0)
        exec_time = f"每小时 {minute:02d}分"
    elif task.get("type") == "minute-n":
        interval = task.get("where1", "5")
        exec_time = f"每 {interval} 分钟"
    elif task.get("type") == "week":
        days = ["周日", "周一", "周二", "周三", "周四", "周五", "周六"]
        day_idx = int(task.get("where1", 0))
        hour = task.get("where_hour", 0)
        minute = task.get("where_minute", 0)
        exec_time = f"每{days[day_idx]} {hour:02d}:{minute:02d}"
    elif task.get("type") == "month":
        day = task.get("where1", 1)
        hour = task.get("where_hour", 0)
        minute = task.get("where_minute", 0)
        exec_time = f"每月{day}日 {hour:02d}:{minute:02d}"

    return {
        "id": task.get("id"),
        "name": task.get("name", "") or task.get("rname", ""),
        "type": task_type,
        "s_type": s_type,
        "is_backup": is_backup,
        "status": task.get("status", 0) == 1,
        "enabled": task.get("status", 0) == 1,
        "cycle": cycle,
        "exec_time": exec_time,
        "backup_target": task.get("sName", "") if is_backup else "",
        "backup_path": task.get("db_backup_path", "/www/backup"),
        "save_count": task.get("save", 0) if is_backup else None,
        "command": task.get("sBody", ""),
        "user": task.get("user", "root"),
        "addtime": task.get("addtime", ""),
        "type_name": task.get("type_name", ""),
        "result": task.get("result", 0),  # 0=未执行/失败, 1=成功
    }


def get_crontab_status(client: BtClient, page: int = 1, limit: int = 100) -> dict:
    """
    获取计划任务状态

    Args:
        client: 宝塔客户端
        page: 页码
        limit: 每页数量

    Returns:
        计划任务状态信息
    """
    result = {
        "server": client.name,
        "timestamp": datetime.now().isoformat(),
        "tasks": [],
        "summary": {
            "total": 0,
            "enabled": 0,
            "disabled": 0,
            "backup_tasks": 0,
            "shell_tasks": 0,
            "other_tasks": 0,
        },
        "backup_tasks": [],
        "alerts": [],
    }

    try:
        response = client.get_crontab_list(page=page, limit=limit)
        tasks = response.get("data", [])

        for task in tasks:
            parsed = parse_crontab_task(task)
            result["tasks"].append(parsed)

            # 统计
            result["summary"]["total"] += 1
            if parsed["enabled"]:
                result["summary"]["enabled"] += 1
            else:
                result["summary"]["disabled"] += 1
                result["alerts"].append({
                    "level": "warning",
                    "type": "crontab",
                    "message": f"任务 [{parsed['name']}] 已禁用",
                    "task_id": parsed["id"],
                })

            if parsed["is_backup"]:
                result["summary"]["backup_tasks"] += 1
                result["backup_tasks"].append(parsed)
            elif parsed["s_type"] == "toShell":
                result["summary"]["shell_tasks"] += 1
            else:
                result["summary"]["other_tasks"] += 1

    except Exception as e:
        result["error"] = str(e)
        result["alerts"].append({
            "level": "critical",
            "type": "connection",
            "message": f"获取计划任务失败: {e}",
        })

    return result


def get_backup_task_logs(client: BtClient, task_id: int, days: int = 7) -> dict:
    """
    获取备份任务日志

    Args:
        client: 宝塔客户端
        task_id: 任务ID
        days: 查询天数

    Returns:
        任务日志信息
    """
    result = {
        "server": client.name,
        "task_id": task_id,
        "timestamp": datetime.now().isoformat(),
        "logs": [],
        "last_status": None,
        "alerts": [],
    }

    try:
        # 计算时间范围
        end_timestamp = int(time.time())
        start_timestamp = end_timestamp - (days * 24 * 60 * 60)

        response = client.get_crontab_logs(
            task_id=task_id,
            start_timestamp=start_timestamp,
            end_timestamp=end_timestamp,
        )

        if response.get("status"):
            log_content = response.get("msg", "")
            result["logs"] = parse_backup_log(log_content)

            # 分析最后的执行状态
            if result["logs"]:
                last_log = result["logs"][-1]
                result["last_status"] = last_log.get("status")
                if last_log.get("status") == "failed":
                    result["alerts"].append({
                        "level": "warning",
                        "type": "backup",
                        "message": f"备份任务最后一次执行失败: {last_log.get('message', '')}",
                    })
        else:
            result["error"] = response.get("msg", "获取日志失败")

    except Exception as e:
        result["error"] = str(e)

    return result


def parse_backup_log(log_content: str) -> list:
    """
    解析备份日志

    Args:
        log_content: 日志内容

    Returns:
        解析后的日志列表
    """
    logs = []

    # 按执行块分割
    blocks = re.split(r"={10,}", log_content)

    for block in blocks:
        if not block.strip():
            continue

        log_entry = {
            "time": "",
            "status": "unknown",
            "message": "",
            "details": [],
        }

        # 提取时间
        time_match = re.search(r"开始备份\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]", block)
        if time_match:
            log_entry["time"] = time_match.group(1)

        # 提取状态
        if "Successful" in block:
            log_entry["status"] = "success"
        elif "Failed" in block or "失败" in block:
            log_entry["status"] = "failed"

        # 提取详细信息
        lines = block.strip().split("\n")
        for line in lines:
            line = line.strip()
            if line.startswith("|-"):
                log_entry["details"].append(line[2:])

        # 提取备份文件路径
        file_match = re.search(r"网站已备份到:(.+\.tar\.gz)", block)
        if file_match:
            log_entry["backup_file"] = file_match.group(1)

        if log_entry["time"] or log_entry["status"] != "unknown":
            logs.append(log_entry)

    return logs


def run_crontab_check(manager: BtClientManager, server: Optional[str] = None,
                      backup_only: bool = False) -> dict:
    """
    执行计划任务检查

    Args:
        manager: 客户端管理器
        server: 指定服务器名称
        backup_only: 只返回备份任务

    Returns:
        检查结果
    """
    # 单个服务器
    if server:
        client = manager.get_client(server)
        result = get_crontab_status(client)
        if backup_only:
            result["tasks"] = [t for t in result["tasks"] if t["is_backup"]]
        return result

    # 所有服务器
    all_clients = manager.get_all_clients()
    results = {
        "timestamp": datetime.now().isoformat(),
        "servers": [],
        "summary": {
            "total_servers": 0,
            "total_tasks": 0,
            "total_backup_tasks": 0,
            "total_enabled": 0,
            "total_disabled": 0,
        },
        "alerts": [],
    }

    for name, client in all_clients.items():
        try:
            server_result = get_crontab_status(client)
            if backup_only:
                server_result["tasks"] = [t for t in server_result["tasks"] if t["is_backup"]]

            results["servers"].append(server_result)

            # 汇总
            summary = server_result.get("summary", {})
            results["summary"]["total_servers"] += 1
            results["summary"]["total_tasks"] += summary.get("total", 0)
            results["summary"]["total_backup_tasks"] += summary.get("backup_tasks", 0)
            results["summary"]["total_enabled"] += summary.get("enabled", 0)
            results["summary"]["total_disabled"] += summary.get("disabled", 0)

            # 收集告警
            for alert in server_result.get("alerts", []):
                alert["server"] = name
                results["alerts"].append(alert)

        except Exception as e:
            results["servers"].append({
                "server": name,
                "error": str(e),
                "alerts": [{"level": "critical", "type": "connection", "message": str(e)}],
            })

    return results


def print_crontab_table(results: dict, backup_only: bool = False):
    """打印表格格式输出"""
    try:
        from rich.console import Console
        from rich.table import Table
        from rich.panel import Panel

        console = Console()

        if "servers" in results and len(results["servers"]) > 1:
            # 多服务器模式
            for server_data in results["servers"]:
                if "error" in server_data:
                    console.print(f"[red]服务器 {server_data.get('server', 'Unknown')} 错误: {server_data['error']}[/red]")
                    continue

                server_name = server_data.get("server", "Unknown")
                summary = server_data.get("summary", {})

                console.print(f"\n[bold cyan]═══ {server_name} ═══[/bold cyan]")

                # 任务列表
                tasks = server_data.get("tasks", [])
                if tasks:
                    table = Table(show_header=True, header_style="bold")
                    table.add_column("名称", style="cyan", width=30)
                    table.add_column("类型", width=12)
                    table.add_column("状态", width=8)
                    table.add_column("执行时间", width=20)
                    table.add_column("备份目标", width=15)

                    for task in tasks:
                        # 状态
                        if task["enabled"]:
                            status_str = "[green]启用[/green]"
                        else:
                            status_str = "[red]禁用[/red]"

                        # 备份目标
                        backup_target = task.get("backup_target", "") or ""

                        table.add_row(
                            task.get("name", "-")[:30],
                            task.get("type", "-"),
                            status_str,
                            task.get("exec_time", "-")[:20],
                            backup_target[:15],
                        )

                    console.print(table)
                else:
                    console.print("[yellow]无计划任务[/yellow]")

                # 汇总
                console.print(f"\n[dim]汇总: "
                             f"总数 {summary.get('total', 0)}, "
                             f"[green]启用 {summary.get('enabled', 0)}[/green], "
                             f"[red]禁用 {summary.get('disabled', 0)}[/red], "
                             f"备份任务 {summary.get('backup_tasks', 0)}[/dim]")

                # 告警
                alerts = server_data.get("alerts", [])
                if alerts:
                    console.print("\n[yellow]告警:[/yellow]")
                    for alert in alerts[:5]:
                        level = alert.get("level", "warning")
                        color = "red" if level == "critical" else "yellow"
                        console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

            # 总汇总
            summary = results.get("summary", {})
            console.print(f"\n[bold]总汇总:[/bold] "
                         f"服务器: {summary.get('total_servers', 0)}, "
                         f"任务总数: {summary.get('total_tasks', 0)}, "
                         f"[green]启用: {summary.get('total_enabled', 0)}[/green], "
                         f"[red]禁用: {summary.get('total_disabled', 0)}[/red], "
                         f"备份任务: {summary.get('total_backup_tasks', 0)}")

        else:
            # 单服务器模式
            server_name = results.get("server", "Unknown")
            summary = results.get("summary", {})

            console.print(Panel(f"[bold]{server_name} - 计划任务[/bold]", title="服务器"))

            tasks = results.get("tasks", [])
            if tasks:
                table = Table(show_header=True, header_style="bold")
                table.add_column("ID", width=6)
                table.add_column("名称", style="cyan")
                table.add_column("类型")
                table.add_column("状态")
                table.add_column("执行时间")
                table.add_column("备份目标")
                table.add_column("保留数")

                for task in tasks:
                    # 状态
                    if task["enabled"]:
                        status_str = "[green]启用[/green]"
                    else:
                        status_str = "[red]禁用[/red]"

                    # 保留数
                    save_count = task.get("save_count")
                    save_str = str(save_count) if save_count is not None else "-"

                    table.add_row(
                        str(task.get("id", "-")),
                        task.get("name", "-")[:25],
                        task.get("type", "-"),
                        status_str,
                        task.get("exec_time", "-"),
                        task.get("backup_target", "")[:15] or "-",
                        save_str,
                    )

                console.print(table)
            else:
                console.print("[yellow]无计划任务[/yellow]")

            # 汇总
            console.print(f"\n[bold]汇总:[/bold]")
            console.print(f"  总数: {summary.get('total', 0)}")
            console.print(f"  [green]启用: {summary.get('enabled', 0)}[/green]")
            console.print(f"  [red]禁用: {summary.get('disabled', 0)}[/red]")
            console.print(f"  备份任务: {summary.get('backup_tasks', 0)}")
            console.print(f"  Shell任务: {summary.get('shell_tasks', 0)}")
            console.print(f"  其他任务: {summary.get('other_tasks', 0)}")

            # 告警
            alerts = results.get("alerts", [])
            if alerts:
                console.print(f"\n[bold yellow]告警 ({len(alerts)}条):[/bold yellow]")
                for alert in alerts:
                    level = alert.get("level", "warning")
                    color = "red" if level == "critical" else "yellow"
                    console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

    except ImportError:
        print("请安装rich库以使用表格输出: pip install rich")
        print(json.dumps(results, ensure_ascii=False, indent=2))


def main():
    """主函数"""
    parser = argparse.ArgumentParser(
        description="宝塔面板计划任务检查",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  # 查看所有计划任务
  %(prog)s

  # 只查看备份任务
  %(prog)s --backup-only

  # 查看指定服务器
  %(prog)s --server prod-01

  # 查看备份任务日志
  %(prog)s --logs --task-id 11

  # JSON格式输出
  %(prog)s --format json
        """,
    )
    parser.add_argument("--server", "-s", help="指定服务器名称")
    parser.add_argument("--backup-only", action="store_true", help="只显示备份任务")
    parser.add_argument("--logs", action="store_true", help="查看任务日志")
    parser.add_argument("--task-id", type=int, help="任务ID(配合--logs使用)")
    parser.add_argument("--days", type=int, default=7, help="日志查询天数(默认7天)")
    parser.add_argument("--format", "-f", choices=["json", "table"], default="table", help="输出格式")
    parser.add_argument("--output", "-o", help="输出文件路径")
    parser.add_argument("--config", "-c", help="配置文件路径")

    args = parser.parse_args()

    # 初始化客户端管理器
    manager = BtClientManager()

    try:
        manager.load_config(args.config)
    except FileNotFoundError as e:
        print(f"错误: {e}", file=sys.stderr)
        print("请先配置服务器: bt-config.py add", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"加载配置失败: {e}", file=sys.stderr)
        sys.exit(1)

    if not manager.get_all_clients():
        print("错误: 没有配置任何服务器", file=sys.stderr)
        sys.exit(1)

    try:
        if args.logs and args.task_id:
            # 查看任务日志
            if args.server:
                client = manager.get_client(args.server)
            else:
                # 获取第一个服务器
                client = list(manager.get_all_clients().values())[0]

            results = get_backup_task_logs(client, args.task_id, args.days)
        else:
            # 查看任务列表
            results = run_crontab_check(manager, args.server, args.backup_only)

    except KeyError as e:
        print(f"错误: 未找到服务器 {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"检查失败: {e}", file=sys.stderr)
        sys.exit(1)

    # 输出结果
    if args.format == "json":
        output = json.dumps(results, ensure_ascii=False, indent=2)
        if args.output:
            with open(args.output, "w", encoding="utf-8") as f:
                f.write(output)
            print(f"结果已保存到: {args.output}")
        else:
            print(output)
    else:
        if args.logs:
            # 日志输出
            print(json.dumps(results, ensure_ascii=False, indent=2))
        else:
            print_crontab_table(results, args.backup_only)


if __name__ == "__main__":
    main()

```

### scripts/logs.py

```python
#!/usr/bin/env python3
# /// script
# dependencies = [
#   "requests>=2.28",
#   "pyyaml>=6.0",
#   "rich>=13.0",
# ]
# ///
"""
日志读取脚本
读取服务器上的各种日志文件(Nginx/Apache/Redis/MySQL/PostgreSQL错误日志等)

注意事项:
- 只有已安装的服务才能获取日志
- MySQL 使用特殊接口获取日志,不是文件路径
- PostgreSQL 需要安装 pgsql_manager 插件
"""

import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional

# 兼容开发环境和发布环境的导入
_skill_root = Path(__file__).parent.parent

if (_skill_root / "bt_common").exists():
    sys.path.insert(0, str(_skill_root))
else:
    sys.path.insert(0, str(_skill_root.parent / "src"))

from bt_common import (
    BtClient,
    BtClientManager,
    SERVICE_LOG_PATHS,
    SPECIAL_SERVICE_APIS,
    load_config,
)


# 支持的服务日志(文件路径 + 特殊接口)
SUPPORTED_LOG_SERVICES = list(SERVICE_LOG_PATHS.keys()) + list(SPECIAL_SERVICE_APIS.keys())


def check_service_installed(client: BtClient, service: str) -> tuple[bool, str]:
    """
    检查服务是否已安装

    Args:
        client: 宝塔客户端
        service: 服务名称

    Returns:
        (是否已安装, 状态信息)
    """
    try:
        status = client.get_service_status(service)
        installed = status.get("installed", False)
        running = status.get("status", False)

        if not installed:
            return False, "服务未安装"
        elif not running:
            return True, "服务已安装但未运行"
        else:
            return True, "服务运行中"
    except Exception as e:
        return False, f"检查状态失败: {str(e)}"


def get_service_log(client: BtClient, service: str, log_type: str = "error",
                    lines: int = 100, check_installed: bool = True) -> dict:
    """
    获取服务日志

    Args:
        client: 宝塔客户端
        service: 服务名称
        log_type: 日志类型 (error/slow)
        lines: 返回的最后N行
        check_installed: 是否检查服务安装状态

    Returns:
        日志内容
    """
    result = {
        "server": client.name,
        "service": service,
        "log_type": log_type,
        "timestamp": datetime.now().isoformat(),
        "path": None,
        "content": "",
        "size": 0,
        "installed": True,
        "running": True,
        "error": None,
    }

    try:
        # 检查服务是否支持
        if service not in SUPPORTED_LOG_SERVICES:
            result["error"] = f"不支持的服务: {service}。支持的服务: {', '.join(SUPPORTED_LOG_SERVICES)}"
            result["installed"] = False
            return result

        # 检查服务安装状态
        if check_installed:
            installed, status_msg = check_service_installed(client, service)
            result["installed"] = installed

            if not installed:
                result["error"] = f"无法获取日志: {status_msg}"
                return result

        # 特殊服务处理(pgsql、mysql)
        if service in SPECIAL_SERVICE_APIS:
            api_key = "log" if log_type == "error" else "slow_log"
            endpoint = SPECIAL_SERVICE_APIS[service].get(api_key)
            if not endpoint:
                result["error"] = f"不支持的日志类型: {log_type}"
                return result

            response = client.request(endpoint)
            if response.get("status"):
                # 日志可能是列表格式或字符串
                log_data = response.get("data", [])
                if isinstance(log_data, list):
                    result["content"] = "\n".join(str(line) for line in log_data)
                elif isinstance(log_data, str):
                    # MySQL 日志可能直接是字符串
                    result["content"] = log_data
                else:
                    result["content"] = str(log_data)
            else:
                result["error"] = response.get("msg", "获取日志失败")
            return result

        # 标准服务日志路径(nginx、apache、redis)
        if service not in SERVICE_LOG_PATHS:
            result["error"] = f"不支持的服务: {service}"
            return result

        log_path = SERVICE_LOG_PATHS[service]
        result["path"] = log_path

        # 读取文件内容
        response = client.get_file_body(log_path)
        if response.get("status"):
            content = response.get("data", "")
            result["size"] = response.get("size", 0)

            # 只返回最后N行
            if content:
                content_lines = content.split("\n")
                if len(content_lines) > lines:
                    content_lines = content_lines[-lines:]
                result["content"] = "\n".join(content_lines)
        else:
            result["error"] = response.get("msg", "读取日志文件失败")

    except Exception as e:
        result["error"] = str(e)

    return result


def run_log_check(manager: BtClientManager, server: Optional[str] = None,
                  log_type: str = "error", service: Optional[str] = None,
                  lines: int = 100) -> dict:
    """
    执行日志检查

    Args:
        manager: 客户端管理器
        server: 指定服务器名称
        log_type: 日志类型
        service: 服务名称
        lines: 返回的行数

    Returns:
        检查结果
    """
    # 单个服务器
    if server:
        client = manager.get_client(server)
        return get_service_log(client, service, log_type, lines)

    # 所有服务器
    all_clients = manager.get_all_clients()
    results = {
        "timestamp": datetime.now().isoformat(),
        "servers": [],
    }

    for name, client in all_clients.items():
        try:
            log_result = get_service_log(client, service, log_type, lines)
            results["servers"].append(log_result)
        except Exception as e:
            results["servers"].append({
                "server": name,
                "error": str(e),
            })

    return results


def print_log_output(results: dict, format_type: str = "table"):
    """打印日志输出"""
    try:
        from rich.console import Console
        from rich.panel import Panel
        from rich.syntax import Syntax

        console = Console()

        if "servers" in results:
            # 多服务器模式
            for server_data in results["servers"]:
                if "error" in server_data and "content" not in server_data:
                    server_name = server_data.get("server", "Unknown")
                    installed = server_data.get("installed", True)
                    if not installed:
                        console.print(f"[yellow]服务器 {server_name}: {server_data['error']}[/yellow]")
                    else:
                        console.print(f"[red]服务器 {server_name} 错误: {server_data['error']}[/red]")
                    continue

                server_name = server_data.get("server", "Unknown")
                service = server_data.get("service", "unknown")
                content = server_data.get("content", "")

                console.print(f"\n[bold cyan]═══ {server_name} - {service} ═══[/bold cyan]")

                if isinstance(content, str):
                    # 日志内容
                    if content.strip():
                        # 尝试语法高亮
                        try:
                            syntax = Syntax(content, "log", theme="monokai", line_numbers=True)
                            console.print(syntax)
                        except Exception:
                            console.print(content)
                    else:
                        console.print("[yellow]日志为空[/yellow]")
                else:
                    console.print(str(content))

                if server_data.get("size"):
                    console.print(f"\n[dim]文件大小: {server_data['size']} 字节[/dim]")

        else:
            # 单服务器模式
            server_name = results.get("server", "Unknown")
            service = results.get("service", "unknown")
            content = results.get("content", "")
            error = results.get("error")
            installed = results.get("installed", True)

            if error:
                if not installed:
                    console.print(f"[yellow]跳过: {error}[/yellow]")
                else:
                    console.print(f"[red]错误: {error}[/red]")
                return

            console.print(Panel(f"[bold]{server_name} - {service}[/bold]", title="日志"))

            if isinstance(content, str):
                if content.strip():
                    try:
                        syntax = Syntax(content, "log", theme="monokai", line_numbers=True)
                        console.print(syntax)
                    except Exception:
                        console.print(content)
                else:
                    console.print("[yellow]日志为空[/yellow]")
            else:
                console.print(str(content))

            if results.get("size"):
                console.print(f"\n[dim]文件大小: {results['size']} 字节[/dim]")

    except ImportError:
        # 无rich库时使用简单输出
        if "servers" in results:
            for server_data in results["servers"]:
                print(f"\n=== {server_data.get('server', 'Unknown')} ===")
                content = server_data.get("content", "")
                print(content)
        else:
            content = results.get("content", "")
            print(content)


def main():
    """主函数"""
    parser = argparse.ArgumentParser(
        description="宝塔面板服务日志读取",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  # 查看Nginx错误日志
  %(prog)s --service nginx

  # 查看Redis日志
  %(prog)s --service redis

  # 查看Apache错误日志
  %(prog)s --service apache

  # 查看MySQL错误日志
  %(prog)s --service mysql

  # 查看MySQL慢查询日志
  %(prog)s --service mysql --log-type slow

  # 查看PostgreSQL日志(需要插件)
  %(prog)s --service pgsql

  # 查看PostgreSQL慢日志
  %(prog)s --service pgsql --log-type slow

  # 指定服务器和行数
  %(prog)s --server prod-01 --service nginx --lines 200

  # JSON格式输出
  %(prog)s --service nginx --format json

支持的服务: nginx, apache, redis, mysql, pgsql

注意事项:
  - 只有已安装的服务才能获取日志
  - MySQL 使用API接口获取日志,非文件路径
  - PostgreSQL 需要安装 pgsql_manager 插件
        """,
    )
    parser.add_argument("--server", "-s", help="指定服务器名称")
    parser.add_argument("--service", required=True,
                        help="服务名称 (nginx/apache/redis/mysql/pgsql)")
    parser.add_argument("--log-type", choices=["error", "slow"], default="error",
                        help="日志类型: error(错误日志), slow(慢日志,mysql/pgsql支持)")
    parser.add_argument("--lines", "-n", type=int, default=100,
                        help="返回最后N行日志 (默认: 100)")
    parser.add_argument("--format", "-f", choices=["json", "table"], default="table",
                        help="输出格式")
    parser.add_argument("--output", "-o", help="输出文件路径")
    parser.add_argument("--config", "-c", help="配置文件路径")
    parser.add_argument("--no-check", action="store_true",
                        help="跳过服务安装状态检查")

    args = parser.parse_args()

    # 初始化客户端管理器
    manager = BtClientManager()

    try:
        manager.load_config(args.config)
    except FileNotFoundError as e:
        print(f"错误: {e}", file=sys.stderr)
        print("请先配置服务器: bt-config.py add", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"加载配置失败: {e}", file=sys.stderr)
        sys.exit(1)

    if not manager.get_all_clients():
        print("错误: 没有配置任何服务器", file=sys.stderr)
        sys.exit(1)

    # 执行日志读取
    try:
        results = run_log_check(
            manager,
            server=args.server,
            log_type=args.log_type,
            service=args.service,
            lines=args.lines,
        )
    except KeyError as e:
        print(f"错误: 未找到服务器 {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"读取日志失败: {e}", file=sys.stderr)
        sys.exit(1)

    # 输出结果
    if args.format == "json":
        output = json.dumps(results, ensure_ascii=False, indent=2)
        if args.output:
            with open(args.output, "w", encoding="utf-8") as f:
                f.write(output)
            print(f"结果已保存到: {args.output}")
        else:
            print(output)
    else:
        print_log_output(results, args.format)


if __name__ == "__main__":
    main()

```

### scripts/monitor.py

```python
#!/usr/bin/env python3
# /// script
# dependencies = [
#   "requests>=2.28",
#   "pyyaml>=6.0",
#   "rich>=13.0",
# ]
# ///
"""
系统资源监控脚本
监控CPU、内存、磁盘和网络使用情况
"""

import argparse
import json
import sys
from dataclasses import asdict
from pathlib import Path
from typing import Optional

# 兼容开发环境和发布环境的导入
# 发布环境: bt_common/ (脚本在 scripts/)
# 开发环境: src/bt_common/ (脚本在 src/btpanel/scripts/)
_skill_root = Path(__file__).parent.parent  # 技能包根目录

# 优先尝试发布环境(技能包根目录),然后尝试开发环境
if (_skill_root / "bt_common").exists():
    sys.path.insert(0, str(_skill_root))
else:
    sys.path.insert(0, str(_skill_root.parent / "src"))

from bt_common import (
    BtClient,
    BtClientManager,
    check_thresholds,
    parse_system_monitor_data,
    load_config,
)


def get_server_system_status(client: BtClient, thresholds: dict) -> dict:
    """
    获取单个服务器的系统状态

    Args:
        client: 宝塔客户端
        thresholds: 告警阈值配置

    Returns:
        系统状态信息
    """
    # 获取系统状态(GetNetWork接口返回完整监控数据)
    status_data = client.get_system_status()

    # 解析数据
    formatted = parse_system_monitor_data(status_data, client.name)

    # 检查告警
    alerts = check_thresholds(formatted, thresholds)

    result = formatted
    result["alerts"] = [asdict(a) if hasattr(a, "__dataclass_fields__") else a for a in alerts]
    return result


def run_monitor(manager: BtClientManager, server: Optional[str] = None) -> dict:
    """
    执行系统监控

    Args:
        manager: 客户端管理器
        server: 指定服务器名称

    Returns:
        监控结果
    """
    from datetime import datetime

    thresholds = manager.get_global_config().get("thresholds", {"cpu": 80, "memory": 85, "disk": 90})

    # 单个服务器
    if server:
        client = manager.get_client(server)
        return get_server_system_status(client, thresholds)

    # 所有服务器
    all_clients = manager.get_all_clients()
    results = {
        "timestamp": datetime.now().isoformat(),
        "servers": [],
        "summary": {"total": len(all_clients), "healthy": 0, "warning": 0, "critical": 0},
    }

    for name, client in all_clients.items():
        try:
            status = get_server_system_status(client, thresholds)
            results["servers"].append(status)

            # 统计健康状态
            alerts = status.get("alerts", [])
            if not alerts:
                results["summary"]["healthy"] += 1
            else:
                has_critical = any(a.get("level") == "critical" for a in alerts)
                if has_critical:
                    results["summary"]["critical"] += 1
                else:
                    results["summary"]["warning"] += 1

        except Exception as e:
            results["servers"].append(
                {
                    "server": name,
                    "error": str(e),
                    "alerts": [{"level": "critical", "type": "connection", "message": str(e)}],
                }
            )
            results["summary"]["critical"] += 1

    return results


def print_table_output(results: dict):
    """打印表格格式输出"""
    try:
        from rich.console import Console
        from rich.table import Table

        console = Console()

        if "servers" in results:
            # 多服务器模式
            table = Table(title="系统资源监控")
            table.add_column("服务器", style="cyan")
            table.add_column("系统", style="white")
            table.add_column("CPU", style="green")
            table.add_column("内存", style="yellow")
            table.add_column("磁盘", style="red")
            table.add_column("状态", style="bold")

            for server in results["servers"]:
                if "error" in server:
                    table.add_row(
                        server["server"],
                        "-",
                        "-",
                        "-",
                        "-",
                        "[red]连接失败[/red]",
                    )
                    continue

                cpu = server.get("cpu", {})
                memory = server.get("memory", {})
                disk = server.get("disk", {})

                # 确定状态颜色
                alerts = server.get("alerts", [])
                if not alerts:
                    status = "[green]正常[/green]"
                elif any(a.get("level") == "critical" for a in alerts):
                    status = "[red]异常[/red]"
                else:
                    status = "[yellow]警告[/yellow]"

                table.add_row(
                    server.get("server", "Unknown"),
                    server.get("simple_system", server.get("system", "-")),
                    f"{cpu.get('usage', 0):.1f}%",
                    f"{memory.get('percent', 0):.1f}%",
                    f"{disk.get('percent', 0):.1f}%",
                    status,
                )

            console.print(table)

            # 打印汇总
            summary = results.get("summary", {})
            console.print(
                f"\n汇总: [green]正常{summary.get('healthy', 0)}[/green], "
                f"[yellow]警告{summary.get('warning', 0)}[/yellow], "
                f"[red]异常{summary.get('critical', 0)}[/red]"
            )
        else:
            # 单服务器模式
            server_name = results.get("server", "Unknown")
            table = Table(title=f"服务器: {server_name}")
            table.add_column("指标", style="cyan")
            table.add_column("值", style="green")

            cpu = results.get("cpu", {})
            memory = results.get("memory", {})
            disk = results.get("disk", {})
            load = results.get("load", {})
            network = results.get("network", {})

            table.add_row("系统", results.get("system", "Unknown"))
            table.add_row("主机名", results.get("hostname", "Unknown"))
            table.add_row("运行时间", results.get("uptime", "Unknown"))
            table.add_row("面板版本", results.get("version", "Unknown"))
            table.add_row("", "")
            table.add_row("[bold]CPU[/bold]", "")
            table.add_row("  使用率", f"{cpu.get('usage', 0):.1f}%")
            table.add_row("  核心数", str(cpu.get("cores", 1)))
            table.add_row("  型号", str(cpu.get("model", "Unknown")))
            table.add_row("", "")
            table.add_row("[bold]内存[/bold]", "")
            table.add_row("  使用量", f"{memory.get('used_mb', 0)}/{memory.get('total_mb', 0)} MB")
            table.add_row("  使用率", f"{memory.get('percent', 0):.1f}%")
            table.add_row("  可用", f"{memory.get('available_mb', 0)} MB")
            table.add_row("", "")
            table.add_row("[bold]磁盘[/bold]", "")
            table.add_row("  使用量", f"{disk.get('used_human', '0')}/{disk.get('total_human', '0')}")
            table.add_row("  使用率", f"{disk.get('percent', 0):.1f}%")
            table.add_row("", "")
            table.add_row("[bold]负载[/bold]", "")
            table.add_row("  1分钟", f"{load.get('one_minute', 0):.2f}")
            table.add_row("  5分钟", f"{load.get('five_minute', 0):.2f}")
            table.add_row("  15分钟", f"{load.get('fifteen_minute', 0):.2f}")
            table.add_row("", "")
            table.add_row("[bold]网络[/bold]", "")
            table.add_row("  上行", f"{network.get('current_up', 0):.2f} KB/s")
            table.add_row("  下行", f"{network.get('current_down', 0):.2f} KB/s")
            table.add_row("  总上行", network.get("total_up", "0"))
            table.add_row("  总下行", network.get("total_down", "0"))
            table.add_row("", "")
            table.add_row("[bold]资源[/bold]", "")
            table.add_row("  网站", str(results.get("resources", {}).get("sites", 0)))
            table.add_row("  数据库", str(results.get("resources", {}).get("databases", 0)))

            console.print(table)

            # 打印磁盘分区
            disks = disk.get("disks", [])
            if disks:
                disk_table = Table(title="磁盘分区")
                disk_table.add_column("挂载点", style="cyan")
                disk_table.add_column("文件系统", style="white")
                disk_table.add_column("使用量", style="green")
                disk_table.add_column("使用率", style="yellow")

                for d in disks:
                    disk_table.add_row(
                        d.get("path", "/"),
                        d.get("filesystem", "-"),
                        f"{d.get('used_human', '0')}/{d.get('total_human', '0')}",
                        f"{d.get('percent', 0):.1f}%",
                    )
                console.print(disk_table)

            # 打印告警
            alerts = results.get("alerts", [])
            if alerts:
                console.print("\n[bold yellow]告警:[/bold yellow]")
                for alert in alerts:
                    level = alert.get("level", "warning")
                    color = "red" if level == "critical" else "yellow"
                    console.print(f"  [{color}]{alert.get('message', '')}[/{color}]")

    except ImportError:
        # 如果没有rich库,使用简单输出
        print("请安装rich库以使用表格输出: pip install rich")
        print(json.dumps(results, ensure_ascii=False, indent=2))


def main():
    """主函数"""
    parser = argparse.ArgumentParser(
        description="宝塔面板系统资源监控",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  # 监控所有服务器
  %(prog)s

  # 监控指定服务器
  %(prog)s --server prod-01

  # JSON格式输出
  %(prog)s --format json

  # 输出到文件
  %(prog)s --output report.json
        """,
    )
    parser.add_argument("--server", "-s", help="指定服务器名称")
    parser.add_argument("--format", "-f", choices=["json", "table"], default="json", help="输出格式")
    parser.add_argument("--output", "-o", help="输出文件路径")
    parser.add_argument("--config", "-c", help="配置文件路径")

    args = parser.parse_args()

    # 初始化客户端管理器
    manager = BtClientManager()

    try:
        manager.load_config(args.config)
    except FileNotFoundError as e:
        print(f"错误: {e}", file=sys.stderr)
        print("请设置 BT_CONFIG_PATH 环境变量或创建配置文件", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"加载配置失败: {e}", file=sys.stderr)
        sys.exit(1)

    if not manager.get_all_clients():
        print("错误: 没有配置任何服务器", file=sys.stderr)
        sys.exit(1)

    # 执行监控
    try:
        results = run_monitor(manager, args.server)
    except KeyError as e:
        print(f"错误: 未找到服务器 {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"监控失败: {e}", file=sys.stderr)
        sys.exit(1)

    # 输出结果
    if args.format == "table":
        print_table_output(results)
    else:
        output = json.dumps(results, ensure_ascii=False, indent=2)
        if args.output:
            with open(args.output, "w", encoding="utf-8") as f:
                f.write(output)
            print(f"结果已保存到: {args.output}")
        else:
            print(output)


if __name__ == "__main__":
    main()

```

### scripts/services.py

```python
#!/usr/bin/env python3
# /// script
# dependencies = [
#   "requests>=2.28",
#   "pyyaml>=6.0",
#   "rich>=13.0",
# ]
# ///
"""
服务状态检查脚本
检查服务器上运行的服务状态(Nginx/Apache/PHP/Redis/Memcached等)
"""

import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional

# 兼容开发环境和发布环境的导入
_skill_root = Path(__file__).parent.parent

if (_skill_root / "bt_common").exists():
    sys.path.insert(0, str(_skill_root))
else:
    sys.path.insert(0, str(_skill_root.parent / "src"))

from bt_common import (
    BtClient,
    BtClientManager,
    SOFTWARE_SERVICES,
    load_config,
)


def get_server_services(client: BtClient, services: Optional[list] = None) -> dict:
    """
    获取单个服务器的服务状态

    Args:
        client: 宝塔客户端
        services: 要查询的服务列表

    Returns:
        服务状态信息
    """
    # 获取所有服务状态
    service_list = client.get_all_services_status(services)

    # 统计
    total = len(service_list)
    running = sum(1 for s in service_list if s.get("status"))
    stopped = sum(1 for s in service_list if s.get("installed") and not s.get("status"))
    not_installed = sum(1 for s in service_list if not s.get("installed"))

    # 生成告警
    alerts = []
    for svc in service_list:
        if svc.get("installed") and not svc.get("status"):
            alerts.append({
                "level": "warning",
                "type": "service",
                "message": f"服务 {svc.get('title', svc.get('name'))} 已停止",
                "service": svc.get("name"),
            })
        elif svc.get("error"):
            alerts.append({
                "level": "warning",
                "type": "service",
                "message": f"服务 {svc.get('name')} 状态查询失败: {svc.get('error')}",
                "service": svc.get("name"),
            })

    return {
        "server": client.name,
        "timestamp": datetime.now().isoformat(),
        "services": service_list,
        "summary": {
            "total": total,
            "running": running,
            "stopped": stopped,
            "not_installed": not_installed,
        },
        "alerts": alerts,
    }


def run_services_check(manager: BtClientManager, server: Optional[str] = None,
                       services: Optional[list] = None) -> dict:
    """
    执行服务状态检查

    Args:
        manager: 客户端管理器
        server: 指定服务器名称
        services: 要查询的服务列表

    Returns:
        检查结果
    """
    # 单个服务器
    if server:
        client = manager.get_client(server)
        return get_server_services(client, services)

    # 所有服务器
    all_clients = manager.get_all_clients()
    results = {
        "timestamp": datetime.now().isoformat(),
        "servers": [],
        "summary": {
            "total_servers": 0,
            "total_services": 0,
            "total_running": 0,
            "total_stopped": 0,
        },
        "alerts": [],
    }

    for name, client in all_clients.items():
        try:
            service_result = get_server_services(client, services)
            results["servers"].append(service_result)

            # 汇总统计
            summary = service_result.get("summary", {})
            results["summary"]["total_servers"] += 1
            results["summary"]["total_services"] += summary.get("total", 0)
            results["summary"]["total_running"] += summary.get("running", 0)
            results["summary"]["total_stopped"] += summary.get("stopped", 0)

            # 收集告警
            for alert in service_result.get("alerts", []):
                results["alerts"].append(alert)

        except Exception as e:
            results["servers"].append({
                "server": name,
                "error": str(e),
                "services": [],
                "alerts": [{"level": "critical", "type": "connection", "message": str(e)}],
            })

    return results


def print_services_table(results: dict):
    """打印表格格式输出"""
    try:
        from rich.console import Console
        from rich.table import Table
        from rich.panel import Panel

        console = Console()

        if "servers" in results and len(results["servers"]) > 1:
            # 多服务器模式 - 显示汇总
            for server_data in results["servers"]:
                if "error" in server_data:
                    console.print(f"[red]服务器 {server_data['server']} 连接失败: {server_data['error']}[/red]")
                    continue

                server_name = server_data.get("server", "Unknown")
                summary = server_data.get("summary", {})

                # 服务器标题
                console.print(f"\n[bold cyan]═══ {server_name} ═══[/bold cyan]")

                # 服务列表表格
                services = server_data.get("services", [])
                if services:
                    table = Table(show_header=True, header_style="bold")
                    table.add_column("服务", style="cyan", width=20)
                    table.add_column("版本", width=12)
                    table.add_column("状态", width=10)
                    table.add_column("安装", width=8)
                    table.add_column("PID", width=8)

                    for svc in services:
                        # 状态颜色
                        if not svc.get("installed", False):
                            status_str = "[dim]未安装[/dim]"
                        elif svc.get("status"):
                            status_str = "[green]运行中[/green]"
                        else:
                            status_str = "[red]已停止[/red]"

                        # 安装状态
                        installed_str = "✓" if svc.get("installed") else "-"

                        # PID
                        pid = svc.get("pid", 0) or 0
                        pid_str = str(pid) if pid > 0 else "-"

                        table.add_row(
                            svc.get("title", svc.get("name", "-"))[:20],
                            svc.get("version", "-")[:12],
                            status_str,
                            installed_str,
                            pid_str,
                        )

                    console.print(table)
                else:
                    console.print("[yellow]无服务信息[/yellow]")

                # 汇总
                console.print(f"\n[dim]汇总: "
                             f"总数 {summary.get('total', 0)}, "
                             f"[green]运行 {summary.get('running', 0)}[/green], "
                             f"[red]停止 {summary.get('stopped', 0)}[/red], "
                             f"[dim]未安装 {summary.get('not_installed', 0)}[/dim][/dim]")

                # 告警
                alerts = server_data.get("alerts", [])
                if alerts:
                    console.print("\n[yellow]告警:[/yellow]")
                    for alert in alerts[:5]:
                        level = alert.get("level", "warning")
                        color = "red" if level == "critical" else "yellow"
                        console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

            # 总汇总
            summary = results.get("summary", {})
            console.print(f"\n[bold]总汇总:[/bold] "
                         f"服务器: {summary.get('total_servers', 0)}, "
                         f"服务总数: {summary.get('total_services', 0)}, "
                         f"[green]运行: {summary.get('total_running', 0)}[/green], "
                         f"[red]停止: {summary.get('total_stopped', 0)}[/red]")

        else:
            # 单服务器模式
            server_name = results.get("server", "Unknown")

            # 基本信息
            console.print(Panel(f"[bold]{server_name}[/bold]", title="服务器"))

            services = results.get("services", [])
            if services:
                table = Table(show_header=True, header_style="bold")
                table.add_column("服务", style="cyan")
                table.add_column("版本")
                table.add_column("状态")
                table.add_column("安装")
                table.add_column("PID")

                for svc in services:
                    # 状态颜色
                    if not svc.get("installed", False):
                        status_str = "[dim]未安装[/dim]"
                    elif svc.get("status"):
                        status_str = "[green]运行中[/green]"
                    else:
                        status_str = "[red]已停止[/red]"

                    # 安装状态
                    installed_str = "✓" if svc.get("installed") else "-"

                    # PID
                    pid = svc.get("pid", 0) or 0
                    pid_str = str(pid) if pid > 0 else "-"

                    table.add_row(
                        svc.get("title", svc.get("name", "-")),
                        svc.get("version", "-"),
                        status_str,
                        installed_str,
                        pid_str,
                    )

                console.print(table)
            else:
                console.print("[yellow]无服务信息[/yellow]")

            # 汇总
            summary = results.get("summary", {})
            console.print(f"\n[bold]汇总:[/bold]")
            console.print(f"  总数: {summary.get('total', 0)}")
            console.print(f"  [green]运行: {summary.get('running', 0)}[/green]")
            console.print(f"  [red]停止: {summary.get('stopped', 0)}[/red]")
            console.print(f"  [dim]未安装: {summary.get('not_installed', 0)}[/dim]")

            # 告警
            alerts = results.get("alerts", [])
            if alerts:
                console.print(f"\n[bold yellow]告警 ({len(alerts)}条):[/bold yellow]")
                for alert in alerts:
                    level = alert.get("level", "warning")
                    color = "red" if level == "critical" else "yellow"
                    console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

    except ImportError:
        print("请安装rich库以使用表格输出: pip install rich")
        print(json.dumps(results, ensure_ascii=False, indent=2))


def main():
    """主函数"""
    parser = argparse.ArgumentParser(
        description="宝塔面板服务状态检查",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  # 检查所有服务器的服务状态
  %(prog)s

  # 检查指定服务器
  %(prog)s --server prod-01

  # 只检查特定服务
  %(prog)s --service nginx --service redis

  # JSON格式输出
  %(prog)s --format json

  # 输出到文件
  %(prog)s --output services.json

支持的服务: nginx, apache, mysql, redis, memcached, pure-ftpd
PHP服务: 自动检测已安装的PHP版本(php-8.2, php-7.4等)
PostgreSQL: 需要安装pgsql_manager插件

字段说明:
  installed (setup): 服务是否已安装
  status: 服务是否正在运行(仅installed=true时有意义)
  version: 已安装的版本号
  pid: 主进程ID(运行中时)
        """,
    )
    parser.add_argument("--server", "-s", help="指定服务器名称")
    parser.add_argument("--format", "-f", choices=["json", "table"], default="table", help="输出格式")
    parser.add_argument("--output", "-o", help="输出文件路径")
    parser.add_argument("--service", action="append", dest="services",
                        help="指定要检查的服务(可多次指定)")
    parser.add_argument("--config", "-c", help="配置文件路径")

    args = parser.parse_args()

    # 初始化客户端管理器
    manager = BtClientManager()

    try:
        manager.load_config(args.config)
    except FileNotFoundError as e:
        print(f"错误: {e}", file=sys.stderr)
        print("请先配置服务器: bt-config.py add", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"加载配置失败: {e}", file=sys.stderr)
        sys.exit(1)

    if not manager.get_all_clients():
        print("错误: 没有配置任何服务器", file=sys.stderr)
        sys.exit(1)

    # 执行检查
    try:
        results = run_services_check(manager, args.server, args.services)
    except KeyError as e:
        print(f"错误: 未找到服务器 {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"检查失败: {e}", file=sys.stderr)
        sys.exit(1)

    # 输出结果
    if args.format == "table":
        print_services_table(results)
    else:
        output = json.dumps(results, ensure_ascii=False, indent=2)
        if args.output:
            with open(args.output, "w", encoding="utf-8") as f:
                f.write(output)
            print(f"结果已保存到: {args.output}")
        else:
            print(output)


if __name__ == "__main__":
    main()

```

### scripts/sites.py

```python
#!/usr/bin/env python3
# /// script
# dependencies = [
#   "requests>=2.28",
#   "pyyaml>=6.0",
#   "rich>=13.0",
# ]
# ///
"""
网站状态检查脚本
检查所有网站和项目的运行状态、SSL证书等
"""

import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional

# 兼容开发环境和发布环境的导入
_skill_root = Path(__file__).parent.parent

if (_skill_root / "bt_common").exists():
    sys.path.insert(0, str(_skill_root))
else:
    sys.path.insert(0, str(_skill_root.parent / "src"))

from bt_common import (
    BtClient,
    BtClientManager,
    parse_all_sites,
    load_config,
)


def get_server_sites(client: BtClient) -> dict:
    """
    获取单个服务器的网站状态

    Args:
        client: 宝塔客户端

    Returns:
        网站状态信息
    """
    # 获取所有网站和项目
    sites_data = client.get_all_sites()

    # 解析数据
    return parse_all_sites(sites_data, client.name)


def run_sites_check(manager: BtClientManager, server: Optional[str] = None) -> dict:
    """
    执行网站状态检查

    Args:
        manager: 客户端管理器
        server: 指定服务器名称

    Returns:
        检查结果
    """
    # 单个服务器
    if server:
        client = manager.get_client(server)
        return get_server_sites(client)

    # 所有服务器
    all_clients = manager.get_all_clients()
    results = {
        "timestamp": datetime.now().isoformat(),
        "servers": [],
        "summary": {
            "total": 0,
            "running": 0,
            "stopped": 0,
            "ssl_expired": 0,
            "ssl_expiring": 0,
        },
        "alerts": [],
    }

    for name, client in all_clients.items():
        try:
            site_result = get_server_sites(client)
            results["servers"].append(site_result)

            # 汇总统计
            summary = site_result.get("summary", {})
            results["summary"]["total"] += summary.get("total", 0)
            results["summary"]["running"] += summary.get("by_status", {}).get("running", 0)
            results["summary"]["stopped"] += summary.get("by_status", {}).get("stopped", 0)
            results["summary"]["ssl_expired"] += summary.get("ssl_expired", 0)
            results["summary"]["ssl_expiring"] += summary.get("ssl_expiring", 0)

            # 收集告警
            for alert in site_result.get("alerts", []):
                results["alerts"].append(alert)

        except Exception as e:
            results["servers"].append({
                "server": name,
                "error": str(e),
                "sites": [],
                "alerts": [{"level": "critical", "type": "connection", "message": str(e)}],
            })

    return results


def print_sites_table(results: dict):
    """打印表格格式输出"""
    try:
        from rich.console import Console
        from rich.table import Table
        from rich.panel import Panel

        console = Console()

        if "servers" in results and len(results["servers"]) > 1:
            # 多服务器模式 - 显示汇总
            for server_data in results["servers"]:
                if "error" in server_data:
                    console.print(f"[red]服务器 {server_data['server']} 连接失败: {server_data['error']}[/red]")
                    continue

                server_name = server_data.get("server", "Unknown")
                summary = server_data.get("summary", {})

                # 服务器标题
                console.print(f"\n[bold cyan]═══ {server_name} ═══[/bold cyan]")

                # 网站列表表格
                sites = server_data.get("sites", [])
                if sites:
                    table = Table(show_header=True, header_style="bold")
                    table.add_column("名称", style="cyan", width=25)
                    table.add_column("类型", width=8)
                    table.add_column("状态", width=8)
                    table.add_column("SSL", width=10)
                    table.add_column("PHP/端口", width=10)
                    table.add_column("备注", width=20)

                    for site in sites:
                        # 状态颜色
                        status = site.get("status", "unknown")
                        if status == "running":
                            status_str = "[green]运行[/green]"
                        elif status == "starting":
                            status_str = "[yellow]启动中[/yellow]"
                        else:
                            status_str = "[red]停止[/red]"

                        # SSL状态
                        ssl = site.get("ssl", {})
                        ssl_status = ssl.get("status", "none")
                        if ssl_status == "valid":
                            ssl_str = f"[green]{ssl.get('days_remaining', 0)}天[/green]"
                        elif ssl_status == "warning":
                            ssl_str = f"[yellow]{ssl.get('days_remaining', 0)}天[/yellow]"
                        elif ssl_status == "critical":
                            ssl_str = f"[red]{ssl.get('days_remaining', 0)}天[/red]"
                        elif ssl_status == "expired":
                            ssl_str = "[red]已过期[/red]"
                        else:
                            ssl_str = "-"

                        # PHP版本或端口
                        php_or_port = site.get("php_version") or str(site.get("port", "")) or "-"

                        table.add_row(
                            site.get("name", "-")[:25],
                            site.get("type", "-"),
                            status_str,
                            ssl_str,
                            php_or_port[:10],
                            (site.get("ps", "") or "")[:20],
                        )

                    console.print(table)
                else:
                    console.print("[yellow]无网站[/yellow]")

                # 告警
                alerts = server_data.get("alerts", [])
                if alerts:
                    console.print("\n[yellow]告警:[/yellow]")
                    for alert in alerts[:5]:
                        level = alert.get("level", "warning")
                        color = "red" if level == "critical" else "yellow"
                        console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

            # 总汇总
            summary = results.get("summary", {})
            console.print(f"\n[bold]总汇总:[/bold] "
                         f"网站总数: {summary.get('total', 0)}, "
                         f"[green]运行: {summary.get('running', 0)}[/green], "
                         f"[red]停止: {summary.get('stopped', 0)}[/red], "
                         f"[red]SSL过期: {summary.get('ssl_expired', 0)}[/red], "
                         f"[yellow]SSL即将过期: {summary.get('ssl_expiring', 0)}[/yellow]")

        else:
            # 单服务器模式
            server_name = results.get("server", "Unknown")

            # 基本信息
            console.print(Panel(f"[bold]{server_name}[/bold]", title="服务器"))

            sites = results.get("sites", [])
            if sites:
                table = Table(show_header=True, header_style="bold")
                table.add_column("名称", style="cyan")
                table.add_column("类型")
                table.add_column("状态")
                table.add_column("SSL")
                table.add_column("路径")
                table.add_column("备注")

                for site in sites:
                    status = site.get("status", "unknown")
                    if status == "running":
                        status_str = "[green]运行[/green]"
                    elif status == "starting":
                        status_str = "[yellow]启动中[/yellow]"
                    else:
                        status_str = "[red]停止[/red]"

                    ssl = site.get("ssl", {})
                    ssl_status = ssl.get("status", "none")
                    if ssl_status == "valid":
                        ssl_str = f"[green]有效({ssl.get('days_remaining', 0)}天)[/green]"
                    elif ssl_status == "expired":
                        ssl_str = "[red]已过期[/red]"
                    elif ssl_status in ["warning", "critical"]:
                        ssl_str = f"[yellow]{ssl.get('days_remaining', 0)}天后过期[/yellow]"
                    else:
                        ssl_str = "-"

                    table.add_row(
                        site.get("name", "-"),
                        site.get("type", "-"),
                        status_str,
                        ssl_str,
                        site.get("path", "-")[:40],
                        site.get("ps", "")[:20],
                    )

                console.print(table)
            else:
                console.print("[yellow]无网站[/yellow]")

            # 汇总
            summary = results.get("summary", {})
            console.print(f"\n[bold]汇总:[/bold]")
            console.print(f"  总数: {summary.get('total', 0)}")
            console.print(f"  按类型: {summary.get('by_type', {})}")
            console.print(f"  按状态: {summary.get('by_status', {})}")

            # 告警
            alerts = results.get("alerts", [])
            if alerts:
                console.print(f"\n[bold yellow]告警 ({len(alerts)}条):[/bold yellow]")
                for alert in alerts:
                    level = alert.get("level", "warning")
                    color = "red" if level == "critical" else "yellow"
                    console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

    except ImportError:
        print("请安装rich库以使用表格输出: pip install rich")
        print(json.dumps(results, ensure_ascii=False, indent=2))


def main():
    """主函数"""
    parser = argparse.ArgumentParser(
        description="宝塔面板网站状态检查",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  # 检查所有服务器的网站状态
  %(prog)s

  # 检查指定服务器
  %(prog)s --server prod-01

  # 只显示停止的网站
  %(prog)s --filter stopped

  # 只显示SSL即将过期的网站
  %(prog)s --filter ssl-warning

  # 输出到文件
  %(prog)s --output sites.json
        """,
    )
    parser.add_argument("--server", "-s", help="指定服务器名称")
    parser.add_argument("--format", "-f", choices=["json", "table"], default="table", help="输出格式")
    parser.add_argument("--output", "-o", help="输出文件路径")
    parser.add_argument("--filter", choices=["stopped", "ssl-warning", "ssl-expired"],
                        help="过滤条件: stopped(停止的), ssl-warning(SSL即将过期), ssl-expired(SSL已过期)")
    parser.add_argument("--config", "-c", help="配置文件路径")

    args = parser.parse_args()

    # 初始化客户端管理器
    manager = BtClientManager()

    try:
        manager.load_config(args.config)
    except FileNotFoundError as e:
        print(f"错误: {e}", file=sys.stderr)
        print("请先配置服务器: bt-config.py add", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"加载配置失败: {e}", file=sys.stderr)
        sys.exit(1)

    if not manager.get_all_clients():
        print("错误: 没有配置任何服务器", file=sys.stderr)
        sys.exit(1)

    # 执行检查
    try:
        results = run_sites_check(manager, args.server)
    except KeyError as e:
        print(f"错误: 未找到服务器 {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"检查失败: {e}", file=sys.stderr)
        sys.exit(1)

    # 应用过滤
    if args.filter:
        results = apply_filter(results, args.filter)

    # 输出结果
    if args.format == "table":
        print_sites_table(results)
    else:
        output = json.dumps(results, ensure_ascii=False, indent=2)
        if args.output:
            with open(args.output, "w", encoding="utf-8") as f:
                f.write(output)
            print(f"结果已保存到: {args.output}")
        else:
            print(output)


def apply_filter(results: dict, filter_type: str) -> dict:
    """应用过滤条件"""
    if "servers" in results:
        # 多服务器模式
        for server_data in results.get("servers", []):
            if "sites" in server_data:
                server_data["sites"] = filter_sites(server_data["sites"], filter_type)
    elif "sites" in results:
        # 单服务器模式
        results["sites"] = filter_sites(results["sites"], filter_type)

    return results


def filter_sites(sites: list, filter_type: str) -> list:
    """过滤网站列表"""
    filtered = []
    for site in sites:
        if filter_type == "stopped":
            if site.get("status") == "stopped":
                filtered.append(site)
        elif filter_type == "ssl-warning":
            ssl = site.get("ssl", {})
            if ssl.get("status") == "warning":
                filtered.append(site)
        elif filter_type == "ssl-expired":
            ssl = site.get("ssl", {})
            if ssl.get("status") == "expired":
                filtered.append(site)
    return filtered


if __name__ == "__main__":
    main()

```

### scripts/ssh.py

```python
#!/usr/bin/env python3
# /// script
# dependencies = [
#   "requests>=2.28",
#   "pyyaml>=6.0",
#   "rich>=13.0",
# ]
# ///
"""
SSH状态检查脚本
检查SSH服务状态和登录日志
"""

import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional

# 兼容开发环境和发布环境的导入
_skill_root = Path(__file__).parent.parent

if (_skill_root / "bt_common").exists():
    sys.path.insert(0, str(_skill_root))
else:
    sys.path.insert(0, str(_skill_root.parent / "src"))

from bt_common import (
    BtClient,
    BtClientManager,
    load_config,
)


def get_ssh_status(client: BtClient) -> dict:
    """
    获取SSH服务状态

    Args:
        client: 宝塔客户端

    Returns:
        SSH状态信息
    """
    result = {
        "server": client.name,
        "timestamp": datetime.now().isoformat(),
        "ssh": {},
        "alerts": [],
    }

    try:
        info = client.get_ssh_info()

        ssh_info = {
            "port": info.get("port", 22),
            "status": info.get("status", False),
            "status_text": info.get("status_text", "未知"),
            "ping_enabled": info.get("ping", False),
            "firewall_status": info.get("firewall_status", False),
            "fail2ban": {
                "status": info.get("fail2ban", {}).get("status", 0) == 1,
                "installed": info.get("fail2ban", {}).get("installed", 0) == 1,
            },
            "ban_cron_job": info.get("ban_cron_job", False),
        }

        result["ssh"] = ssh_info

        # 生成告警
        if not ssh_info["status"]:
            result["alerts"].append({
                "level": "critical",
                "type": "ssh",
                "message": "SSH服务已停止",
            })

        # 检查非标准端口
        if ssh_info["port"] != 22:
            result["alerts"].append({
                "level": "info",
                "type": "ssh",
                "message": f"SSH使用非标准端口: {ssh_info['port']}",
            })

    except Exception as e:
        result["error"] = str(e)
        result["alerts"].append({
            "level": "critical",
            "type": "connection",
            "message": f"获取SSH状态失败: {e}",
        })

    return result


def get_ssh_logs(client: BtClient, page: int = 1, limit: int = 50,
                 login_filter: str = "ALL", search: str = "") -> dict:
    """
    获取SSH登录日志

    Args:
        client: 宝塔客户端
        page: 页码
        limit: 每页数量
        login_filter: 过滤类型 (ALL/success/failed)
        search: 搜索关键字

    Returns:
        SSH登录日志
    """
    result = {
        "server": client.name,
        "timestamp": datetime.now().isoformat(),
        "logs": [],
        "summary": {
            "total": 0,
            "success": 0,
            "failed": 0,
            "unique_ips": set(),
        },
        "alerts": [],
    }

    try:
        response = client.get_ssh_logs(page=page, limit=limit, search=search)
        logs = response.get("data", [])

        # 解析日志
        parsed_logs = []
        for log in logs:
            parsed_log = {
                "time": log.get("time", ""),
                "timestamp": log.get("timestamp", 0),
                "type": log.get("type", "unknown"),  # success/failed
                "status": log.get("status", 0),
                "user": log.get("user", ""),
                "address": log.get("address", ""),
                "port": log.get("port", ""),
                "login_type": log.get("login_type", "password"),
                "area": log.get("area", {}).get("info", "未知"),
                "deny_status": log.get("deny_status", 0),
            }

            # 应用过滤
            if login_filter != "ALL":
                if login_filter == "success" and parsed_log["type"] != "success":
                    continue
                elif login_filter == "failed" and parsed_log["type"] != "failed":
                    continue

            parsed_logs.append(parsed_log)

            # 统计
            result["summary"]["total"] += 1
            if parsed_log["type"] == "success":
                result["summary"]["success"] += 1
            else:
                result["summary"]["failed"] += 1
            result["summary"]["unique_ips"].add(parsed_log["address"])

        result["logs"] = parsed_logs
        result["summary"]["unique_ips"] = len(result["summary"]["unique_ips"])

        # 生成告警 - 检测异常登录
        recent_failed = sum(1 for log in parsed_logs[:10] if log["type"] == "failed")
        if recent_failed >= 5:
            result["alerts"].append({
                "level": "warning",
                "type": "ssh",
                "message": f"最近10条日志中有{recent_failed}次登录失败",
            })

    except Exception as e:
        result["error"] = str(e)
        result["alerts"].append({
            "level": "critical",
            "type": "connection",
            "message": f"获取SSH日志失败: {e}",
        })

    return result


def run_ssh_check(manager: BtClientManager, server: Optional[str] = None,
                  check_type: str = "status") -> dict:
    """
    执行SSH检查

    Args:
        manager: 客户端管理器
        server: 指定服务器名称
        check_type: 检查类型 (status/logs)

    Returns:
        检查结果
    """
    # 单个服务器
    if server:
        client = manager.get_client(server)
        if check_type == "status":
            return get_ssh_status(client)
        else:
            return get_ssh_logs(client)

    # 所有服务器
    all_clients = manager.get_all_clients()
    results = {
        "timestamp": datetime.now().isoformat(),
        "servers": [],
    }

    for name, client in all_clients.items():
        try:
            if check_type == "status":
                result = get_ssh_status(client)
            else:
                result = get_ssh_logs(client)
            results["servers"].append(result)
        except Exception as e:
            results["servers"].append({
                "server": name,
                "error": str(e),
                "alerts": [{"level": "critical", "type": "connection", "message": str(e)}],
            })

    return results


def print_ssh_status(results: dict):
    """打印SSH状态输出"""
    try:
        from rich.console import Console
        from rich.table import Table
        from rich.panel import Panel

        console = Console()

        if "servers" in results:
            # 多服务器模式
            for server_data in results["servers"]:
                if "error" in server_data:
                    console.print(f"[red]服务器 {server_data.get('server', 'Unknown')} 错误: {server_data['error']}[/red]")
                    continue

                server_name = server_data.get("server", "Unknown")
                ssh_info = server_data.get("ssh", {})

                console.print(f"\n[bold cyan]═══ {server_name} ═══[/bold cyan]")

                # SSH状态表格
                table = Table(show_header=True, header_style="bold")
                table.add_column("项目", style="cyan", width=20)
                table.add_column("值", width=30)

                status_str = "[green]运行中[/green]" if ssh_info.get("status") else "[red]已停止[/red]"
                table.add_row("SSH服务", status_str)
                table.add_row("端口", str(ssh_info.get("port", 22)))
                table.add_row("Ping", "允许" if ssh_info.get("ping_enabled") else "禁止")
                table.add_row("防火墙", "开启" if ssh_info.get("firewall_status") else "关闭")

                fail2ban = ssh_info.get("fail2ban", {})
                fb_status = "已安装" if fail2ban.get("installed") else "未安装"
                if fail2ban.get("status"):
                    fb_status += " [green](运行中)[/green]"
                table.add_row("Fail2ban", fb_status)

                console.print(table)

                # 告警
                alerts = server_data.get("alerts", [])
                if alerts:
                    console.print("\n[yellow]提示:[/yellow]")
                    for alert in alerts:
                        level = alert.get("level", "info")
                        if level == "critical":
                            color = "red"
                        elif level == "warning":
                            color = "yellow"
                        else:
                            color = "blue"
                        console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

        else:
            # 单服务器模式
            server_name = results.get("server", "Unknown")
            ssh_info = results.get("ssh", {})

            console.print(Panel(f"[bold]{server_name} - SSH状态[/bold]", title="服务器"))

            table = Table(show_header=True, header_style="bold")
            table.add_column("项目", style="cyan")
            table.add_column("值")

            status_str = "[green]运行中[/green]" if ssh_info.get("status") else "[red]已停止[/red]"
            table.add_row("SSH服务", status_str)
            table.add_row("端口", str(ssh_info.get("port", 22)))
            table.add_row("状态描述", ssh_info.get("status_text", "未知"))
            table.add_row("Ping", "允许" if ssh_info.get("ping_enabled") else "禁止")
            table.add_row("防火墙", "开启" if ssh_info.get("firewall_status") else "关闭")

            fail2ban = ssh_info.get("fail2ban", {})
            fb_status = "已安装" if fail2ban.get("installed") else "未安装"
            if fail2ban.get("status"):
                fb_status += " (运行中)"
            table.add_row("Fail2ban", fb_status)

            console.print(table)

            # 告警
            alerts = results.get("alerts", [])
            if alerts:
                console.print(f"\n[bold yellow]告警 ({len(alerts)}条):[/bold yellow]")
                for alert in alerts:
                    level = alert.get("level", "info")
                    if level == "critical":
                        color = "red"
                    elif level == "warning":
                        color = "yellow"
                    else:
                        color = "blue"
                    console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

    except ImportError:
        print("请安装rich库以使用表格输出: pip install rich")
        print(json.dumps(results, ensure_ascii=False, indent=2, default=str))


def print_ssh_logs(results: dict):
    """打印SSH日志输出"""
    try:
        from rich.console import Console
        from rich.table import Table
        from rich.panel import Panel

        console = Console()

        if "servers" in results:
            # 多服务器模式
            for server_data in results["servers"]:
                if "error" in server_data:
                    console.print(f"[red]服务器 {server_data.get('server', 'Unknown')} 错误: {server_data['error']}[/red]")
                    continue

                server_name = server_data.get("server", "Unknown")
                logs = server_data.get("logs", [])
                summary = server_data.get("summary", {})

                console.print(f"\n[bold cyan]═══ {server_name} ═══[/bold cyan]")

                # 汇总
                console.print(f"[dim]总计: {summary.get('total', 0)} 条, "
                             f"[green]成功: {summary.get('success', 0)}[/green], "
                             f"[red]失败: {summary.get('failed', 0)}[/red], "
                             f"独立IP: {summary.get('unique_ips', 0)}[/dim]")

                if logs:
                    table = Table(show_header=True, header_style="bold")
                    table.add_column("时间", width=20)
                    table.add_column("类型", width=8)
                    table.add_column("用户", width=10)
                    table.add_column("IP地址", width=18)
                    table.add_column("地区", width=15)

                    for log in logs[:30]:
                        type_str = "[green]成功[/green]" if log["type"] == "success" else "[red]失败[/red]"
                        table.add_row(
                            log.get("time", "")[:19],
                            type_str,
                            log.get("user", "-"),
                            log.get("address", "-"),
                            log.get("area", "未知")[:15],
                        )

                    console.print(table)
                else:
                    console.print("[yellow]无登录日志[/yellow]")

                # 告警
                alerts = server_data.get("alerts", [])
                if alerts:
                    console.print("\n[yellow]告警:[/yellow]")
                    for alert in alerts:
                        level = alert.get("level", "warning")
                        color = "red" if level == "critical" else "yellow"
                        console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

        else:
            # 单服务器模式
            server_name = results.get("server", "Unknown")
            logs = results.get("logs", [])
            summary = results.get("summary", {})

            console.print(Panel(f"[bold]{server_name} - SSH登录日志[/bold]", title="服务器"))

            # 汇总
            console.print(f"[dim]总计: {summary.get('total', 0)} 条, "
                         f"[green]成功: {summary.get('success', 0)}[/green], "
                         f"[red]失败: {summary.get('failed', 0)}[/red], "
                         f"独立IP: {summary.get('unique_ips', 0)}[/dim]")

            if logs:
                table = Table(show_header=True, header_style="bold")
                table.add_column("时间")
                table.add_column("类型")
                table.add_column("用户")
                table.add_column("IP地址")
                table.add_column("端口")
                table.add_column("地区")

                for log in logs[:50]:
                    type_str = "[green]成功[/green]" if log["type"] == "success" else "[red]失败[/red]"
                    table.add_row(
                        log.get("time", "")[:19],
                        type_str,
                        log.get("user", "-"),
                        log.get("address", "-"),
                        log.get("port", "-"),
                        log.get("area", "未知"),
                    )

                console.print(table)
            else:
                console.print("[yellow]无登录日志[/yellow]")

            # 告警
            alerts = results.get("alerts", [])
            if alerts:
                console.print(f"\n[bold yellow]告警 ({len(alerts)}条):[/bold yellow]")
                for alert in alerts:
                    level = alert.get("level", "warning")
                    color = "red" if level == "critical" else "yellow"
                    console.print(f"  [{color}]• {alert.get('message', '')}[/{color}]")

    except ImportError:
        print("请安装rich库以使用表格输出: pip install rich")
        print(json.dumps(results, ensure_ascii=False, indent=2, default=str))


def main():
    """主函数"""
    parser = argparse.ArgumentParser(
        description="宝塔面板SSH状态和日志检查",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  # 查看SSH服务状态
  %(prog)s --status

  # 查看SSH登录日志
  %(prog)s --logs

  # 只查看失败的登录日志
  %(prog)s --logs --filter failed

  # 只查看成功的登录日志
  %(prog)s --logs --filter success

  # 搜索特定IP的登录记录
  %(prog)s --logs --search 192.168.1.1

  # 指定服务器
  %(prog)s --status --server prod-01

  # JSON格式输出
  %(prog)s --logs --format json
        """,
    )
    parser.add_argument("--server", "-s", help="指定服务器名称")
    parser.add_argument("--status", action="store_true", help="查看SSH服务状态")
    parser.add_argument("--logs", action="store_true", help="查看SSH登录日志")
    parser.add_argument("--filter", choices=["ALL", "success", "failed"], default="ALL",
                        help="日志过滤: ALL(全部), success(成功), failed(失败)")
    parser.add_argument("--search", help="搜索关键字(IP地址或用户名)")
    parser.add_argument("--limit", "-n", type=int, default=50,
                        help="返回日志条数 (默认: 50)")
    parser.add_argument("--format", "-f", choices=["json", "table"], default="table",
                        help="输出格式")
    parser.add_argument("--output", "-o", help="输出文件路径")
    parser.add_argument("--config", "-c", help="配置文件路径")

    args = parser.parse_args()

    # 默认显示状态
    if not args.status and not args.logs:
        args.status = True

    # 初始化客户端管理器
    manager = BtClientManager()

    try:
        manager.load_config(args.config)
    except FileNotFoundError as e:
        print(f"错误: {e}", file=sys.stderr)
        print("请先配置服务器: bt-config.py add", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"加载配置失败: {e}", file=sys.stderr)
        sys.exit(1)

    if not manager.get_all_clients():
        print("错误: 没有配置任何服务器", file=sys.stderr)
        sys.exit(1)

    # 执行检查
    try:
        if args.status:
            results = run_ssh_check(manager, args.server, "status")
            if args.format == "json":
                output = json.dumps(results, ensure_ascii=False, indent=2, default=str)
                if args.output:
                    with open(args.output, "w", encoding="utf-8") as f:
                        f.write(output)
                    print(f"结果已保存到: {args.output}")
                else:
                    print(output)
            else:
                print_ssh_status(results)

        if args.logs:
            results = run_ssh_check(manager, args.server, "logs")
            # 应用过滤
            if args.filter != "ALL" or args.search:
                if "servers" in results:
                    for server_data in results["servers"]:
                        if "logs" in server_data:
                            filtered_logs = []
                            for log in server_data["logs"]:
                                if args.filter != "ALL":
                                    if args.filter == "success" and log["type"] != "success":
                                        continue
                                    elif args.filter == "failed" and log["type"] != "failed":
                                        continue
                                if args.search:
                                    if args.search not in log.get("address", "") and args.search not in log.get("user", ""):
                                        continue
                                filtered_logs.append(log)
                            server_data["logs"] = filtered_logs
                            # 更新统计
                            server_data["summary"]["total"] = len(filtered_logs)
                            server_data["summary"]["success"] = sum(1 for l in filtered_logs if l["type"] == "success")
                            server_data["summary"]["failed"] = sum(1 for l in filtered_logs if l["type"] == "failed")
                            server_data["summary"]["unique_ips"] = len(set(l["address"] for l in filtered_logs))
                elif "logs" in results:
                    filtered_logs = []
                    for log in results["logs"]:
                        if args.filter != "ALL":
                            if args.filter == "success" and log["type"] != "success":
                                continue
                            elif args.filter == "failed" and log["type"] != "failed":
                                continue
                        if args.search:
                            if args.search not in log.get("address", "") and args.search not in log.get("user", ""):
                                continue
                        filtered_logs.append(log)
                    results["logs"] = filtered_logs
                    results["summary"]["total"] = len(filtered_logs)
                    results["summary"]["success"] = sum(1 for l in filtered_logs if l["type"] == "success")
                    results["summary"]["failed"] = sum(1 for l in filtered_logs if l["type"] == "failed")
                    results["summary"]["unique_ips"] = len(set(l["address"] for l in filtered_logs))

            if args.format == "json":
                output = json.dumps(results, ensure_ascii=False, indent=2, default=str)
                if args.output:
                    with open(args.output, "w", encoding="utf-8") as f:
                        f.write(output)
                    print(f"结果已保存到: {args.output}")
                else:
                    print(output)
            else:
                print_ssh_logs(results)

    except KeyError as e:
        print(f"错误: 未找到服务器 {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"检查失败: {e}", file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    main()

```

btpanel | SkillHub