重试和死信处理

默认情况下,当你在消费者绑定中配置重试(例如 `maxAttempts`)和 `enableDlq` 时,这些功能是在 binder 内部执行的,listener container 或 Kafka consumer 不参与其中。

在某些情况下,将此功能移至 listener container 中会更好,例如

  • 重试和延迟的总和会超过消费者 `max.poll.interval.ms` 属性的值,这可能会导致分区重平衡。

  • 你希望将死信发布到不同的 Kafka 集群。

  • 你希望为错误处理器添加重试监听器。

  • ...​

要配置将此功能从 binder 移至 container,请定义一个类型为 `ListenerContainerWithDlqAndRetryCustomizer` 的 `@Bean`。此接口包含以下方法

/**
 * Configure the container.
 * @param container the container.
 * @param destinationName the destination name.
 * @param group the group.
 * @param dlqDestinationResolver a destination resolver for the dead letter topic (if
 * enableDlq).
 * @param backOff the backOff using retry properties (if configured).
 * @see #retryAndDlqInBinding(String, String)
 */
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
        @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
        @Nullable BackOff backOff);

/**
 * Return false to move retries and DLQ from the binding to a customized error handler
 * using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
 * configured via
 * {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
 * @param destinationName the destination name.
 * @param group the group.
 * @return false to disable retries and DLQ in the binding
 */
default boolean retryAndDlqInBinding(String destinationName, String group) {
    return true;
}

目标解析器 (destination resolver) 和 `BackOff` 是根据绑定属性(如果已配置)创建的。`KafkaTemplate` 使用 `spring.kafka…​.` 属性中的配置。然后,你可以使用这些来创建自定义的错误处理器和死信发布器;例如

@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
    return new ListenerContainerWithDlqAndRetryCustomizer() {

        @Override
        public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                String group,
                @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                @Nullable BackOff backOff) {

            if (destinationName.equals("topicWithLongTotalRetryConfig")) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }
        }

        @Override
        public boolean retryAndDlqInBinding(String destinationName, String group) {
            return !destinationName.contains("topicWithLongTotalRetryConfig");
        }

    };
}

现在,只需要将单个重试延迟设置得大于消费者 `max.poll.interval.ms` 属性的值即可。

当使用多个 binder 时,'ListenerContainerWithDlqAndRetryCustomizer' bean 会被 'DefaultBinderFactory' 覆盖。要使该 bean 生效,你需要使用 'BinderCustomizer' 来设置 container customizer(参见 [binder-customizer]

@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}