登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flink1.12.x Table API 和 SQL:kafka读写数据,上传到flink集群,flink run执行jar

上接:flink1.12.x Table API 和 SQL:从kafka读取数据,执行查询,将结果写入到kafka中

准备 flink-kafka 相关 jar 包

hadoop1

将下面 jar 包上传到 hadoop1 的 flink的lib目录下,/program/flink-1.12.5/lib/

  • flink-connector-kafka_2.12-1.12.0.jar
  • kafka-clients-2.4.1.jar

同步到 hadoop2

rsync -av /program/flink-1.12.5/lib root@hadoop2:/program/flink-1.12.5/

同步到 hadoop3

rsync -av /program/flink-1.12.5/lib root@hadoop3:/program/flink-1.12.5/

重启

cd /program/flink-1.12.5/bin/
./stop-cluster.sh
./start-cluster.sh

启动 kafka

启动 hadoop1hadoop2hadoop3

启动zookeeper

登录 hadoop1,执行下面命令

cd /program/bin/
./zk.sh start

启动kafka

登录 hadoop1,执行下面命令

/program/bin/kafka.sh start

准备 kafka 主题

创建主题 user_order_input

flink 从该主题读取数据:

/program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-input

注意: zookeeper 的主题名称不要有 ._,详见 链接

创建主题 user-order-output

flink 将结果 发送到该主题:

/program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-output

启动 user-order-output 主题消费者

启动另一个窗口执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic user-order-output

用于查看数据

测试

上传 程序 jar 包

将程序 jar 包上传到 /program/ 目录

启动 jar 命令

/program/flink-1.12.5/bin/flink run -c test.SQLKafka  /program/flink_table_sql-1.0-SNAPSHOT.jar

启动生产者 user-order-input,发送数据

启动另一个窗口执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic user-order-input

在上面的环境中,输入下面内容,发送数据(可一条一条发,也可多条发):

{"name":"java从入门到精通","price":50,"username":"李雷"}
{"name":"spark从入门到精通","price":80,"username":"李雷"}
{"name":"hadoop从入门到精通","price":90,"username":"韩梅梅"}
{"name":"flink从入门到精通","price":70,"username":"韩梅梅"}
{"name":"mysql从入门到精通","price":100,"username":"李雷"}
{"name":"hive从入门到精通","price":100,"username":"lucy"}
{"name":"html从入门到精通","price":120,"username":"lucy"}

观察结果

user-order-output消费者窗口

输出下面内容

{"name":"hive从入门到精通","price":100.0}

原文出处:https://malaoshi.top/show_1IX23lFFOBHr.html