flink:SocketWindowWordCount源代码 作者:马育民 • 2021-12-22 09:33 • 阅读:10062 # 源码 ``` public class SocketWindowWordCount { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream text = env.socketTextStream("localhost", 9000, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream windowCounts = text .flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .reduce(new ReduceFunction() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // Data type for words with count public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } } ``` 原文出处:http://malaoshi.top/show_1IX2SAb7RsHl.html