消息监听器容器配置

有相当多的选项可以配置 SimpleMessageListenerContainer (SMLC) 和 DirectMessageListenerContainer (DMLC),这些选项与事务和服务质量有关,其中一些选项相互作用。适用于 SMLC、DMLC 或 StreamListenerContainer (StLC) 的属性(请参阅 使用 RabbitMQ 流插件)通过在相应列中的复选标记表示。有关帮助您决定哪个容器适合您的应用程序的信息,请参阅 选择容器

下表显示了容器属性名称及其对应的属性名称(括号中),当使用命名空间配置 <rabbit:listener-container/> 时。该元素的 type 属性可以是 simple(默认)或 direct,分别指定 SMLCDMLC。一些属性未通过命名空间公开。这些属性用 N/A 表示。

表 1. 消息监听器容器的配置选项
属性(Attribute) 描述 SMLC DMLC StLC

ackTimeout
(N/A)

当设置了 messagesPerAck 时,此超时用作发送确认的替代方案。当有新消息到达时,未确认消息的数量会与 messagesPerAck 进行比较,自上次确认以来的时间会与此值进行比较。如果任一条件为 true,则消息将被确认。当没有新消息到达且有未确认消息时,此超时是近似的,因为条件仅在每个 monitorInterval 检查一次。另请参阅此表中的 messagesPerAckmonitorInterval

tickmark

acknowledgeMode
(acknowledge)

  • NONE:不发送确认(与 channelTransacted=true 不兼容)。RabbitMQ 称之为“自动确认”,因为代理在消费者没有任何操作的情况下假定所有消息都已确认。

  • MANUAL:监听器必须通过调用 Channel.basicAck() 来确认所有消息。

  • AUTO:容器自动确认消息,除非 MessageListener 抛出异常。请注意,acknowledgeModechannelTransacted 是互补的——如果通道是事务性的,代理除了确认之外还需要提交通知。这是默认模式。另请参阅 batchSize

tickmark
tickmark

adviceChain
(advice-chain)

一个 AOP 建议数组,用于应用于监听器执行。这可以用于应用额外的横切关注点,例如在代理死亡时自动重试。请注意,在 AMQP 错误后简单的重新连接由 CachingConnectionFactory 处理,只要代理仍然存活。

tickmark
tickmark

afterReceivePostProcessors
(N/A)

一个 MessagePostProcessor 实例数组,在调用监听器之前调用。后处理器可以实现 PriorityOrderedOrdered。数组按顺序排序,未排序的成员最后调用。如果后处理器返回 null,则消息将被丢弃(并酌情确认)。

tickmark
tickmark

alwaysRequeueWithTxManagerRollback
(N/A)

当配置了事务管理器时,设置为 true 以在回滚时始终重新排队消息。

tickmark
tickmark

autoDeclare
(auto-declare)

当设置为 true(默认)时,如果容器在启动期间检测到至少有一个队列丢失(可能是因为它是一个 auto-delete 队列或一个已过期的队列),它会使用 RabbitAdmin 重新声明所有 AMQP 对象(队列、交换器、绑定),但如果队列因任何原因丢失,则重新声明会继续。要禁用此行为,请将此属性设置为 false。请注意,如果所有队列都丢失,容器将无法启动。

在 1.6 版本之前,如果上下文中存在多个管理员,容器会随机选择一个。如果没有管理员,它会在内部创建一个。在这两种情况下,都可能导致意外结果。从 1.6 版本开始,为了使 autoDeclare 工作,上下文中必须只有一个 RabbitAdmin,或者必须使用 rabbitAdmin 属性在容器上配置对特定实例的引用。
tickmark
tickmark

autoStartup
(auto-startup)

标志,指示容器是否应在 ApplicationContext 启动时(作为 SmartLifecycle 回调的一部分,发生在所有 bean 初始化之后)启动。默认为 true,但如果您的代理在启动时可能不可用,您可以将其设置为 false,并在您知道代理准备就绪后手动调用 start()

tickmark
tickmark
tickmark

batchSize
(transaction-size) (batch-size)

当与 acknowledgeMode 设置为 AUTO 结合使用时,容器会尝试处理最多此数量的消息,然后发送确认(等待每条消息直到接收超时设置)。这也是事务通道提交的时候。如果 prefetchCount 小于 batchSize,它将增加以匹配 batchSize

