flink1.12.x sink-发送数据到elasticsearch7

介绍

Flink 处理数据后,可能要保存到 elasticsearch7 中

Flink 的 connector 就支持该功能,参见 链接

简单案例

Flink 从 kafka 读取数据,数据如下:

{"id":1000,"title":"elasticsearch从入门到精通5","price":199,"author":"李雷"}
{"id":1001,"title":"Flink从入门到精通5","price":32,"author":"韩梅梅"}

将这些数据保存到 elasticsearch 的 book 索引中

pom.xml 依赖

需要添加相关依赖:

  • flink-connector-elasticsearch7_2.12
  • log4j-core:flink-connector-elasticsearch7_2.12 依赖使用的是 log4j2,所以要加此依赖
  • commons-lang:后面的高级案例中,用到此依赖
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.12.0</flink.version>
</properties>

<dependencies>
    <!--java-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- kafka Dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <!-- flink-connector-elasticsearch7_2.12 使用log4j2 -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.12.1</version>
    </dependency>
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>
</dependencies>

log4j.properties

# INFO 级别
log4j.rootLogger=INFO, stdout
# 配置 stdout,输出到控制台
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j2.xml

flink-connector-elasticsearch7_2.12 依赖使用的是 log4j2,所以要加此配置文件

<?xml version="1.0" encoding="UTF-8"?>
<configuration status="error">
    <!--     先定义所有的appender -->
    <appenders>
        <!--         这个输出控制台的配置 -->
        <Console name="Console" target="SYSTEM_OUT">
            <!--             控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/>
            <!--             这个都知道是输出日志的格式 -->
            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
        </Console>
    </appenders>
    <!--     然后定义logger,只有定义了logger并引入的appender,appender才会生效 -->
    <loggers>
        <!--         建立一个默认的root的logger -->
        <root level="trace">
            <appender-ref ref="Console"/>
        </root>
    </loggers>
</configuration>

java 代码

package std;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.*;

public class Std2_简单实现 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 定义kafka相关配置项
        Properties pro = new Properties();
        pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
        pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置
        FlinkKafkaConsumer<ObjectNode> fkc = new FlinkKafkaConsumer<>(
                "kafka2flink",
                new JSONKeyValueDeserializationSchema(false),
                pro);
        DataStreamSource<ObjectNode> dss = env.addSource(fkc);

        /*
         ==================================================
         构建 ElasticsearchSink
         */
        // 指定 elasticsearch 集群ip、port
        List<HttpHost> httpHosts = new java.util.ArrayList<>();
        httpHosts.add(new HttpHost("hadoop2", 9200));

        /*
         构建 ElasticsearchSink
         第一个形参:es集群ip、port
         第二个形参:ElasticsearchSinkFunction接口实现类对象,处理数据,并向es发请求
         */
        ElasticsearchSink.Builder<ObjectNode> sinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new MyElasticsearchSinkFunction()
        );
        /*
         XXXXXXX  关键  XXXXXXXXXX
         接收一个元素之后立即提交,否则这些元素将被缓存起来
         必须加此代码,否则不会提交(详见提交配置)
         */
        sinkBuilder.setBulkFlushMaxActions(1);
        // 添加 sink
        dss.addSink(sinkBuilder.build());
        dss.print();
        env.execute();
    }

    static class MyElasticsearchSinkFunction implements ElasticsearchSinkFunction<ObjectNode>{
        /**
         *
         * @param s 输入的数据
         * @param runtimeContext     运行时上下文
         * @param requestIndexer 将 IndexRequest 对象(该对象封装要保存的索引、主键、数据)
         *                      添加到 requestIndexer 中,会自动向es发请求
         */
        @Override
        public void process(ObjectNode s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
            // 处理数据
            JsonNode data = s.get("value");
            Map<String, Object> map = new HashMap();
            Iterator<String> it = data.fieldNames();
            while(it.hasNext()) {
                String name = it.next();
                if("id".equals(name)){
                    continue;
                }
                map.put(name,data.get(name).asText());
            }

            // 构建 IndexRequest 对象,该对象封装要保存的索引、主键、数据
            IndexRequest ir = Requests.indexRequest()
                    .index("book") // 指定索引
                    .id(data.get("id").asText()) // 指定id
                    .source(map); // 指定数据
            // 将 IndexRequest 对象 添加到 requestIndexer 中,会自动向es发请求
            requestIndexer.add(ir);
        }
    }
}

