Spring Cloud Stream整合Rabbit之重复投递

SpringCloudStream 整合Rabbit 时,消费端在处理失败时,如果需要进行重试,可以有如下几种重试机制:

方法1(默认):

当消费端在处理消息时抛出异常,那么默认会在当前线程的3次的Retry。该方法是默认的,可以通过修改配置文件,指定channel下的参数,例如:

1
2
3
4
5
6
7
8
9
10
spring:
cloud:
stream:
bindings:
input-test-event:
destination: test-event
group: test-group
binder: rabbit
consumer:
max-attempts: 1

其中:

  • max-attempts 如果等于1,就是不重试;
  • max-attempts 如果大于1,其值就是重试次数。

当消息重试超过最大次数,如果未配置启用DLQ ,消息将会被丢弃。该方法默认是无法设置重试的时间间隔的。

方法2:

方法1是在当前线程进行重试,相当于阻塞了后面的消息,有时我们不想阻塞,则可以利用死信队列(Dead Letter Queue, 缩写DLQ ),进行异步重试。

先看一下DLQ 的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
cloud:
stream:
bindings:
input-test-event:
destination: test-event
group: test-group
binder: rabbit
rabbit:
bindings:
input-test-event:
consumer:
autoBindDlq: true

设置spring.cloud.stream.rabbit.bindings.<channelName>.consumer.autoBindDlq 参数为true,将自动创建对应channel的DLQ ,绑定死信交换机(Dead Letter Exchange, 缩写DLX )。默认该queue的名字就是其对应destination.group 后追加.dlq ,同时,该进入该queue 的消息的routingKey 即为原destination

image-20220125105048438

按上面的配置,消息进入DLQ 以后,因为没有任何的消费者,消息会一直存储于DLQ 中,可以添加dlqTtl 参数设置消息在DLQ中生存的时间,在无消费者的情况下,默认到期后会删除该消息。

如果想指定DLQ的名称,可以用deadLetterQueueName 参数指定。

重试的逻辑其实就是利用DLQ ,给其设置一个默认的exchange ,在TTL 时间到期后,消息会再度转到指定的exchange 对应的queue 中。

为了实现该逻辑,需要配置三个参数:

  • autoBindDql 设置为true,启用DLQ
  • dlqTtl 设置一个死信消息超时时间,变相实现了重试的间隔时间
  • dlqDeadLetterExchange 增加该参数后,留空即为设置默认值。在默认值情况下,DLQ 中的消息将会按照其routingKey的值(也可由deadLetterRoutingKey参数指定),将消息投递到给名称对应该值的quque ,实现消息的重新消费。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
cloud:
stream:
bindings:
input-test-event:
destination: test-event
group: test-group
binder: rabbit
rabbit:
bindings:
input-test-event:
consumer:
autoBindDlq: true
dlqTtl: 5000
dlqDeadLetterExchange:

注意:

因为在配置中,设置了group 这个参数,当该参数使用时,默认rabbit的durable 参数是启用的,即该channel的exhangequeue 是持久化的,应用退出后,不会自动清除,并且保存创建时的参数。所以当改变了channel的参数后,需要将queue删除,让其自动重建,否则新改的不会生效,则无法实现自动重试。

按照上面配置,删除旧queue 后重新启动应用,创建queue 信息如下:

image-20220125105048438

消息重新投递后,在其header 里,会增加一些重试的信息,如下图所示:

image-20220125105048438

  • deliveryAttempt 值代表在当前线程的重试次数,即方法一的重试逻辑
  • x-death 头记录了重试循环的一些详细信息,尤其是值count 记录了经由DLQ 异步重试的次数。

但有时,我们想知道上一次错误的具体异常,此时可以增加republishToDlq 参数,当设置为true时,会在消息头里增加详细的异常和异常堆栈信息。

image-20220125105048438

注:

republishToDlq 设置为不同值时,routingKey 的取值逻辑不同。当为false时,取的是x-death 头中第一个的routing-keys 值;当为true时,取得是X_ORIGINAL_ROUTING_KEY_HEADER 这个Header的值。

此时,该消息将不断重复queue -> DLQ -> queue的循环(假设消费端一直拒绝或抛异常)。如果我们想设置重试次数大于3就不再重试,可以抛出ImmediateAcknowledgeAmqpException 这个异常,则该消息被丢弃,不再进入DLQ

关于消息的拒绝

前面对于消息的拒绝,都是采用抛异常,但是这个异常不能乱抛。不同的异常,框架处理的方式不同:

  • 普通的异常,等同于AmqpRejectAndDontRequeueException ,会导致消息重试
  • ImmediateAcknowledgeAmqpException 这个异常,会导致消息被丢弃不触发重试

有时候,我们不期望在生产的日志中出现重试的ERROR,可以考虑用下面的方案:

  1. 将消费端的acknowledgeMode 从默认的自动改为手动,即 acknowledgeMode: MANUAL
  2. 将channel注入到消费端,手动处理,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
@EnableBinding(TestSink.class)
public class TestConsumer {

private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
private static final Long MAX_RETRY = Long.valueOf(3L);
@StreamListener(TestSink.INPUT)
public void consume(Message message,
@Header(name = AmqpHeaders.CHANNEL, required = false) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag,
@Header(name = "x-death", required = false) Map<?,?> death) throws IOException {
logger.info("收到消息:{}", message);

if(death!=null && death.get("count")!=null && Long.valueOf(death.get("count").toString()).compareTo(MAX_RETRY)>=0){
logger.error("放弃该消息");
channel.basicAck(deliveryTag, false);
return;
}
//c
channel.basicReject(deliveryTag, false);
}
}
本文由 EdisonXu - 徐焱飞 创作,采用 CC BY 4.0 CN协议 进行许可。 可自由转载、引用,但需署名作者且注明文章出处。
本文链接为http://edisonxu.com/2022/01/28/spring-cloud-stream-rabbit.html
如果您觉得文章不错,可以请我喝一杯咖啡!
MQ, Rabbit, Spring, Spring Cloud Stream