SpringBoot整合Kafka之消费者异常处理机制

依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.7</version>
</dependency>

@KafkaListener

注解参数

  • String id() default "";设置该监听器所属容器的id,如果不设置则自动生成。同一个容器中,id不能重复。
  • String[] topics() default {};消费者订阅的主题,可以是主题名、属性占位符${xiang.topics:test}、表达式。
  • String topicPattern() default "";消费者订阅的主题,可以是主题模式、配置属性的占位符、表达式。
  • String containerFactory() default "";设置创建监听器的工厂。
  • String errorHandler() default "";异常处理器, org.springframework.kafka.listener.KafkaListenerErrorHandler的实现类。
  • String groupId() default "";消费者所属群组ID,可以不配置。
  • boolean idIsGroup() default true;如果没有设置groupId,且idIsGroup设置为true,则使用id(如果设置了)作为群组ID;如果没有设置groupId,且idIsGroup设置为false,则使用消费者工厂提供的group.id作为群组ID。
  • TopicPartition[] topicPartitions() default {};指定主题与分区。
    @KafkaListener(topicPartitions = {  
        @TopicPartition(topic = "test", partitions = {"1", "3"}, partitionOffsets = {  
                @PartitionOffset(partition = "1", initialOffset = "12"),  
                @PartitionOffset(partition = "3", initialOffset = "23")  
        })  
    })
    
  • String concurrency() default "";消费者并发数,即创建的消费者客户端个数。

注解描述

该注解用于标记一个方法,在指定主题上作为Kafka消息监听器的目标方法(动态代理)。 containerFactory()用于指定构建Kafka监听器容器的 KafkaListenerContainerFactory。如果不通过containerFactory()设置,并且没有通过配置指定默认的容器工厂,则将使用名为kafkaListenerContainerFactory的默认容器工厂。

通过注册一个KafkaListenerAnnotationBeanPostProcessor来完成@KafkaListener注解的处理。可以手动注册,也可以通过@EnableKafka注解完成。被@KafkaListener标记的方法可以拥有灵活的方法签名:

  • ConsumerRecord:用于获取Kafka原始消息
  • Acknowledgment:用于手动 ack

当@KafkaListener标记在方法上时,会为每个方法创建一个监听器容器。MessageListener是MessagingMessageListenerAdapter,提供消息处理需要调用的方法的是MethodKafkaListenerEndpoint。

当@KafkaListener标记在类上时,会创建一个容器监听器,用于处理所有标记了@KafkaHandler注解的方法。被@KafkaHandler标记的所有方法不能造成任何歧义,以便能够为特定的消息解析一个方法。MessageListener是配置了MessagingMessageListenerAdapter,提供消息处理需要调用的方法的是MultiMethodKafkaListenerEndpoint。

KafkaListenerContainerFactory

该接口有两个默认实现:AbstractKafkaListenerContainerFactory、ConcurrentKafkaListenerContainerFactory,后者用于创建 ConcurrentMessageListenerContainer(并发消息监听器容器)。由于前者是抽象类,因此猜测默认容器工厂就是ConcurrentKafkaListenerContainerFactory。

在KafkaListenerAnnotationBeanPostProcessor的afterSingletonsInstantiated方法末尾,有一个注册所有监听器的地方:

// Actually register all listeners  
this.registrar.afterPropertiesSet();

进入该方法将来到如下方法,其中有个解析容器工厂的地方: alt

解析容器工厂方法: alt 可见,默认容器工厂的确是ConcurrentKafkaListenerContainerFactory。

ConcurrentKafkaListenerContainerFactory

该工厂创建的容器是ConcurrentMessageListenerContainer。其参数concurrency指定创建多少个KafkaMessageListenerContainer,默认为1。KafkaMessageListenerContainer的doStart()方法中会创建监听器消费者: alt 默认错误处理器为;SeekToCurrentErrorHandler。是在ListenerConsumer构造方法中设置的:

  • 获取错误处理器为null alt
  • 确认错误处理器 alt
  • 创建错误处理器,最大尝试次数为9 alt

SeekToCurrentErrorHandler

当消息处理出现错误时,查找还需处理的记录中每个主题当前的偏移量,用于倒回分区以便重新消费。出现错误时,会调用其handle方法,默认处理结束后确认消息(提交偏移量)。 alt SeekUtils.seekOrRecover: alt doSeeks: alt 判断是否恢复第一条消息,recovery.recovered方法:

