flink1.12.x WaterMarker水位线处理乱序消息(老版api) 作者:马育民 • 2021-12-24 13:54 • 阅读:10070 # 说明 [![](https://www.malaoshi.top/upload/pic/flink/Snipaste_2021-12-22_15-12-24.png)](https://www.malaoshi.top/upload/pic/flink/Snipaste_2021-12-22_15-12-24.png) 1. 启动 tcp服务器 2. 启动 flink 代码,连接 tcp服务器 4. tcp服务器输入单词 和 时间戳(表示事件时间) 如下: ``` 111,1640261670000 111,1640261669000 111,1640261671000 111,1640261675000 111,1640261673000 111,1640261674000 111,1640261672000 111,1640261680000 111,1640261660000 111,1640261690000 ``` **解释:** - `111`:表示数据,任何字符串都行,由于代码中会执行 `keyBy()`,所以最好相同 - `1640261670000`:表示时间戳,可通过下面代码查看时间: `System.out.println(new Date(1640261670000l))`,显示时间为: `Thu Dec 23 20:14:30 CST 2021` # 代码 ``` import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import javax.annotation.Nullable; import java.text.SimpleDateFormat; import java.util.*; public class Paixu { public static void main(String[] args) throws Exception { //定义socket的端口号 int port = 9090; //获取运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置使用eventtime,默认是使用processtime // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置并行度为1,默认并行度是当前机器的cpu数量 env.setParallelism(1); //连接socket获取输入的数据 DataStream text = env.socketTextStream("hadoop1", port, "\n"); //解析输入的数据 DataStream> inputMap = text.map(new MapFunction>() { @Override public Tuple2 map(String value) throws Exception { String[] arr = value.split(","); return new Tuple2<>(arr[0], Long.parseLong(arr[1])); } }); //抽取timestamp和生成watermark DataStream> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() { Long currentMaxTimestamp = 0L; final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s Long currentWaterMark=0L; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); /** * 定义生成watermark的逻辑 * 默认100ms被调用一次 */ @Nullable @Override public Watermark getCurrentWatermark() { currentWaterMark = currentMaxTimestamp - maxOutOfOrderness; System.out.println("当前水位线:" + currentWaterMark); return new Watermark(currentWaterMark); } //定义如何提取timestamp @Override public long extractTimestamp(Tuple2 element, long previousElementTimestamp) { long eventTime = element.f1; currentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp); System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]"); return eventTime; } }); //保存被丢弃的数据 OutputTag> outputTag = new OutputTag>("late-data"){}; //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。 SingleOutputStreamOperator window = waterMarkStream.keyBy(t->t.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样 //.allowedLateness(Time.seconds(2))//允许数据迟到2秒 .sideOutputLateData(outputTag) .apply(new WindowFunction, String, String, TimeWindow>() { /** * 对window内的数据进行排序,保证数据的顺序 * @param key * @param window * @param input * @param out * @throws Exception */ @Override public void apply(String key, TimeWindow window, Iterable> input, Collector out) throws Exception { List> list = new ArrayList(); Iterator> it = input.iterator(); while (it.hasNext()) { list.add(it.next()); } list.sort(new Comparator>() { @Override public int compare(Tuple2 o1, Tuple2 o2) { int ret=0; if(o2.f1 > o1.f1){ ret=-1; }else if(o2.f1 < o1.f1){ ret=1; }else{//等于 ret=0; } return ret; } }); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); String result = key + "," + list.size() + "," + sdf.format(list.get(0).f1) + "," + sdf.format(list.get(list.size() - 1).f1) + ",window start:" + sdf.format(window.getStart()) + ",window end:" + sdf.format(window.getEnd()); System.out.println("list:"); for(Tuple2 t:list){ System.out.println(t.toString()); } System.out.println("--"); out.collect(result); } }); //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中 DataStream> sideOutput = window.getSideOutput(outputTag); sideOutput.print("sideOutput:::"); //测试-把结果打印到控制台 window.print("window:::"); //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行 env.execute("eventtime-watermark"); } } ``` 原文出处:http://malaoshi.top/show_1IX2Stf5qhqP.html