flink 维表关联、翻译字典-热存储维表-使用AsyncDataStream、RichAsyncFunction 查询Hbase 作者:马育民 • 2023-01-17 12:39 • 阅读:10035 上接:[flink 维表关联、翻译字典](https://www.malaoshi.top/show_1IX4AqCDMezt.html "flink 维表关联、翻译字典") # 介绍 将维表数据存储在Redis、HBase、MySQL等外部存储系统中,Flink接收到一个数据,就实时去外部存储中查询 通过 [使用AsyncDataStream和AsyncFunction、RichAsyncFunction 进行异步处理](https://www.malaoshi.top/show_1IX4cEJSQSey.html "使用AsyncDataStream和AsyncFunction、RichAsyncFunction 进行异步处理") 文章可知,Flink中应该使用异步IO来读写外部系统, ### 优点 维度数据量不受内存限制,可以存储很大的数据量。 ### 缺点 对于频繁访问的数据,需要将查询结果缓存起来,这样性能更好 # 例子 **注意:**要求外部系统客户端支持异步IO,否则需要 **自定义实现异步IO** 使用异步就要涉及到三个问题: - 超时:如果查询超时那么就认为是读写失败,需要按失败处理; - 并发数量:如果并发数量太多,就要触发Flink的反压机制来抑制上游的写入。 - 返回顺序错乱:顺序错乱了要根据实际情况来处理,Flink支持两种方式:允许乱序、保证顺序。 ### pom.xml依赖 ``` UTF-8 1.8 1.8 1.12.0 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_2.12 ${flink.version} org.apache.flink flink-clients_2.12 ${flink.version} org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-log4j12 1.7.30 log4j log4j 1.2.17 org.apache.flink flink-connector-kafka_2.12 ${flink.version} org.apache.hbase hbase-client 2.1.9 ``` ### java代码 ``` package flink; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class 异步IO_hbase { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 定义kafka相关配置项 Properties pro = new Properties(); pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092"); pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumer-group"); pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置 FlinkKafkaConsumer fkc = new FlinkKafkaConsumer<>("kafka2flink", new SimpleStringSchema(), pro); DataStreamSource dss = env.addSource(fkc); SingleOutputStreamOperator> soso = AsyncDataStream.orderedWait( dss, new MyRichAsyncFunction() , 5, TimeUnit.SECONDS); soso.print("soso--"); env.execute(); } static class MyRichAsyncFunction extends RichAsyncFunction> { // zookeeper的集群ip static final String ADDR = "hadoop1,hadoop2,hadoop3"; // zookeeper 的端口号 static final String PORT = "2181"; //hbase工具类 HbaseUtils utils; // 创建线程池 private ThreadPoolExecutor threadPoolExecutor; @Override public void open(Configuration parameters) throws Exception { utils=new HbaseUtils(); utils.connect(ADDR,PORT); //初始化线程池 threadPoolExecutor = new ThreadPoolExecutor( 10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(1000)); } /** * 每来一条数据,就执行一次该方法 * @param id 输入数据 * @param resultFuture 通过该对象收集数据 * @throws Exception */ @Override public void asyncInvoke(String id, ResultFuture> resultFuture) throws Exception { threadPoolExecutor.submit(new Runnable() { @Override public void run() { try { List res = utils.get("book", id); // 通过下面方法收集数据 resultFuture.complete(Collections.singleton(res)); } catch (Exception e) { e.printStackTrace(); } } }); } /** * 如果执行超时,就执行该方法 * @param id element coming from an upstream task * @param resultFuture to be completed with the result data * @throws Exception */ @Override public void timeout(String id, ResultFuture> resultFuture) throws Exception { resultFuture.complete(Collections.singleton(new ArrayList<>())); } @Override public void close() throws Exception { // 关闭数据库连接 if(utils!=null) { utils.close(); } // 终止定时任务 if(threadPoolExecutor!=null){ threadPoolExecutor.shutdown(); } } } } ``` ### 启动zk 登录 hadoop1,执行下面命令: ``` /program/bin/zk.sh start ``` ### 启动hadoop 登录 hadoop1,执行下面命令: ``` /program/bin/hadoop.sh start ``` ### 启动hbase 登录 hadoop1,执行下面命令: ``` start-hbase.sh ``` ##### 建表 ``` create "book","c1" ``` ##### 添加测试数据 ``` put 'book','1020','c1:author','韩梅梅' put 'book','1020','c1:price','121.90' put 'book','1020','c1:title','scala从入门到精通' ``` ### 启动kafka 登录 hadoop1,执行下面命令: ``` /program/bin/kafka.sh start ``` ### 启动 flink 在 idea 中运行 flink 程序 ### 启动生产者、指定主题 登录 hadoop1,执行下面命令: ``` /program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic kafka2flink ``` 然后输入: ``` 1020 10211 ``` ### 执行结果 在 idea 控制台中打印结果如下: ``` soso--> [CellData{rowkey='1020', family='c1', qualifier='author', timestamp=1669335450913, value=韩梅梅}, CellData{rowkey='1020', family='c1', qualifier='price', timestamp=1669335450913, value=121.90}, CellData{rowkey='1020', family='c1', qualifier='title', timestamp=1669335450913, value=scala从入门到精通}] soso--> [] ``` 参考: https://blog.csdn.net/Yuan_CSDF/article/details/117486259 原文出处:http://malaoshi.top/show_1IX4nPUb1tKI.html