关注

Flink 的三种一致性语义

在 Flink 中,“Exactly-Once” 等一致性语义不是自动全局生效的,需要分层理解按端到端的链路来配置。下面从业务开发者的视角,说明不同模式的配置方法和业务保障手段。


一、Flink 的三种一致性语义

语义含义适用场景
At-Most-Once (最多一次)故障时可能丢失数据,不重发对丢数据不敏感的监控类指标
At-Least-Once (至少一次)故障恢复后数据被重放,可能重复可容忍重复、下游能去重的场景
Exactly-Once (精确一次)故障恢复后,每条数据只影响最终结果一次金融结算、实时计费、严密度量

内部状态的 Exactly-Once 可以通过 Flink 的 Checkpoint 机制天然实现,而端到端 Exactly-Once (即 source→Flink→sink 全链路) 还需要 source 可重放、sink 支持事务或幂等写入。


二、业务如何配置 Exactly-Once

1. 开启 Checkpoint(状态一致性基础)

java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 5000ms 启动一次 Checkpoint,模式为 EXACTLY_ONCE (默认)
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

// 必须的容错配置
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 至少一次/精确一次建议为 1
env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  • CheckpointingMode.EXACTLY_ONCE 保证内部状态的精确一次。

  • 若设为 AT_LEAST_ONCE,则状态可能重复,但 checkpoint 开销更小、延迟更低。

状态后端推荐使用 RocksDBStateBackend(适合大状态、生产环境),并配置到可靠的分布式文件系统上,如 HDFS、S3。

2. Source 端配置 —— 要求可重放

需要 source 连接器支持将读取偏移量(offset)保存在 Flink 的 Checkpoint 中,恢复时从该偏移量重新消费。
内置支持 Exactly-Once 的 Source:Kafka、Pulsar、FileSystem 等。

Kafka Source 示例(Flink 1.17+ 新 API)

java

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("brokers")
    .setTopics("input-topic")
    .setGroupId("flink-group")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .setProperty("enable.auto.commit", "false")   // 必须关闭自动提交
    .setProperty("isolation.level", "read_committed") // 若 sink 用事务,需配合读已提交
    .build();
  • 关闭自动提交,偏移量由 Flink 在 checkpoint 完成时提交到 Kafka 或保存在状态中。

  • 恢复时会从 Checkpoint 记录的 offset 开始回放,保证 source 到 Flink 不丢不重。

3. Sink 端配置 —— 实现端到端 Exactly-Once 的关键

根据下游系统,选用对应的事务性或幂等性 sink。

(1) 使用两阶段提交(2PC)的事务型 Sink
  • Kafka Sink (FlinkKafkaProducer 旧 API,或 KafkaSink 新 API):

    java

    // 旧 API (DataStream API 中)
    FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
        "output-topic",
        new SimpleStringSchema(),
        props,
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    // 或使用新 KafkaSink (推荐)
    KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers("brokers")
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("output-topic")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build())
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .setTransactionalIdPrefix("flink-tx-")
        .build();
    • 需要配置 transactional.id.prefix,Flink 利用 Kafka 事务将 checkpoint 和写入绑定。

  • FileSink(写入文件系统):

    java

    FileSink<String> sink = FileSink
        .forRowFormat(new Path("hdfs://output"), new SimpleStringEncoder<String>("UTF-8"))
        .withRollingPolicy(...)
        .build();
    • FileSink 默认使用两阶段提交,写入临时文件,checkpoint 完成后原子性重命名。

(2) 幂等写入方案

对不支持事务的系统(如 Elasticsearch、Redis、部分 JDBC),可以利用主键去重实现幂等。

  • 业务上需保证每条数据有唯一键(如订单ID),重复写入不会改变结果。

  • 配合 At-Least-Once 的 sink 也能达到业务级的 Exactly-Once。

  • 例如,MySQL sink 可以使用 INSERT ... ON DUPLICATE KEY UPDATE,Elasticsearch 以文档 ID 写入。

(3) 自定义 Sink 实现两阶段提交

如果你的下游没有现成的事务连接器,需要继承 TwoPhaseCommitSinkFunction 实现 beginTransaction、preCommit、commit、abort 四个方法,Flink 会在 checkpoint 不同阶段调用它们。

java

public class MyTxSink extends TwoPhaseCommitSinkFunction<MyData, MyTransaction, Void> {
    // 开启事务
    protected MyTransaction beginTransaction() { ... }
    // 写入数据
    protected void invoke(MyTransaction tx, MyData value, Context ctx) { ... }
    // 预提交
    protected void preCommit(MyTransaction tx) { ... }
    // 正式提交
    protected void commit(MyTransaction tx) { ... }
    // 丢弃
    protected void abort(MyTransaction tx) { ... }
}

之后在流中使用:

java

dataStream.addSink(new MyTxSink());

这样开发出的 sink 就能参与 Flink 的 Exactly-Once 基础设施。


三、业务如何保证 Exactly-Once (编码与运维)

1. 检查点与恢复策略

  • 启用外部持久化检查点
    enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION),保证作业取消后 Checkpoint 不删除,可用于手动重启。

  • 配置重启策略
    比如失败重启固定次数且每次延时间隔:

    java

    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
  • 从 Savepoint / Checkpoint 恢复
    上线或迁移时指定:-s hdfs://path/to/savepoint

2. 状态里避免非确定性操作

算子内部如果使用 System.currentTimeMillis()、随机数等,恢复时可能导致计算结果不一致,破坏 Exactly-Once 语义。
应将这些值存入状态,或使用 Flink 提供的 ProcessingTimeService 等确定性接口。

3. 保证 Exactly-Once 的代价与局限

  • 延迟略高:需要等待 checkpoint 完成才能提交事务,可调小 checkpoint 间隔(如秒级)平衡时效。

  • 下游隔离级别:若用 Kafka 事务,下游消费者需设置 isolation.level=read_committed,否则会读到未提交的脏数据。

  • 确认下游支持事务:不是所有系统都支持 2PC,此时只能选用幂等写入或接受 At-Least-Once。


四、其他模式的配置速览

At-Least-Once(至少一次)

  • 关闭 Checkpoint:不调用 env.enableCheckpointing(),或者

  • 设置 Checkpoint 模式为 AT_LEAST_ONCE

    java

    env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
  • Sink 不加事务(如普通 Redis/MySQL Sink),故障恢复时状态从上一个 checkpoint 重放,导致下游可能收到重复数据。

At-Most-Once(最多一次)

  • 不启用 Checkpoint,也不配置重启策略。

  • Source 只消费一次(如 Kafka 提交偏移量在读取后立即自动提交),失败就不管了。业务很少这样使用,一般通过关闭 checkpoint 并不做任何重试来实现。


总结

层级Exactly-Once 如何配置
Flink 内部状态env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
Source 端使用可重放 Source(如 Kafka),关闭自动偏移提交,偏移存入 checkpoint
Sink 端使用 DeliveryGuarantee.EXACTLY_ONCE 的 Sink,或 TwoPhaseCommitSinkFunction,或幂等写入
业务编码避免非确定性操作,唯一键幂等,保证端到端事务语义闭环
运维恢复外部检查点 + 适当重启策略 + Savepoint 升级

最简实践:Flink 程序通过 enableCheckpointing + Kafka Source(可重放) + KafkaSink(事务)/ FileSink,就能获得生产级端到端 Exactly-Once,无需额外编码。对于其他下游,能事务就配置事务 Sink,不能事务就设计幂等写入,同样可以保证业务级精确一次。

转载自CSDN-专业IT技术社区

原文链接:https://blog.csdn.net/INGNIGHT/article/details/161803521

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--