flink 维表关联、翻译字典-热存储维表-使用AsyncDataStream、RichAsyncFunction 查询Hbase,并缓存到guava cache中 作者:马育民 • 2023-01-17 21:19 • 阅读:10033 上接:[flink 维表关联、翻译字典](https://www.malaoshi.top/show_1IX4AqCDMezt.html "flink 维表关联、翻译字典") # 介绍 从 [flink 维表关联、翻译字典-热存储维表-使用AsyncDataStream、RichAsyncFunction 查询Hbase](https://www.malaoshi.top/show_1IX4nPUb1tKI.html "flink 维表关联、翻译字典-热存储维表-使用AsyncDataStream、RichAsyncFunction 查询Hbase") 文中,掌握了Flink异步查询hbase ## 提出问题 但有缺点,对于频繁查询的数据,且改动较少,那么每次查询,显然影响性能 ### 解决 将查询的数据缓存到 [guava cache](https://www.malaoshi.top/show_1IX4mscf2Mjt.html "guava cache") 中 ### 缺点 使用 Cache 也有缺点,可能会导致维度数据更新不及时 # 例子 ### pom.xml ``` UTF-8 1.8 1.8 1.12.0 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_2.12 ${flink.version} org.apache.flink flink-clients_2.12 ${flink.version} org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-log4j12 1.7.30 log4j log4j 1.2.17 org.apache.flink flink-connector-kafka_2.12 ${flink.version} com.google.guava guava 20.0 org.apache.hbase hbase-client 2.1.9 ``` ### java ``` package flink; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class 异步IO_hbase_guavacatch { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 定义kafka相关配置项 Properties pro = new Properties(); pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092"); pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumer-group"); pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置 FlinkKafkaConsumer fkc = new FlinkKafkaConsumer<>("kafka2flink", new SimpleStringSchema(), pro); DataStreamSource dss = env.addSource(fkc); SingleOutputStreamOperator> soso = AsyncDataStream.orderedWait( dss, new MyRichAsyncFunction() , 5, TimeUnit.SECONDS); soso.print("soso--"); env.execute(); } static class MyRichAsyncFunction extends RichAsyncFunction> { // zookeeper的集群ip static final String ADDR = "hadoop1,hadoop2,hadoop3"; // zookeeper 的端口号 static final String PORT = "2181"; //hbase工具类 private HbaseUtils utils; // 创建线程池 private ThreadPoolExecutor threadPoolExecutor; private Cache> cache; @Override public void open(Configuration parameters) throws Exception { utils=new HbaseUtils(); utils.connect(ADDR,PORT); //初始化线程池 threadPoolExecutor = new ThreadPoolExecutor( 10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(1000)); cache = CacheBuilder.newBuilder() // 缓存的初始数量 .initialCapacity(1000) // 设置缓存最大数量 .maximumSize(1000) // 放入数据后,超过10秒钟,就清除该数据 .expireAfterWrite(10, TimeUnit.SECONDS) // 并行级别 .concurrencyLevel(Runtime.getRuntime().availableProcessors()) // 提供缓存加载器 .build(); } /** * 每来一条数据,就执行一次该方法 * @param id 输入数据 * @param resultFuture 通过该对象收集数据 * @throws Exception */ @Override public void asyncInvoke(String id, ResultFuture> resultFuture) throws Exception { // 查询时,先从cache中取值,如果有就收集,没有再查询 List list = cache.getIfPresent(id); if(list !=null){ System.out.println("--从缓存中取值,key:"+id); // 通过下面方法收集数据 resultFuture.complete(Collections.singleton(list)); }else { threadPoolExecutor.submit(new Runnable() { @Override public void run() { try { List listOfHbase = utils.get("book", id); System.out.println("--查询hbase,key:"+id); // 通过下面方法收集数据 resultFuture.complete(Collections.singleton(listOfHbase)); cache.put(id, listOfHbase); System.out.println("--放入到缓存中,key:"+id); } catch (Exception e) { e.printStackTrace(); } } }); } } /** * 如果执行超时,就执行该方法 * @param id element coming from an upstream task * @param resultFuture to be completed with the result data * @throws Exception */ @Override public void timeout(String id, ResultFuture> resultFuture) throws Exception { resultFuture.complete(Collections.singleton(new ArrayList<>())); } @Override public void close() throws Exception { // 关闭数据库连接 if(utils!=null) { utils.close(); } // 终止定时任务 if(threadPoolExecutor!=null){ threadPoolExecutor.shutdown(); } } } } ``` ### 启动zk 登录 hadoop1,执行下面命令: ``` /program/bin/zk.sh start ``` ### 启动hadoop 登录 hadoop1,执行下面命令: ``` /program/bin/hadoop.sh start ``` ### 启动hbase 登录 hadoop1,执行下面命令: ``` start-hbase.sh ``` ##### 建表 ``` create "book","c1" ``` ##### 添加测试数据 ``` put 'book','1020','c1:author','韩梅梅' put 'book','1020','c1:price','121.90' put 'book','1020','c1:title','scala从入门到精通' ``` ### 启动kafka 登录 hadoop1,执行下面命令: ``` /program/bin/kafka.sh start ``` ### 启动 flink 在 idea 中运行 flink 程序 ### 启动生产者、指定主题 登录 hadoop1,执行下面命令: ``` /program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic kafka2flink ``` ### 第一次执行 在kafka中输入: ``` 1020 ``` 执行结果,在 idea 控制台中打印结果如下: ``` --查询hbase,key:1020 --放入到缓存中,key:1020 soso--> [CellData{rowkey='1020', family='c1', qualifier='author', timestamp=1669335450913, value=韩梅梅}, CellData{rowkey='1020', family='c1', qualifier='price', timestamp=1669335450913, value=121.90}, CellData{rowkey='1020', family='c1', qualifier='title', timestamp=1669335450913, value=scala从入门到精通}] ``` **解释:**第一次执行,缓存中没有数据,所以查询hbase数据库,并放入到缓存中 ### 第二次执行 在10秒内,在kafka中输入: ``` 1020 ``` 执行结果,在 idea 控制台中打印结果如下: ``` --从缓存中取值,key:1020 soso--> [CellData{rowkey='1020', family='c1', qualifier='author', timestamp=1669335450913, value=韩梅梅}, CellData{rowkey='1020', family='c1', qualifier='price', timestamp=1669335450913, value=121.90}, CellData{rowkey='1020', family='c1', qualifier='title', timestamp=1669335450913, value=scala从入门到精通}] ``` **解释:**第二次执行,由于在 **10秒内** 执行,缓存中有数据,所以直接从缓存中取数据 ### 第三次执行 在10秒后,在kafka中输入: ``` 1020 ``` 执行结果,在 idea 控制台中打印结果如下: ``` --查询hbase,key:1020 --放入到缓存中,key:1020 soso--> [CellData{rowkey='1020', family='c1', qualifier='author', timestamp=1669335450913, value=韩梅梅}, CellData{rowkey='1020', family='c1', qualifier='price', timestamp=1669335450913, value=121.90}, CellData{rowkey='1020', family='c1', qualifier='title', timestamp=1669335450913, value=scala从入门到精通}] ``` **解释:**第三次执行,由于在 **10秒后** 执行,缓存中的数据 **超时清除**,所以查询hbase数据库,并放入到缓存中 原文出处:http://malaoshi.top/show_1IX4nTNeKzad.html