tickmark

batchingStrategy
(N/A)

消息解除批处理时使用的策略。默认 SimpleDebatchingStrategy。参见 批处理带有批处理的 @RabbitListener

tickmark
tickmark

channelTransacted
(channel-transacted)

布尔标志,指示所有消息都应在事务中确认(手动或自动)。

tickmark
tickmark

concurrency
(N/A)

m-n 每个监听器的并发消费者范围(最小,最大)。如果只提供了 n,则 n 是固定数量的消费者。参见 监听器并发

tickmark

concurrentConsumers
(concurrency)

为每个监听器最初启动的并发消费者数量。参见 监听器并发。对于 StLC,并发性通过重载的 superStream 方法控制;参见 使用单活动消费者消费超级流

tickmark
tickmark

connectionFactory
(connection-factory)

ConnectionFactory 的引用。当使用 XML 命名空间配置时,默认引用的 bean 名称是 rabbitConnectionFactory

tickmark
tickmark

consecutiveActiveTrigger
(min-consecutive-active)

消费者连续接收消息的最小数量,且未发生接收超时,此时考虑启动新消费者。也受“batchSize”影响。参见 监听器并发。默认值:10。

tickmark

consecutiveIdleTrigger
(min-consecutive-idle)

消费者在考虑停止消费者之前必须经历的最小接收超时次数。也受“batchSize”影响。参见 监听器并发。默认值:10。

tickmark

consumerBatchEnabled
(batch-enabled)

如果 MessageListener 支持,将其设置为 true 会启用离散消息的批处理,最多达到 batchSize;如果在 receiveTimeout 内没有新消息到达或收集批处理消息的时间超过 batchReceiveTimeout,则会传递部分批处理。当此为 false 时,批处理仅支持由生产者创建的批处理;请参阅 批处理

tickmark

consumerCustomizer
(N/A)

一个 ConsumerCustomizer bean,用于修改容器创建的流消费者。

tickmark

consumerStartTimeout
(N/A)

等待消费者线程启动的时间(毫秒)。如果此时间过去,则会写入错误日志。例如,当配置的 taskExecutor 没有足够的线程来支持容器的 concurrentConsumers 时,可能会发生这种情况。

请参阅 线程和异步消费者。默认值:60000(一分钟)。

tickmark

consumerTagStrategy
(consumer-tag-strategy)

设置 ConsumerTagStrategy 的实现,启用为每个消费者创建唯一的标签。

tickmark
tickmark

consumersPerQueue
(consumers-per-queue)

为每个配置的队列创建的消费者数量。参见 监听器并发

tickmark

consumeDelay
(N/A)

当使用 RabbitMQ 分片插件concurrentConsumers > 1 时,存在一种竞态条件,可能会阻止消费者在分片之间均匀分布。使用此属性在消费者启动之间添加一个小的延迟以避免此竞态条件。您应该尝试不同的值以确定适合您的环境的延迟。

tickmark
tickmark

debatchingEnabled
(N/A)

当为 true 时,监听器容器将解除批处理消息,并用批处理中的每条消息调用监听器。从 2.2.7 版本开始,如果监听器是 BatchMessageListenerChannelAwareBatchMessageListener,则 生产者创建的批处理 将被解除批处理为 List<Message>。否则,批处理中的消息将逐个呈现。默认值为 true。请参阅 批处理带有批处理的 @RabbitListener

tickmark
tickmark

declarationRetries
(declaration-retries)

被动队列声明失败时的重试次数。被动队列声明发生在消费者启动时,或者当从多个队列消费时,如果并非所有队列在初始化期间都可用。当重试次数耗尽后,所有配置的队列都无法被被动声明(出于任何原因),容器行为由前面描述的 missingQueuesFatal 属性控制。默认值:三次重试(总共四次尝试)。

tickmark

defaultRequeueRejected
(requeue-rejected)

确定因监听器抛出异常而被拒绝的消息是否应重新排队。默认值:true

tickmark
tickmark

errorHandler
(error-handler)

ErrorHandler 策略的引用,用于处理在 MessageListener 执行期间可能发生的任何未捕获异常。默认值:ConditionalRejectingErrorHandler