// 如果最大重试次数大于1,则noRetries为false。
if (this.noRetries) {  
    attemptRecovery(record, exception, null, consumer);  
    return true;
}
...
// (主题+分区)只有唯一一个FailedRecord对象,存在Map里。
FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);
...
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();

alt 如果重试次数达到最大值(9次)则返回-1,最终recovery.recovered方法返回true,则跳过第当前消息。如果不跳过则一起seekPartitions:重置下一次拉取消息的偏移量,并返回false。然后抛出异常。 alt 当一条消息处理失败——>重置偏移量——>主动抛出异常——>打印错误日志。重新拉取消息处理,如果处理失败——>重置偏移量——>主动抛出异常——>打印错误日志,最大重试9次,同一条消息最大处理10次。然后判断是否提交偏移量: alt

重新拉取or重新消费

  • 重新拉取:重新拉取失败的消息并消费。
  • 重新消费:不重新拉取失败的消息,而是直接尝试消费。

Kafka消费者在出现异常时,会重试9次。那么重试是重新拉取还是重新消费?通过阅读代码发现:出现异常时,只是重置了偏移量,因此猜测是重新拉取。

测试方法

设置拉取间隔,观察出现异常时重试间隔是否与拉取间隔一致。如果一致,则说明是重新拉取;如果不一致,则说明是重新消费。

具体操作

  • 设置拉取间隔:默认值:0L。
@Configuration  
public class KafkaConfig {  
    @Bean  
    public ConcurrentKafkaListenerContainerFactory<?, ?> factory(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {  
        factory.getContainerProperties().setIdleBetweenPolls(3000L);  
        return factory;  
    }  
}
  • 消息处理抛出异常,查看重试间隔
    • 间隔3秒: alt

    • 间隔7秒: alt

小结

通过观察不同拉取间隔和重试间隔,发现重试间隔与拉取间隔基本一致,因此可以确定:Spring-Kafka(2.7.7)默认消费异常处理为重新拉取9次,即一条消息默认最大10次处理机会

MAX_POLL_INTERVAL_MS_CONFIG

查看消费者配置能看到还有一个类似拉取间隔的配置:ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG。该配置表示消费者最大拉取间隔:默认5分钟。决定了Kafka消费者最大拉取空闲时间,如果超过这个时间broker就认为该消费者可能不属于该消费者组了,触发一次再均衡。

  • 设置消费者最大拉取间隔
kafka:  
  ...
  consumer:  
	...
    properties:  
      max:
	    poll:  
		  interval:  
		    ms: 1000
  • 消息处理过程中sleep5秒:当消费者处理时长超过消费者最大拉取间隔时,被认为不活跃了并触发了再均衡。 alt 并且一直反复:消费->再均衡->消费->再均衡: alt

拉取间隔与消费者最大拉取间隔之间的联系

不同间隔时长测试

  • 拉取间隔设置为10秒,消费者最大拉取间隔设置为3秒:工厂拉取间隔失效。 alt
  • 拉取间隔设置为3秒,消费者最大拉取间隔设置为3秒:工厂拉取间隔失效。 alt
  • 拉取间隔设置为3秒,消费者最大拉取间隔设置为50秒:工厂拉取间隔生效。 alt

源码探究

  • ConcurrentMessageListenerContainer最终创建ConcurrentMessageListenerContainer
  • ConcurrentMessageListenerContainer将根据topic等配置创建对应的KafkaMessageListenerContainer alt
  • KafkaMessageListenerContainer内部有个内部类:ListenerConsumer。ListenerConsumer实现了Runnable接口,在run()方法中调用了pollAndInvoke()。 alt
  • pollAndInvoke()方法中判断是否需要在下一次拉取前等待一段时间 alt
  • idleBetweenPollIfNecessary()方法中 alt 从上图可知,实际拉取间隔为:消费者最大拉取间隔-距离上一次拉取的时间-5秒,与拉取间隔之间的最小值。因此只有当消费者最大拉取间隔大于5秒时,拉取间隔才可能生效。

小结

  • 如果消费者处理时长较长且不可避免,则应该将消费者最大拉取间隔设置为一个较大的值。
  • 如果希望工厂拉取间隔(idleBetweenPools)生效,则消费者最大拉取间隔(maxPollInterval)最小应该设置为:2 x idleBetweenPools + 5000。否则,等待时间将动态变化,可能存在拉取间隔为0的情况。
全部评论

相关推荐

11-07 13:31
怀化学院 Java
勇敢牛牛不怕难:又疯一个
点赞 评论 收藏
分享
1 收藏 评论
分享
牛客网
牛客企业服务