flink 维表关联、翻译字典-热存储维表-使用AsyncDataStream、RichAsyncFunction 查询Hbase,并缓存到guava cache中

上接:flink 维表关联、翻译字典

介绍

flink 维表关联、翻译字典-热存储维表-使用AsyncDataStream、RichAsyncFunction 查询Hbase 文中,掌握了Flink异步查询hbase

提出问题

但有缺点,对于频繁查询的数据,且改动较少,那么每次查询,显然影响性能

解决

将查询的数据缓存到 guava cache

缺点

使用 Cache 也有缺点,可能会导致维度数据更新不及时

例子

pom.xml

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.12.0</flink.version>
</properties>

<dependencies>
    <!--java-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- kafka Dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>20.0</version>
    </dependency>
    <!-- hbase的java客户端 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.1.9</version>
    </dependency>
</dependencies>

java

package flink;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class 异步IO_hbase_guavacatch {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 定义kafka相关配置项
        Properties pro = new Properties();
        pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092");
        pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumer-group");
        pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        // 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置
        FlinkKafkaConsumer<String> fkc = new FlinkKafkaConsumer<>("kafka2flink", new SimpleStringSchema(), pro);
        DataStreamSource<String> dss = env.addSource(fkc);

        SingleOutputStreamOperator<List<CellData>> soso = AsyncDataStream.orderedWait(
                dss,
                new MyRichAsyncFunction() ,
                5,
                TimeUnit.SECONDS);

        soso.print("soso--");
        env.execute();
    }
    static class MyRichAsyncFunction extends RichAsyncFunction<String,List<CellData>> {
        // zookeeper的集群ip
        static final String ADDR = "hadoop1,hadoop2,hadoop3";
        // zookeeper 的端口号
        static final String PORT = "2181";

        //hbase工具类
        private HbaseUtils utils;
        // 创建线程池
        private ThreadPoolExecutor threadPoolExecutor;
        private Cache<String, List<CellData>> cache;
        @Override
        public void open(Configuration parameters) throws Exception {
            utils=new HbaseUtils();
            utils.connect(ADDR,PORT);
            //初始化线程池
            threadPoolExecutor =  new ThreadPoolExecutor(
                    10,
                    10,
                    0,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue(1000));

            cache = CacheBuilder.newBuilder()
                    // 缓存的初始数量
                    .initialCapacity(1000)
                    // 设置缓存最大数量
                    .maximumSize(1000)
                    // 放入数据后,超过10秒钟,就清除该数据
                    .expireAfterWrite(10, TimeUnit.SECONDS)
                    // 并行级别
                    .concurrencyLevel(Runtime.getRuntime().availableProcessors())
                    // 提供缓存加载器
                    .build();
        }
        /**
         * 每来一条数据,就执行一次该方法
         * @param id 输入数据
         * @param resultFuture 通过该对象收集数据
         * @throws Exception
         */
        @Override
        public void asyncInvoke(String id, ResultFuture<List<CellData>> resultFuture) throws Exception {
            // 查询时,先从cache中取值,如果有就收集,没有再查询
            List<CellData> list = cache.getIfPresent(id);
            if(list !=null){
                System.out.println("--从缓存中取值,key:"+id);
                // 通过下面方法收集数据
                resultFuture.complete(Collections.singleton(list));
            }else {
                threadPoolExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            List<CellData> listOfHbase = utils.get("book", id);
                            System.out.println("--查询hbase,key:"+id);
                            // 通过下面方法收集数据
                            resultFuture.complete(Collections.singleton(listOfHbase));
                            cache.put(id, listOfHbase);
                            System.out.println("--放入到缓存中,key:"+id);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }

        /**
         * 如果执行超时,就执行该方法
         * @param id element coming from an upstream task
         * @param resultFuture to be completed with the result data
         * @throws Exception
         */
        @Override
        public void timeout(String id, ResultFuture<List<CellData>> resultFuture) throws Exception {
            resultFuture.complete(Collections.singleton(new ArrayList<>()));
        }

        @Override
        public void close() throws Exception {
            // 关闭数据库连接
            if(utils!=null) {
                utils.close();
            }
            // 终止定时任务
            if(threadPoolExecutor!=null){
                threadPoolExecutor.shutdown();
            }
        }
    }
}

启动zk

登录 hadoop1,执行下面命令:

/program/bin/zk.sh start

启动hadoop

登录 hadoop1,执行下面命令:

/program/bin/hadoop.sh start

启动hbase

登录 hadoop1,执行下面命令:

start-hbase.sh
建表
create "book","c1"
添加测试数据
put 'book','1020','c1:author','韩梅梅'
put 'book','1020','c1:price','121.90'
put 'book','1020','c1:title','scala从入门到精通'

启动kafka

登录 hadoop1,执行下面命令:

/program/bin/kafka.sh start

在 idea 中运行 flink 程序

启动生产者、指定主题

登录 hadoop1,执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic kafka2flink

第一次执行

在kafka中输入:

1020

执行结果,在 idea 控制台中打印结果如下:

--查询hbase,key:1020
--放入到缓存中,key:1020
soso--> [CellData{rowkey='1020', family='c1', qualifier='author', timestamp=1669335450913, value=韩梅梅}, CellData{rowkey='1020', family='c1', qualifier='price', timestamp=1669335450913, value=121.90}, CellData{rowkey='1020', family='c1', qualifier='title', timestamp=1669335450913, value=scala从入门到精通}]

解释:第一次执行,缓存中没有数据,所以查询hbase数据库,并放入到缓存中

第二次执行

在10秒内,在kafka中输入:

1020

执行结果,在 idea 控制台中打印结果如下:

--从缓存中取值,key:1020
soso--> [CellData{rowkey='1020', family='c1', qualifier='author', timestamp=1669335450913, value=韩梅梅}, CellData{rowkey='1020', family='c1', qualifier='price', timestamp=1669335450913, value=121.90}, CellData{rowkey='1020', family='c1', qualifier='title', timestamp=1669335450913, value=scala从入门到精通}]

解释:第二次执行,由于在 10秒内 执行,缓存中有数据,所以直接从缓存中取数据

第三次执行

在10秒后,在kafka中输入:

1020

执行结果,在 idea 控制台中打印结果如下:

--查询hbase,key:1020
--放入到缓存中,key:1020
soso--> [CellData{rowkey='1020', family='c1', qualifier='author', timestamp=1669335450913, value=韩梅梅}, CellData{rowkey='1020', family='c1', qualifier='price', timestamp=1669335450913, value=121.90}, CellData{rowkey='1020', family='c1', qualifier='title', timestamp=1669335450913, value=scala从入门到精通}]

解释:第三次执行,由于在 10秒后 执行,缓存中的数据 超时清除,所以查询hbase数据库,并放入到缓存中


原文出处:http://malaoshi.top/show_1IX4nTNeKzad.html