SpringBoot整合Kafka之消费者异常处理机制
依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.7</version>
</dependency>
@KafkaListener
注解参数
- String id() default "";设置该监听器所属容器的id,如果不设置则自动生成。同一个容器中,id不能重复。
- String[] topics() default {};消费者订阅的主题,可以是主题名、属性占位符
${xiang.topics:test}
、表达式。 - String topicPattern() default "";消费者订阅的主题,可以是主题模式、配置属性的占位符、表达式。
- String containerFactory() default "";设置创建监听器的工厂。
- String errorHandler() default "";异常处理器, org.springframework.kafka.listener.KafkaListenerErrorHandler的实现类。
- String groupId() default "";消费者所属群组ID,可以不配置。
- boolean idIsGroup() default true;如果没有设置groupId,且idIsGroup设置为true,则使用id(如果设置了)作为群组ID;如果没有设置groupId,且idIsGroup设置为false,则使用消费者工厂提供的group.id作为群组ID。
- TopicPartition[] topicPartitions() default {};指定主题与分区。
@KafkaListener(topicPartitions = { @TopicPartition(topic = "test", partitions = {"1", "3"}, partitionOffsets = { @PartitionOffset(partition = "1", initialOffset = "12"), @PartitionOffset(partition = "3", initialOffset = "23") }) })
- String concurrency() default "";消费者并发数,即创建的消费者客户端个数。
注解描述
该注解用于标记一个方法,在指定主题上作为Kafka消息监听器的目标方法(动态代理)。 containerFactory()用于指定构建Kafka监听器容器的 KafkaListenerContainerFactory。如果不通过containerFactory()设置,并且没有通过配置指定默认的容器工厂,则将使用名为kafkaListenerContainerFactory的默认容器工厂。
通过注册一个KafkaListenerAnnotationBeanPostProcessor来完成@KafkaListener注解的处理。可以手动注册,也可以通过@EnableKafka注解完成。被@KafkaListener标记的方法可以拥有灵活的方法签名:
- ConsumerRecord:用于获取Kafka原始消息
- Acknowledgment:用于手动 ack
- 等
当@KafkaListener标记在方法上时,会为每个方法创建一个监听器容器。MessageListener是MessagingMessageListenerAdapter,提供消息处理需要调用的方法的是MethodKafkaListenerEndpoint。
当@KafkaListener标记在类上时,会创建一个容器监听器,用于处理所有标记了@KafkaHandler注解的方法。被@KafkaHandler标记的所有方法不能造成任何歧义,以便能够为特定的消息解析一个方法。MessageListener是配置了MessagingMessageListenerAdapter,提供消息处理需要调用的方法的是MultiMethodKafkaListenerEndpoint。
KafkaListenerContainerFactory
该接口有两个默认实现:AbstractKafkaListenerContainerFactory、ConcurrentKafkaListenerContainerFactory,后者用于创建 ConcurrentMessageListenerContainer(并发消息监听器容器)。由于前者是抽象类,因此猜测默认容器工厂就是ConcurrentKafkaListenerContainerFactory。
在KafkaListenerAnnotationBeanPostProcessor的afterSingletonsInstantiated
方法末尾,有一个注册所有监听器的地方:
// Actually register all listeners
this.registrar.afterPropertiesSet();
进入该方法将来到如下方法,其中有个解析容器工厂的地方:
解析容器工厂方法: 可见,默认容器工厂的确是ConcurrentKafkaListenerContainerFactory。
ConcurrentKafkaListenerContainerFactory
该工厂创建的容器是ConcurrentMessageListenerContainer。其参数concurrency指定创建多少个KafkaMessageListenerContainer,默认为1。KafkaMessageListenerContainer的doStart()方法中会创建监听器消费者: 默认错误处理器为;SeekToCurrentErrorHandler。是在ListenerConsumer构造方法中设置的:
- 获取错误处理器为null
- 确认错误处理器
- 创建错误处理器,最大尝试次数为9
SeekToCurrentErrorHandler
当消息处理出现错误时,查找还需处理的记录中每个主题当前的偏移量,用于倒回分区以便重新消费。出现错误时,会调用其handle方法,默认处理结束后确认消息(提交偏移量)。 SeekUtils.seekOrRecover: doSeeks: 判断是否恢复第一条消息,recovery.recovered方法:
// 如果最大重试次数大于1,则noRetries为false。
if (this.noRetries) {
attemptRecovery(record, exception, null, consumer);
return true;
}
...
// (主题+分区)只有唯一一个FailedRecord对象,存在Map里。
FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);
...
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
如果重试次数达到最大值(9次)则返回-1,最终recovery.recovered方法返回true,则跳过第当前消息。如果不跳过则一起seekPartitions:重置下一次拉取消息的偏移量,并返回false。然后抛出异常。 当一条消息处理失败——>重置偏移量——>主动抛出异常——>打印错误日志。重新拉取消息处理,如果处理失败——>重置偏移量——>主动抛出异常——>打印错误日志,最大重试9次,同一条消息最大处理10次。然后判断是否提交偏移量:
重新拉取or重新消费
- 重新拉取:重新拉取失败的消息并消费。
- 重新消费:不重新拉取失败的消息,而是直接尝试消费。
Kafka消费者在出现异常时,会重试9次。那么重试是重新拉取还是重新消费?通过阅读代码发现:出现异常时,只是重置了偏移量,因此猜测是重新拉取。
测试方法
设置拉取间隔,观察出现异常时重试间隔是否与拉取间隔一致。如果一致,则说明是重新拉取;如果不一致,则说明是重新消费。
具体操作
- 设置拉取间隔:默认值:0L。
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> factory(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
factory.getContainerProperties().setIdleBetweenPolls(3000L);
return factory;
}
}
- 消息处理抛出异常,查看重试间隔
-
间隔3秒:
-
间隔7秒:
-
小结
通过观察不同拉取间隔和重试间隔,发现重试间隔与拉取间隔基本一致,因此可以确定:Spring-Kafka(2.7.7)默认消费异常处理为重新拉取9次,即一条消息默认最大10次处理机会。
附
MAX_POLL_INTERVAL_MS_CONFIG
查看消费者配置能看到还有一个类似拉取间隔的配置:ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
。该配置表示消费者最大拉取间隔:默认5分钟。决定了Kafka消费者最大拉取空闲时间,如果超过这个时间broker就认为该消费者可能不属于该消费者组了,触发一次再均衡。
- 设置消费者最大拉取间隔
kafka:
...
consumer:
...
properties:
max:
poll:
interval:
ms: 1000
- 消息处理过程中sleep5秒:当消费者处理时长超过消费者最大拉取间隔时,被认为不活跃了并触发了再均衡。 并且一直反复:消费->再均衡->消费->再均衡:
拉取间隔与消费者最大拉取间隔之间的联系
不同间隔时长测试
- 拉取间隔设置为10秒,消费者最大拉取间隔设置为3秒:工厂拉取间隔失效。
- 拉取间隔设置为3秒,消费者最大拉取间隔设置为3秒:工厂拉取间隔失效。
- 拉取间隔设置为3秒,消费者最大拉取间隔设置为50秒:工厂拉取间隔生效。
源码探究
- ConcurrentMessageListenerContainer最终创建ConcurrentMessageListenerContainer
- ConcurrentMessageListenerContainer将根据topic等配置创建对应的KafkaMessageListenerContainer
- KafkaMessageListenerContainer内部有个内部类:
ListenerConsumer
。ListenerConsumer实现了Runnable接口,在run()方法中调用了pollAndInvoke()。 - pollAndInvoke()方法中判断是否需要在下一次拉取前等待一段时间
- idleBetweenPollIfNecessary()方法中 从上图可知,实际拉取间隔为:消费者最大拉取间隔-距离上一次拉取的时间-5秒,与拉取间隔之间的最小值。因此只有当消费者最大拉取间隔大于5秒时,拉取间隔才可能生效。
小结
- 如果消费者处理时长较长且不可避免,则应该将消费者最大拉取间隔设置为一个较大的值。
- 如果希望工厂拉取间隔(
idleBetweenPools
)生效,则消费者最大拉取间隔(maxPollInterval
)最小应该设置为:2 x idleBetweenPools + 5000。否则,等待时间将动态变化,可能存在拉取间隔为0的情况。