flink1.12.x Savepoint 案例 - session cluster模式 作者:马育民 • 2022-10-02 13:09 • 阅读:10029 # 案例 为了演示 savepoint,需要将 flink 程序打包,发送到 flink 服务中运行 这里为了方便演示效果,只用 socket(没用kafka):从 socket 服务端接收数据,取出最大值后,发送到 socket 客户端中 [![](/upload/0/0/1IX49dASChvZ.png)](/upload/0/0/1IX49dASChvZ.png) # flink程序 ### java ``` package savepoint; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SocketClientSink; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class S1socket { final static Logger log = LoggerFactory.getLogger(S1socket.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource dss = env.socketTextStream("hadoop1", 9999); SingleOutputStreamOperator> soso = dss .flatMap(new FlatMapFunction>(){ @Override public void flatMap(String s, Collector> out) throws Exception { String[] arr = s.split(","); if(arr.length == 2){ double temp = Double.parseDouble(arr[1]); out.collect(Tuple2.of(arr[0],temp)); }else{ log.error("数据格式不正确,数据:"+s); } } }) .keyBy(new KeySelector,String>(){ @Override public String getKey(Tuple2 value) throws Exception { return value.f0; } }) .maxBy(1); SingleOutputStreamOperator result = soso.map(new MapFunction, String>() { @Override public String map(Tuple2 value) throws Exception { return value.f0 + "," + value.f1+"\n"; } }); result.print("result::"); SocketClientSink scs = new SocketClientSink("hadoop1",8888,new SimpleStringSchema()); result.addSink(scs); env.execute(); } } ``` ### 打包 执行 maven 的 package ### 上传 flink 包 将 jar 包上传到 `hadoop1` 的 `/program` 目录下 # 启动 hadoop ``` /program/bin/hadoop.sh start ``` # 启动 socket ### 启动服务器 ``` nc -lk 9999 ``` ### 启动客户端 ``` nc -lk 8888 ``` # 启动 flink 集群 ``` cd /program/flink-1.12.5 ``` ``` bin/yarn-session.sh ``` 或者 ``` bin/yarn-session.sh -n 2 -tm 800 -s 1 -d ``` ### flink web ui 下图红框处,是 flink web ui 网址 [![](/upload/0/0/1IX49kQv9ya6.png)](/upload/0/0/1IX49kQv9ya6.png) 看到下图,说明启动成功: [![](http://65242847.gitee.io/pic/flink/Snipaste_2021-12-23_19-28-47.png)](http://65242847.gitee.io/pic/flink/Snipaste_2021-12-23_19-28-47.png) # 提交任务 ``` bin/flink run --class savepoint.S1socket /program/kafka2flink-1.0-SNAPSHOT.jar ``` ### flink web ui 看到下图,说明提交成功 [![](/upload/0/0/1IX49kPiYUF1.png)](/upload/0/0/1IX49kPiYUF1.png) # 测试 ### 输入测试数据 在 `nc -lk 9999` 服务器上,输入下面数据: ``` 1,20 1,21 1,10 1,22 ``` ### 查看输出结果 在 `nc -lk 8888` 服务器上,看到下面结果: ``` 1,20.0 1,21.0 1,21.0 1,22.0 ``` # 手动触发 savepoint ### 命令格式 ``` bin/flink savepoint flink任务id hdfs路径 ``` ### 查看 flink任务id - 命令窗口 提交任务的窗口,如下图: [![](/upload/0/0/1IX49kZeYyHM.png)](/upload/0/0/1IX49kZeYyHM.png) ### 查看 flink任务id - flink web [![](/upload/0/0/1IX49kb36t9Q.png)](/upload/0/0/1IX49kb36t9Q.png) ### 执行命令 ``` bin/flink savepoint 30566c74f44ffc278b4d1355dba27d9c hdfs://hadoop1:8020/flink_savepoint ``` ### 执行结果 看到下图,说明执行成功,并且给出保存路径: [![](/upload/0/0/1IX49kk3yR3K.png)](/upload/0/0/1IX49kk3yR3K.png) ### 查看 hdfs 文件 看到下图,说明创建成功: [![](/upload/0/0/1IX49kf46qU3.png)](/upload/0/0/1IX49kf46qU3.png) # 停止任务 - 命令方式 ### 格式 ``` bin/flink cancel flink任务id ``` ### 执行命令 ``` bin/flink cancel 30566c74f44ffc278b4d1355dba27d9c ``` # 停止任务 - flink web方式 [![](/upload/0/0/1IX49krDMzCd.png)](/upload/0/0/1IX49krDMzCd.png) # 启动任务并恢复 ### 格式 ``` bin/flink run -s savepoint路径 --class 类路径 jar包路径 ``` ### 查看 savepoint 路径 [![](/upload/0/0/1IX49kk3yR3K.png)](/upload/0/0/1IX49kk3yR3K.png) ### 执行命令 ``` bin/flink run -s hdfs://hadoop1:8020/flink_savepoint/savepoint-30566c-4a4291a5602e --class savepoint.S1socket /program/kafka2flink-1.0-SNAPSHOT.jar ``` # 测试状态是否恢复 ### 输入测试数据 在 `nc -lk 9999` 服务器上,输入下面数据: ``` 1,10 ``` ### 查看输出结果 在 `nc -lk 8888` 服务器上,看到下面结果: ``` 1,22.0 ``` 说明 这个 `1,22.0` 已经从 savepoint 恢复了 原文出处:http://malaoshi.top/show_1IX49c7m5IGz.html