Kafka Topic
Kafka Topic(主题) 是 Kafka 中数据存储和管理的核心单元。生产者将消息发送到主题,消费者从主题中读取消息。Kafka 的主题设计是分布式、高吞吐量和高可靠性的基础。
1. 什么是 Kafka Topic?
Kafka 的 主题 是一个逻辑分类,用于存储和组织消息。每个主题可以分为多个 分区(Partitions),每个分区中的消息是有序的。主题提供了以下核心特性:
- 分区(Partitioning): 一个主题由多个分区组成,每个分区是一个有序的消息队列。
- 复制(Replication): 分区可以被复制到多个 Kafka Broker 上,提供容错能力。
- 日志结构(Log): 消息以追加方式存储到分区的日志中,每条消息都有唯一的 偏移量(Offset)。
2. Kafka Topic 的架构
- 主题和分区:每个主题可以分为多个分区。分区内的消息是有序的,但不同分区之间没有顺序保证。
- 偏移量(Offset):消息在分区中的唯一编号,用于标识消息的顺序。消费者使用偏移量跟踪消费进度。
- 复制(Replication):每个分区可以有一个或多个副本。一个副本为主分区(Leader),其他副本为副本分区(Follower)。生产者和消费者只与主分区交互。
3. Kafka Topic 的管理
Kafka 提供了多种命令行工具和 API 来管理主题,包括创建、查看、删除和修改主题的配置。
3.1 创建主题
使用 kafka-topics.sh
创建主题:
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2
参数说明:
--topic
: 主题名称。--partitions
: 分区数,决定消息的分布。--replication-factor
: 副本因子,决定每个分区的副本数量。
3.2 查看主题
查看所有主题列表:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
查看特定主题的详细信息:
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
输出示例:
Topic: my-topic PartitionCount: 3 ReplicationFactor: 2 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
字段解释:
- Leader: 负责处理读写请求的主分区。
- Replicas: 分区的所有副本。
- Isr (In-Sync Replicas): 当前与 Leader 同步的副本。
3.3 删除主题
删除主题:
bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
4. Kafka Topic 的配置
Kafka 主题支持多种配置,可以通过 CLI 或 API 设置。
4.1 常见配置项
| 设置日志清理策略:
或
。 |
|
| 消息的保留时间(毫秒)。 |
(7天) |
| 消息的最大存储容量(字节)。 | 无限制 |
| 单个日志分段的最大大小(字节)。 |
(1GB) |
| 日志分段的最长保留时间(毫秒)。 | 无限制 |
| 必须同步的副本数量,用于控制写入的可靠性。 |
|
| 单条消息的最大大小(字节)。 |
(1MB) |
4.2 修改主题配置
可以使用 kafka-configs.sh
修改主题配置:
bin/kafka-configs.sh --alter --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --add-config retention.ms=259200000
查看主题配置:
bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
5. Kafka Topic 的日志管理
Kafka 的日志管理主要通过以下方式控制:
5.1 日志清理策略
- 删除(Delete):默认清理策略,超过保留时间或大小的消息将被删除。配置:cleanup.policy=delete
- 压缩(Compact):基于键的清理策略,只保留每个键的最新值。配置:cleanup.policy=compact
5.2 消息保留策略
- 基于时间:使用 retention.ms 设置消息的保留时间。示例:保留 3 天的消息。
- 基于大小:使用 retention.bytes 设置日志的最大存储大小。示例:限制主题存储为 1GB。
6. Kafka Topic 的分区管理
分区是 Kafka 提高并发性和扩展性的关键。
6.1 增加分区
可以动态增加主题的分区数:
bin/kafka-topics.sh --alter --topic my-topic --bootstrap-server localhost:9092 --partitions 5
注意:增加分区可能导致消息顺序被打乱,因为 Kafka 不会自动将现有数据重新分配到新分区。
6.2 分区的分配策略
Kafka 的分区可以通过以下方式分配:
- 轮询(Round Robin): 默认策略,消息均匀分布到所有分区。
- 基于键的分区(Key-based Partitioning): 使用消息键的哈希值选择分区。
- 自定义分区器(Custom Partitioner): 自定义分区逻辑。
7. Kafka Topic 的监控
Kafka 提供多种工具和指标,用于监控主题的状态和性能。
7.1 使用 CLI 工具
查看主题的分区状态:
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
7.2 使用 JMX 监控
通过 JMX 暴露 Kafka 的主题指标,例如:
UnderReplicatedPartitions
: 未同步的分区数。MessagesInPerSec
: 每秒写入的消息数。BytesInPerSec
: 每秒写入的字节数。BytesOutPerSec
: 每秒读取的字节数。
7.3 使用 Prometheus 和 Grafana
通过 Kafka Exporter 和 JMX Exporter 集成 Prometheus 和 Grafana,监控主题的性能。
8. Kafka Topic 的设计与优化
8.1 分区数的选择
- 根据生产者和消费者的并发需求选择分区数。
- 分区过少可能导致性能瓶颈,分区过多会增加管理开销。
8.2 副本因子的选择
- 通常设置为 2 或 3,以确保高可用性。
- 副本因子不应超过 Kafka Broker 的总数。
8.3 消息清理策略
- 使用删除策略(
delete
)处理历史数据。 - 使用压缩策略(
compact
)处理需要保留最新状态的场景。
Kafka的一些碎碎念,哈哈哈哈哈