登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flink1.12.x Table API 和 SQL:更新模式:追加Append、撤回Retract、更新插入Upsert

上接:flink1.12.x Table API 和 SQL:Table 转换成 DataStream

说明

对于流式查询,需要声明如何在表和外部连接器之间执行转换

与外部系统交换的消息类型,由更新模式(Update Mode)指定

有以下3种模式

追加(Append)模式

表只做追加操作,比如:普通 select 查询

撤回(Retract)模式(常用)

表和外部连接器交换添加(Add)和撤回(Retract)消息

  • 插入操作(Insert)编码为 Add 消息;
  • 删除(Delete)编码为 Retract 消息;
  • 更新(Update)编码为:上一条的 Retract 和下一条的 Add 消息

应用场景

任何场景均适合,如:聚合查询、删除、更新

例子

flink1.12.x Table API 和 SQL:读取csv文件,实现分组聚合查询

使用下面代码打印时:

DataStream<Tuple2<Boolean, Row>> dsRes = stEnv.toRetractStream(res, Row.class);
dsRes.print("===");

显示结果如下:

2> (true,lucy,120.0)
4> (true,韩梅梅,90.0)
2> (true,李雷,80.0)
2> (false,李雷,80.0)
2> (true,李雷,90.0)
4> (false,韩梅梅,90.0)
4> (true,韩梅梅,80.0)
2> (false,lucy,120.0)
2> (true,lucy,110.0)

true 表示 add 操作
false 表示 Retract 操作

更新插入(Upsert)模式

  • 更新和插入都被编码为 Upsert 消息
  • 删除编码为 Delete 消息

需要指定 key,常配合 kafka 使用


原文出处:https://malaoshi.top/show_1IX23OQz4MpW.html