登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flink1.12.x Table API 和 SQL:从kafka读取数据,写入到elasticsearch,上传flink集群,flink run执行jar

上接:flink1.12.x Table API 和 SQL:从kafka读取数据,聚合查询,写入到elasticsearch

打包

必须修改 flink-table-planner-blink_2.12 依赖,修改为 <scope>provided</scope>

完整如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

否则报错

idea 调试

必须修改 flink-table-planner-blink_2.12 依赖,把 <scope>provided</scope> 去掉

完整如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

否则报错

完整 pom.xml

    <repositories>
        <!-- 发布 CDH 的公司提供的maven仓库 -->
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.12.0</flink.version>

    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!--    <dependency>-->
        <!--        <groupId>org.apache.flink</groupId>-->
        <!--        <artifactId>flink-table-api-java</artifactId>-->
        <!--        <version>${flink.version}</version>-->
        <!--    </dependency>-->

        <!--    <dependency>-->
        <!--        <groupId>org.apache.flink</groupId>-->
        <!--        <artifactId>flink-streaming-scala_2.12</artifactId>-->
        <!--        <version>${flink.version}</version>-->
        <!--        <scope>provided</scope>-->
        <!--    </dependency>-->


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
            <!--        <scope>provided</scope>-->
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--Java 开发-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
            <!--        <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
            <!--        <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- kafka Dependency -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- file formats Dependency -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!-- tools Dependency -->
        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.10</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
            <scope>runtime</scope>
        </dependency>


    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.2</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 设置jar包的入口类(可选) -->
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

打包

双击 maven 的 package 生成 jar包

修改 flink-kafka 服务器

上传相关 jar 包(关键)

登录 hadoop

将下面 jar 包上传到 hadoop1 的 flink的lib目录下,/program/flink-1.12.5/lib/

  • flink-sql-connector-kafka_2.12-1.12.0.jar

修改配置文件(关键)

登录 hadoop

执行下面命令:

vim /program/flink-1.12.5/conf/flink-conf.yaml

找到 classloader.resolve-order

在下面增加下面内容:

classloader.resolve-order: parent-first

同步到 hadoop2

rsync -av /program/flink-1.12.5/lib root@hadoop2:/program/flink-1.12.5/
rsync -av /program/flink-1.12.5/conf root@hadoop2:/program/flink-1.12.5/

同步到 hadoop3

rsync -av /program/flink-1.12.5/lib root@hadoop3:/program/flink-1.12.5/
rsync -av /program/flink-1.12.5/conf root@hadoop3:/program/flink-1.12.5/

重启

cd /program/flink-1.12.5/bin/
./stop-cluster.sh
./start-cluster.sh

启动 kafka 环境

启动 hadoop1hadoop2hadoop3

启动zookeeper

登录 hadoop1,执行下面命令

/program/bin/zk.sh start

启动kafka

登录 hadoop1,执行下面命令

/program/bin/kafka.sh start

准备 kafka 主题

创建主题 user_order_input

flink 从该主题读取数据:

/program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-input

注意: zookeeper 的主题名称不要有 ._,详见 链接

创建主题 user-order-output

flink 将结果 发送到该主题:

/program/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 2 --partitions 3 --topic user-order-output

启动 user-order-output 主题消费者

启动另一个窗口执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic user-order-output

用于查看数据

测试

上传 程序 jar 包

将程序 jar 包上传到 /program/ 目录

启动 jar 命令

/program/flink-1.12.5/bin/flink run -c test.Kafka2Es  /program/flink_table_sql-1.0-SNAPSHOT.jar

启动生产者 user-order-input,发送数据

启动另一个窗口执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic user-order-input

在上面的环境中,输入下面内容,发送数据(可一条一条发,也可多条发):

{"name":"java从入门到精通","price":50,"username":"李雷"}
{"name":"spark从入门到精通","price":80,"username":"李雷"}
{"name":"hadoop从入门到精通","price":90,"username":"韩梅梅"}
{"name":"flink从入门到精通","price":70,"username":"韩梅梅"}
{"name":"mysql从入门到精通","price":100,"username":"李雷"}
{"name":"hive从入门到精通","price":100,"username":"lucy"}
{"name":"html从入门到精通","price":120,"username":"lucy"}

观察 es

输出下面内容

GET /user_order/_search

显示结果如下:

{
    "took":3,
    "timed_out":false,
    "_shards":{
        "total":1,
        "successful":1,
        "skipped":0,
        "failed":0
    },
    "hits":{
        "total":{
            "value":3,
            "relation":"eq"
        },
        "max_score":1,
        "hits":[
            {
                "_index":"user_order",
                "_type":"_doc",
                "_id":"韩梅梅",
                "_score":1,
                "_source":{
                    "name":"韩梅梅",
                    "price":80
                }
            },
            {
                "_index":"user_order",
                "_type":"_doc",
                "_id":"李雷",
                "_score":1,
                "_source":{
                    "name":"李雷",
                    "price":100
                }
            },
            {
                "_index":"user_order",
                "_type":"_doc",
                "_id":"lucy",
                "_score":1,
                "_source":{
                    "name":"lucy",
                    "price":110
                }
            }
        ]
    }
}

感谢:
https://blog.csdn.net/DH2442897094/article/details/120220852


原文出处:https://malaoshi.top/show_1IX24XVDG1Bf.html