flink1.12.x Savepoint 案例 - per-job cluster模式

注意

测试未成功,可能是因为资源不够

案例

为了演示 savepoint,需要将 flink 程序打包,发送到 flink 服务中运行

这里为了方便演示效果,只用 socket(没用kafka):从 socket 服务端接收数据,取出最大值后,发送到 socket 客户端中

flink程序

java

package savepoint;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S1socket {
    final static Logger log = LoggerFactory.getLogger(S1socket.class);

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> dss = env.socketTextStream("hadoop1", 9999);

        SingleOutputStreamOperator<Tuple2<String,Double>> soso = dss
                .flatMap(new FlatMapFunction<String,Tuple2<String,Double>>(){

                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Double>> out) throws Exception {

                        String[] arr = s.split(",");
                        if(arr.length == 2){
                            double temp = Double.parseDouble(arr[1]);
                            out.collect(Tuple2.of(arr[0],temp));
                        }else{
                            log.error("数据格式不正确,数据:"+s);
                        }
                    }
                })

                .keyBy(new KeySelector<Tuple2<String,Double>,String>(){

                    @Override
                    public String getKey(Tuple2<String, Double> value) throws Exception {
                        return value.f0;
                    }
                })
                .maxBy(1);

        SingleOutputStreamOperator<String> result = soso.map(new MapFunction<Tuple2<String, Double>, String>() {
            @Override
            public String map(Tuple2<String, Double> value) throws Exception {
                return value.f0 + "," + value.f1+"\n";
            }
        });

        result.print("result::");

        SocketClientSink scs = new SocketClientSink("hadoop1",8888,new SimpleStringSchema());
        result.addSink(scs);
        env.execute();
    }


}

打包

执行 maven 的 package

将 jar 包上传到 hadoop1/program 目录下

启动 hadoop

/program/bin/hadoop.sh start

启动 socket

启动服务器

nc -lk 9999

启动客户端

nc -lk 8888

提交 jar 到 yarn 集群上

cd /program/flink-1.12.5
bin/flink run -m yarn-cluster --class savepoint.S1socket /program/kafka2flink-1.0-SNAPSHOT.jar

执行结果

如下图:

看到下图,说明提交成功

测试

输入测试数据

nc -lk 9999 服务器上,输入下面数据:

1,20
1,21
1,10
1,22

查看输出结果

nc -lk 8888 服务器上,看到下面结果:

1,20.0
1,21.0
1,21.0
1,22.0

手动触发 savepoint

命令格式

bin/flink savepoint flink任务id hdfs路径

提交任务的窗口,如下图:

执行命令

bin/flink savepoint 2dab9339507283cab2d245f5a293fb30 hdfs://hadoop1:8020/flink_savepoint

执行结果

看到下图,说明执行成功,并且给出保存路径:

查看 hdfs 文件

看到下图,说明创建成功:

停止任务 - 命令方式

格式

bin/flink cancel flink任务id

执行命令

bin/flink cancel 2dab9339507283cab2d245f5a293fb30

停止任务 - flink web方式

启动任务并恢复

格式

bin/flink run -s savepoint路径 --class 类路径 jar包路径

查看 savepoint 路径

执行命令

bin/flink run -s hdfs://hadoop1:8020/flink_savepoint/savepoint-30566c-4a4291a5602e --class savepoint.S1socket /program/kafka2flink-1.0-SNAPSHOT.jar

测试状态是否恢复

输入测试数据

nc -lk 9999 服务器上,输入下面数据:

1,10

查看输出结果

nc -lk 8888 服务器上,看到下面结果:

1,22.0

说明 这个 1,22.0 已经从 savepoint 恢复了


原文出处:https://malaoshi.top/show_1IX49mx3d1UA.html