tickmark
tickmark

exclusive
(exclusive)

确定此容器中的单个消费者是否对队列具有独占访问权限。当此为 true 时,容器的并发性必须为 1。如果另一个消费者具有独占访问权限,容器会根据 recovery-intervalrecovery-back-off 尝试恢复消费者。当使用命名空间时,此属性与队列名称一起出现在 <rabbit:listener/> 元素上。默认值:false

tickmark
tickmark

exclusiveConsumerExceptionLogger
(N/A)

一个异常记录器,用于当独占消费者无法访问队列时。默认情况下,此信息以 WARN 级别记录。

tickmark
tickmark

failedDeclarationRetryInterval
(failed-declaration -retry-interval)

被动队列声明重试尝试之间的间隔。被动队列声明发生在消费者启动时,或者当从多个队列消费时,如果并非所有队列在初始化期间都可用。默认值:5000(五秒)。

tickmark
tickmark

forceCloseChannel
(N/A)

如果消费者在 shutdownTimeout 内没有响应关机,如果此值为 true,通道将被关闭,导致任何未确认的消息重新排队。自 2.0 版本起默认为 true。您可以将其设置为 false 以恢复到以前的行为。

tickmark
tickmark

forceStop
(N/A)

设置为 true 表示在处理完当前记录后停止(当容器停止时);导致所有预取的消息重新排队。默认情况下,容器将取消消费者并在停止前处理所有预取的消息。从 2.4.14、3.0.6 版本开始,默认为 false

tickmark
tickmark

globalQos
(global-qos)

当为 true 时,prefetchCount 全局应用于通道,而不是通道上的每个消费者。有关更多信息,请参阅 basicQos.global

tickmark
tickmark

(group)

此仅在使用命名空间时可用。指定时,将以该名称注册一个类型为 Collection<MessageListenerContainer> 的 bean,并且为每个 <listener/> 元素的容器将添加到该集合中。这允许,例如,通过迭代集合来启动和停止容器组。如果多个 <listener-container/> 元素具有相同的 group 值,则集合中的容器构成所有指定容器的聚合。

tickmark
tickmark

idleEventInterval
(idle-event-interval)

请参阅 检测空闲异步消费者

tickmark
tickmark

javaLangErrorHandler
(N/A)

一个 AbstractMessageListenerContainer.JavaLangErrorHandler 实现,在容器线程捕获到 Error 时调用。默认实现调用 System.exit(99);要恢复到以前的行为(不执行任何操作),请添加一个空操作处理器。

tickmark
tickmark

maxConcurrentConsumers
(max-concurrency)

根据需要启动的最大并发消费者数量。必须大于或等于 'concurrentConsumers'。参见 监听器并发

tickmark

messagesPerAck
(N/A)

两次确认之间接收的消息数量。使用此属性可减少发送给代理的确认数量(代价是增加消息重新传递的可能性)。通常,您应该只在大量监听器容器上设置此属性。如果设置了此属性并且消息被拒绝(抛出异常),则未决的确认将被确认,并且失败的消息将被拒绝。事务通道不允许使用此属性。如果 prefetchCount 小于 messagesPerAck,它将增加以匹配 messagesPerAck。默认值:每条消息都确认。另请参阅此表中的 ackTimeout

tickmark

mismatchedQueuesFatal
(mismatched-queues-fatal)

当容器启动时,如果此属性为 true(默认值:false),容器将检查上下文中声明的所有队列是否与代理上已存在的队列兼容。如果存在不匹配的属性(例如 auto-delete)或参数(例如 x-message-ttl),容器(和应用程序上下文)将因致命异常而无法启动。

如果在恢复期间检测到问题(例如,连接丢失后),容器将停止。

应用程序上下文中必须只有一个 RabbitAdmin(或者通过 rabbitAdmin 属性专门在容器上配置一个)。否则,此属性必须为 false

如果在初始启动期间代理不可用,容器将启动并在建立连接时检查条件。
检查是针对上下文中所有队列进行的,而不仅仅是特定监听器配置使用的队列。如果您希望将检查限制在容器使用的那些队列,您应该为容器配置一个单独的 RabbitAdmin,并使用 rabbitAdmin 属性提供对其的引用。有关更多信息,请参阅 条件声明
当启动标记为 @Lazy 的 bean 中的 @RabbitListener 的容器时,将禁用不匹配队列参数的检测。这是为了避免潜在的死锁,这可能会将此类容器的启动延迟长达 60 秒。使用懒惰监听器 bean 的应用程序应在获取对懒惰 bean 的引用之前检查队列参数。
tickmark
tickmark

