首页
解决方案
技术服务
专业数据库维保服务 大数据维保服务
一体机
Oracle数据库一体机 PolarDB数据库一体机 瀚高数据库一体机 崖山数据库一体机 海扬数据库一体机 高斯数据库一体机 金仓数据库一体机
产品
CLup乘数云统一平台 CData高性能数据库云一体机 CPDA高性能双子星数据库机 CBackup数据库备份恢复云平台 CMiner: PostgreSQL中的CDC CSYun超融合虚拟机产品 ZQPool数据库连接池 ConshGuard数据保护产品 APCC: Greenplum管理平台
文档
文章
客户及伙伴
中启开源
关于我们
登录
×
修改密码

RisingWave 技术文档(六)

上手篇:5 分钟入门 RisingWave

“Talk is cheap, show me the code。这可能是最简单易懂的流数据库入门教程,让你在 5 分钟内体验从零构建实时数据 pipeline 的快感。”


6.1 安装与部署

RisingWave 支持多种部署方式,从单机的 Docker 体验到生产级的 Kubernetes 集群,总有一款适合你。

Docker 快速体验(5 分钟上手)

最快的方式是用 Docker 运行 RisingWave:

  1. # 启动一个单节点 RisingWave 实例
  2. docker run -it \
  3. --name risingwave \
  4. -p 4566:4566 \
  5. -p 5691:5691 \
  6. risingwavelabs/risingwave:latest \
  7. ./target/release/risingwave \
  8. --backend-state-store=mem

参数说明:

注意: 如果你已经有 Docker Compose 的 RisingWave 环境,可以跳过上面这步。

连接 RisingWave

RisingWave 兼容 PostgreSQL 协议,你可以用任何 PostgreSQL 客户端连接:

  1. # 使用 psql 连接
  2. psql -h localhost -p 4566 -U root
  1. # 使用 Python 连接
  2. import psycopg2
  3. conn = psycopg2.connect(
  4. host="localhost",
  5. port=4566,
  6. user="root",
  7. database="dev"
  8. )
  9. cursor = conn.cursor()
  10. cursor.execute("SELECT version();")
  11. print(cursor.fetchone())

预期输出:

  1. version
  2. ----------------------------------------------------------------------------------------------------------------
  3. PostgreSQL 16.2 (RisingWave 1.5.0-dev (e5e2d66fc on 2024-04-01T10:00:00Z) compiled by gcc (Apple clang 15.0.0))
  4. (1 row)

恭喜你,RisingWave 已经成功运行!

单机调试模式

如果你是开发者,想要在本地调试 RisingWave 源码:

  1. # 克隆源码
  2. git clone https://github.com/risingwavelabs/risingwave.git
  3. cd risingwave
  4. # 使用 risedev 启动本地集群
  5. ./risedev d

./risedev d 会启动一个完整的开发集群(Frontend + Compute + Meta + MinIO),适合本地调试和功能验证。

Docker Compose 集群部署

生产环境推荐使用 Docker Compose:

  1. # docker-compose.yml
  2. version: '3'
  3. services:
  4. meta:
  5. image: risingwavelabs/risingwave:latest
  6. command: meta-node
  7. ports:
  8. - "5690:5690"
  9. volumes:
  10. - meta_data:/risingwave/meta_node
  11. compute:
  12. image: risingwavelabs/risingwave:latest
  13. command: compute-node
  14. depends_on:
  15. - meta
  16. ports:
  17. - "5688:5688"
  18. environment:
  19. - META_ADDR=meta:5690
  20. frontend:
  21. image: risingwavelabs/risingwave:latest
  22. command: frontend-node
  23. depends_on:
  24. - meta
  25. ports:
  26. - "4566:4566"
  27. environment:
  28. - META_ADDR=meta:5690
  29. minio:
  30. image: minio/minio:latest
  31. command: server /data --console-address ":9001"
  32. ports:
  33. - "9000:9000"
  34. - "9001:9001"
  35. volumes:
  36. meta_data:

启动集群:

  1. docker-compose up -d

Kubernetes 部署

