登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flink1.12.x 窗口函数(window function)-全量 WindowFunction

说明

WindowFunction 可以实现 中位数、平均值 这类,需要获取 全部数据才能统计的功能

接口定义

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
    void apply(KEY var1, W var2, Iterable<IN> var3, Collector<OUT> var4) throws Exception;
}

方法 apply 的形参解释:

  • KEY var1:key

  • W var2:当前窗口对象

  • Iterable<IN> var3:当前窗口中接收的全部数据

  • Collector<OUT> var4:返回结果

案例

按照 年月 统计订单平均价

数据

2021-10-01,java从入门到入坟,80
2021-10-21,mysql从入门到入坟,50
2021-9-05,hadoop从入门到放弃,90
2021-9-15,zookeeper从入门到放弃,40
2021-9-25,flink从入门到放弃,30

思路

  1. 根据字符串,进行拆分,并封装到对象中
  2. keyBy() 根据 年-月进行分组
  3. 实现 apply(),按照 年月 统计订单平均价

java

public class WindowApply全量窗口统计订单平均价 {
    /**
     * 数据格式:
     * 日期时间,商品名,价格
2021-10-01,java从入门到入坟,80
2021-10-21,mysql从入门到入坟,50
2021-9-05,hadoop从入门到放弃,90
2021-9-15,zookeeper从入门到放弃,40
2021-9-25,flink从入门到放弃,30
     *
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动模式,根据数据源自行判断
    DataStream<String> dataStream = env.socketTextStream("localhost", 9999);

    SingleOutputStreamOperator<Goods> soso=dataStream.map(new MapFunction<String, Goods>() {

        @Override
        public Goods map(String s) throws Exception {
            String[] arr=s.split(",");
            Goods goods=new Goods();
            goods.setDate(arr[0]);
            goods.setTitle(arr[1]);
            goods.setPrice(Double.parseDouble(arr[2]));
            return goods;
        }
    });
    KeyedStream<Goods,String> ks=soso.keyBy(new KeySelector<Goods,String>(){
        /**
         *
         * @param goods:传入的元祖,格式:(广告id,点击数量)
         * @return
         * @throws Exception
         */
        @Override
        public String getKey(Goods goods) throws Exception {
            String date=goods.getDate();
            int index=date.lastIndexOf("-");
            String yearMonth=date.substring(0,index);
            return yearMonth;
        }
    });//流处理中的分组

    //统计各个key出现的次数
    SingleOutputStreamOperator<Result> agg = ks.window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
            .apply(new WindowFunction<Goods, Result, String, TimeWindow>() {
                /**
                 *
                 * @param s key
                 * @param timeWindow
                 * @param iterable 当前窗口中接收的全部数据
                 * @param collector 返回结果
                 * @throws Exception
                 */
                @Override
                public void apply(String s, TimeWindow timeWindow, Iterable<Goods> iterable, Collector<Result> collector) throws Exception {
                    Iterator<Goods> iterator = iterable.iterator();
                    int count =0;
                    double totalPrice = 0.0;
                    while (iterator.hasNext()) {
                        Goods goods = iterator.next();
                        totalPrice += goods.getPrice();
                        count++;
                    }
                    if (count != 0) {
                        double avg=totalPrice/count;
                        Result result=new Result(s,avg);
                        collector.collect(result);
                    }
                }
            });

    agg.print();
    //启动并等待程序结束,可传入任务名
    env.execute();
}

static class Goods {
        String date;
        String title;
        Double price;


    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }
}
static class Result{
        String date;
        Double avg;

    public Result(String date, Double avg) {
        this.date = date;
        this.avg = avg;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public Double getAvg() {
        return avg;
    }

    public void setAvg(Double avg) {
        this.avg = avg;
    }

    @Override
    public String toString() {
        return "Result{" +
                "date='" + date + '\'' +
                ", avg=" + avg +
                '}';
    }
}

测试

在linux 执行下面命令:

nc -lk 9999

启动java程序

nc 窗口输入下面内容:

2021-10-01,java从入门到入坟,80
2021-10-21,mysql从入门到入坟,50
2021-9-05,hadoop从入门到放弃,90
2021-9-15,zookeeper从入门到放弃,40
2021-9-25,flink从入门到放弃,30

观察 idea 控制台,显示各本书出现的次数

如下:

2> Result{date='2021-10', avg=65.0}
4> Result{date='2021-9', avg=90.0}
4> Result{date='2021-9', avg=35.0}

原文出处:https://malaoshi.top/show_1IX22eWkSqBb.html