missingQueuesFatal
(missing-queues-fatal)

当设置为 true(默认值)时,如果代理上没有任何配置的队列可用,则被认为是致命的。这将导致应用程序上下文在启动期间初始化失败。此外,当队列在容器运行时被删除时,默认情况下,消费者会尝试三次连接到队列(以五秒间隔),如果这些尝试失败,则停止容器。

在以前的版本中,此功能不可配置。

当设置为 false 时,在进行三次重试后,容器将进入恢复模式,就像其他问题一样,例如代理宕机。容器将根据 recoveryInterval 属性尝试恢复。在每次恢复尝试期间,每个消费者将再次尝试四次以被动声明队列,间隔为五秒。此过程将无限期地继续。

您还可以使用属性 bean 全局设置所有容器的属性,如下所示

<util:properties
        id="spring.amqp.global.properties">
    <prop key="mlc.missing.queues.fatal">
        false
    </prop>
</util:properties>

此全局属性不应用于任何已设置显式 missingQueuesFatal 属性的容器。

默认重试属性(三次重试,间隔五秒)可以通过设置以下属性来覆盖。

当启动标记为 @Lazy 的 bean 中的 @RabbitListener 的容器时,将禁用缺少队列的检测。这是为了避免潜在的死锁,这可能会将此类容器的启动延迟长达 60 秒。使用懒惰监听器 bean 的应用程序应在获取对懒惰 bean 的引用之前检查队列。
tickmark
tickmark

monitorInterval
(monitor-interval)

对于 DMLC,在此间隔内调度一个任务以监视消费者的状态并恢复任何已失败的消费者。

tickmark

noLocal
(N/A)

设置为 true 可禁用从服务器向在同一通道连接上发布消息的消费者递送消息。

tickmark
tickmark

phase
(phase)

autoStartuptrue 时,此容器应在其内部启动和停止的生命周期阶段。值越低,此容器启动越早,停止越晚。默认值为 Integer.MAX_VALUE,这意味着容器尽可能晚启动,尽可能早停止。

tickmark
tickmark

possibleAuthenticationFailureFatal
(possible-authentication-failure-fatal)

当设置为 true (SMLC 的默认值)时,如果在连接期间抛出 PossibleAuthenticationFailureException,则认为它是致命的。这将导致应用程序上下文在启动期间初始化失败 (如果容器配置了自动启动)。

版本 2.0 起。

DirectMessageListenerContainer

当设置为 false(默认值)时,每个消费者将根据 monitorInterval 尝试重新连接。

SimpleMessageListenerContainer

当设置为 false 时,在 3 次重试之后,容器将进入恢复模式,就像其他问题一样,例如代理宕机。容器将尝试根据 recoveryInterval 属性进行恢复。在每次恢复尝试期间,每个消费者将再次尝试 4 次启动。此过程将无限期地继续。

您还可以使用属性 bean 全局设置所有容器的属性,如下所示

<util:properties
    id="spring.amqp.global.properties">
  <prop
    key="mlc.possible.authentication.failure.fatal">
     false
  </prop>
</util:properties>

此全局属性不适用于任何已设置显式 missingQueuesFatal 属性的容器。

默认重试属性(3 次重试,间隔 5 秒)可以使用此属性之后的属性进行覆盖。

tickmark
tickmark

prefetchCount
(prefetch)

每个消费者可以未确认的未决消息数量。此值越高,消息递送速度越快,但非顺序处理的风险也越高。如果 acknowledgeModeNONE,则忽略此值。如有必要,此值会增加以匹配 batchSizemessagePerAck。自 2.0 版本起默认为 250。您可以将其设置为 1 以恢复到以前的行为。

在某些情况下,预取值应该较低——例如,对于大型消息,尤其是处理速度较慢时(消息可能在客户端进程中累积大量内存),以及如果需要严格的消息顺序(在这种情况下,预取值应设置为 1)。此外,在消息量低且有多个消费者(包括单个监听器容器实例中的并发)的情况下,您可能希望减少预取以获得更均匀的消息分布。

