Back to skills
SkillHub ClubShip Full StackFull Stack

clickhouse-io

ClickHouse 数据库模式、查询优化、分析以及针对高性能分析工作负载的数据工程最佳实践。

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
167
Hot score
96
Updated
March 19, 2026
Overall rating
C3.0
Composite score
3.0
Best-practice grade
B75.1

Install command

npx @skill-hub/cli install xu-xiang-everything-claude-code-zh-clickhouse-io

Repository

xu-xiang/everything-claude-code-zh

Skill path: skills/clickhouse-io

ClickHouse 数据库模式、查询优化、分析以及针对高性能分析工作负载的数据工程最佳实践。

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: xu-xiang.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install clickhouse-io into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/xu-xiang/everything-claude-code-zh before adding clickhouse-io to shared team environments
  • Use clickhouse-io for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: clickhouse-io
description: ClickHouse 数据库模式、查询优化、分析以及针对高性能分析工作负载的数据工程最佳实践。
---

# ClickHouse 分析模式

针对高性能分析和数据工程的 ClickHouse 特定模式。

## 概览

ClickHouse 是一款用于联机分析处理(OLAP)的列式数据库管理系统(DBMS)。它针对大规模数据集上的快速分析查询进行了优化。

**核心特性:**
- 列式存储
- 数据压缩
- 并行查询执行
- 分布式查询
- 实时分析

## 表设计模式

### MergeTree 引擎(最常用)

```sql
CREATE TABLE markets_analytics (
    date Date,
    market_id String,
    market_name String,
    volume UInt64,
    trades UInt32,
    unique_traders UInt32,
    avg_trade_size Float64,
    created_at DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, market_id)
SETTINGS index_granularity = 8192;
```

### ReplacingMergeTree(去重)

```sql
-- 针对可能存在重复的数据(例如来自多个源)
CREATE TABLE user_events (
    event_id String,
    user_id String,
    event_type String,
    timestamp DateTime,
    properties String
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, event_id, timestamp)
PRIMARY KEY (user_id, event_id);
```

### AggregatingMergeTree(预聚合)

```sql
-- 用于维护聚合指标
CREATE TABLE market_stats_hourly (
    hour DateTime,
    market_id String,
    total_volume AggregateFunction(sum, UInt64),
    total_trades AggregateFunction(count, UInt32),
    unique_users AggregateFunction(uniq, String)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, market_id);

-- 查询聚合数据
SELECT
    hour,
    market_id,
    sumMerge(total_volume) AS volume,
    countMerge(total_trades) AS trades,
    uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR)
GROUP BY hour, market_id
ORDER BY hour DESC;
```

## 查询优化模式

### 高效过滤

```sql
-- ✅ 推荐:优先使用索引列
SELECT *
FROM markets_analytics
WHERE date >= '2025-01-01'
  AND market_id = 'market-123'
  AND volume > 1000
ORDER BY date DESC
LIMIT 100;

-- ❌ 不推荐:优先过滤非索引列
SELECT *
FROM markets_analytics
WHERE volume > 1000
  AND market_name LIKE '%election%'
  AND date >= '2025-01-01';
```

### 聚合

```sql
-- ✅ 推荐:使用 ClickHouse 特有的聚合函数
SELECT
    toStartOfDay(created_at) AS day,
    market_id,
    sum(volume) AS total_volume,
    count() AS total_trades,
    uniq(trader_id) AS unique_traders,
    avg(trade_size) AS avg_size
FROM trades
WHERE created_at >= today() - INTERVAL 7 DAY
GROUP BY day, market_id
ORDER BY day DESC, total_volume DESC;

-- ✅ 使用 quantile 计算分位数(比 percentile 更高效)
SELECT
    quantile(0.50)(trade_size) AS median,
    quantile(0.95)(trade_size) AS p95,
    quantile(0.99)(trade_size) AS p99
FROM trades
WHERE created_at >= now() - INTERVAL 1 HOUR;
```

### 窗口函数

```sql
-- 计算累计总量
SELECT
    date,
    market_id,
    volume,
    sum(volume) OVER (
        PARTITION BY market_id
        ORDER BY date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS cumulative_volume
FROM markets_analytics
WHERE date >= today() - INTERVAL 30 DAY
ORDER BY market_id, date;
```

## 数据插入模式

### 批量插入(推荐)

```typescript
import { ClickHouse } from 'clickhouse'

const clickhouse = new ClickHouse({
  url: process.env.CLICKHOUSE_URL,
  port: 8123,
  basicAuth: {
    username: process.env.CLICKHOUSE_USER,
    password: process.env.CLICKHOUSE_PASSWORD
  }
})

// ✅ 批量插入(高效)
async function bulkInsertTrades(trades: Trade[]) {
  const values = trades.map(trade => `(
    '${trade.id}',
    '${trade.market_id}',
    '${trade.user_id}',
    ${trade.amount},
    '${trade.timestamp.toISOString()}'
  )`).join(',')

  await clickhouse.query(`
    INSERT INTO trades (id, market_id, user_id, amount, timestamp)
    VALUES ${values}
  `).toPromise()
}

// ❌ 逐条插入(缓慢)
async function insertTrade(trade: Trade) {
  // 不要循环执行此操作!
  await clickhouse.query(`
    INSERT INTO trades VALUES ('${trade.id}', ...)
  `).toPromise()
}
```

### 流式插入

```typescript
// 用于持续的数据摄取
import { createWriteStream } from 'fs'
import { pipeline } from 'stream/promises'

async function streamInserts() {
  const stream = clickhouse.insert('trades').stream()

  for await (const batch of dataSource) {
    stream.write(batch)
  }

  await stream.end()
}
```