生产级部署推荐使用 Kubernetes,配合 Operator 实现自动化管理:

  1. # 添加 Helm 仓库
  2. helm repo add risingwave https://charts.risingwave.com
  3. helm repo update
  4. # 安装 RisingWave
  5. helm install risingwave risingwave/risingwave \
  6. --set meta.nodeSelector.type=compute \
  7. --set compute.replicas=3 \
  8. --set storage.type=aws-s3

详细的 Kubernetes 配置请参考官方 K8s 部署文档。


6.2 第一个物化视图

让我们从零开始,构建你的第一个实时物化视图。

场景:实时统计每分钟订单金额

假设有一个订单表,实时接收订单数据,我们想要实时统计每分钟的订单总额。

步骤一:创建数据源

RisingWave 支持多种数据源。我们先创建一个模拟的 Kafka Source:

  1. -- 创建一个 Kafka Source
  2. CREATE SOURCE orders (
  3. order_id BIGINT,
  4. user_id BIGINT,
  5. amount DECIMAL,
  6. created_at TIMESTAMP
  7. )
  8. WITH (
  9. connector = 'kafka',
  10. kafka.topic = 'orders',
  11. kafka.brokers = 'localhost:9092',
  12. kafka.scan.startup.mode = 'earliest'
  13. )
  14. FORMAT PLAIN ENCODE JSON;

如果你没有真实的 Kafka,也可以用 Table Source 模拟:

  1. -- 直接创建 Table,数据需要手动插入
  2. CREATE TABLE orders (
  3. order_id BIGINT PRIMARY KEY,
  4. user_id BIGINT,
  5. amount DECIMAL,
  6. created_at TIMESTAMP
  7. );

步骤二:插入测试数据

  1. -- orders 表插入测试数据
  2. INSERT INTO orders VALUES
  3. (1, 1001, 99.50, NOW()),
  4. (2, 1002, 150.00, NOW()),
  5. (3, 1001, 299.00, NOW()),
  6. (4, 1003, 75.25, NOW());

步骤三:创建物化视图

现在创建第一个物化视图,实时统计每分钟订单金额:

  1. CREATE MATERIALIZED VIEW order_stats_by_minute AS
  2. SELECT
  3. date_trunc('minute', created_at) AS minute,
  4. COUNT(*) AS order_count,
  5. SUM(amount) AS total_amount,
  6. AVG(amount) AS avg_amount
  7. FROM orders
  8. GROUP BY date_trunc('minute', created_at);

步骤四:查询物化视图

  1. -- 查询实时统计结果
  2. SELECT * FROM order_stats_by_minute;

预期输出:

  1. minute | order_count | total_amount | avg_amount
  2. ------------------------+-------------+---------------+-------------
  3. 2026-04-01 10:00:00 | 4 | 623.75 | 155.9375
  4. (1 row)

步骤五:观察增量更新

现在我们再插入一条数据,观察物化视图如何实时更新:

  1. -- 插入新订单
  2. INSERT INTO orders VALUES
  3. (5, 1004, 200.00, NOW());
  4. -- 再次查询
  5. SELECT * FROM order_stats_by_minute;

注意到 order_counttotal_amount 已经自动更新了——这就是 RisingWave 增量计算的能力。


6.3 进阶操作

时间窗口

Tumbling Window(滚动窗口)

滚动窗口是固定大小、不重叠的窗口:

  1. -- 每小时统计
  2. CREATE MATERIALIZED VIEW hourly_orders AS
  3. SELECT
  4. window_start,
  5. window_end,
  6. COUNT(*) AS order_count,
  7. SUM(amount) AS total_amount
  8. FROM TUMBLING(
  9. orders,
  10. INTERVAL '1' HOUR,
  11. created_at
  12. )
  13. GROUP BY window_start, window_end;

Sliding Window(滑动窗口)

滑动窗口可以定义滑动步长:

  1. -- 5 分钟统计一次最近 30 分钟的订单
  2. CREATE MATERIALIZED VIEW sliding_orders AS
  3. SELECT
  4. window_start,
  5. window_end,
  6. COUNT(*) AS order_count,
  7. SUM(amount) AS total_amount
  8. FROM SLIDING(
  9. orders,
  10. INTERVAL '30' MINUTE,
  11. INTERVAL '5' MINUTE,
  12. created_at
  13. )
  14. GROUP BY window_start, window_end;