启动zk

登录 hadoop1,执行下面命令:

/program/bin/zk.sh start

启动kafka

登录 hadoop1,执行下面命令:

/program/bin/kafka.sh start

启动 es7

cd /program/elasticsearch-7.9.3/bin/

切换 es 用户

注意:不能用 root 用户启动es

su es

前台启动

./elasticsearch

警告

启动时显示下面警告:

future versions of Elasticsearch will require Java 11; your Java version from [/program/jdk1.8.0_202/jre] does not meet this requirement
future versions of Elasticsearch will require Java 11; your Java version from [/program/jdk1.8.0_202/jre] does not meet this requirement

es7jdk11,在本机中配置的 jdk8 ,所以提示警告信息,一般不影响使用

后台启动

./elasticsearch -d

退出 es 用户:

exit

启动 kibana7.9.x

详见链接:
https://www.malaoshi.top/show_1IX2PIWOumlt.html

创建 book 索引

PUT /book
{
  "mappings": {
    "properties": {
      "title":    { "type": "text" },  
      "author":   { "type": "keyword"  },
      "price":   { "type": "float"  }
    }
  }
}

查询 book 索引中的数据

get /book/_search

启动 flink

在 idea 中运行 flink 程序

启动生产者、指定主题

登录 hadoop1,执行下面命令:

/program/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic kafka2flink

然后输入:

{"id":1000,"title":"elasticsearch从入门到精通5","price":199,"author":"李雷"}
{"id":1001,"title":"Flink从入门到精通5","price":32,"author":"韩梅梅"}

执行结果

在 kibana7.9.x 查询 book 索引:

get /book/_search

可以看到新增加的2条数据

扩展配置

提交配置(关键)

关键,如果不配置此参数(三个之一即可),将不会提交

  • bulk.flush.max.actions: 缓存多少个数据,才提交

  • bulk.flush.max.size.mb: 缓存多少 MB 的数据,才提交

  • bulk.flush.interval.ms: 提交的时间间隔,单位 ms(毫秒),无视上面的两个批量写入配置

失败的配置:

  • bulk.flush.backoff.enable 用来表示是否开启重试机制

  • bulk.flush.backoff.type 重试策略,有两种:EXPONENTIAL 指数型(表示多次重试之间的时间间隔按照指数方式进行增长)、CONSTANT 常数型(表示多次重试之间的时间间隔为固定常数)

  • bulk.flush.backoff.delay 进行重试的时间间隔

  • bulk.flush.backoff.retries 失败重试的次数

FailureHandler 失败处理器

写入 ES 的时候会有这些情况会导致写入 ES 失败:

  • ES 集群队列满了。es 队列容量默认大约是 200,并发 sink 数量 大于这个 200 ,那么就可能出现因为 es 队列满了而写入失败。
  • ES 集群某个节点挂了
  • ES 集群某个节点的磁盘满了

如果失败后,想继续让 es 写入的话就需要指定 FailureHandler 失败处理器

让失败处理器生效

使用EsSink的失败重试机制,则需要通过 Checkpoint 开启,执行下面代码来开启Flink任务对checkpoint的支持:

env.enableCheckpoint()

如果没有开启checkpoint机制的话,则失败重试策略是无法生效的

失败处理器实现

  • 自带 IgnoringFailureHandler:忽略失败,什么也不操作

  • 自带 NoOpFailureHandler:将错误打印到控制台,然后抛出。默认处理器

  • 自带 RetryRejectedExecutionFailureHandler:如果抛出 EsRejectedExecutionException 异常,就重试

  • 自定义 ActionRequestFailureHandler 实现类

