说明
启动 tcp服务器
启动 flink 代码,连接 tcp服务器
tcp服务器输入单词 和 时间戳(表示事件时间)
如下:
111,1640261670000
111,1640261669000
111,1640261671000
111,1640261675000
111,1640261673000
111,1640261674000
111,1640261672000
111,1640261680000
111,1640261660000
111,1640261690000
解释:
111
:表示数据,任何字符串都行,由于代码中会执行keyBy()
,所以最好相同1640261670000
:表示时间戳,可通过下面代码查看时间:System.out.println(new Date(1640261670000l))
,显示时间为:Thu Dec 23 20:14:30 CST 2021
代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.*;
public class Paixu {
public static void main(String[] args) throws Exception {
//定义socket的端口号
int port = 9090;
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置使用eventtime,默认是使用processtime
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置并行度为1,默认并行度是当前机器的cpu数量
env.setParallelism(1);
//连接socket获取输入的数据
DataStream<String> text = env.socketTextStream("hadoop1", port, "\n");
//解析输入的数据
DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
}
});
//抽取timestamp和生成watermark
DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s
Long currentWaterMark=0L;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* 定义生成watermark的逻辑
* 默认100ms被调用一次
*/
@Nullable
@Override
public Watermark getCurrentWatermark() {
currentWaterMark = currentMaxTimestamp - maxOutOfOrderness;
System.out.println("当前水位线:" + currentWaterMark);
return new Watermark(currentWaterMark);
}
//定义如何提取timestamp
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long eventTime = element.f1;
currentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp);
System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
return eventTime;
}
});
//保存被丢弃的数据
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
//注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(t->t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
//.allowedLateness(Time.seconds(2))//允许数据迟到2秒
.sideOutputLateData(outputTag)
.apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
/**
* 对window内的数据进行排序,保证数据的顺序
* @param key
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
List<Tuple2<String, Long>> list = new ArrayList();
Iterator<Tuple2<String, Long>> it = input.iterator();
while (it.hasNext()) {
list.add(it.next());
}
list.sort(new Comparator<Tuple2<String, Long>>() {
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
int ret=0;
if(o2.f1 > o1.f1){
ret=-1;
}else if(o2.f1 < o1.f1){
ret=1;
}else{//等于
ret=0;
}
return ret;
}
});
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = key + "," + list.size() + "," + sdf.format(list.get(0).f1) + ","
+ sdf.format(list.get(list.size() - 1).f1)
+ ",window start:" + sdf.format(window.getStart()) + ",window end:" + sdf.format(window.getEnd());
System.out.println("list:");
for(Tuple2<String, Long> t:list){
System.out.println(t.toString());
}
System.out.println("--");
out.collect(result);
}
});
//把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
sideOutput.print("sideOutput:::");
//测试-把结果打印到控制台
window.print("window:::");
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("eventtime-watermark");
}
}