多流 Join

RisingWave 支持多个流的 Join,这是流处理中最强大的能力之一:

  1. -- 场景:订单流 + 用户信息流 = 带用户详情的订单流
  2. -- 创建用户信息 Source
  3. CREATE SOURCE users (
  4. user_id BIGINT,
  5. username VARCHAR,
  6. email VARCHAR
  7. )
  8. WITH (
  9. connector = 'kafka',
  10. kafka.topic = 'users',
  11. kafka.brokers = 'localhost:9092'
  12. )
  13. FORMAT PLAIN ENCODE JSON;
  14. -- 创建带用户信息的订单 MV
  15. CREATE MATERIALIZED VIEW orders_with_users AS
  16. SELECT
  17. o.order_id,
  18. o.amount,
  19. o.created_at,
  20. u.username,
  21. u.email
  22. FROM orders o
  23. JOIN users u ON o.user_id = u.user_id;

水位线(Watermark)

处理乱序数据时,水位线是关键机制:

  1. -- 定义水位线,允许最多 5 分钟的乱序
  2. CREATE SOURCE orders_with_watermark (
  3. order_id BIGINT,
  4. user_id BIGINT,
  5. amount DECIMAL,
  6. created_at TIMESTAMP,
  7. WATERMARK FOR created_at AS created_at - INTERVAL '5' MINUTE
  8. )
  9. WITH (
  10. connector = 'kafka',
  11. kafka.topic = 'orders',
  12. kafka.brokers = 'localhost:9092'
  13. )
  14. FORMAT PLAIN ENCODE JSON;

6.4 生产环境配置

资源规划

CPU 和内存配置

  1. # risingwave.yml
  2. # Compute 节点资源配置
  3. compute:
  4. # 每个 Actor 的线程数
  5. actor_runtime_worker_threads: 4
  6. # Meta 节点资源配置
  7. meta:
  8. # Meta 服务线程数
  9. meta_worker_threads: 4
  10. # 存储配置
  11. storage:
  12. # 共享 buffer 缓存大小(建议为机器内存的 1/4)
  13. share_buffer_capacity_mb: 4096

存储容量规划

Hummock 使用对象存储(S3/MinIO),容量取决于:

估算公式:

  1. 存储容量 (基表数据量 × MV 数量 × 状态膨胀系数) + Checkpoint 缓冲区

通常状态膨胀系数为 2-5 倍。

监控与运维

关键监控指标

RisingWave 暴露了丰富的 Prometheus 指标:

  1. # prometheus.yml
  2. scrape_configs:
  3. - job_name: 'risingwave'
  4. static_configs:
  5. - targets: ['risingwave:5691'] # Meta 节点

关键指标:

指标名称 说明 告警阈值
risingwave_meta_barrier_num Barrier 生成频率 低于 0.05/s
risingwave_stream_actor_count 活跃 Actor 数量 持续为 0
risingwave_iceberg_writer_uploader_quantity 写入量异常 突增或突降
node_cpu_usage_seconds_total CPU 使用率 > 80% 持续 5 分钟

日志管理

  1. # risingwave.yml
  2. logging:
  3. # 日志级别
  4. level: info
  5. # 日志输出格式
  6. format: json
  7. # 日志保留天数
  8. retention_days: 30

高可用配置

多副本配置

  1. # risingwave.yml
  2. compute:
  3. # 每个 Fragment 的副本数
  4. replicas: 3
  5. meta:
  6. # Meta 节点高可用
  7. max_retries: 3

当副本数为 3 时,系统可以容忍 1 个节点故障而不中断服务。

故障转移

RisingWave 的故障转移是自动的:

  1. Meta 节点检测到 Compute 节点心跳超时
  2. 将该节点的任务调度到其他节点
  3. 新节点从 Checkpoint 恢复状态
  4. 物化视图「追上」最新状态

