登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flink1.12.x Table API 和 SQL:从kafka读取数据,执行查询,将结果写入到kafka中

说明

本文介绍 从 kafka 接收数据

官方:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html

执行流程

  1. flink 从 kafka的 user-order-output 主题读取数据
  2. 执行查询
  3. flink 将查询结果写入到 kafka user_order_input 主题

pom.xml 依赖

需要添加下面依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>

完整依赖

<repositories>
    <!-- 发布 CDH 的公司提供的maven仓库 -->
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.12.0</flink.version>

</properties>

<dependencies>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

java

package test;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class SQLKafka {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动模式,根据数据源自行判断

        //构建Table环境
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, bsSettings);

        /*
        直接连接 kafka
        注意不要出现关键字,如:order等
         */
        TableResult table=stEnv.executeSql("CREATE TABLE t_order_input ( \n" +
                "  name STRING, \n" +
                "  price double, \n" +
                "  username STRING \n" +
                ") WITH (  \n" +
                "'connector' = 'kafka', \n"+//表示连接kafka
                "'topic' = 'user-order-input', \n"+//主题
                "'properties.bootstrap.servers' = 'hadoop1:9092', \n"+//服务器
                "'properties.group.id' = 'testGroup', \n"+//组
//                "'scan.startup.mode' = 'earliest-offset', "+//从最早位置消费
                "'scan.startup.mode' = 'latest-offset', \n"+//从最新位置消费
                "'format' = 'json' \n"+
                ")");
        /*
        创建结果表

         */
        TableResult sinkTable=stEnv.executeSql("CREATE TABLE t_order_output ( \n" +
                "  name STRING, \n" +
                "  price double \n" +
                ") WITH (  \n" +
                "'connector' = 'kafka', \n"+//表示连接kafka
                "'topic' = 'user-order-output', \n"+//主题
                "'properties.bootstrap.servers' = 'hadoop1:9092', \n"+//服务器
                "'properties.group.id' = 'testGroup', \n"+//组
//                "'scan.startup.mode' = 'earliest-offset', "+//从最早位置消费
                "'format' = 'json', \n"+
                "'sink.partitioner' = 'round-robin' \n"+//轮询分区
                ")");


        //执行查询
        Table res=stEnv.sqlQuery("select name,price from t_order_input where price>60");
        System.out.println("Schema信息:");
        //打印Schema
        res.printSchema();

        DataStream<Tuple2<Boolean, Row>> dsRes = stEnv.toRetractStream(res, Row.class);
        dsRes.print();

        stEnv.executeSql("insert into t_order_output select * from "+res);

        env.execute();
    }
}

注意: zookeeper 的主题名称不要有 ._,详见 链接

启动 kafka 环境

启动 hadoop1hadoop2hadoop3

启动zookeeper

登录 hadoop1,执行下面命令

cd /program/bin/
./zk.sh start

启动kafka

登录 hadoop1,执行下面命令

/program/bin/kafka.sh start

准备 kafka 主题

创建主题 user_order_input

flink 从该主题读取数据:

/program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-input

注意: zookeeper 的主题名称不要有 ._,详见 链接

创建主题 user-order-output

flink 将结果 发送到该主题:

/program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-output

启动 user-order-output 主题消费者

启动另一个窗口执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic user-order-output

用于查看数据

测试

启动 java 程序

通过 idea 启动 java程序

启动生产者 user-order-input,发送数据

启动另一个窗口执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic user-order-input

在上面的环境中,输入下面内容,发送数据(可一条一条发,也可多条发):

{"name":"java从入门到精通","price":50,"username":"李雷"}
{"name":"spark从入门到精通","price":80,"username":"李雷"}
{"name":"hadoop从入门到精通","price":90,"username":"韩梅梅"}
{"name":"flink从入门到精通","price":70,"username":"韩梅梅"}
{"name":"mysql从入门到精通","price":100,"username":"李雷"}
{"name":"hive从入门到精通","price":100,"username":"lucy"}
{"name":"html从入门到精通","price":120,"username":"lucy"}

观察结果

idea控制台输出结果如下:

Schema信息:
root
 |-- name: STRING
 |-- price: DOUBLE

2> (true,spark从入门到精通,80.0)
1> (true,hadoop从入门到精通,90.0)
2> (true,flink从入门到精通,70.0)

user-order-output消费者窗口

输出下面内容

{"name":"hive从入门到精通","price":100.0}

原文出处:https://malaoshi.top/show_1IX23f177SJ5.html