另请参阅 globalQos

tickmark
tickmark

rabbitAdmin
(admin)

当监听器容器监听至少一个自动删除队列,并且在启动期间发现该队列丢失时,容器会使用 RabbitAdmin 声明该队列以及任何相关的绑定和交换器。如果这些元素配置为使用条件声明(请参阅 条件声明),则容器必须使用配置为声明这些元素的管理员。在此处指定该管理员。仅在使用带有条件声明的自动删除队列时才需要。如果您不希望自动删除队列在容器启动之前声明,请将管理员上的 auto-startup 设置为 false。默认为声明所有非条件元素的 RabbitAdmin

tickmark
tickmark

receiveTimeout
(receive-timeout)

等待每条消息的最大时间。如果 acknowledgeMode=NONE,这几乎没有效果——容器会循环并请求另一条消息。对于 batchSize > 1 的事务性 Channel,其影响最大,因为它可能导致已消费的消息在超时到期之前不被确认。当 consumerBatchEnabled 为 true 时,如果此超时在批处理完成之前发生,则会传递部分批处理。

tickmark

batchReceiveTimeout
(batch-receive-timeout)

收集批处理消息的超时时间(毫秒)。它限制了等待填充批处理大小的时间。当 batchSize > 1 且收集批处理消息的时间大于 batchReceiveTime 时,将递送批处理。默认值为 0(无超时)。

tickmark

recoveryBackOff
(recovery-back-off)

指定在消费者因非致命原因启动失败时,尝试启动消费者之间的时间间隔的 BackOff。默认是 FixedBackOff,以五秒间隔无限重试。与 recoveryInterval 互斥。

tickmark
tickmark

recoveryInterval
(recovery-interval)

确定消费者因非致命原因启动失败时,尝试启动消费者之间的时间间隔(毫秒)。默认值:5000。与 recoveryBackOff 互斥。

tickmark
tickmark

retryDeclarationInterval
(missing-queue- retry-interval)

如果在消费者初始化期间一部分配置的队列可用,则消费者将开始从这些队列消费。消费者将使用此间隔尝试被动声明丢失的队列。当此间隔过去后,将再次使用 'declarationRetries' 和 'failedDeclarationRetryInterval'。如果仍然存在丢失的队列,消费者将再次等待此间隔,然后再次尝试。此过程将无限期地持续,直到所有队列都可用。默认值:60000(一分钟)。

tickmark

shutdownTimeout
(N/A)

当容器关闭时(例如,其包含的 ApplicationContext 关闭时),它会在处理完正在处理的消息之前等待,直到达到此限制。默认值为五秒。

tickmark
tickmark

startConsumerMinInterval
(min-start-interval)

每个新消费者按需启动之前必须经过的时间(毫秒)。参见 监听器并发。默认值:10000(10 秒)。

tickmark

statefulRetryFatal
WithNullMessageId (N/A)

当使用有状态重试建议时,如果收到缺少 messageId 属性的消息,默认情况下这被认为是致命的(它将停止消费者)。将其设置为 false 以丢弃(或路由到死信队列)此类消息。

tickmark
tickmark

stopConsumerMinInterval
(min-stop-interval)

当检测到空闲消费者时,自上次消费者停止以来,新的消费者停止前必须经过的时间(毫秒)。参见 监听器并发。默认值:60000(一分钟)。

tickmark

streamConverter
(N/A)

一个 StreamMessageConverter,用于将原生流消息转换为 Spring AMQP 消息。

tickmark

taskExecutor
(task-executor)

对 Spring TaskExecutor(或标准 JDK 1.5+ Executor)的引用,用于执行监听器调用器。默认是 SimpleAsyncTaskExecutor,使用内部管理的线程。

tickmark
tickmark

taskScheduler
(task-scheduler)

对于 DMLC,用于在“monitorInterval”运行监视任务的调度程序。

tickmark

transactionManager
(transaction-manager)

监听器操作的外部事务管理器。也与 channelTransacted 互补——如果 Channel 是事务性的,其事务将与外部事务同步。

tickmark
tickmark
© . This site is unofficial and not affiliated with VMware.