登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flink1.12.x Table API 和 SQL:从kafka读取数据,聚合查询,写入到elasticsearch

上接:

flink1.12.x Table API 和 SQL:从kafka读取数据,执行查询,将结果写入到kafka中

flink1.12.x Table API 和 SQL:sink将查询结果写入到elasticsearch(聚合查询)

说明

本文介绍,flink 从kafka读取数据,聚合查询,写入到elasticsearch

pom.xml

添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

完整依赖

<repositories>
    <!-- 发布 CDH 的公司提供的maven仓库 -->
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.12.0</flink.version>

</properties>

<dependencies>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

java代码

package test;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Kafka2Es {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动模式,根据数据源自行判断

        //构建Table环境
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, bsSettings);

        /*
        直接连接 kafka
        注意不要出现关键字,如:order等
         */
        TableResult table=stEnv.executeSql("CREATE TABLE t_order_input ( \n" +
                "  name STRING, \n" +
                "  price double, \n" +
                "  username STRING \n" +
                ") WITH (  \n" +
                "'connector' = 'kafka', \n"+//表示连接kafka
                "'topic' = 'user-order-input', \n"+//主题
                "'properties.bootstrap.servers' = 'hadoop1:9092', \n"+//服务器
                "'properties.group.id' = 'testGroup', \n"+//组
//                "'scan.startup.mode' = 'earliest-offset', "+//从最早位置消费
                "'scan.startup.mode' = 'latest-offset', \n"+//从最新位置消费
                "'format' = 'json' \n"+
                ")");


        //执行查询
        Table res=stEnv.sqlQuery("select username,avg(price) from t_order_input where price>60 group by username");

        System.out.println("Schema信息:");
        //打印Schema
        res.printSchema();

        DataStream<Tuple2<Boolean, Row>> dsRes = stEnv.toRetractStream(res, Row.class);
        dsRes.print();


        /*
        创建结果表

         */
        TableResult sinkTable=stEnv.executeSql("CREATE TABLE t_order_output ( \n" +
                "  name STRING, \n" +
                "  price double, \n" +
                "  PRIMARY KEY (name) NOT ENFORCED \n" + //必须指定主键,才能以 upsert 模式运行,即:更新、删除
                ") WITH (  \n" +
                "'connector' = 'elasticsearch-7', \n"+//表示连接ES
                "'hosts' = 'http://hadoop1:9200', \n"+//服务器
                "'index' = 'user_order' \n"+
                ")");


        stEnv.executeSql("insert into t_order_output select * from "+res);

        env.execute("kafka2es");
    }
}

ES 准备

启动

登录 hadoop1

切换 es 用户

su es

启动:

/program/elasticsearch-7.9.3/bin/elasticsearch -d

创建索引 且 指定映射

put /user_order
{
  "mappings": {
    "properties": {
      "name": {
        "type": "text"
      },
      "price": {
        "type": "double"
      }
    }
  }
}

返回结果:

{
    "acknowledged": true,
    "shards_acknowledged": true,
    "index": "user_order"
}

表示成功

启动 kafka 环境

启动 hadoop1hadoop2hadoop3

启动zookeeper

登录 hadoop1,执行下面命令

/program/bin/zk.sh start

启动kafka

登录 hadoop1,执行下面命令

/program/bin/kafka.sh start

准备 kafka 主题

创建主题 user_order_input

flink 从该主题读取数据:

/program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-input

注意: zookeeper 的主题名称不要有 ._,详见 链接

创建主题 user-order-output

flink 将结果 发送到该主题:

/program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-output

测试

启动 java 程序

通过 idea 启动 java程序

启动生产者 user-order-input,发送数据

启动另一个窗口执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic user-order-input

在上面的环境中,输入下面内容,发送数据(可一条一条发,也可多条发):

{"name":"java从入门到精通","price":50,"username":"李雷"}
{"name":"spark从入门到精通","price":80,"username":"李雷"}
{"name":"hadoop从入门到精通","price":90,"username":"韩梅梅"}
{"name":"flink从入门到精通","price":70,"username":"韩梅梅"}
{"name":"mysql从入门到精通","price":100,"username":"李雷"}
{"name":"hive从入门到精通","price":100,"username":"lucy"}
{"name":"html从入门到精通","price":120,"username":"lucy"}

观察结果

idea控制台输出结果如下:

Schema信息:
root
 |-- name: STRING
 |-- price: DOUBLE

2> (true,spark从入门到精通,80.0)
1> (true,hadoop从入门到精通,90.0)
2> (true,flink从入门到精通,70.0)

查看es

输出下面内容

GET /user_order/_search

显示结果如下:

{
    "took":3,
    "timed_out":false,
    "_shards":{
        "total":1,
        "successful":1,
        "skipped":0,
        "failed":0
    },
    "hits":{
        "total":{
            "value":3,
            "relation":"eq"
        },
        "max_score":1,
        "hits":[
            {
                "_index":"user_order",
                "_type":"_doc",
                "_id":"韩梅梅",
                "_score":1,
                "_source":{
                    "name":"韩梅梅",
                    "price":80
                }
            },
            {
                "_index":"user_order",
                "_type":"_doc",
                "_id":"李雷",
                "_score":1,
                "_source":{
                    "name":"李雷",
                    "price":100
                }
            },
            {
                "_index":"user_order",
                "_type":"_doc",
                "_id":"lucy",
                "_score":1,
                "_source":{
                    "name":"lucy",
                    "price":110
                }
            }
        ]
    }
}

原文出处:https://malaoshi.top/show_1IX24Ii9tyMF.html