flink1.12.x sink-发送数据到elasticsearch7 作者:马育民 • 2023-01-19 22:23 • 阅读:10269 # 介绍 Flink 处理数据后,可能要保存到 elasticsearch7 中 Flink 的 connector 就支持该功能,参见 [链接](https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/elasticsearch.html "链接") # 简单案例 Flink 从 kafka 读取数据,数据如下: ``` {"id":1000,"title":"elasticsearch从入门到精通5","price":199,"author":"李雷"} {"id":1001,"title":"Flink从入门到精通5","price":32,"author":"韩梅梅"} ``` 将这些数据保存到 elasticsearch 的 `book` 索引中 ### pom.xml 依赖 需要添加相关依赖: - flink-connector-elasticsearch7_2.12 - log4j-core:flink-connector-elasticsearch7_2.12 依赖使用的是 log4j2,所以要加此依赖 - commons-lang:后面的高级案例中,用到此依赖 ``` UTF-8 1.8 1.8 1.12.0 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_2.12 ${flink.version} org.apache.flink flink-clients_2.12 ${flink.version} org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-log4j12 1.7.30 log4j log4j 1.2.17 org.apache.flink flink-connector-kafka_2.12 ${flink.version} org.apache.flink flink-connector-elasticsearch7_2.12 1.12.0 org.apache.logging.log4j log4j-core 2.12.1 commons-lang commons-lang 2.6 ``` ### log4j.properties ``` # INFO 级别 log4j.rootLogger=INFO, stdout # 配置 stdout,输出到控制台 log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n ``` ### log4j2.xml flink-connector-elasticsearch7_2.12 依赖使用的是 log4j2,所以要加此配置文件 ``` ``` ### java 代码 ``` package std; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.http.HttpHost; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.*; public class Std2_简单实现 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 定义kafka相关配置项 Properties pro = new Properties(); pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092,hadoop3:9092"); pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group"); pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置 FlinkKafkaConsumer fkc = new FlinkKafkaConsumer<>( "kafka2flink", new JSONKeyValueDeserializationSchema(false), pro); DataStreamSource dss = env.addSource(fkc); /* ================================================== 构建 ElasticsearchSink */ // 指定 elasticsearch 集群ip、port List httpHosts = new java.util.ArrayList<>(); httpHosts.add(new HttpHost("hadoop2", 9200)); /* 构建 ElasticsearchSink 第一个形参:es集群ip、port 第二个形参:ElasticsearchSinkFunction接口实现类对象,处理数据,并向es发请求 */ ElasticsearchSink.Builder sinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new MyElasticsearchSinkFunction() ); /* XXXXXXX 关键 XXXXXXXXXX 接收一个元素之后立即提交,否则这些元素将被缓存起来 必须加此代码,否则不会提交(详见提交配置) */ sinkBuilder.setBulkFlushMaxActions(1); // 添加 sink dss.addSink(sinkBuilder.build()); dss.print(); env.execute(); } static class MyElasticsearchSinkFunction implements ElasticsearchSinkFunction{ /** * * @param s 输入的数据 * @param runtimeContext 运行时上下文 * @param requestIndexer 将 IndexRequest 对象(该对象封装要保存的索引、主键、数据) * 添加到 requestIndexer 中,会自动向es发请求 */ @Override public void process(ObjectNode s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { // 处理数据 JsonNode data = s.get("value"); Map map = new HashMap(); Iterator it = data.fieldNames(); while(it.hasNext()) { String name = it.next(); if("id".equals(name)){ continue; } map.put(name,data.get(name).asText()); } // 构建 IndexRequest 对象,该对象封装要保存的索引、主键、数据 IndexRequest ir = Requests.indexRequest() .index("book") // 指定索引 .id(data.get("id").asText()) // 指定id .source(map); // 指定数据 // 将 IndexRequest 对象 添加到 requestIndexer 中,会自动向es发请求 requestIndexer.add(ir); } } } ``` # 启动zk 登录 hadoop1,执行下面命令: ``` /program/bin/zk.sh start ``` # 启动kafka 登录 hadoop1,执行下面命令: ``` /program/bin/kafka.sh start ``` # 启动 es7 ``` cd /program/elasticsearch-7.9.3/bin/ ``` ### 切换 es 用户 **注意:**不能用 `root` 用户启动es ``` su es ``` ### 前台启动 ``` ./elasticsearch ``` ### 警告 启动时显示下面警告: ``` future versions of Elasticsearch will require Java 11; your Java version from [/program/jdk1.8.0_202/jre] does not meet this requirement future versions of Elasticsearch will require Java 11; your Java version from [/program/jdk1.8.0_202/jre] does not meet this requirement ``` `es7` 用 `jdk11`,在本机中配置的 `jdk8` ,所以提示警告信息,一般不影响使用 ### 后台启动 ``` ./elasticsearch -d ``` ### 退出 es 用户: ``` exit ``` # 启动 kibana7.9.x 详见链接: https://www.malaoshi.top/show_1IX2PIWOumlt.html ### 创建 book 索引 ``` PUT /book { "mappings": { "properties": { "title": { "type": "text" }, "author": { "type": "keyword" }, "price": { "type": "float" } } } } ``` ### 查询 book 索引中的数据 ``` get /book/_search ``` # 启动 flink 在 idea 中运行 flink 程序 ### 启动生产者、指定主题 登录 hadoop1,执行下面命令: ``` /program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic kafka2flink ``` 然后输入: ``` {"id":1000,"title":"elasticsearch从入门到精通5","price":199,"author":"李雷"} {"id":1001,"title":"Flink从入门到精通5","price":32,"author":"韩梅梅"} ``` ### 执行结果 在 kibana7.9.x 查询 book 索引: ``` get /book/_search ``` 可以看到新增加的2条数据 # 扩展配置 ### 提交配置(关键) 关键,如果不配置此参数(三个之一即可),将不会提交 - bulk.flush.max.actions: 缓存多少个数据,才提交 - bulk.flush.max.size.mb: 缓存多少 MB 的数据,才提交 - bulk.flush.interval.ms: 提交的时间间隔,单位 ms(毫秒),无视上面的两个批量写入配置 失败的配置: - bulk.flush.backoff.enable 用来表示是否开启重试机制 - bulk.flush.backoff.type 重试策略,有两种:EXPONENTIAL 指数型(表示多次重试之间的时间间隔按照指数方式进行增长)、CONSTANT 常数型(表示多次重试之间的时间间隔为固定常数) - bulk.flush.backoff.delay 进行重试的时间间隔 - bulk.flush.backoff.retries 失败重试的次数 # FailureHandler 失败处理器 写入 ES 的时候会有这些情况会导致写入 ES 失败: - ES 集群队列满了。es 队列容量默认大约是 200,并发 sink 数量 大于这个 200 ,那么就可能出现因为 es 队列满了而写入失败。 - ES 集群某个节点挂了 - ES 集群某个节点的磁盘满了 如果失败后,想继续让 es 写入的话就需要指定 `FailureHandler` 失败处理器 ### 让失败处理器生效 使用EsSink的失败重试机制,则需要通过 Checkpoint 开启,执行下面代码来开启Flink任务对checkpoint的支持: ``` env.enableCheckpoint() ``` 如果没有开启checkpoint机制的话,**则失败重试策略是无法生效的** ### 失败处理器实现 - 自带 `IgnoringFailureHandler`:忽略失败,什么也不操作 - 自带 `NoOpFailureHandler`:将错误打印到控制台,然后抛出。**默认处理器** - 自带 `RetryRejectedExecutionFailureHandler`:如果抛出 `EsRejectedExecutionException` 异常,就重试 - 自定义 `ActionRequestFailureHandler` 实现类 ### 自定义例子 如下: ``` DataStream input = ...; input.addSink(new ElasticsearchSink<>( config, transportAddresses, new ElasticsearchSinkFunction() {...}, new ActionRequestFailureHandler() { @Override void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throw Throwable { if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { // full queue; re-add document for indexing indexer.add(action); } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) { // malformed document; simply drop request without failing sink } else { // for all other failures, fail the sink // here the failure is simply rethrown, but users can also choose to throw custom exceptions throw failure; } } })); ``` # 设置 es7 用户名密码 ``` //设置用户名密码 esSinkBuilder.setRestClientFactory( restClientBuilder -> { restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","admin")); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }); } ); ``` # 高级案例(封装类) ### ESSinkUtil es sink 工具类,也是核心类 ``` package std; import org.apache.commons.lang.StringUtils; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.http.HttpHost; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.List; public class ESSinkUtil { /** * * @param addresses es网址 * @param bulkFlushMaxActions:缓存多少个数据,才提交 * @param bulkFlushMaxSizeMb:缓存多少 MB 的数据,才提交 * @param bulkFlushInterval:提交的时间间隔,单位 ms(毫秒) * @param func sink函数 * @return * @param */ public static ElasticsearchSink buildSink(List addresses, int bulkFlushMaxActions, int bulkFlushMaxSizeMb, long bulkFlushInterval, ElasticsearchSinkFunction func ) { return buildSink( addresses, null, null, bulkFlushMaxActions, bulkFlushMaxSizeMb, bulkFlushInterval, func, null); } /** * * @param addresses es网址 * @param userName 用户名 * @param passwd 密码 * @param bulkFlushMaxActions:缓存多少个数据,才提交 * @param bulkFlushMaxSizeMb:缓存多少 MB 的数据,才提交 * @param bulkFlushInterval:提交的时间间隔,单位 ms(毫秒) * @param func sink函数 * @param failureHandler 失败策略,可以使用 EsSinkFailureHandler * @return * @param */ public static ElasticsearchSink buildSink(List addresses, String userName, String passwd, int bulkFlushMaxActions, int bulkFlushMaxSizeMb, long bulkFlushInterval, ElasticsearchSinkFunction func, ActionRequestFailureHandler failureHandler ) { ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(addresses, func); // 鉴权,正对写 es 需要密码的场景 if (StringUtils.isNotEmpty(userName) && StringUtils.isNotEmpty(passwd)) { esSinkBuilder.setRestClientFactory(new EsSinkRestClientFactory(userName, passwd)); } //失败处理策略 esSinkBuilder.setFailureHandler(failureHandler); //接收一个元素之后立即提交,否则这些元素将被缓存起来 esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions); esSinkBuilder.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb); esSinkBuilder.setBulkFlushInterval(bulkFlushInterval); return esSinkBuilder.build(); } /** * 解析配置文件的 es hosts * * @param hosts:elasticsearch地址,支持集群地址: * 格式1:http://192.168.1.100:9092,http://192.168.1.101:9092 * 格式2:192.168.1.100:9092,192.168.1.101:9092 * @return * @throws MalformedURLException */ public static List parseAddresses(String hosts) throws MalformedURLException { String[] hostList = hosts.split(","); List addresses = new ArrayList<>(); for (String host : hostList) { if (host.startsWith("http")) { URL url = new URL(host); addresses.add(new HttpHost(url.getHost(), url.getPort())); } else { String[] parts = host.split(":", 2); if (parts.length > 1) { addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1]))); } else { throw new MalformedURLException("invalid elasticsearch hosts format"); } } } return addresses; } } ``` ### EsSinkRestClientFactory 该类实现:Flink 连接 es 需要指定用户名、密码 ``` package std; import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClientBuilder; public class EsSinkRestClientFactory implements RestClientFactory { private String userName; private String password; public EsSinkRestClientFactory(String userName, String password) { this.userName = userName; this.password = password; } @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { @Override public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) { builder.setConnectTimeout(5000); builder.setSocketTimeout(60000); builder.setConnectionRequestTimeout(2000); return builder; } }); } } ``` ### EsSinkFailureHandler 失败的处理方式,本案例未使用该类 ``` package std; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.util.ExceptionUtils; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; public class EsSinkFailureHandler implements ActionRequestFailureHandler { @Override public void onFailure( ActionRequest actionRequest, Throwable throwable, int restStatusCode, RequestIndexer requestIndexer ) throws Throwable { if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) { // 如果是es拒绝异常,那么就重试 requestIndexer.add(actionRequest); } else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) { // 格式错误的,直接丢弃,不做处理 } else { throw throwable; } } } ``` ### Flink 类 ``` package std; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.http.HttpHost; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.*; public class Std3_封装 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 定义kafka相关配置项 Properties pro = new Properties(); pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092,hadoop3:9092"); pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group"); pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置 FlinkKafkaConsumer fkc = new FlinkKafkaConsumer<>( "kafka2flink", new JSONKeyValueDeserializationSchema(false), pro); DataStreamSource dss = env.addSource(fkc); /* ================================================== 构建 ElasticsearchSink */ List httpHosts = ESSinkUtil.parseAddresses("hadoop2:9200"); ElasticsearchSink elasticsearchSink = ESSinkUtil.buildSink( httpHosts, 10, 5, 10000, new MyElasticsearchSinkFunction() ); dss.print(); dss.addSink(elasticsearchSink); env.execute(); } static class MyElasticsearchSinkFunction implements ElasticsearchSinkFunction{ /** * * @param s 输入的数据 * @param runtimeContext 运行时上下文 * @param requestIndexer 将 IndexRequest 对象(该对象封装要保存的索引、主键、数据) * 添加到 requestIndexer 中,会自动向es发请求 */ @Override public void process(ObjectNode s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { // 处理数据 JsonNode data = s.get("value"); Map map = new HashMap(); Iterator it = data.fieldNames(); while(it.hasNext()) { String name = it.next(); if("id".equals(name)){ continue; } map.put(name,data.get(name).asText()); } // 构建 IndexRequest 对象,该对象封装要保存的索引、主键、数据 IndexRequest ir = Requests.indexRequest() .index("book") // 指定索引 .id(data.get("id").asText()) // 指定id .source(map); // 指定数据 // 将 IndexRequest 对象 添加到 requestIndexer 中,会自动向es发请求 requestIndexer.add(ir); } } } ``` ### 发送数据 在 kafka 生产者输入: ``` {"id":1002,"title":"hadoop从入门到精通","price":129,"author":"lucy"} {"id":1003,"title":"hive从入门到精通","price":50,"author":"lili"} ``` ### 执行结果 在 kibana7.9.x 查询 book 索引: ``` get /book/_search ``` 可以看到新增加的2条数据 参考: https://xie.infoq.cn/article/318acc75866d2e6eb5bfdbfc1 https://blog.csdn.net/xiaoc100200/article/details/111401744 https://blog.csdn.net/Vector97/article/details/118217813 https://www.modb.pro/db/377574 https://cloud.tencent.com/developer/article/1979189 原文出处:http://malaoshi.top/show_1IX4oFZ2ydza.html