“Talk is cheap, show me the code。这可能是最简单易懂的流数据库入门教程,让你在 5 分钟内体验从零构建实时数据 pipeline 的快感。”
RisingWave 支持多种部署方式,从单机的 Docker 体验到生产级的 Kubernetes 集群,总有一款适合你。
最快的方式是用 Docker 运行 RisingWave:
# 启动一个单节点 RisingWave 实例docker run -it \--name risingwave \-p 4566:4566 \-p 5691:5691 \risingwavelabs/risingwave:latest \./target/release/risingwave \--backend-state-store=mem
参数说明:
4566:PostgreSQL 兼容协议端口(连接客户端用)5691:Meta 服务端口--backend-state-store=mem:使用内存存储(仅适合体验,不适合生产)注意: 如果你已经有 Docker Compose 的 RisingWave 环境,可以跳过上面这步。
RisingWave 兼容 PostgreSQL 协议,你可以用任何 PostgreSQL 客户端连接:
# 使用 psql 连接psql -h localhost -p 4566 -U root
# 使用 Python 连接import psycopg2conn = psycopg2.connect(host="localhost",port=4566,user="root",database="dev")cursor = conn.cursor()cursor.execute("SELECT version();")print(cursor.fetchone())
预期输出:
version----------------------------------------------------------------------------------------------------------------PostgreSQL 16.2 (RisingWave 1.5.0-dev (e5e2d66fc on 2024-04-01T10:00:00Z) compiled by gcc (Apple clang 15.0.0))(1 row)
恭喜你,RisingWave 已经成功运行!
如果你是开发者,想要在本地调试 RisingWave 源码:
# 克隆源码git clone https://github.com/risingwavelabs/risingwave.gitcd risingwave# 使用 risedev 启动本地集群./risedev d
./risedev d 会启动一个完整的开发集群(Frontend + Compute + Meta + MinIO),适合本地调试和功能验证。
生产环境推荐使用 Docker Compose:
# docker-compose.ymlversion: '3'services:meta:image: risingwavelabs/risingwave:latestcommand: meta-nodeports:- "5690:5690"volumes:- meta_data:/risingwave/meta_nodecompute:image: risingwavelabs/risingwave:latestcommand: compute-nodedepends_on:- metaports:- "5688:5688"environment:- META_ADDR=meta:5690frontend:image: risingwavelabs/risingwave:latestcommand: frontend-nodedepends_on:- metaports:- "4566:4566"environment:- META_ADDR=meta:5690minio:image: minio/minio:latestcommand: server /data --console-address ":9001"ports:- "9000:9000"- "9001:9001"volumes:meta_data:
启动集群:
docker-compose up -d
生产级部署推荐使用 Kubernetes,配合 Operator 实现自动化管理:
# 添加 Helm 仓库helm repo add risingwave https://charts.risingwave.comhelm repo update# 安装 RisingWavehelm install risingwave risingwave/risingwave \--set meta.nodeSelector.type=compute \--set compute.replicas=3 \--set storage.type=aws-s3
详细的 Kubernetes 配置请参考官方 K8s 部署文档。
让我们从零开始,构建你的第一个实时物化视图。
假设有一个订单表,实时接收订单数据,我们想要实时统计每分钟的订单总额。
RisingWave 支持多种数据源。我们先创建一个模拟的 Kafka Source:
-- 创建一个 Kafka SourceCREATE SOURCE orders (order_id BIGINT,user_id BIGINT,amount DECIMAL,created_at TIMESTAMP)WITH (connector = 'kafka',kafka.topic = 'orders',kafka.brokers = 'localhost:9092',kafka.scan.startup.mode = 'earliest')FORMAT PLAIN ENCODE JSON;
如果你没有真实的 Kafka,也可以用 Table Source 模拟:
-- 直接创建 Table,数据需要手动插入CREATE TABLE orders (order_id BIGINT PRIMARY KEY,user_id BIGINT,amount DECIMAL,created_at TIMESTAMP);
-- 向 orders 表插入测试数据INSERT INTO orders VALUES(1, 1001, 99.50, NOW()),(2, 1002, 150.00, NOW()),(3, 1001, 299.00, NOW()),(4, 1003, 75.25, NOW());
现在创建第一个物化视图,实时统计每分钟订单金额:
CREATE MATERIALIZED VIEW order_stats_by_minute ASSELECTdate_trunc('minute', created_at) AS minute,COUNT(*) AS order_count,SUM(amount) AS total_amount,AVG(amount) AS avg_amountFROM ordersGROUP BY date_trunc('minute', created_at);
-- 查询实时统计结果SELECT * FROM order_stats_by_minute;
预期输出:
minute | order_count | total_amount | avg_amount------------------------+-------------+---------------+-------------2026-04-01 10:00:00 | 4 | 623.75 | 155.9375(1 row)
现在我们再插入一条数据,观察物化视图如何实时更新:
-- 插入新订单INSERT INTO orders VALUES(5, 1004, 200.00, NOW());-- 再次查询SELECT * FROM order_stats_by_minute;
注意到 order_count 和 total_amount 已经自动更新了——这就是 RisingWave 增量计算的能力。
滚动窗口是固定大小、不重叠的窗口:
-- 每小时统计CREATE MATERIALIZED VIEW hourly_orders ASSELECTwindow_start,window_end,COUNT(*) AS order_count,SUM(amount) AS total_amountFROM TUMBLING(orders,INTERVAL '1' HOUR,created_at)GROUP BY window_start, window_end;
滑动窗口可以定义滑动步长:
-- 每 5 分钟统计一次最近 30 分钟的订单CREATE MATERIALIZED VIEW sliding_orders ASSELECTwindow_start,window_end,COUNT(*) AS order_count,SUM(amount) AS total_amountFROM SLIDING(orders,INTERVAL '30' MINUTE,INTERVAL '5' MINUTE,created_at)GROUP BY window_start, window_end;
RisingWave 支持多个流的 Join,这是流处理中最强大的能力之一:
-- 场景:订单流 + 用户信息流 = 带用户详情的订单流-- 创建用户信息 SourceCREATE SOURCE users (user_id BIGINT,username VARCHAR,email VARCHAR)WITH (connector = 'kafka',kafka.topic = 'users',kafka.brokers = 'localhost:9092')FORMAT PLAIN ENCODE JSON;-- 创建带用户信息的订单 MVCREATE MATERIALIZED VIEW orders_with_users ASSELECTo.order_id,o.amount,o.created_at,u.username,u.emailFROM orders oJOIN users u ON o.user_id = u.user_id;
处理乱序数据时,水位线是关键机制:
-- 定义水位线,允许最多 5 分钟的乱序CREATE SOURCE orders_with_watermark (order_id BIGINT,user_id BIGINT,amount DECIMAL,created_at TIMESTAMP,WATERMARK FOR created_at AS created_at - INTERVAL '5' MINUTE)WITH (connector = 'kafka',kafka.topic = 'orders',kafka.brokers = 'localhost:9092')FORMAT PLAIN ENCODE JSON;
# risingwave.yml# Compute 节点资源配置compute:# 每个 Actor 的线程数actor_runtime_worker_threads: 4# Meta 节点资源配置meta:# Meta 服务线程数meta_worker_threads: 4# 存储配置storage:# 共享 buffer 缓存大小(建议为机器内存的 1/4)share_buffer_capacity_mb: 4096
Hummock 使用对象存储(S3/MinIO),容量取决于:
估算公式:
存储容量 ≈ (基表数据量 × MV 数量 × 状态膨胀系数) + Checkpoint 缓冲区
通常状态膨胀系数为 2-5 倍。
RisingWave 暴露了丰富的 Prometheus 指标:
# prometheus.ymlscrape_configs:- job_name: 'risingwave'static_configs:- targets: ['risingwave:5691'] # Meta 节点
关键指标:
| 指标名称 | 说明 | 告警阈值 |
|---|---|---|
risingwave_meta_barrier_num |
Barrier 生成频率 | 低于 0.05/s |
risingwave_stream_actor_count |
活跃 Actor 数量 | 持续为 0 |
risingwave_iceberg_writer_uploader_quantity |
写入量异常 | 突增或突降 |
node_cpu_usage_seconds_total |
CPU 使用率 | > 80% 持续 5 分钟 |
# risingwave.ymllogging:# 日志级别level: info# 日志输出格式format: json# 日志保留天数retention_days: 30
# risingwave.ymlcompute:# 每个 Fragment 的副本数replicas: 3meta:# Meta 节点高可用max_retries: 3
当副本数为 3 时,系统可以容忍 1 个节点故障而不中断服务。
RisingWave 的故障转移是自动的:
整个过程对用户透明,无需人工干预。
-- 使用 EXPLAIN 查看查询计划EXPLAIN (VERBOSE) SELECT * FROM order_stats_by_minute;
-- 为高频查询列创建索引CREATE INDEX idx_orders_user_id ON orders(user_id);CREATE INDEX idx_orders_created_at ON orders(created_at);
对于窗口较大的查询,状态可能增长很快:
-- 设置窗口过期时间CREATE MATERIALIZED VIEW recent_orders ASSELECT *FROM ordersWHERE created_at > NOW() - INTERVAL '24' HOUR;
# risingwave.ymlstorage:# Checkpoint 间隔(默认 10 秒)checkpoint_interval_ms: 10000# Checkpoint 并行度checkpoint_concurrency: 8
建议:
可能原因:
解决方案:
-- 查看当前延迟SELECT * FROM rw_materialized_views;-- 优化:增加 Checkpoint 并行度-- 优化:增加 Compute 节点资源
可能原因:
解决方案:
-- 增加 Kafka 分区数(在 Kafka 端操作)-- 创建 Source 时指定并发度CREATE SOURCE orders (...)WITH (connector = 'kafka',kafka.topic = 'orders',kafka.brokers = 'localhost:9092',kafka.consumer_threads = 8 -- 增加并发)FORMAT PLAIN ENCODE JSON;
可能原因:
解决方案:
如果你对 RisingWave 的开发感兴趣:
# 克隆源码git clone https://github.com/risingwavelabs/risingwave.git# 查看贡献指南cat CONTRIBUTING.md# 搭建开发环境./risedev d
-- 查看所有表\dt-- 查看所有物化视图\dv-- 查看源SHOW SOURCES;-- 查看连接器SHOW CONNECTORS;-- 刷新物化视图(仅限静态 MV)REFRESH MATERIALIZED VIEW mv_name;
恭喜你完成了 RisingWave 入门之旅!让我们回顾今天学到的内容:
核心要点:
下一步建议:
恭喜你完成了 RisingWave 技术文档系列!
从流数据库的核心概念,到系统架构、核心特性、实战场景、选型对比,再到本篇的快速入门,你应该已经对 RisingWave 有了全面且深入的理解。
如果你还没有尝试过,现在就是最好的时机:
docker run -it --name risingwave -p 4566:4566 risingwavelabs/risingwave:latest ./target/release/risingwave --backend-state-store=mem
然后用 psql -h localhost -p 4566 -U root 连接,开始你的实时计算之旅吧!
想了解更多?