Kafka 消费者

Kafka 消费者(Consumer)负责从 Kafka 的主题(Topic)中读取消息。消费者是 Kafka 数据流的接收端,能够根据一定的配置和策略来获取数据并进行处理。

1. Kafka 消费者的基本原理

Kafka 消费者有以下主要特性:

  1. 消费组(Consumer Group): 消费者可以通过加入消费组来实现消息的分发,确保消息的高效消费和负载均衡。
  2. 偏移量(Offset): Kafka 消费者在每个分区中都有一个偏移量,指示消息的消费进度。偏移量由 Kafka 维护,也可以由消费者手动控制。
  3. 消息顺序: Kafka 保证每个分区内的消息顺序,但不同分区的消息不保证顺序。
  4. 重复消费与幂等性: 消费者可以通过配置和设计来避免消息的重复消费。

2. Kafka 消费者配置

Kafka 消费者的配置决定了其行为,最常用的配置如下:

bootstrap.servers

Kafka 集群地址,用于初始化消费者客户端。

localhost:9092

group.id

消费者所属的消费组 ID,Kafka 根据消费组来协调消息的消费。

my-consumer-group

key.deserializer

消息键的反序列化类。

org.apache.kafka.common.serialization.StringDeserializer

value.deserializer

消息值的反序列化类。

org.apache.kafka.common.serialization.StringDeserializer

auto.offset.reset

消费者如何处理偏移量,如果没有偏移量或偏移量无效时的处理方式。

earliest, latest, none

enable.auto.commit

是否自动提交偏移量。

true, false

auto.commit.interval.ms

自动提交偏移量的时间间隔。

5000

session.timeout.ms

消费者与 Kafka 集群之间的会话超时。

10000

max.poll.records

每次拉取的最大消息数量。

500

3. Kafka 消费者代码实现

基础示例:简单消费者

这是一个简单的 Kafka 消费者示例,它会从 test-topic 主题中消费消息并输出:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {
    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 集群地址
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组 ID
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息键反序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息值反序列化类
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 如果没有偏移量,自动从最早消息开始消费

        // 创建消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        // 拉取消息并处理
        try {
            while (true) {
                consumer.poll(100).forEach(record -> {
                    System.out.printf("Consumed record: Key=%s, Value=%s, Offset=%d%n", record.key(), record.value(), record.offset());
                });
            }
        } finally {
            consumer.close();
        }
    }
}

带手动偏移量管理的消费者

手动提交偏移量,不依赖 Kafka 自动提交。这可以通过 enable.auto.commit = false 来实现,并使用 commitSync()commitAsync() 方法手动提交偏移量。

import org.apache.kafka.clients.consumer.*;

import java.util.Collections;
import java.util.Properties;

public class ManualOffsetConsumer {
    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-offset-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 禁用自动提交

        // 创建消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed record: Key=%s, Value=%s, Offset=%d%n", record.key(), record.value(), record.offset());
                }

                // 手动提交偏移量
                consumer.commitSync(); // 确保提交
            }
        } finally {
            consumer.close();
        }
    }
}

4. Kafka 消费者的高级特性

消费者组与负载均衡

Kafka 消费者通过 消费组(Consumer Group) 实现消息的并行处理。当多个消费者属于同一个消费组时,Kafka 会将消息分配给消费组中的每个消费者处理,每个消费者处理不同分区的数据。消费者组中的每个消费者只会消费自己分配到的分区。

  • 每个分区只能被同一消费组内的一个消费者消费。
  • 如果消费组中的消费者数量超过分区数量,部分消费者将空闲。
  • 如果消费者故障,Kafka 会将分区重新分配给其他消费者。

动态重新平衡

Kafka 会动态地重新平衡消费者组中的消费者和分区分配。消费者可以监听消费组的变化,并及时响应。

consumer.subscribe(Collections.singletonList("test-topic"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 当分区被撤销时执行
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 当分区被分配时执行
    }
});

5. Kafka 消费者的性能优化

配置优化

  1. fetch.min.bytes 最小拉取消息的字节数。设置该参数可以提高性能,但会增加延迟。
  2. fetch.max.wait.ms 拉取请求的最大等待时间。合理配置可以减少请求的数量,提升吞吐量。
  3. max.poll.records 每次拉取的最大记录数。适当设置可以避免消费者被过多消息阻塞。

批量处理

通过批量拉取消息并批量处理,可以减少消费的延迟并提高吞吐量。

consumer.poll(100).forEach(record -> {
    // 批量处理逻辑
});

消费者并发

可以通过启动多个消费者实例来并行消费消息,增加消息处理的吞吐量。

6. 消费者常见问题

  1. 重复消费: 如果 enable.auto.commit = false 或出现消费者崩溃并重新启动,可能会导致消息重复消费。可以通过存储偏移量来避免重复消费。
  2. 偏移量丢失: 如果消费者的偏移量丢失或无法找到,可能会导致消息丢失。可以通过设置合理的 auto.offset.reset 参数来控制这种情况。
Kafka碎碎念 文章被收录于专栏

Kafka的一些碎碎念,哈哈哈哈哈

全部评论

相关推荐

评论
1
1
分享

创作者周榜

更多
牛客网
牛客企业服务