RocketMQ 重复消费
- Tag过滤和Key
- Tag过滤:
- 原理:在RocketMQ中,Tag是对消息的再分类。生产者在发送消息时可以为消息指定Tag,消费者在订阅时可以通过指定Tag来只接收特定类型的消息。这有助于在同一个Topic下对不同子类型的消息进行区分和处理。
- 示例代码:
- 生产者:
- Tag过滤:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TagProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TagProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 发送带有TagA的消息
Message msg1 = new Message("TagTopic",
"TagA",
"Message with TagA".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult1 = producer.send(msg1);
System.out.printf("%s%n", sendResult1);
// 发送带有TagB的消息
Message msg2 = new Message("TagTopic",
"TagB",
"Message with TagB".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult2 = producer.send(msg2);
System.out.printf("%s%n", sendResult2);
producer.shutdown();
}
}
- **消费者**:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TagConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 只订阅TagA的消息
consumer.subscribe("TagTopic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started.");
}
}
- Key:
- 原理:消息的Key是消息的一个属性,它主要用于消息的查询和定位。通过为消息设置唯一的Key,在排查问题或需要精确查找某条消息时,可以通过Key在RocketMQ的管理控制台或相关工具中快速定位到该消息。
- 示例代码(生产者设置Key):
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class KeyProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("KeyProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("KeyTopic",
"TagA",
"Message with Key".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置消息的Key
msg.setKeys("unique - key - 123");
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
- 消息重复消费问题
- 产生原因:
- 网络波动:在消息传递过程中,由于网络不稳定,生产者可能会收到Broker未成功接收消息的错误,但实际上消息已成功到达Broker,导致生产者重新发送消息,造成重复。
- 消费者确认机制:消费者消费消息后,在向Broker发送确认消息时,如果网络出现问题,Broker未收到确认,会认为消息未被成功消费,从而再次投递该消息。
- 产生原因:
- 重复消费解决方案
- 幂等性处理:
- 原理:幂等性是指对同一操作的多次请求应该产生相同的结果。在消息消费场景中,通过设计消费逻辑,使得无论消息被消费多少次,对业务的影响是一致的。
- 示例:在电商系统中,如果消息是用于更新订单状态,消费逻辑可以先根据订单ID查询当前订单状态,如果已经是目标状态,则不进行重复操作;如果不是目标状态,则进行状态更新。
- 幂等性处理:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class IdempotencyConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("IdempotencyConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("IdempotencyTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 假设消息体是订单ID
String orderId = new String(msg.getBody());
// 检查订单状态
boolean isTargetStatus = checkOrderStatus(orderId);
if (!isTargetStatus) {
// 更新订单状态
updateOrderStatus(orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started.");
}
private static boolean checkOrderStatus(String orderId) {
// 实际逻辑:查询数据库中订单状态
return false;
}
private static void updateOrderStatus(String orderId) {
// 实际逻辑:更新数据库中订单状态
}
}
- 消息去重表:
- 原理:维护一张消息去重表,在消费消息前,先根据消息的唯一标识(如Message ID或自定义的业务唯一标识)查询去重表。如果该消息已被消费过,则直接丢弃;否则,将消息标识插入去重表并进行消息消费。
- 示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class DuplicateTableConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DuplicateTableConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("DuplicateTableTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
boolean isConsumed = checkDuplicateTable(msgId);
if (!isConsumed) {
insertIntoDuplicateTable(msgId);
// 处理消息
processMessage(msg);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started.");
}
private static boolean checkDuplicateTable(String msgId) {
// 实际逻辑:查询数据库或缓存中的去重表
return false;
}
private static void insertIntoDuplicateTable(String msgId) {
// 实际逻辑:向数据库或缓存中的去重表插入记录
}
private static void processMessage(MessageExt msg) {
// 实际逻辑:处理消息
}
}
- 消息重试和死信消息解决方案
- 消息重试:
- 原理:当消费者消费消息失败时,RocketMQ会自动进行消息重试。默认情况下,消息会重试16次,每次重试的间隔时间会逐渐变长。消费者可以通过实现
MessageListenerConcurrently
接口并返回ConsumeConcurrentlyStatus.RECONSUME_LATER
来触发重试。 - 示例代码:
- 原理:当消费者消费消息失败时,RocketMQ会自动进行消息重试。默认情况下,消息会重试16次,每次重试的间隔时间会逐渐变长。消费者可以通过实现
- 消息重试:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RetryConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RetryConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("RetryTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 模拟消费失败
throw new RuntimeException("Simulated consumption failure");
} catch (Exception e) {
// 返回RECONSUME_LATER触发重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started.");
}
}
- 死信消息:
- 原理:当消息重试达到最大次数后仍然消费失败,该消息会被发送到死信队列(DLQ,Dead - Letter Queue)。死信队列是一个特殊的队列,用于存储这些无法正常消费的消息,方便后续分析和处理。
- 处理方式:
- 人工干预:运维人员或开发人员可以定期查看死信队列中的消息,分析消费失败的原因,修复问题后手动重新发送这些消息。
- 自动处理:可以编写一个专门的死信消息处理程序,根据死信消息的内容,自动进行一些处理,如调整消费逻辑、修正数据等,然后重新发送到原队列或其他合适的队列进行消费。
以上代码示例基于RocketMQ 4.x版本,实际应用中可能需要根据具体的RocketMQ版本和业务需求进行调整。同时,确保RocketMQ服务已正确安装和启动,并根据实际情况调整NameServer地址等配置。 RocketMQ
#找工作如何保持松弛感?##牛客创作赏金赛#