Kafka开发

Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流处理应用。在 Java 中使用 Kafka,通常涉及生产者、消费者、主题配置和集群管理等方面。

1. 安装和配置 Kafka

  1. 下载 Kafka 并解压。
  2. 启动 Zookeeper(Kafka 的依赖服务):
  3. 启动 Kafka Broker:

2. 引入 Maven 依赖

在你的 Java 项目的 pom.xml 中添加以下依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version> <!-- 选择 Kafka 对应的版本 -->
</dependency>

3. 创建 Kafka 生产者

生产者负责将消息发送到 Kafka 主题。

代码示例:生产者

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 配置生产者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka Broker 地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建 Kafka 生产者
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.printf("Message sent: %s | Partition: %d | Offset: %d%n",
                            record.value(), metadata.partition(), metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });
        }

        // 关闭生产者
        producer.close();
    }
}

4. 创建 Kafka 消费者

消费者从 Kafka 主题读取消息。

代码示例:消费者

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("auto.offset.reset", "earliest"); // 从最早的消息开始消费

        // 创建 Kafka 消费者
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        // 拉取消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message: %s | Partition: %d | Offset: %d%n",
                            record.value(), record.partition(), record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

5. 流处理示例

Kafka Streams 是一个强大的流处理库,用于实时处理 Kafka 数据流。

Kafka Streams 示例

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class KafkaStreamsExample {
    public static void main(String[] args) {
        // 配置流处理属性
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");

        // 定义流拓扑
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> sourceStream = builder.stream("input-topic");
        sourceStream.mapValues(value -> value.toUpperCase())
                    .to("output-topic");

        // 启动流处理应用
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

6. 常见优化建议

  1. 分区与副本: 根据需求设置分区和副本,提升吞吐量和容错能力。
  2. 幂等性生产者: 设置 enable.idempotence=true 确保消息的可靠投递。
  3. 消费并发: 配置多个消费者实例,提高消费性能。
  4. 监控: 使用 Kafka Manager 或 Prometheus 监控集群运行状态。
Kafka碎碎念 文章被收录于专栏

Kafka的一些碎碎念,哈哈哈哈哈

全部评论

相关推荐

2024-12-30 22:31
吉首大学 Web前端
工字钢写代码:改成吉林就OK了
点赞 评论 收藏
分享
评论
点赞
2
分享
牛客网
牛客企业服务