登录    关于
马育民老师的博客

马育民的博客

QQ:65242847

flink1.12.x 容错机制:状态后端-全局配置、代码中配置

全局配置

全局生效,一般不用

修改 flink-conf.yaml 配置文件

找到下面配置,进行修改:

# state.backend: filesystem

# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

默认的就是保存的 hdfs

在代码中配置

只配置当前任务

官网:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/checkpointing.html

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动模式,根据数据源自行判断]
// 必设置,每隔 1000 ms 执行一次checkpoint。
env.enableCheckpointing(5000);//一般秒级、分钟级
//设置保存文件路径,生产环境中应该是 HDFS
env.setStateBackend(new FsStateBackend("file:///D:/bigdata/flink_java/state/fs"));
//        env.setStateBackend(new RocksDBStateBackend("file:///D:/bigdata/flink_java/state/RocksDB",true));
// 设置2个checkpoint之间最少等待时间
// 这里至少要等到500ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//checkpoint容忍的错误数量,达到此数量,就让该任务失败,默认值是:0
//这里容忍10个错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
//取消任务时,是否清理checkpoint
// true:取消任务时,会删除外部checkpoint,默认值
// false:取消任务时,保留外部checkpoint
//这里设置不删除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置checkpoint的执行模式:EXACTLY_ONCE,默认
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint的超时时间,如果在60s内尚未完成说明本次checkpoint失败,则丢弃
env.getCheckpointConfig().setCheckpointTimeout(60000);//这里设置10分钟
// 设置同一时间有多少个checkpoint可以同时执行。与 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); 可能冲突
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1

原文出处:https://malaoshi.top/show_1IX22CilCA1w.html