Kafka开发
Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流处理应用。在 Java 中使用 Kafka,通常涉及生产者、消费者、主题配置和集群管理等方面。
1. 安装和配置 Kafka
- 下载 Kafka 并解压。
- 启动 Zookeeper(Kafka 的依赖服务):
- 启动 Kafka Broker:
2. 引入 Maven 依赖
在你的 Java 项目的 pom.xml
中添加以下依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> <!-- 选择 Kafka 对应的版本 --> </dependency>
3. 创建 Kafka 生产者
生产者负责将消息发送到 Kafka 主题。
代码示例:生产者
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // 配置生产者属性 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Kafka Broker 地址 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建 Kafka 生产者 Producer<String, String> producer = new KafkaProducer<>(props); // 发送消息 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "value-" + i); producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.printf("Message sent: %s | Partition: %d | Offset: %d%n", record.value(), metadata.partition(), metadata.offset()); } else { exception.printStackTrace(); } }); } // 关闭生产者 producer.close(); } }
4. 创建 Kafka 消费者
消费者从 Kafka 主题读取消息。
代码示例:消费者
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // 配置消费者属性 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("auto.offset.reset", "earliest"); // 从最早的消息开始消费 // 创建 Kafka 消费者 Consumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题 consumer.subscribe(Collections.singletonList("test-topic")); // 拉取消息 try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Consumed message: %s | Partition: %d | Offset: %d%n", record.value(), record.partition(), record.offset()); } } } finally { consumer.close(); } } }
5. 流处理示例
Kafka Streams 是一个强大的流处理库,用于实时处理 Kafka 数据流。
Kafka Streams 示例
import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import java.util.Properties; public class KafkaStreamsExample { public static void main(String[] args) { // 配置流处理属性 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde"); // 定义流拓扑 StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> sourceStream = builder.stream("input-topic"); sourceStream.mapValues(value -> value.toUpperCase()) .to("output-topic"); // 启动流处理应用 KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }
6. 常见优化建议
- 分区与副本: 根据需求设置分区和副本,提升吞吐量和容错能力。
- 幂等性生产者: 设置
enable.idempotence=true
确保消息的可靠投递。 - 消费并发: 配置多个消费者实例,提高消费性能。
- 监控: 使用 Kafka Manager 或 Prometheus 监控集群运行状态。
Kafka碎碎念 文章被收录于专栏
Kafka的一些碎碎念,哈哈哈哈哈