在 Flink 中,“Exactly-Once” 等一致性语义不是自动全局生效的,需要分层理解并按端到端的链路来配置。下面从业务开发者的视角,说明不同模式的配置方法和业务保障手段。
一、Flink 的三种一致性语义
| 语义 | 含义 | 适用场景 |
|---|---|---|
| At-Most-Once (最多一次) | 故障时可能丢失数据,不重发 | 对丢数据不敏感的监控类指标 |
| At-Least-Once (至少一次) | 故障恢复后数据被重放,可能重复 | 可容忍重复、下游能去重的场景 |
| Exactly-Once (精确一次) | 故障恢复后,每条数据只影响最终结果一次 | 金融结算、实时计费、严密度量 |
内部状态的 Exactly-Once 可以通过 Flink 的 Checkpoint 机制天然实现,而端到端 Exactly-Once (即 source→Flink→sink 全链路) 还需要 source 可重放、sink 支持事务或幂等写入。
二、业务如何配置 Exactly-Once
1. 开启 Checkpoint(状态一致性基础)
java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 5000ms 启动一次 Checkpoint,模式为 EXACTLY_ONCE (默认)
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 必须的容错配置
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 至少一次/精确一次建议为 1
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-
CheckpointingMode.EXACTLY_ONCE保证内部状态的精确一次。 -
若设为
AT_LEAST_ONCE,则状态可能重复,但 checkpoint 开销更小、延迟更低。
状态后端推荐使用 RocksDBStateBackend(适合大状态、生产环境),并配置到可靠的分布式文件系统上,如 HDFS、S3。
2. Source 端配置 —— 要求可重放
需要 source 连接器支持将读取偏移量(offset)保存在 Flink 的 Checkpoint 中,恢复时从该偏移量重新消费。
内置支持 Exactly-Once 的 Source:Kafka、Pulsar、FileSystem 等。
Kafka Source 示例(Flink 1.17+ 新 API):
java
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("brokers")
.setTopics("input-topic")
.setGroupId("flink-group")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("enable.auto.commit", "false") // 必须关闭自动提交
.setProperty("isolation.level", "read_committed") // 若 sink 用事务,需配合读已提交
.build();
-
关闭自动提交,偏移量由 Flink 在 checkpoint 完成时提交到 Kafka 或保存在状态中。
-
恢复时会从 Checkpoint 记录的 offset 开始回放,保证 source 到 Flink 不丢不重。
3. Sink 端配置 —— 实现端到端 Exactly-Once 的关键
根据下游系统,选用对应的事务性或幂等性 sink。
(1) 使用两阶段提交(2PC)的事务型 Sink
-
Kafka Sink (FlinkKafkaProducer 旧 API,或 KafkaSink 新 API):
java
// 旧 API (DataStream API 中) FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( "output-topic", new SimpleStringSchema(), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 或使用新 KafkaSink (推荐) KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers("brokers") .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("output-topic") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("flink-tx-") .build();-
需要配置
transactional.id.prefix,Flink 利用 Kafka 事务将 checkpoint 和写入绑定。
-
-
FileSink(写入文件系统):
java
FileSink<String> sink = FileSink .forRowFormat(new Path("hdfs://output"), new SimpleStringEncoder<String>("UTF-8")) .withRollingPolicy(...) .build();-
FileSink 默认使用两阶段提交,写入临时文件,checkpoint 完成后原子性重命名。
-
(2) 幂等写入方案
对不支持事务的系统(如 Elasticsearch、Redis、部分 JDBC),可以利用主键去重实现幂等。
-
业务上需保证每条数据有唯一键(如订单ID),重复写入不会改变结果。
-
配合
At-Least-Once的 sink 也能达到业务级的 Exactly-Once。 -
例如,MySQL sink 可以使用
INSERT ... ON DUPLICATE KEY UPDATE,Elasticsearch 以文档 ID 写入。
(3) 自定义 Sink 实现两阶段提交
如果你的下游没有现成的事务连接器,需要继承 TwoPhaseCommitSinkFunction 实现 beginTransaction、preCommit、commit、abort 四个方法,Flink 会在 checkpoint 不同阶段调用它们。
java
public class MyTxSink extends TwoPhaseCommitSinkFunction<MyData, MyTransaction, Void> {
// 开启事务
protected MyTransaction beginTransaction() { ... }
// 写入数据
protected void invoke(MyTransaction tx, MyData value, Context ctx) { ... }
// 预提交
protected void preCommit(MyTransaction tx) { ... }
// 正式提交
protected void commit(MyTransaction tx) { ... }
// 丢弃
protected void abort(MyTransaction tx) { ... }
}
之后在流中使用:
java
dataStream.addSink(new MyTxSink());
这样开发出的 sink 就能参与 Flink 的 Exactly-Once 基础设施。
三、业务如何保证 Exactly-Once (编码与运维)
1. 检查点与恢复策略
-
启用外部持久化检查点:
enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION),保证作业取消后 Checkpoint 不删除,可用于手动重启。 -
配置重启策略:
比如失败重启固定次数且每次延时间隔:java
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
-
从 Savepoint / Checkpoint 恢复:
上线或迁移时指定:-s hdfs://path/to/savepoint。
2. 状态里避免非确定性操作
算子内部如果使用 System.currentTimeMillis()、随机数等,恢复时可能导致计算结果不一致,破坏 Exactly-Once 语义。
应将这些值存入状态,或使用 Flink 提供的 ProcessingTimeService 等确定性接口。
3. 保证 Exactly-Once 的代价与局限
-
延迟略高:需要等待 checkpoint 完成才能提交事务,可调小 checkpoint 间隔(如秒级)平衡时效。
-
下游隔离级别:若用 Kafka 事务,下游消费者需设置
isolation.level=read_committed,否则会读到未提交的脏数据。 -
确认下游支持事务:不是所有系统都支持 2PC,此时只能选用幂等写入或接受 At-Least-Once。
四、其他模式的配置速览
At-Least-Once(至少一次)
-
关闭 Checkpoint:不调用
env.enableCheckpointing(),或者 -
设置 Checkpoint 模式为
AT_LEAST_ONCE:java
env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
-
Sink 不加事务(如普通 Redis/MySQL Sink),故障恢复时状态从上一个 checkpoint 重放,导致下游可能收到重复数据。
At-Most-Once(最多一次)
-
不启用 Checkpoint,也不配置重启策略。
-
Source 只消费一次(如 Kafka 提交偏移量在读取后立即自动提交),失败就不管了。业务很少这样使用,一般通过关闭 checkpoint 并不做任何重试来实现。
总结
| 层级 | Exactly-Once 如何配置 |
|---|---|
| Flink 内部状态 | env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE) |
| Source 端 | 使用可重放 Source(如 Kafka),关闭自动偏移提交,偏移存入 checkpoint |
| Sink 端 | 使用 DeliveryGuarantee.EXACTLY_ONCE 的 Sink,或 TwoPhaseCommitSinkFunction,或幂等写入 |
| 业务编码 | 避免非确定性操作,唯一键幂等,保证端到端事务语义闭环 |
| 运维恢复 | 外部检查点 + 适当重启策略 + Savepoint 升级 |
最简实践:Flink 程序通过 enableCheckpointing + Kafka Source(可重放) + KafkaSink(事务)/ FileSink,就能获得生产级端到端 Exactly-Once,无需额外编码。对于其他下游,能事务就配置事务 Sink,不能事务就设计幂等写入,同样可以保证业务级精确一次。
转载自CSDN-专业IT技术社区
原文链接:https://blog.csdn.net/INGNIGHT/article/details/161803521



