介绍
Flink 处理数据后,可能要保存到 elasticsearch7 中
Flink 的 connector 就支持该功能,参见 链接
简单案例
Flink 从 kafka 读取数据,数据如下:
{"id":1000,"title":"elasticsearch从入门到精通5","price":199,"author":"李雷"}
{"id":1001,"title":"Flink从入门到精通5","price":32,"author":"韩梅梅"}
将这些数据保存到 elasticsearch 的 book
索引中
pom.xml 依赖
需要添加相关依赖:
- flink-connector-elasticsearch7_2.12
- log4j-core:flink-connector-elasticsearch7_2.12 依赖使用的是 log4j2,所以要加此依赖
- commons-lang:后面的高级案例中,用到此依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.12.0</flink.version>
</properties>
<dependencies>
<!--java-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- kafka Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<!-- flink-connector-elasticsearch7_2.12 使用log4j2 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
log4j.properties
# INFO 级别
log4j.rootLogger=INFO, stdout
# 配置 stdout,输出到控制台
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j2.xml
flink-connector-elasticsearch7_2.12 依赖使用的是 log4j2,所以要加此配置文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="error">
<!-- 先定义所有的appender -->
<appenders>
<!-- 这个输出控制台的配置 -->
<Console name="Console" target="SYSTEM_OUT">
<!-- 控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
<ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/>
<!-- 这个都知道是输出日志的格式 -->
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
</Console>
</appenders>
<!-- 然后定义logger,只有定义了logger并引入的appender,appender才会生效 -->
<loggers>
<!-- 建立一个默认的root的logger -->
<root level="trace">
<appender-ref ref="Console"/>
</root>
</loggers>
</configuration>
java 代码
package std;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.*;
public class Std2_简单实现 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义kafka相关配置项
Properties pro = new Properties();
pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092,hadoop3:9092");
pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置
FlinkKafkaConsumer<ObjectNode> fkc = new FlinkKafkaConsumer<>(
"kafka2flink",
new JSONKeyValueDeserializationSchema(false),
pro);
DataStreamSource<ObjectNode> dss = env.addSource(fkc);
/*
==================================================
构建 ElasticsearchSink
*/
// 指定 elasticsearch 集群ip、port
List<HttpHost> httpHosts = new java.util.ArrayList<>();
httpHosts.add(new HttpHost("hadoop2", 9200));
/*
构建 ElasticsearchSink
第一个形参:es集群ip、port
第二个形参:ElasticsearchSinkFunction接口实现类对象,处理数据,并向es发请求
*/
ElasticsearchSink.Builder<ObjectNode> sinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new MyElasticsearchSinkFunction()
);
/*
XXXXXXX 关键 XXXXXXXXXX
接收一个元素之后立即提交,否则这些元素将被缓存起来
必须加此代码,否则不会提交(详见提交配置)
*/
sinkBuilder.setBulkFlushMaxActions(1);
// 添加 sink
dss.addSink(sinkBuilder.build());
dss.print();
env.execute();
}
static class MyElasticsearchSinkFunction implements ElasticsearchSinkFunction<ObjectNode>{
/**
*
* @param s 输入的数据
* @param runtimeContext 运行时上下文
* @param requestIndexer 将 IndexRequest 对象(该对象封装要保存的索引、主键、数据)
* 添加到 requestIndexer 中,会自动向es发请求
*/
@Override
public void process(ObjectNode s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
// 处理数据
JsonNode data = s.get("value");
Map<String, Object> map = new HashMap();
Iterator<String> it = data.fieldNames();
while(it.hasNext()) {
String name = it.next();
if("id".equals(name)){
continue;
}
map.put(name,data.get(name).asText());
}
// 构建 IndexRequest 对象,该对象封装要保存的索引、主键、数据
IndexRequest ir = Requests.indexRequest()
.index("book") // 指定索引
.id(data.get("id").asText()) // 指定id
.source(map); // 指定数据
// 将 IndexRequest 对象 添加到 requestIndexer 中,会自动向es发请求
requestIndexer.add(ir);
}
}
}
启动zk
登录 hadoop1,执行下面命令:
/program/bin/zk.sh start
启动kafka
登录 hadoop1,执行下面命令:
/program/bin/kafka.sh start
启动 es7
cd /program/elasticsearch-7.9.3/bin/
切换 es 用户
注意:不能用 root
用户启动es
su es
前台启动
./elasticsearch
警告
启动时显示下面警告:
future versions of Elasticsearch will require Java 11; your Java version from [/program/jdk1.8.0_202/jre] does not meet this requirement
future versions of Elasticsearch will require Java 11; your Java version from [/program/jdk1.8.0_202/jre] does not meet this requirement
es7
用 jdk11
,在本机中配置的 jdk8
,所以提示警告信息,一般不影响使用
后台启动
./elasticsearch -d
退出 es 用户:
exit
启动 kibana7.9.x
详见链接:
https://www.malaoshi.top/show_1IX2PIWOumlt.html
创建 book 索引
PUT /book
{
"mappings": {
"properties": {
"title": { "type": "text" },
"author": { "type": "keyword" },
"price": { "type": "float" }
}
}
}
查询 book 索引中的数据
get /book/_search
启动 flink
在 idea 中运行 flink 程序
启动生产者、指定主题
登录 hadoop1,执行下面命令:
/program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic kafka2flink
然后输入:
{"id":1000,"title":"elasticsearch从入门到精通5","price":199,"author":"李雷"}
{"id":1001,"title":"Flink从入门到精通5","price":32,"author":"韩梅梅"}
执行结果
在 kibana7.9.x 查询 book 索引:
get /book/_search
可以看到新增加的2条数据
扩展配置
提交配置(关键)
关键,如果不配置此参数(三个之一即可),将不会提交
bulk.flush.max.actions: 缓存多少个数据,才提交
bulk.flush.max.size.mb: 缓存多少 MB 的数据,才提交
bulk.flush.interval.ms: 提交的时间间隔,单位 ms(毫秒),无视上面的两个批量写入配置
失败的配置:
bulk.flush.backoff.enable 用来表示是否开启重试机制
bulk.flush.backoff.type 重试策略,有两种:EXPONENTIAL 指数型(表示多次重试之间的时间间隔按照指数方式进行增长)、CONSTANT 常数型(表示多次重试之间的时间间隔为固定常数)
bulk.flush.backoff.delay 进行重试的时间间隔
bulk.flush.backoff.retries 失败重试的次数
FailureHandler 失败处理器
写入 ES 的时候会有这些情况会导致写入 ES 失败:
- ES 集群队列满了。es 队列容量默认大约是 200,并发 sink 数量 大于这个 200 ,那么就可能出现因为 es 队列满了而写入失败。
- ES 集群某个节点挂了
- ES 集群某个节点的磁盘满了
如果失败后,想继续让 es 写入的话就需要指定 FailureHandler
失败处理器
让失败处理器生效
使用EsSink的失败重试机制,则需要通过 Checkpoint 开启,执行下面代码来开启Flink任务对checkpoint的支持:
env.enableCheckpoint()
如果没有开启checkpoint机制的话,则失败重试策略是无法生效的
失败处理器实现
自带
IgnoringFailureHandler
:忽略失败,什么也不操作自带
NoOpFailureHandler
:将错误打印到控制台,然后抛出。默认处理器自带
RetryRejectedExecutionFailureHandler
:如果抛出EsRejectedExecutionException
异常,就重试自定义
ActionRequestFailureHandler
实现类
自定义例子
如下:
DataStream<String> input = ...;
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action);
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
throw failure;
}
}
}));
设置 es7 用户名密码
//设置用户名密码
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","admin"));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
);
高级案例(封装类)
ESSinkUtil
es sink 工具类,也是核心类
package std;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
public class ESSinkUtil {
/**
*
* @param addresses es网址
* @param bulkFlushMaxActions:缓存多少个数据,才提交
* @param bulkFlushMaxSizeMb:缓存多少 MB 的数据,才提交
* @param bulkFlushInterval:提交的时间间隔,单位 ms(毫秒)
* @param func sink函数
* @return
* @param <T>
*/
public static <T> ElasticsearchSink buildSink(List<HttpHost> addresses,
int bulkFlushMaxActions,
int bulkFlushMaxSizeMb,
long bulkFlushInterval,
ElasticsearchSinkFunction<T> func
) {
return buildSink(
addresses,
null,
null,
bulkFlushMaxActions,
bulkFlushMaxSizeMb,
bulkFlushInterval,
func,
null);
}
/**
*
* @param addresses es网址
* @param userName 用户名
* @param passwd 密码
* @param bulkFlushMaxActions:缓存多少个数据,才提交
* @param bulkFlushMaxSizeMb:缓存多少 MB 的数据,才提交
* @param bulkFlushInterval:提交的时间间隔,单位 ms(毫秒)
* @param func sink函数
* @param failureHandler 失败策略,可以使用 EsSinkFailureHandler
* @return
* @param <T>
*/
public static <T> ElasticsearchSink buildSink(List<HttpHost> addresses,
String userName,
String passwd,
int bulkFlushMaxActions,
int bulkFlushMaxSizeMb,
long bulkFlushInterval,
ElasticsearchSinkFunction<T> func,
ActionRequestFailureHandler failureHandler
) {
ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(addresses, func);
// 鉴权,正对写 es 需要密码的场景
if (StringUtils.isNotEmpty(userName) && StringUtils.isNotEmpty(passwd)) {
esSinkBuilder.setRestClientFactory(new EsSinkRestClientFactory(userName, passwd));
}
//失败处理策略
esSinkBuilder.setFailureHandler(failureHandler);
//接收一个元素之后立即提交,否则这些元素将被缓存起来
esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
esSinkBuilder.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);
esSinkBuilder.setBulkFlushInterval(bulkFlushInterval);
return esSinkBuilder.build();
}
/**
* 解析配置文件的 es hosts
*
* @param hosts:elasticsearch地址,支持集群地址:
* 格式1:http://192.168.1.100:9092,http://192.168.1.101:9092
* 格式2:192.168.1.100:9092,192.168.1.101:9092
* @return
* @throws MalformedURLException
*/
public static List<HttpHost> parseAddresses(String hosts) throws MalformedURLException {
String[] hostList = hosts.split(",");
List<HttpHost> addresses = new ArrayList<>();
for (String host : hostList) {
if (host.startsWith("http")) {
URL url = new URL(host);
addresses.add(new HttpHost(url.getHost(), url.getPort()));
} else {
String[] parts = host.split(":", 2);
if (parts.length > 1) {
addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
} else {
throw new MalformedURLException("invalid elasticsearch hosts format");
}
}
}
return addresses;
}
}
EsSinkRestClientFactory
该类实现:Flink 连接 es 需要指定用户名、密码
package std;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;
public class EsSinkRestClientFactory implements RestClientFactory {
private String userName;
private String password;
public EsSinkRestClientFactory(String userName, String password) {
this.userName = userName;
this.password = password;
}
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
builder.setConnectTimeout(5000);
builder.setSocketTimeout(60000);
builder.setConnectionRequestTimeout(2000);
return builder;
}
});
}
}
EsSinkFailureHandler
失败的处理方式,本案例未使用该类
package std;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.ExceptionUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
public class EsSinkFailureHandler implements ActionRequestFailureHandler {
@Override
public void onFailure(
ActionRequest actionRequest,
Throwable throwable,
int restStatusCode,
RequestIndexer requestIndexer
) throws Throwable {
if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
// 如果是es拒绝异常,那么就重试
requestIndexer.add(actionRequest);
} else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {
// 格式错误的,直接丢弃,不做处理
} else {
throw throwable;
}
}
}
Flink 类
package std;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.*;
public class Std3_封装 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义kafka相关配置项
Properties pro = new Properties();
pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092,hadoop3:9092");
pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置
FlinkKafkaConsumer<ObjectNode> fkc = new FlinkKafkaConsumer<>(
"kafka2flink",
new JSONKeyValueDeserializationSchema(false),
pro);
DataStreamSource<ObjectNode> dss = env.addSource(fkc);
/*
==================================================
构建 ElasticsearchSink
*/
List<HttpHost> httpHosts = ESSinkUtil.parseAddresses("hadoop2:9200");
ElasticsearchSink elasticsearchSink = ESSinkUtil.buildSink(
httpHosts,
10,
5,
10000,
new MyElasticsearchSinkFunction()
);
dss.print();
dss.addSink(elasticsearchSink);
env.execute();
}
static class MyElasticsearchSinkFunction implements ElasticsearchSinkFunction<ObjectNode>{
/**
*
* @param s 输入的数据
* @param runtimeContext 运行时上下文
* @param requestIndexer 将 IndexRequest 对象(该对象封装要保存的索引、主键、数据)
* 添加到 requestIndexer 中,会自动向es发请求
*/
@Override
public void process(ObjectNode s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
// 处理数据
JsonNode data = s.get("value");
Map<String, Object> map = new HashMap();
Iterator<String> it = data.fieldNames();
while(it.hasNext()) {
String name = it.next();
if("id".equals(name)){
continue;
}
map.put(name,data.get(name).asText());
}
// 构建 IndexRequest 对象,该对象封装要保存的索引、主键、数据
IndexRequest ir = Requests.indexRequest()
.index("book") // 指定索引
.id(data.get("id").asText()) // 指定id
.source(map); // 指定数据
// 将 IndexRequest 对象 添加到 requestIndexer 中,会自动向es发请求
requestIndexer.add(ir);
}
}
}
发送数据
在 kafka 生产者输入:
{"id":1002,"title":"hadoop从入门到精通","price":129,"author":"lucy"}
{"id":1003,"title":"hive从入门到精通","price":50,"author":"lili"}
执行结果
在 kibana7.9.x 查询 book 索引:
get /book/_search
可以看到新增加的2条数据
参考:
https://xie.infoq.cn/article/318acc75866d2e6eb5bfdbfc1
https://blog.csdn.net/xiaoc100200/article/details/111401744
https://blog.csdn.net/Vector97/article/details/118217813
https://www.modb.pro/db/377574
https://cloud.tencent.com/developer/article/1979189