关于spring:RabbitMQ监听器在MessageListener抛出异常时停止监听消息

RabbitMQ listener stops listening messages when MessageListener throws exception

我正在使用 Spring AMQP 来处理 RabbitMQ 中的消息。下面是问题:
1.(比方说)RabbitMQ 内部有 3 条消息处于就绪状态
2. 第一个被 MessageListener 拾取并开始处理。 (比如说)它最终抛出一个异常
3. 在这种情况下,容器保持运行状态,但在我重新启动容器之前,不会处理剩余的 2 条消息。第一条消息也处于未确认状态。

这是预期的行为吗?如果没有,如何确保其他 2 条消息将被处理,而不管第一个处理失败?

MQ 配置:

1
2
3
4
5
6
7
8
9
10
<rabbit:connection-factory host="localhost" username="guest" password="guest" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:listener-container
    connection-factory="connectionFactory"
    concurrency="1"
    acknowledge="auto">
    <rabbit:listener queue-names="testQueue" ref="myProcessorListener" />
</rabbit:listener-container>

MessageListener 类:

1
2
3
4
5
6
7
8
9
10
public class MyProcessorListener implements MessageListener{
....
    @Override
public void onMessage(Message message) {
try{
...Some logic...

} catch (Exception e) {
  throw new RuntimeException(e.getMessage(), e);
}


消息被一遍又一遍地重新传递;为了拒绝它(并丢弃或路由到死信队列),您需要抛出 AmqpRejectAndDontRequeueException 或将容器的 requeue-rejected 属性设置为 false。使用 Java 配置时,它是 defaultRequeueRejected.

您还可以使用自定义错误处理程序。

这在参考手册中都有解释。

相关讨论

  • 消息重新传递是我所期望的。但我担心的是,当一条消息被拒绝时,为什么侦听器没有处理队列中准备好的其他消息。
  • RabbitMQ 将被拒绝的消息重新排入队列头部,因此如果您只有 concurrency="1",它将被重新处理(并首先被拒绝)。您还可以从默认值 (1) 增加 prefetch,以便在收到拒绝之前将其他消息发送给消费者。
  • 谢谢回复。我在 MessageListener [在 onMessage() 方法中] 放置了一个断点。当接收到第一条消息时,将执行断点。之后,我收到失败消息,消息仍处于"未确认"状态[我在 RabbitMQ 控制台上检查]。如果您说消息在队列的头部重新排队并再次得到处理,那么我的断点应该再次执行,消息应该从"未确认"状态移动到"就绪"状态。我的代码似乎有问题。你能让我知道我做错了什么吗?
  • 通常,此类情况是由线程卡在用户代码中引起的。在大多数情况下,为 org.springframewrk.amqp 启用 DEBUG 或 TRACE 日志记录将显示大量调试消息。如果您没有看到此类消息,则表示线程卡在某个地方(通常在用户代码中)。在这种情况下,请进行线程转储以查看发生了什么。如果您仍然无法弄清楚,请发布调试日志并在某个地方完成线程转储(如 github gist 或 pastebin)。
  • 感谢加里的帮助。如果发生阻塞它的异常,我没有释放锁。它现在工作正常。

以上是关于spring:RabbitMQ监听器在MessageListener抛出异常时停止监听消息的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>