自定义例子

如下:

DataStream<String> input = ...;

input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction<String>() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) throw Throwable {

            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action);
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                throw failure;
            }
        }
}));

设置 es7 用户名密码

//设置用户名密码
esSinkBuilder.setRestClientFactory(
        restClientBuilder -> {
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","admin"));
                    return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
            });
        }
);

高级案例(封装类)

ESSinkUtil

es sink 工具类,也是核心类

package std;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;


import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

public class ESSinkUtil {

    /**
     *
     * @param addresses es网址
     * @param bulkFlushMaxActions:缓存多少个数据,才提交
     * @param bulkFlushMaxSizeMb:缓存多少 MB 的数据,才提交
     * @param bulkFlushInterval:提交的时间间隔,单位 ms(毫秒)
     * @param func sink函数
     * @return
     * @param <T>
     */
    public static <T> ElasticsearchSink buildSink(List<HttpHost> addresses,
                                                  int bulkFlushMaxActions,
                                                  int bulkFlushMaxSizeMb,
                                                  long bulkFlushInterval,
                                                  ElasticsearchSinkFunction<T> func
    ) {

        return buildSink(
                addresses,
                null,
                null,
                bulkFlushMaxActions,
                bulkFlushMaxSizeMb,
                bulkFlushInterval,
                func,
                null);
    }

    /**
     *
     * @param addresses es网址
     * @param userName 用户名
     * @param passwd 密码
     * @param bulkFlushMaxActions:缓存多少个数据,才提交
     * @param bulkFlushMaxSizeMb:缓存多少 MB 的数据,才提交
     * @param bulkFlushInterval:提交的时间间隔,单位 ms(毫秒)
     * @param func sink函数
     * @param failureHandler 失败策略,可以使用 EsSinkFailureHandler
     * @return
     * @param <T>
     */
    public static <T> ElasticsearchSink buildSink(List<HttpHost> addresses,
                                                  String userName,
                                                  String passwd,
                                                  int bulkFlushMaxActions,
                                                  int bulkFlushMaxSizeMb,
                                                  long bulkFlushInterval,
                                                  ElasticsearchSinkFunction<T> func,
                                                  ActionRequestFailureHandler failureHandler
    ) {

        ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(addresses, func);

        // 鉴权,正对写 es 需要密码的场景
        if (StringUtils.isNotEmpty(userName) && StringUtils.isNotEmpty(passwd)) {
            esSinkBuilder.setRestClientFactory(new EsSinkRestClientFactory(userName, passwd));
        }


        //失败处理策略
        esSinkBuilder.setFailureHandler(failureHandler);

        //接收一个元素之后立即提交,否则这些元素将被缓存起来
        esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
        esSinkBuilder.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);
        esSinkBuilder.setBulkFlushInterval(bulkFlushInterval);

        return esSinkBuilder.build();
    }


    /**
     * 解析配置文件的 es hosts
     *
     * @param hosts:elasticsearch地址,支持集群地址:
     *             格式1:http://192.168.1.100:9092,http://192.168.1.101:9092
     *             格式2:192.168.1.100:9092,192.168.1.101:9092
     * @return
     * @throws MalformedURLException
     */

    public static List<HttpHost> parseAddresses(String hosts) throws MalformedURLException {
        String[] hostList = hosts.split(",");
        List<HttpHost> addresses = new ArrayList<>();
        for (String host : hostList) {
            if (host.startsWith("http")) {
                URL url = new URL(host);
                addresses.add(new HttpHost(url.getHost(), url.getPort()));
            } else {
                String[] parts = host.split(":", 2);
                if (parts.length > 1) {
                    addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
                } else {
                    throw new MalformedURLException("invalid elasticsearch hosts format");
                }
            }
        }

        return addresses;

    }
}

EsSinkRestClientFactory

