“理论的价值在于指导实践。这一篇我们将展示 RisingWave 如何在实际业务场景中大显身手。”
电商平台需要在双十一大促期间,实时展示各品类销售额、订单量、用户行为等核心指标。
使用 RisingWave 构建实时数据管道:
-- 创建订单源表CREATE TABLE orders (order_id BIGINT PRIMARY KEY,user_id BIGINT,category VARCHAR,amount DECIMAL,created_at TIMESTAMP) WITH (connector = 'kafka',kafka.topic = 'orders',kafka.brokers = 'localhost:9092');-- 创建实时物化视图:每小时销售额CREATE MATERIALIZED VIEW hourly_sales ASSELECTdate_trunc('hour', created_at) AS hour,category,SUM(amount) AS total_amount,COUNT(*) AS order_countFROM ordersGROUP BY date_trunc('hour', created_at), category;-- 创建级联视图:实时 TOP 10 品类CREATE MATERIALIZED VIEW top_categories ASSELECT * FROM (SELECTcategory,SUM(amount) AS total,ROW_NUMBER() OVER (ORDER BY SUM(amount) DESC) AS rankFROM ordersWHERE created_at > NOW() - INTERVAL '24' HOURGROUP BY category) WHERE rank <= 10;
金融交易系统需要实时监控异常交易,当某项指标超过阈值时立即报警。
-- 交易监控表CREATE TABLE transactions (tx_id BIGINT,user_id BIGINT,amount DECIMAL,risk_score DECIMAL,created_at TIMESTAMP);-- 高风险交易告警物化视图CREATE MATERIALIZED VIEW high_risk_alerts ASSELECTuser_id,COUNT(*) AS alert_count,MAX(risk_score) AS max_risk,SUM(amount) AS total_amountFROM transactionsWHERE risk_score > 0.8AND created_at > NOW() - INTERVAL '5' MINUTEGROUP BY user_idHAVING COUNT(*) > 5 OR SUM(amount) > 100000;-- 窗口聚合:5分钟滚动统计CREATE MATERIALIZED VIEW tx_stats_5min ASSELECTwindow_start,window_end,COUNT(*) AS tx_count,AVG(amount) AS avg_amount,MAX(amount) AS max_amount,PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY amount) AS p99_amountFROM TUMBLING(transactions,INTERVAL '5' MINUTE,created_at)GROUP BY window_start, window_end;
从多个异构数据源(MySQL、PostgreSQL、外部 API)实时同步数据到数据仓库。
-- MySQL 订单数据 CDCCREATE SOURCE mysql_orders (...) WITH (connector = 'mysql_cdc', ...);-- PostgreSQL 产品目录CREATE SOURCE pg_products (...) WITH (connector = 'postgres_cdc', ...);-- 外部 API 用户数据CREATE SOURCE api_users (...) WITH (connector = 'kafka', ...);-- 合并订单视图CREATE MATERIALIZED VIEW enriched_orders ASSELECTo.order_id,o.user_id,u.username,u.email,p.product_name,p.category,o.amount,o.created_atFROM mysql_orders oJOIN api_users u ON o.user_id = u.user_idJOIN pg_products p ON o.product_id = p.product_id;-- Sink 到 ClickHouseCREATE SINK enriched_orders_sink ASSELECT * FROM enriched_ordersWITH (connector = 'clickhouse',clickhouse.url = 'clickhouse://localhost:9000',clickhouse.database = 'analytics');
实时计算用户特征,用于推荐系统和风控模型。
-- 用户行为事件流CREATE SOURCE user_events (user_id BIGINT,event_type VARCHAR,product_id BIGINT,amount DECIMAL,created_at TIMESTAMP);-- 实时用户特征CREATE MATERIALIZED VIEW user_features ASSELECTuser_id,-- 统计特征COUNT(*) AS event_count,SUM(amount) AS total_amount,AVG(amount) AS avg_amount,-- 行为特征COUNT(DISTINCT event_type) AS behavior_diversity,COUNT(DISTINCT product_id) AS product_diversity,-- 时间特征MAX(created_at) AS last_active,-- 窗口特征(最近1小时)SUM(CASE WHEN created_at > NOW() - INTERVAL '1' HOUR THEN 1 ELSE 0 END) AS hourly_eventsFROM user_eventsGROUP BY user_id;
结合 UDF Server 实现复杂特征计算:
-- 注册 Python UDF 进行特征标准化CREATE FUNCTION normalize_features(features JSONB)RETURNS JSONBLANGUAGE python AS $$import jsonfrom sklearn.preprocessing import StandardScalerdef normalize_features(features):data = json.loads(features)values = [[d['event_count'], d['total_amount'], d['avg_amount']]]scaler = StandardScaler()normalized = scaler.fit_transform(values)[0]return json.dumps({'event_count_norm': normalized[0],'total_amount_norm': normalized[1],'avg_amount_norm': normalized[2]})$$;
业务背景:
双十一期间需要实时展示全站销售额、订单量、UV 等核心指标。
传统架构:
Kafka → Flink → Redis → 大屏(延迟 30 秒+)
改造后架构:
MySQL CDC → RisingWave → 大屏(延迟 < 1 秒)
效果对比:
业务背景:
实时检测异常交易,延迟要求毫秒级。
技术架构:
PostgreSQL → RisingWave → 规则引擎 → 告警
关键 SQL:
CREATE MATERIALIZED VIEW fraud_detection ASSELECTuser_id,card_id,SUM(amount) AS total,COUNT(*) AS tx_count,MAX(risk_score) AS max_riskFROM transactionsWHERE created_at > NOW() - INTERVAL '10' MINUTEGROUP BY user_id, card_idHAVING SUM(amount) > 50000 OR COUNT(*) > 20 OR MAX(risk_score) > 0.9;
| 组件 | 集成方式 | 用途 |
|---|---|---|
| Kafka | Source/Sink | 消息队列 |
| PostgreSQL | CDC Source | 数据同步 |
| MySQL | CDC Source | 数据同步 |
| ClickHouse | Sink | 分析引擎 |
| S3 | Sink | 数据湖 |
| Grafana | 监控集成 | 可视化 |
RisingWave 的典型应用场景:
下一篇预告
面对众多技术方案,如何做出正确选择?下一篇我们将进行横向对比:
敬请期待:《对比篇:选型指南与方案对比》
想了解更多?