flink1.12.x Table API 和 SQL:从kafka读取数据,聚合查询,写入到elasticsearch 作者:马育民 • 2021-10-19 06:32 • 阅读:10228 上接: [flink1.12.x Table API 和 SQL:从kafka读取数据,执行查询,将结果写入到kafka中](https://www.malaoshi.top/show_1IX23f177SJ5.html "flink1.12.x Table API 和 SQL:从kafka读取数据,执行查询,将结果写入到kafka中") [flink1.12.x Table API 和 SQL:sink将查询结果写入到elasticsearch(聚合查询)](https://www.malaoshi.top/show_1IX24GpPBCJo.html "flink1.12.x Table API 和 SQL:sink将查询结果写入到elasticsearch(聚合查询)") # 说明 本文介绍,flink 从kafka读取数据,聚合查询,写入到elasticsearch [![](https://www.malaoshi.top/upload/pic/flink/QQ20211019084006.png)](https://www.malaoshi.top/upload/pic/flink/QQ20211019084006.png) # pom.xml 添加依赖 ``` org.apache.flink flink-connector-elasticsearch7_2.12 ${flink.version} ``` ### 完整依赖 ``` cloudera https://repository.cloudera.com/artifactory/cloudera-repos true true UTF-8 1.8 1.8 1.12.0 org.apache.flink flink-table-planner-blink_2.12 ${flink.version} org.apache.flink flink-clients_2.12 ${flink.version} org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-log4j12 1.7.30 log4j log4j 1.2.17 org.apache.flink flink-csv ${flink.version} org.apache.flink flink-connector-kafka_2.12 ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-connector-elasticsearch7_2.12 ${flink.version} ``` # java代码 ``` package test; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class Kafka2Es { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动模式,根据数据源自行判断 //构建Table环境 EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, bsSettings); /* 直接连接 kafka 注意不要出现关键字,如:order等 */ TableResult table=stEnv.executeSql("CREATE TABLE t_order_input ( \n" + " name STRING, \n" + " price double, \n" + " username STRING \n" + ") WITH ( \n" + "'connector' = 'kafka', \n"+//表示连接kafka "'topic' = 'user-order-input', \n"+//主题 "'properties.bootstrap.servers' = 'hadoop1:9092', \n"+//服务器 "'properties.group.id' = 'testGroup', \n"+//组 // "'scan.startup.mode' = 'earliest-offset', "+//从最早位置消费 "'scan.startup.mode' = 'latest-offset', \n"+//从最新位置消费 "'format' = 'json' \n"+ ")"); //执行查询 Table res=stEnv.sqlQuery("select username,avg(price) from t_order_input where price>60 group by username"); System.out.println("Schema信息:"); //打印Schema res.printSchema(); DataStream> dsRes = stEnv.toRetractStream(res, Row.class); dsRes.print(); /* 创建结果表 */ TableResult sinkTable=stEnv.executeSql("CREATE TABLE t_order_output ( \n" + " name STRING, \n" + " price double, \n" + " PRIMARY KEY (name) NOT ENFORCED \n" + //必须指定主键,才能以 upsert 模式运行,即:更新、删除 ") WITH ( \n" + "'connector' = 'elasticsearch-7', \n"+//表示连接ES "'hosts' = 'http://hadoop1:9200', \n"+//服务器 "'index' = 'user_order' \n"+ ")"); stEnv.executeSql("insert into t_order_output select * from "+res); env.execute("kafka2es"); } } ``` # ES 准备 ### 启动 登录 `hadoop1` , 切换 `es` 用户 ``` su es ``` 启动: ``` /program/elasticsearch-7.9.3/bin/elasticsearch -d ``` ### 创建索引 且 指定映射 ``` put /user_order { "mappings": { "properties": { "name": { "type": "text" }, "price": { "type": "double" } } } } ``` 返回结果: ``` { "acknowledged": true, "shards_acknowledged": true, "index": "user_order" } ``` 表示成功 # 启动 kafka 环境 启动 `hadoop1`、`hadoop2`、`hadoop3` ### 启动zookeeper 登录 `hadoop1`,执行下面命令 ``` /program/bin/zk.sh start ``` ### 启动kafka 登录 `hadoop1`,执行下面命令 ``` /program/bin/kafka.sh start ``` # 准备 kafka 主题 ### 创建主题 `user_order_input` flink 从该主题读取数据: ``` /program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-input ``` **注意:** zookeeper 的主题名称不要有 `.` 或 `_`,详见 [链接](https://www.malaoshi.top/show_1IXNH632WG5.html "链接") ### 创建主题 `user-order-output` flink 将结果 发送到该主题: ``` /program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-output ``` # 测试 ### 启动 java 程序 通过 idea 启动 java程序 ### 启动生产者 user-order-input,发送数据 启动另一个窗口执行下面命令: ``` /program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic user-order-input ``` 在上面的环境中,输入下面内容,发送数据(可一条一条发,也可多条发): ``` {"name":"java从入门到精通","price":50,"username":"李雷"} {"name":"spark从入门到精通","price":80,"username":"李雷"} {"name":"hadoop从入门到精通","price":90,"username":"韩梅梅"} {"name":"flink从入门到精通","price":70,"username":"韩梅梅"} {"name":"mysql从入门到精通","price":100,"username":"李雷"} {"name":"hive从入门到精通","price":100,"username":"lucy"} {"name":"html从入门到精通","price":120,"username":"lucy"} ``` # 观察结果 ### idea控制台输出结果如下: ``` Schema信息: root |-- name: STRING |-- price: DOUBLE 2> (true,spark从入门到精通,80.0) 1> (true,hadoop从入门到精通,90.0) 2> (true,flink从入门到精通,70.0) ``` ### 查看es 输出下面内容 ``` GET /user_order/_search ``` 显示结果如下: ``` { "took":3, "timed_out":false, "_shards":{ "total":1, "successful":1, "skipped":0, "failed":0 }, "hits":{ "total":{ "value":3, "relation":"eq" }, "max_score":1, "hits":[ { "_index":"user_order", "_type":"_doc", "_id":"韩梅梅", "_score":1, "_source":{ "name":"韩梅梅", "price":80 } }, { "_index":"user_order", "_type":"_doc", "_id":"李雷", "_score":1, "_source":{ "name":"李雷", "price":100 } }, { "_index":"user_order", "_type":"_doc", "_id":"lucy", "_score":1, "_source":{ "name":"lucy", "price":110 } } ] } } ``` 原文出处:http://malaoshi.top/show_1IX24Ii9tyMF.html