面试官:Kafka中的key有什么用?

我们在使用 Kafka 时,最简单、最常用的方式是只设置 topic(主题)和 value(消息体),如下所示:

alt

这样的话获取消息的代码也很简单,如下所示:

@KafkaListener(topics = "mytopic", groupId = "my-group")
public void listen(String data) {
    System.out.println("监听到消息:" + data);
}

但是,除了我们可以设置和传递 topic 和 value 之外,我们还可以传递 key,如下图所示:

alt

那问题来了,发送消息时设置这个 key 有什么用呢?

key的作用

发送消息时,设置 key 的作用如下:

alt

1.决定分区

当生产者发送消息时,如果指定了 key,Kafka 会根据 key 的 hash 值来决定这条消息应该发送到哪个分区。

如果没有指定 key,Kafka 会采用轮询(早期版本)或随机(最新版本)的方式将消息分配到其他分区中。

分区的具体实现源码在 DefaultPartitioner 中 partition 方法中体现,核心源码如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
    return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}

指定 key 之后的分区实现代码如下:

public static int partitionForKey(byte[] serializedKey, int numPartitions) {
    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

以上源码的大概含义是:使用 MurmurHash2 算法对字节数组 serializedKey 进行哈希运算,并将其结果转换为正数,然后对 numPartitions 取模,以确定键在分区中的位置,返回值表示键所在的分区编号。

所以,从上述源码可以看出,发送消息如果设置了 key 之后,会将相同 key 放到同一个分区中。

2.保证消息顺序

在 Kafka 中,同一个分区中的消息是有序的。而相同的 key,根据上面的分区算法可知,它们会存放到同一个分区,这样就能保证消息的有序性了。

3.消息过滤

对于某些应用场景,消费者可以根据消息的键来进行过滤或聚合操作。例如,在实时数据分析场景中,可能需要对具有相同键的消息进行分组处理。

Kafka 设置了 key 之后,可以通过以下方式实现消息过滤,如下代码所示:

@KafkaListener(topics = "topicName", groupId = "groupId")
public void listen(String message, ConsumerRecord<?,?> record) {
    Object key = record.key();
    if (key instanceof String && ((String) key).matches("regexPattern")) {
        // 处理满足正则表达式条件的消息
    }
}

也就是,我们在接收到消息之后,通过对 key 的正则匹配实现消息的过滤和聚合等操作。

课后思考

保证 Kafka 消息的有序性,除了设置 key 之外,还有哪些实现手段?

#java##八股#
全部评论

相关推荐

09-12 21:23
中南大学 Java
1、短链接创建之后是先写数据库还是先写入缓存我答的先写数据库再写缓存,面试官问写缓存这个过程失败了导致请求都落到数据库中怎么办?解决思路:①先写缓存再写数据库(我答的这个,面试官说这个)&nbsp;②写缓存失败,可以用重试机制(基于消息队列的方案,基于canal的方案)2、HashMap什么时候红黑树转链表红黑树大小减到6转链表3、我看你限流用的阿里的sentinel组件,阿里的一些组件也有bug,如果只给你redis,你怎么实现限流呢答:还是使用滑动窗口算法,使用redis的有序集合来存放每个小窗口内的请求数,score存时间戳面试官说你的方案数据存储和计算都在redis里面,有没有更快一点的他说可以数据存redis,计算时把数据加载到本地内存算,会快一些4、线程池设置的这个存活时间,这个时间过了之后,线程会销毁吗核心线程不会销毁,只有非核心线程才会被销毁反问核心线程也可以设置超时时间,你知道吗,不知道~查了下是allowCoreThreadTimeOut(true),核心线程在超出&nbsp;keepAliveTime&nbsp;后可以被销毁,5、线程池怎么销毁核心线程setcorepoolsize方法动态调整核心线程数、关闭线程池、设置allowCoreThreadTimeOut(true)6、为什么Java已经有synchronized,还要Lock接口Lock接口有更多的功能,可设置为公平锁,可中断,可设置超时时间,支持多条件变量
点赞 评论 收藏
分享
2 3 评论
分享
牛客网
牛客企业服务