首页
解决方案
技术服务
专业数据库维保服务 大数据维保服务
一体机
Oracle数据库一体机 PolarDB数据库一体机 瀚高数据库一体机 崖山数据库一体机 海扬数据库一体机 高斯数据库一体机 金仓数据库一体机
产品
CLup乘数云统一平台 CData高性能数据库云一体机 CPDA高性能双子星数据库机 CBackup数据库备份恢复云平台 CMiner: PostgreSQL中的CDC CSYun超融合虚拟机产品 ZQPool数据库连接池 ConshGuard数据保护产品 APCC: Greenplum管理平台
文档
文章
客户及伙伴
中启开源
关于我们
登录
×
修改密码

RisingWave 技术文档(四)

应用篇:实战场景与案例分析

“理论的价值在于指导实践。这一篇我们将展示 RisingWave 如何在实际业务场景中大显身手。”


4.1 实时分析报表

场景描述

电商平台需要在双十一大促期间,实时展示各品类销售额、订单量、用户行为等核心指标。

技术方案

使用 RisingWave 构建实时数据管道:

  1. -- 创建订单源表
  2. CREATE TABLE orders (
  3. order_id BIGINT PRIMARY KEY,
  4. user_id BIGINT,
  5. category VARCHAR,
  6. amount DECIMAL,
  7. created_at TIMESTAMP
  8. ) WITH (
  9. connector = 'kafka',
  10. kafka.topic = 'orders',
  11. kafka.brokers = 'localhost:9092'
  12. );
  13. -- 创建实时物化视图:每小时销售额
  14. CREATE MATERIALIZED VIEW hourly_sales AS
  15. SELECT
  16. date_trunc('hour', created_at) AS hour,
  17. category,
  18. SUM(amount) AS total_amount,
  19. COUNT(*) AS order_count
  20. FROM orders
  21. GROUP BY date_trunc('hour', created_at), category;
  22. -- 创建级联视图:实时 TOP 10 品类
  23. CREATE MATERIALIZED VIEW top_categories AS
  24. SELECT * FROM (
  25. SELECT
  26. category,
  27. SUM(amount) AS total,
  28. ROW_NUMBER() OVER (ORDER BY SUM(amount) DESC) AS rank
  29. FROM orders
  30. WHERE created_at > NOW() - INTERVAL '24' HOUR
  31. GROUP BY category
  32. ) WHERE rank <= 10;

4.2 监控报警系统

场景描述

金融交易系统需要实时监控异常交易,当某项指标超过阈值时立即报警。

技术方案

  1. -- 交易监控表
  2. CREATE TABLE transactions (
  3. tx_id BIGINT,
  4. user_id BIGINT,
  5. amount DECIMAL,
  6. risk_score DECIMAL,
  7. created_at TIMESTAMP
  8. );
  9. -- 高风险交易告警物化视图
  10. CREATE MATERIALIZED VIEW high_risk_alerts AS
  11. SELECT
  12. user_id,
  13. COUNT(*) AS alert_count,
  14. MAX(risk_score) AS max_risk,
  15. SUM(amount) AS total_amount
  16. FROM transactions
  17. WHERE risk_score > 0.8
  18. AND created_at > NOW() - INTERVAL '5' MINUTE
  19. GROUP BY user_id
  20. HAVING COUNT(*) > 5 OR SUM(amount) > 100000;
  21. -- 窗口聚合:5分钟滚动统计
  22. CREATE MATERIALIZED VIEW tx_stats_5min AS
  23. SELECT
  24. window_start,
  25. window_end,
  26. COUNT(*) AS tx_count,
  27. AVG(amount) AS avg_amount,
  28. MAX(amount) AS max_amount,
  29. PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY amount) AS p99_amount
  30. FROM TUMBLING(
  31. transactions,
  32. INTERVAL '5' MINUTE,
  33. created_at
  34. )
  35. GROUP BY window_start, window_end;

4.3 流式 ETL

场景描述

从多个异构数据源(MySQL、PostgreSQL、外部 API)实时同步数据到数据仓库。