## 物化视图(Materialized Views)

### 实时聚合

```sql
-- 为每小时统计创建物化视图
CREATE MATERIALIZED VIEW market_stats_hourly_mv
TO market_stats_hourly
AS SELECT
    toStartOfHour(timestamp) AS hour,
    market_id,
    sumState(amount) AS total_volume,
    countState() AS total_trades,
    uniqState(user_id) AS unique_users
FROM trades
GROUP BY hour, market_id;

-- 查询物化视图
SELECT
    hour,
    market_id,
    sumMerge(total_volume) AS volume,
    countMerge(total_trades) AS trades,
    uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, market_id;
```

## 性能监控

### 查询性能

```sql
-- 检查慢查询
SELECT
    query_id,
    user,
    query,
    query_duration_ms,
    read_rows,
    read_bytes,
    memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
  AND query_duration_ms > 1000
  AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY query_duration_ms DESC
LIMIT 10;
```

### 表统计信息

```sql
-- 检查表大小
SELECT
    database,
    table,
    formatReadableSize(sum(bytes)) AS size,
    sum(rows) AS rows,
    max(modification_time) AS latest_modification
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;
```

## 常用分析查询

### 时间序列分析

```sql
-- 日活跃用户数
SELECT
    toDate(timestamp) AS date,
    uniq(user_id) AS daily_active_users
FROM events
WHERE timestamp >= today() - INTERVAL 30 DAY
GROUP BY date
ORDER BY date;

-- 留存分析
SELECT
    signup_date,
    countIf(days_since_signup = 0) AS day_0,
    countIf(days_since_signup = 1) AS day_1,
    countIf(days_since_signup = 7) AS day_7,
    countIf(days_since_signup = 30) AS day_30
FROM (
    SELECT
        user_id,
        min(toDate(timestamp)) AS signup_date,
        toDate(timestamp) AS activity_date,
        dateDiff('day', signup_date, activity_date) AS days_since_signup
    FROM events
    GROUP BY user_id, activity_date
)
GROUP BY signup_date
ORDER BY signup_date DESC;
```

### 漏斗分析

```sql
-- 转化漏斗
SELECT
    countIf(step = 'viewed_market') AS viewed,
    countIf(step = 'clicked_trade') AS clicked,
    countIf(step = 'completed_trade') AS completed,
    round(clicked / viewed * 100, 2) AS view_to_click_rate,
    round(completed / clicked * 100, 2) AS click_to_completion_rate
FROM (
    SELECT
        user_id,
        session_id,
        event_type AS step
    FROM events
    WHERE event_date = today()
)
GROUP BY session_id;
```

### 队列分析(Cohort Analysis)

```sql
-- 按注册月份划分的用户队列
SELECT
    toStartOfMonth(signup_date) AS cohort,
    toStartOfMonth(activity_date) AS month,
    dateDiff('month', cohort, month) AS months_since_signup,
    count(DISTINCT user_id) AS active_users
FROM (
    SELECT
        user_id,
        min(toDate(timestamp)) OVER (PARTITION BY user_id) AS signup_date,
        toDate(timestamp) AS activity_date
    FROM events
)
GROUP BY cohort, month, months_since_signup
ORDER BY cohort, months_since_signup;
```

## 数据流水线(Data Pipeline)模式

### ETL 模式

```typescript
// 抽取(Extract)、转换(Transform)、加载(Load)
async function etlPipeline() {
  // 1. 从源端抽取
  const rawData = await extractFromPostgres()

  // 2. 转换
  const transformed = rawData.map(row => ({
    date: new Date(row.created_at).toISOString().split('T')[0],
    market_id: row.market_slug,
    volume: parseFloat(row.total_volume),
    trades: parseInt(row.trade_count)
  }))

  // 3. 加载到 ClickHouse
  await bulkInsertToClickHouse(transformed)
}

// 定期运行
setInterval(etlPipeline, 60 * 60 * 1000)  // 每小时
```

### 变更数据捕获(CDC)

```typescript
// 监听 PostgreSQL 变更并同步到 ClickHouse
import { Client } from 'pg'

const pgClient = new Client({ connectionString: process.env.DATABASE_URL })

pgClient.query('LISTEN market_updates')

pgClient.on('notification', async (msg) => {
  const update = JSON.parse(msg.payload)

  await clickhouse.insert('market_updates', [
    {
      market_id: update.id,
      event_type: update.operation,  // INSERT, UPDATE, DELETE
      timestamp: new Date(),
      data: JSON.stringify(update.new_data)
    }
  ])
})
```

## 最佳实践

### 1. 分区策略
- 按时间分区(通常是按月或按天)
- 避免分区过多(会影响性能)
- 分区键使用 DATE 类型

### 2. 排序键(Ordering Key)
- 将最常过滤的列放在前面
- 考虑基数(高基数列放在前面)
- 排序会影响压缩效果

### 3. 数据类型
- 使用最合适的最小类型(如 UInt32 而非 UInt64)
- 对重复字符串使用 LowCardinality
- 对类别数据使用 Enum

### 4. 避免事项
- SELECT *(应指定具体列)
- FINAL(应改为在查询前合并数据)
- 过多的 JOIN 操作(针对分析场景应进行反规范化)
- 小额频繁插入(应改为批量插入)

### 5. 监控
- 追踪查询性能
- 监控磁盘使用情况
- 检查合并(merge)操作
- 审查慢查询日志

**记住**:ClickHouse 擅长处理分析型工作负载。请根据查询模式设计表结构,采用批量插入,并利用物化视图进行实时聚合。
clickhouse-io | SkillHub