该类实现:Flink 连接 es 需要指定用户名、密码

package std;

import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;

public class EsSinkRestClientFactory implements RestClientFactory {

    private String userName;

    private String password;

    public EsSinkRestClientFactory(String userName, String password) {
        this.userName = userName;
        this.password = password;
    }

    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {

        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }

        }).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {

            @Override

            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
                builder.setConnectTimeout(5000);
                builder.setSocketTimeout(60000);
                builder.setConnectionRequestTimeout(2000);
                return builder;

            }

        });

    }
}

EsSinkFailureHandler

失败的处理方式,本案例未使用该类

package std;

import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.ExceptionUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

public class EsSinkFailureHandler implements ActionRequestFailureHandler {
    @Override
    public void onFailure(
            ActionRequest actionRequest,
            Throwable throwable,
            int restStatusCode,
            RequestIndexer requestIndexer
    ) throws Throwable {

        if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
            // 如果是es拒绝异常,那么就重试
            requestIndexer.add(actionRequest);
        } else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {
            // 格式错误的,直接丢弃,不做处理
        } else {

            throw throwable;

        }

    }
}
package std;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.*;

public class Std3_封装 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 定义kafka相关配置项
        Properties pro = new Properties();
        pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        pro.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
        pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 内置连接器,创建FlinkKafka消费者,需要指定主题、反序列化、kafka配置
        FlinkKafkaConsumer<ObjectNode> fkc = new FlinkKafkaConsumer<>(
                "kafka2flink",
                new JSONKeyValueDeserializationSchema(false),
                pro);
        DataStreamSource<ObjectNode> dss = env.addSource(fkc);

        /*
         ==================================================
         构建 ElasticsearchSink
         */
        List<HttpHost> httpHosts = ESSinkUtil.parseAddresses("hadoop2:9200");
        ElasticsearchSink elasticsearchSink = ESSinkUtil.buildSink(
                httpHosts,
                10,
                5,
                10000,
                new MyElasticsearchSinkFunction()
        );

        dss.print();
        dss.addSink(elasticsearchSink);
        env.execute();
    }

    static class MyElasticsearchSinkFunction implements ElasticsearchSinkFunction<ObjectNode>{
        /**
         *
         * @param s 输入的数据
         * @param runtimeContext     运行时上下文
         * @param requestIndexer 将 IndexRequest 对象(该对象封装要保存的索引、主键、数据)
         *                      添加到 requestIndexer 中,会自动向es发请求
         */
        @Override
        public void process(ObjectNode s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
            // 处理数据
            JsonNode data = s.get("value");
            Map<String, Object> map = new HashMap();
            Iterator<String> it = data.fieldNames();
            while(it.hasNext()) {
                String name = it.next();
                if("id".equals(name)){
                    continue;
                }
                map.put(name,data.get(name).asText());
            }

            // 构建 IndexRequest 对象,该对象封装要保存的索引、主键、数据
            IndexRequest ir = Requests.indexRequest()
                    .index("book") // 指定索引
                    .id(data.get("id").asText()) // 指定id
                    .source(map); // 指定数据
            // 将 IndexRequest 对象 添加到 requestIndexer 中,会自动向es发请求
            requestIndexer.add(ir);
        }
    }

}

发送数据

在 kafka 生产者输入:

{"id":1002,"title":"hadoop从入门到精通","price":129,"author":"lucy"}
{"id":1003,"title":"hive从入门到精通","price":50,"author":"lili"}

执行结果

在 kibana7.9.x 查询 book 索引:

get /book/_search

可以看到新增加的2条数据

参考:
https://xie.infoq.cn/article/318acc75866d2e6eb5bfdbfc1
https://blog.csdn.net/xiaoc100200/article/details/111401744
https://blog.csdn.net/Vector97/article/details/118217813
https://www.modb.pro/db/377574
https://cloud.tencent.com/developer/article/1979189


原文出处:https://malaoshi.top/show_1IX4oFZ2ydza.html