登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flink1.12.x 窗口函数(window function)-增量 AggregateFunction

说明

ReduceFunction 的通用版本

接口定义

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
    //创建初始累加器
    ACC createAccumulator();

    ACC add(IN var1, ACC var2);

    //将输入元素,进行增量聚合
    OUT getResult(ACC var1);
    //两个累加器合并为一个累加器以及从累加器提取输出(OUT类型)的方法
    ACC merge(ACC var1, ACC var2);
}

泛型具有三种类型:

  • 输入类型(IN)
  • 累加器类型(ACC)
  • 输出类型(OUT)

这样设计主要是为了解决:输入类型累加器类型输出类型 不一致的问题

同时 累加器类型 可以自定义,我们可以在 累加器类型 里构建我们想要的数据结构。
比如我们要计算一个窗口内某个字段的平均值,那么ACC中要保存总和以及个数

案例

统计各书出现的次数

数据

111,java从入门到入坟
222,mysql从入门到入坟
333,hadoop从入门到放弃
333,hadoop从入门到放弃
444,flink从入门到放弃

思路

  1. 根据字符串,进行拆分,并封装到对象中
  2. keyBy() 根据书名进行分组
  3. 实现 aggregate(),统计各本书出现的次数

代码

/**
数据格式:
id,文章标题
111,java从入门到入坟
222,mysql从入门到入坟
333,hadoop从入门到放弃
333,hadoop从入门到放弃
444,flink从入门到放弃
     *
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动模式,根据数据源自行判断
    DataStream<String> dataStream = env.socketTextStream("hadoop1", 9999);

    SingleOutputStreamOperator<Article> soso=dataStream.map(new MapFunction<String, Article>() {

        @Override
        public Article map(String s) throws Exception {
            String[] arr=s.split(",");
            Article article=new Article();
            article.setId(arr[0]);
            article.setTitle(arr[1]);
            return article;
        }
    });
    KeyedStream<Article,String> ks=soso.keyBy(new KeySelector<Article,String>(){
        /**
         *
         * @param t:传入的元祖,格式:(广告id,点击数量)
         * @return
         * @throws Exception
         */
        @Override
        public String getKey(Article t) throws Exception {
            return t.getTitle();
        }
    });//流处理中的分组

    //统计各个key出现的次数
    SingleOutputStreamOperator<Result> agg = ks.window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
            .aggregate(new AggregateFunction<Article, Integer, Result>() {

                private String key;

                //累加器初始化为0
                @Override
                public Integer createAccumulator() {
                    return 0;
                }

                /**
                 *
                 * @param article 传入的元素
                 * @param integer 累加器
                 * @return
                 */
                @Override
                public Integer add(Article article, Integer integer) {
                    key = article.getTitle();

                    return integer + 1;
                }

                /**
                 * 返回结果
                 * @param integer 累加器
                 * @return 格式:(key,出现的次数)
                 */
                @Override
                public Result getResult(Integer integer) {
                    return new Result(key, integer);
                }

                /**
                 * 合并累加器,一般用不上
                 * @param integer
                 * @param acc1
                 * @return
                 */
                @Override
                public Integer merge(Integer integer, Integer acc1) {
                    return integer+acc1;
                }
            });

    agg.print();
    //启动并等待程序结束,可传入任务名
    env.execute();
}

//文章封装类
static class Article{
        String id;
        String title;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }
}


//结果封装类
static class Result{
        String key;
        Integer num;

    public Result(String key, Integer num) {
        this.key = key;
        this.num = num;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public Integer getNum() {
        return num;
    }

    public void setNum(Integer num) {
        this.num = num;
    }

    @Override
    public String toString() {
        return "Result{" +
                "key='" + key + '\'' +
                ", num=" + num +
                '}';
    }
}

测试

在linux 执行下面命令:

nc -lk 9999

启动java程序

nc 窗口输入下面内容:

111,java从入门到入坟
222,mysql从入门到入坟
333,hadoop从入门到放弃
333,hadoop从入门到放弃
444,flink从入门到放弃

观察 idea 控制台,显示各本书出现的次数


原文出处:https://malaoshi.top/show_1IX22Z8RyX5K.html