技术方案

  1. -- MySQL 订单数据 CDC
  2. CREATE SOURCE mysql_orders (...) WITH (connector = 'mysql_cdc', ...);
  3. -- PostgreSQL 产品目录
  4. CREATE SOURCE pg_products (...) WITH (connector = 'postgres_cdc', ...);
  5. -- 外部 API 用户数据
  6. CREATE SOURCE api_users (...) WITH (connector = 'kafka', ...);
  7. -- 合并订单视图
  8. CREATE MATERIALIZED VIEW enriched_orders AS
  9. SELECT
  10. o.order_id,
  11. o.user_id,
  12. u.username,
  13. u.email,
  14. p.product_name,
  15. p.category,
  16. o.amount,
  17. o.created_at
  18. FROM mysql_orders o
  19. JOIN api_users u ON o.user_id = u.user_id
  20. JOIN pg_products p ON o.product_id = p.product_id;
  21. -- Sink ClickHouse
  22. CREATE SINK enriched_orders_sink AS
  23. SELECT * FROM enriched_orders
  24. WITH (
  25. connector = 'clickhouse',
  26. clickhouse.url = 'clickhouse://localhost:9000',
  27. clickhouse.database = 'analytics'
  28. );

4.4 机器学习特征工程

场景描述

实时计算用户特征,用于推荐系统和风控模型。

技术方案

  1. -- 用户行为事件流
  2. CREATE SOURCE user_events (
  3. user_id BIGINT,
  4. event_type VARCHAR,
  5. product_id BIGINT,
  6. amount DECIMAL,
  7. created_at TIMESTAMP
  8. );
  9. -- 实时用户特征
  10. CREATE MATERIALIZED VIEW user_features AS
  11. SELECT
  12. user_id,
  13. -- 统计特征
  14. COUNT(*) AS event_count,
  15. SUM(amount) AS total_amount,
  16. AVG(amount) AS avg_amount,
  17. -- 行为特征
  18. COUNT(DISTINCT event_type) AS behavior_diversity,
  19. COUNT(DISTINCT product_id) AS product_diversity,
  20. -- 时间特征
  21. MAX(created_at) AS last_active,
  22. -- 窗口特征(最近1小时)
  23. SUM(CASE WHEN created_at > NOW() - INTERVAL '1' HOUR THEN 1 ELSE 0 END) AS hourly_events
  24. FROM user_events
  25. GROUP BY user_id;

结合 UDF Server 实现复杂特征计算:

  1. -- 注册 Python UDF 进行特征标准化
  2. CREATE FUNCTION normalize_features(features JSONB)
  3. RETURNS JSONB
  4. LANGUAGE python AS $$
  5. import json
  6. from sklearn.preprocessing import StandardScaler
  7. def normalize_features(features):
  8. data = json.loads(features)
  9. values = [[d['event_count'], d['total_amount'], d['avg_amount']]]
  10. scaler = StandardScaler()
  11. normalized = scaler.fit_transform(values)[0]
  12. return json.dumps({
  13. 'event_count_norm': normalized[0],
  14. 'total_amount_norm': normalized[1],
  15. 'avg_amount_norm': normalized[2]
  16. })
  17. $$;

4.5 案例分析

案例一:某电商平台实时大屏

业务背景:
双十一期间需要实时展示全站销售额、订单量、UV 等核心指标。

传统架构:
Kafka → Flink → Redis → 大屏(延迟 30 秒+)

改造后架构:
MySQL CDC → RisingWave → 大屏(延迟 < 1 秒)

效果对比:

案例二:某金融公司风控系统

业务背景:
实时检测异常交易,延迟要求毫秒级。

技术架构:
PostgreSQL → RisingWave → 规则引擎 → 告警

关键 SQL:

  1. CREATE MATERIALIZED VIEW fraud_detection AS
  2. SELECT
  3. user_id,
  4. card_id,
  5. SUM(amount) AS total,
  6. COUNT(*) AS tx_count,
  7. MAX(risk_score) AS max_risk
  8. FROM transactions
  9. WHERE created_at > NOW() - INTERVAL '10' MINUTE
  10. GROUP BY user_id, card_id
  11. HAVING SUM(amount) > 50000 OR COUNT(*) > 20 OR MAX(risk_score) > 0.9;

4.6 生态集成

组件 集成方式 用途
Kafka Source/Sink 消息队列
PostgreSQL CDC Source 数据同步
MySQL CDC Source 数据同步
ClickHouse Sink 分析引擎
S3 Sink 数据湖
Grafana 监控集成 可视化

4.7 小结

RisingWave 的典型应用场景:

  1. 实时分析报表:电商大屏、运营监控
  2. 监控报警:金融风控、异常检测
  3. 流式 ETL:数据同步、格式转换
  4. 特征工程:推荐系统、风控模型
  5. 实时 BI:用户行为分析

下一篇预告

面对众多技术方案,如何做出正确选择?下一篇我们将进行横向对比:

敬请期待:《对比篇:选型指南与方案对比》


想了解更多?