整个过程对用户透明,无需人工干预。


6.5 性能调优

查询优化

查看执行计划

  1. -- 使用 EXPLAIN 查看查询计划
  2. EXPLAIN (VERBOSE) SELECT * FROM order_stats_by_minute;

使用索引优化查询

  1. -- 为高频查询列创建索引
  2. CREATE INDEX idx_orders_user_id ON orders(user_id);
  3. CREATE INDEX idx_orders_created_at ON orders(created_at);

状态管理

控制状态大小

对于窗口较大的查询,状态可能增长很快:

  1. -- 设置窗口过期时间
  2. CREATE MATERIALIZED VIEW recent_orders AS
  3. SELECT *
  4. FROM orders
  5. WHERE created_at > NOW() - INTERVAL '24' HOUR;

Checkpoint 调优

  1. # risingwave.yml
  2. storage:
  3. # Checkpoint 间隔(默认 10 秒)
  4. checkpoint_interval_ms: 10000
  5. # Checkpoint 并行度
  6. checkpoint_concurrency: 8

建议:


6.6 常见问题与解决方案

Q1: 物化视图更新延迟高

可能原因:

解决方案:

  1. -- 查看当前延迟
  2. SELECT * FROM rw_materialized_views;
  3. -- 优化:增加 Checkpoint 并行度
  4. -- 优化:增加 Compute 节点资源

Q2: Kafka Source 消费慢

可能原因:

解决方案:

  1. -- 增加 Kafka 分区数(在 Kafka 端操作)
  2. -- 创建 Source 时指定并发度
  3. CREATE SOURCE orders (...)
  4. WITH (
  5. connector = 'kafka',
  6. kafka.topic = 'orders',
  7. kafka.brokers = 'localhost:9092',
  8. kafka.consumer_threads = 8 -- 增加并发
  9. )
  10. FORMAT PLAIN ENCODE JSON;

Q3: 多流 Join 状态爆炸

可能原因:

解决方案:


6.7 社区资源

官方文档

GitHub

社区交流

贡献指南

如果你对 RisingWave 的开发感兴趣:

  1. # 克隆源码
  2. git clone https://github.com/risingwavelabs/risingwave.git
  3. # 查看贡献指南
  4. cat CONTRIBUTING.md
  5. # 搭建开发环境
  6. ./risedev d

快速参考

常用 SQL 命令

  1. -- 查看所有表
  2. \dt
  3. -- 查看所有物化视图
  4. \dv
  5. -- 查看源
  6. SHOW SOURCES;
  7. -- 查看连接器
  8. SHOW CONNECTORS;
  9. -- 刷新物化视图(仅限静态 MV
  10. REFRESH MATERIALIZED VIEW mv_name;

6.8 小结

恭喜你完成了 RisingWave 入门之旅!让我们回顾今天学到的内容:

核心要点:

  1. 安装部署:Docker 是最快的体验方式,Docker Compose 适合开发,Kubernetes 适合生产
  2. 物化视图:一条 SQL 即可创建持续更新的实时视图
  3. 时间窗口:Tumbling Window 和 Sliding Window 满足不同分析需求
  4. 多流 Join:支持 10-20 个流的同时 Join
  5. 水位线:优雅处理乱序和延迟数据
  6. 生产配置:合理的资源规划和监控是高可用的基础

下一步建议:

  1. 用 Docker 快速体验核心功能
  2. 尝试连接真实的 Kafka 或 PostgreSQL CDC
  3. 构建一个属于你的实时分析 Pipeline
  4. 加入 Slack 社区,与开发者直接交流

恭喜你完成了 RisingWave 技术文档系列!

从流数据库的核心概念,到系统架构、核心特性、实战场景、选型对比,再到本篇的快速入门,你应该已经对 RisingWave 有了全面且深入的理解。

如果你还没有尝试过,现在就是最好的时机:

  1. docker run -it --name risingwave -p 4566:4566 risingwavelabs/risingwave:latest ./target/release/risingwave --backend-state-store=mem

然后用 psql -h localhost -p 4566 -U root 连接,开始你的实时计算之旅吧!


想了解更多?