用户下单成功,但积分没加上,消息也丢了——这就是分布式事务难题。本文将揭秘Outbox模式如何用一张数据库表,优雅解决"业务操作与消息发送"的一致性难题,让系统可靠性提升100倍。

文章目录
一、场景引入:一次下单后的积分丢失
1.1 真实案例
某电商平台,用户下单后需要:扣库存、加积分、发消息。
时间线:
T+0ms:用户点击下单
T+10ms:订单服务扣减库存成功
T+20ms:订单服务发送MQ消息(通知积分服务加积分)
T+30ms:订单服务创建订单成功,返回用户"下单成功"
T+50ms:MQ Broker宕机,消息丢失
T+1s:积分服务未收到消息,用户积分未增加
T+1天:用户投诉:"我下单了为什么没积分?"
问题根源:业务操作(创建订单)和消息发送是两个独立操作,无法保证原子性。
1.2 传统方案的痛点
// 传统方案1:先操作数据库,再发消息
@Transactional
public void createOrder(OrderDTO dto) {
// 1. 扣减库存
stockService.deduct(dto.getSkuId(), dto.getQuantity());
// 2. 创建订单
orderMapper.insert(order);
// 3. 发送MQ消息(问题在这里!)
// 如果这里失败,订单已创建但积分没加
kafkaTemplate.send("order-created", order);
}
// 传统方案2:先发消息,再操作数据库
@Transactional
public void createOrder(OrderDTO dto) {
// 1. 发送MQ消息
kafkaTemplate.send("order-created", order);
// 2. 扣减库存
stockService.deduct(dto.getSkuId(), dto.getQuantity());
// 3. 创建订单(如果这里失败,消息已发出但订单未创建)
orderMapper.insert(order);
}
传统方案的问题:
| 方案 | 问题 | 后果 |
|---|---|---|
| 先DB后发消息 | 消息发送失败 | 业务完成但通知丢失 |
| 先发消息后DB | DB操作失败 | 通知发出但业务未完成 |
| 事务消息 | 实现复杂,依赖MQ特性 | 代码侵入性强 |
二、解决方案:Outbox事件驱动模式
2.1 什么是Outbox模式?
Outbox模式的核心思想:将业务操作和事件写入放在同一个数据库事务中,由独立进程读取Outbox表并投递到MQ。
Outbox模式架构:
┌─────────────────────────────────────────────────────────────────────┐
│ 业务服务(订单服务) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 业务操作(事务内) │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 扣减库存 │ │ 创建订单 │ │ 写入Outbox │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ stock表 │ │ order表 │ │ outbox表 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ 同一个数据库事务,保证原子性 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 独立进程(消息投递器) │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 轮询Outbox │───→│ 发送MQ消息 │───→│ 标记已发送 │ │ │
│ │ │ 表 │ │ │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 消息队列 (MQ) │ │
│ │ Kafka / RocketMQ / RabbitMQ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
2.2 核心优势
| 优势 | 说明 |
|---|---|
| 原子性保证 | 业务操作和事件写入在同一个事务中 |
| 消息不丢失 | 事件先持久化到数据库,再异步发送 |
| 业务零侵入 | 业务代码只操作数据库,不直接发消息 |
| 支持重试 | 发送失败可自动重试,保证最终投递 |
| 顺序保证 | 按事件产生顺序投递,避免乱序 |
三、实战代码:从零实现Outbox模式
3.1 Outbox表设计
-- Outbox事件表
CREATE TABLE `outbox` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`aggregate_type` varchar(100) NOT NULL COMMENT '聚合根类型(如:order)',
`aggregate_id` varchar(100) NOT NULL COMMENT '聚合根ID(如:订单ID)',
`event_type` varchar(100) NOT NULL COMMENT '事件类型(如:OrderCreated)',
`event_data` json NOT NULL COMMENT '事件数据(JSON格式)',
`status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '状态:0-待发送 1-已发送 2-发送失败',
`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`sent_at` datetime DEFAULT NULL COMMENT '发送时间',
PRIMARY KEY (`id`),
KEY `idx_status_created_at` (`status`, `created_at`),
KEY `idx_aggregate` (`aggregate_type`, `aggregate_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Outbox事件表';
3.2 Outbox实体和Mapper
/**
* Outbox事件实体
*/
@Data
@TableName("outbox")
public class Outbox {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 聚合根类型
*/
private String aggregateType;
/**
* 聚合根ID
*/
private String aggregateId;
/**
* 事件类型
*/
private String eventType;
/**
* 事件数据(JSON)
*/
private String eventData;
/**
* 状态:0-待发送 1-已发送 2-发送失败
*/
private Integer status;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 创建时间
*/
private LocalDateTime createdAt;
/**
* 发送时间
*/
private LocalDateTime sentAt;
}
/**
* Outbox Mapper
*/
@Mapper
public interface OutboxMapper extends BaseMapper<Outbox> {
/**
* 查询待发送的事件
*/
@Select("SELECT * FROM outbox WHERE status = 0 ORDER BY created_at LIMIT #{limit}")
List<Outbox> selectPending(@Param("limit") int limit);
/**
* 查询发送失败需要重试的事件
*/
@Select("SELECT * FROM outbox WHERE status = 2 AND retry_count < #{maxRetry} ORDER BY created_at LIMIT #{limit}")
List<Outbox> selectFailed(@Param("maxRetry") int maxRetry, @Param("limit") int limit);
}
3.3 Outbox事件发布器
/**
* Outbox事件发布器
* 业务代码通过此接口发布事件
*/
@Component
@Slf4j
public class OutboxEventPublisher {
@Autowired
private OutboxMapper outboxMapper;
/**
* 发布事件(在业务事务中调用)
*
* @param aggregateType 聚合根类型(如:order)
* @param aggregateId 聚合根ID
* @param eventType 事件类型
* @param eventData 事件数据对象
*/
public void publish(String aggregateType, String aggregateId,
String eventType, Object eventData) {
Outbox outbox = new Outbox();
outbox.setAggregateType(aggregateType);
outbox.setAggregateId(aggregateId);
outbox.setEventType(eventType);
outbox.setEventData(JSON.toJSONString(eventData));
outbox.setStatus(0); // 待发送
outbox.setRetryCount(0);
outbox.setCreatedAt(LocalDateTime.now());
outboxMapper.insert(outbox);
log.info("📦 Outbox事件已记录: aggregateType={}, aggregateId={}, eventType={}",
aggregateType, aggregateId, eventType);
}
/**
* 发布事件(简化版,自动从事件中提取信息)
*/
public void publish(DomainEvent event) {
publish(
event.getAggregateType(),
event.getAggregateId(),
event.getEventType(),
event
);
}
}
/**
* 领域事件基类
*/
@Data
public abstract class DomainEvent {
/**
* 事件ID
*/
private String eventId = UUID.randomUUID().toString();
/**
* 事件发生时间
*/
private LocalDateTime eventTime = LocalDateTime.now();
/**
* 聚合根类型
*/
public abstract String getAggregateType();
/**
* 聚合根ID
*/
public abstract String getAggregateId();
/**
* 事件类型
*/
public abstract String getEventType();
}
/**
* 订单创建事件
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class OrderCreatedEvent extends DomainEvent {
private Long orderId;
private Long userId;
private Long skuId;
private Integer quantity;
private BigDecimal amount;
@Override
public String getAggregateType() {
return "order";
}
@Override
public String getAggregateId() {
return String.valueOf(orderId);
}
@Override
public String getEventType() {
return "OrderCreated";
}
}
3.4 业务层使用Outbox
/**
* 订单服务(使用Outbox模式)
*/
@Service
@Slf4j
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private StockService stockService;
@Autowired
private OutboxEventPublisher outboxPublisher;
/**
* 创建订单(Outbox模式)
*
* 关键:业务操作和事件写入在同一个事务中
*/
@Transactional
public OrderVO createOrder(CreateOrderDTO dto) {
// 1. 扣减库存
boolean stockDeducted = stockService.deduct(dto.getSkuId(), dto.getQuantity());
if (!stockDeducted) {
throw new BizException("库存不足");
}
// 2. 创建订单
Order order = new Order();
order.setUserId(dto.getUserId());
order.setSkuId(dto.getSkuId());
order.setQuantity(dto.getQuantity());
order.setAmount(dto.getAmount());
order.setStatus(OrderStatus.PENDING_PAYMENT);
order.setCreateTime(LocalDateTime.now());
orderMapper.insert(order);
// 3. 写入Outbox事件(同一个事务!)
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(order.getId());
event.setUserId(order.getUserId());
event.setSkuId(order.getSkuId());
event.setQuantity(order.getQuantity());
event.setAmount(order.getAmount());
outboxPublisher.publish(event);
log.info("✅ 订单创建成功: orderId={}, userId={}", order.getId(), order.getUserId());
return convertToVO(order);
}
}
3.5 Outbox消息投递器
/**
* Outbox消息投递器
* 独立进程,轮询Outbox表并投递到MQ
*/
@Component
@Slf4j
public class OutboxMessagePoller {
@Autowired
private OutboxMapper outboxMapper;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private RedissonClient redissonClient;
private static final int BATCH_SIZE = 100;
private static final int MAX_RETRY = 3;
private static final String LOCK_KEY = "lock:outbox:poller";
/**
* 定时轮询(每5秒执行一次)
*/
@Scheduled(fixedRate = 5000)
public void poll() {
// 分布式锁,防止多实例重复投递
RLock lock = redissonClient.getLock(LOCK_KEY);
try {
if (!lock.tryLock(0, 30, TimeUnit.SECONDS)) {
return; // 其他实例正在执行
}
try {
// 1. 处理待发送的事件
processPendingMessages();
// 2. 处理发送失败需要重试的事件
processFailedMessages();
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("❌ Outbox轮询被中断", e);
}
}
/**
* 处理待发送的消息
*/
private void processPendingMessages() {
List<Outbox> pendingList = outboxMapper.selectPending(BATCH_SIZE);
if (CollUtil.isEmpty(pendingList)) {
return;
}
log.info("📤 开始投递Outbox消息,待发送: {} 条", pendingList.size());
for (Outbox outbox : pendingList) {
try {
sendToKafka(outbox);
markAsSent(outbox);
} catch (Exception e) {
log.error("❌ Outbox消息投递失败: id={}, eventType={}",
outbox.getId(), outbox.getEventType(), e);
markAsFailed(outbox);
}
}
}
/**
* 处理发送失败需要重试的消息
*/
private void processFailedMessages() {
List<Outbox> failedList = outboxMapper.selectFailed(MAX_RETRY, BATCH_SIZE);
if (CollUtil.isEmpty(failedList)) {
return;
}
log.info("🔄 开始重试Outbox消息,待重试: {} 条", failedList.size());
for (Outbox outbox : failedList) {
try {
// 指数退避:重试间隔随次数增加
long backoffMillis = (long) Math.pow(2, outbox.getRetryCount()) * 1000;
if (Duration.between(outbox.getCreatedAt(), LocalDateTime.now()).toMillis() < backoffMillis) {
continue; // 还未到重试时间
}
sendToKafka(outbox);
markAsSent(outbox);
} catch (Exception e) {
log.error("❌ Outbox消息重试失败: id={}, retryCount={}",
outbox.getId(), outbox.getRetryCount(), e);
markAsFailed(outbox);
}
}
}
/**
* 发送到Kafka
*/
private void sendToKafka(Outbox outbox) {
String topic = buildTopic(outbox.getAggregateType());
String key = outbox.getAggregateId();
String message = buildMessage(outbox);
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);
// 同步等待发送结果(确保发送成功)
future.get(5, TimeUnit.SECONDS);
log.debug("📨 消息已发送到Kafka: topic={}, key={}, eventType={}",
topic, key, outbox.getEventType());
}
/**
* 构建Kafka Topic
*/
private String buildTopic(String aggregateType) {
return "outbox." + aggregateType;
}
/**
* 构建消息体
*/
private String buildMessage(Outbox outbox) {
Map<String, Object> message = new HashMap<>();
message.put("eventId", outbox.getId());
message.put("eventType", outbox.getEventType());
message.put("aggregateType", outbox.getAggregateType());
message.put("aggregateId", outbox.getAggregateId());
message.put("eventData", JSON.parseObject(outbox.getEventData()));
message.put("eventTime", outbox.getCreatedAt());
return JSON.toJSONString(message);
}
/**
* 标记为已发送
*/
private void markAsSent(Outbox outbox) {
Outbox update = new Outbox();
update.setId(outbox.getId());
update.setStatus(1); // 已发送
update.setSentAt(LocalDateTime.now());
outboxMapper.updateById(update);
}
/**
* 标记为发送失败
*/
private void markAsFailed(Outbox outbox) {
Outbox update = new Outbox();
update.setId(outbox.getId());
update.setStatus(2); // 发送失败
update.setRetryCount(outbox.getRetryCount() + 1);
outboxMapper.updateById(update);
}
}
3.6 消息消费者
/**
* 订单事件消费者
*/
@Component
@Slf4j
public class OrderEventConsumer {
@Autowired
private PointsService pointsService;
@Autowired
private NotificationService notificationService;
@Autowired
private InventoryService inventoryService;
/**
* 消费订单创建事件
*/
@KafkaListener(topics = "outbox.order", groupId = "order-consumer-group")
public void onOrderCreated(ConsumerRecord<String, String> record) {
String message = record.value();
try {
OutboxMessage outboxMessage = JSON.parseObject(message, OutboxMessage.class);
if (!"OrderCreated".equals(outboxMessage.getEventType())) {
return;
}
OrderCreatedEvent event = JSON.parseObject(
JSON.toJSONString(outboxMessage.getEventData()),
OrderCreatedEvent.class
);
log.info("📥 收到订单创建事件: orderId={}, userId={}",
event.getOrderId(), event.getUserId());
// 1. 增加用户积分
pointsService.addPoints(event.getUserId(), event.getAmount().intValue());
// 2. 发送通知
notificationService.sendOrderNotification(event.getUserId(), event.getOrderId());
// 3. 预留库存(如果之前只是预扣)
inventoryService.confirmDeduct(event.getSkuId(), event.getQuantity());
log.info("✅ 订单创建事件处理完成: orderId={}", event.getOrderId());
} catch (Exception e) {
log.error("❌ 订单创建事件处理失败: {}", message, e);
throw e; // 抛出异常,让Kafka重试
}
}
}
四、高级进阶:Outbox模式增强
4.1 消息顺序保证
/**
* 顺序消息投递器
* 保证同一聚合根的事件按顺序投递
*/
@Component
@Slf4j
public class OrderedOutboxPoller {
@Autowired
private OutboxMapper outboxMapper;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 按聚合根ID分组,保证顺序投递
*/
public void pollOrdered() {
List<Outbox> pendingList = outboxMapper.selectPending(1000);
if (CollUtil.isEmpty(pendingList)) {
return;
}
// 按聚合根ID分组
Map<String, List<Outbox>> grouped = pendingList.stream()
.collect(Collectors.groupingBy(
o -> o.getAggregateType() + ":" + o.getAggregateId()
));
// 对每个分组按ID排序后投递(保证顺序)
grouped.forEach((key, events) -> {
events.sort(Comparator.comparing(Outbox::getId));
for (Outbox event : events) {
try {
// 使用聚合根ID作为Kafka分区Key,保证同一聚合根的消息进入同一分区
kafkaTemplate.send(
buildTopic(event.getAggregateType()),
event.getAggregateId(), // 分区Key
buildMessage(event)
).get(5, TimeUnit.SECONDS);
markAsSent(event);
} catch (Exception e) {
log.error("顺序投递失败: {}", event.getId(), e);
markAsFailed(event);
break; // 一个失败,后续不再投递(保证顺序)
}
}
});
}
}
4.2 消息幂等性处理
/**
* 消息幂等性处理器
* 防止重复消费
*/
@Component
@Slf4j
public class IdempotencyHandler {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String IDEMPOTENCY_KEY_PREFIX = "idempotency:";
private static final long IDEMPOTENCY_EXPIRE = 7 * 24 * 3600; // 7天
/**
* 检查是否已处理
*/
public boolean isProcessed(String eventId) {
String key = IDEMPOTENCY_KEY_PREFIX + eventId;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
/**
* 标记为已处理
*/
public void markAsProcessed(String eventId) {
String key = IDEMPOTENCY_KEY_PREFIX + eventId;
redisTemplate.opsForValue().set(key, "1", IDEMPOTENCY_EXPIRE, TimeUnit.SECONDS);
}
/**
* 幂等性消费包装器
*/
public void idempotentConsume(String eventId, Runnable consumer) {
if (isProcessed(eventId)) {
log.info("⏭️ 事件已处理,跳过: eventId={}", eventId);
return;
}
consumer.run();
markAsProcessed(eventId);
}
}
// 使用示例
@Component
public class OrderEventConsumer {
@Autowired
private IdempotencyHandler idempotencyHandler;
@KafkaListener(topics = "outbox.order")
public void consume(ConsumerRecord<String, String> record) {
OutboxMessage message = JSON.parseObject(record.value(), OutboxMessage.class);
idempotencyHandler.idempotentConsume(
message.getEventId(),
() -> {
// 处理业务逻辑
processOrderCreated(message);
}
);
}
}
4.3 Outbox表清理策略
/**
* Outbox表定时清理
* 防止表数据无限增长
*/
@Component
@Slf4j
public class OutboxCleanupScheduler {
@Autowired
private OutboxMapper outboxMapper;
/**
* 每天凌晨3点清理已发送的Outbox记录
*/
@Scheduled(cron = "0 0 3 * * ?")
public void cleanup() {
log.info("🧹 开始清理Outbox表...");
// 删除7天前已发送的记录
LocalDateTime beforeTime = LocalDateTime.now().minusDays(7);
int deleted = outboxMapper.delete(
new LambdaQueryWrapper<Outbox>()
.eq(Outbox::getStatus, 1) // 已发送
.lt(Outbox::getSentAt, beforeTime)
);
log.info("✅ Outbox表清理完成,删除 {} 条记录", deleted);
}
}
五、预判问题与解答
Q1:Outbox模式和事务消息(RocketMQ)有什么区别?
A:
| 特性 | Outbox模式 | RocketMQ事务消息 |
|---|---|---|
| 依赖 | 只需数据库 | 依赖RocketMQ |
| 通用性 | 通用,支持任意MQ | 仅RocketMQ支持 |
| 复杂度 | 需要额外表和轮询进程 | 需要实现事务监听器 |
| 性能 | 稍低(需要轮询) | 稍高 |
| 可靠性 | 高(数据库持久化) | 高 |
建议:如果已使用RocketMQ,可直接用事务消息;如果需要跨MQ或更通用,用Outbox模式。
Q2:Outbox消息投递延迟多久?
A:延迟取决于轮询间隔:
默认配置:
- 轮询间隔:5秒
- 批量大小:100条
- 实际延迟:5秒 + 处理时间
优化方案:
- 缩短轮询间隔到1秒
- 使用数据库变更通知(如MySQL触发器+MQ)
- 使用Debezium等CDC工具替代轮询
Q3:如果消息一直发送失败怎么办?
A:需要多层保障:
1. 重试机制:
- 指数退避重试
- 最大重试3-5次
2. 死信队列:
- 超过重试次数进入死信队列
- 人工介入处理
3. 监控告警:
- 发送失败率超过阈值告警
- 死信队列堆积告警
4. 补偿机制:
- 定时任务扫描长时间未发送的消息
- 人工补偿接口
Q4:Outbox表会不会成为性能瓶颈?
A:合理设计不会:
优化策略:
1. 索引优化:
- status + created_at 联合索引
- aggregate_type + aggregate_id 索引
2. 分区表:
- 按时间分区,方便清理
3. 读写分离:
- 写入走主库
- 轮询查询走从库
4. 定期清理:
- 已发送记录保留7天后删除
- 控制表大小在百万级以内
Q5:如何保证消息顺序性?
A:
1. 数据库层面:
- 按ID顺序读取
- 同一聚合根的事件按创建时间排序
2. MQ层面:
- 使用聚合根ID作为分区Key
- 保证同一聚合根的消息进入同一分区
3. 消费层面:
- 单线程消费(或按Key分区消费)
- 处理失败不跳过,保证顺序
六、面试高频考点
考点1:Outbox模式解决了什么问题?
参考答案:
Outbox模式解决了"业务操作与消息发送"的一致性问题:
问题场景:
- 先操作DB再发消息:消息可能发送失败
- 先发消息再操作DB:DB可能操作失败
解决方案:
1. 将事件写入Outbox表(与业务操作同一个事务)
2. 独立进程读取Outbox表并发送消息
3. 保证业务操作和事件记录的原子性
优势:
- 消息不丢失
- 业务代码零侵入
- 支持任意消息队列
考点2:Outbox模式和Saga模式有什么区别?
参考答案:
| 特性 | Outbox模式 | Saga模式 |
|---|---|---|
| 解决的问题 | 消息发送可靠性 | 长事务一致性 |
| 实现方式 | 本地事件表+异步发送 | 事务+补偿操作 |
| 复杂度 | 较低 | 较高 |
| 适用场景 | 事件通知 | 分布式事务 |
| 关系 | Saga可用Outbox实现消息投递 | - |
考点3:如何保证消息至少被消费一次?
参考答案:
1. 发送端:
- Outbox持久化保证消息不丢失
- 重试机制保证消息最终发送成功
2. MQ端:
- 开启消息持久化
- 配置副本机制
3. 消费端:
- 手动确认消费
- 消费失败不确认,触发重试
- 幂等性处理防止重复消费
4. 监控:
- 消息堆积监控
- 消费延迟监控
- 死信队列监控
考点4:如果数据库事务回滚了,Outbox记录会怎样?
参考答案:
Outbox记录会随着事务回滚而回滚:
因为Outbox表的写入是在业务事务中:
@Transactional
public void createOrder() {
orderMapper.insert(order); // 业务操作
outboxMapper.insert(outbox); // Outbox记录
// 同一事务,要么都成功,要么都回滚
}
如果事务回滚:
- 订单记录回滚
- Outbox记录也回滚
- 消息不会被发送
这正是Outbox模式的优势:保证了业务操作和事件记录的原子性。
七、总结与最佳实践
7.1 核心要点回顾
Outbox事件驱动模式核心流程:
┌─────────────────────────────────────────────────────────────┐
│ 1. 业务操作(事务内) │
│ ├── 执行业务逻辑(如:创建订单) │
│ └── 写入Outbox事件表 │
│ └── 同一事务,保证原子性 │
│ │
│ 2. 消息投递(独立进程) │
│ ├── 轮询Outbox表 │
│ ├── 发送消息到MQ │
│ └── 标记为已发送 │
│ │
│ 3. 消息消费 │
│ ├── 消费MQ消息 │
│ ├── 幂等性检查 │
│ └── 执行业务逻辑 │
│ │
│ 4. 兜底保障 │
│ ├── 重试机制 │
│ ├── 死信队列 │
│ └── 定时清理 │
└─────────────────────────────────────────────────────────────┘
7.2 适用场景
| 场景 | 是否推荐 | 说明 |
|---|---|---|
| 订单状态变更通知 | ✅ 强烈推荐 | 保证通知不丢失 |
| 用户注册后初始化 | ✅ 强烈推荐 | 积分、优惠券发放 |
| 数据变更同步 | ✅ 推荐 | 缓存同步、搜索索引更新 |
| 实时性要求极高 | ⚠️ 谨慎使用 | 有轮询延迟 |
| 简单CRUD | ❌ 不推荐 | 过度设计 |
7.3 性能提升数据
某电商平台实测数据:
| 指标 | 传统方式 | Outbox模式 | 提升 |
|---|---|---|---|
| 消息丢失率 | 0.1% | 0% | 100%↓ |
| 系统可用性 | 99.9% | 99.99% | 0.09%↑ |
| 开发效率 | 低 | 高 | 大幅提升 |
| 维护成本 | 高 | 低 | 大幅降低 |
八、参考与拓展
互动讨论:你在项目中是如何保证消息可靠投递的?有没有用过Outbox模式?欢迎在评论区分享!
如果本文对你有帮助,欢迎点赞👍、收藏⭐、关注🔔,持续获取更多Java后端技术干货!
转载自CSDN-专业IT技术社区
原文链接:https://blog.csdn.net/2501_90715893/article/details/161718849



