RocketMQ 重复消费

  1. Tag过滤和Key
    • Tag过滤
      • 原理:在RocketMQ中,Tag是对消息的再分类。生产者在发送消息时可以为消息指定Tag,消费者在订阅时可以通过指定Tag来只接收特定类型的消息。这有助于在同一个Topic下对不同子类型的消息进行区分和处理。
      • 示例代码
        • 生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class TagProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("TagProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 发送带有TagA的消息
        Message msg1 = new Message("TagTopic",
                "TagA",
                "Message with TagA".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult1 = producer.send(msg1);
        System.out.printf("%s%n", sendResult1);

        // 发送带有TagB的消息
        Message msg2 = new Message("TagTopic",
                "TagB",
                "Message with TagB".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult2 = producer.send(msg2);
        System.out.printf("%s%n", sendResult2);

        producer.shutdown();
    }
}
   - **消费者**:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TagConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        // 只订阅TagA的消息
        consumer.subscribe("TagTopic", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer started.");
    }
}
  • Key
    • 原理:消息的Key是消息的一个属性,它主要用于消息的查询和定位。通过为消息设置唯一的Key,在排查问题或需要精确查找某条消息时,可以通过Key在RocketMQ的管理控制台或相关工具中快速定位到该消息。
    • 示例代码(生产者设置Key)
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class KeyProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("KeyProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("KeyTopic",
                "TagA",
                "Message with Key".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 设置消息的Key
        msg.setKeys("unique - key - 123");
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}
  1. 消息重复消费问题
    • 产生原因
      • 网络波动:在消息传递过程中,由于网络不稳定,生产者可能会收到Broker未成功接收消息的错误,但实际上消息已成功到达Broker,导致生产者重新发送消息,造成重复。
      • 消费者确认机制:消费者消费消息后,在向Broker发送确认消息时,如果网络出现问题,Broker未收到确认,会认为消息未被成功消费,从而再次投递该消息。
  2. 重复消费解决方案
    • 幂等性处理
      • 原理:幂等性是指对同一操作的多次请求应该产生相同的结果。在消息消费场景中,通过设计消费逻辑,使得无论消息被消费多少次,对业务的影响是一致的。
      • 示例:在电商系统中,如果消息是用于更新订单状态,消费逻辑可以先根据订单ID查询当前订单状态,如果已经是目标状态,则不进行重复操作;如果不是目标状态,则进行状态更新。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class IdempotencyConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("IdempotencyConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("IdempotencyTopic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    // 假设消息体是订单ID
                    String orderId = new String(msg.getBody());
                    // 检查订单状态
                    boolean isTargetStatus = checkOrderStatus(orderId);
                    if (!isTargetStatus) {
                        // 更新订单状态
                        updateOrderStatus(orderId);
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer started.");
    }

    private static boolean checkOrderStatus(String orderId) {
        // 实际逻辑:查询数据库中订单状态
        return false;
    }

    private static void updateOrderStatus(String orderId) {
        // 实际逻辑:更新数据库中订单状态
    }
}
  • 消息去重表
    • 原理:维护一张消息去重表,在消费消息前,先根据消息的唯一标识(如Message ID或自定义的业务唯一标识)查询去重表。如果该消息已被消费过,则直接丢弃;否则,将消息标识插入去重表并进行消息消费。
    • 示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class DuplicateTableConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DuplicateTableConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("DuplicateTableTopic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String msgId = msg.getMsgId();
                    boolean isConsumed = checkDuplicateTable(msgId);
                    if (!isConsumed) {
                        insertIntoDuplicateTable(msgId);
                        // 处理消息
                        processMessage(msg);
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer started.");
    }

    private static boolean checkDuplicateTable(String msgId) {
        // 实际逻辑:查询数据库或缓存中的去重表
        return false;
    }

    private static void insertIntoDuplicateTable(String msgId) {
        // 实际逻辑:向数据库或缓存中的去重表插入记录
    }

    private static void processMessage(MessageExt msg) {
        // 实际逻辑:处理消息
    }
}
  1. 消息重试和死信消息解决方案
    • 消息重试
      • 原理:当消费者消费消息失败时,RocketMQ会自动进行消息重试。默认情况下,消息会重试16次,每次重试的间隔时间会逐渐变长。消费者可以通过实现MessageListenerConcurrently接口并返回ConsumeConcurrentlyStatus.RECONSUME_LATER来触发重试。
      • 示例代码
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RetryConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RetryConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("RetryTopic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        // 模拟消费失败
                        throw new RuntimeException("Simulated consumption failure");
                    } catch (Exception e) {
                        // 返回RECONSUME_LATER触发重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer started.");
    }
}
  • 死信消息
    • 原理:当消息重试达到最大次数后仍然消费失败,该消息会被发送到死信队列(DLQ,Dead - Letter Queue)。死信队列是一个特殊的队列,用于存储这些无法正常消费的消息,方便后续分析和处理。
    • 处理方式
      • 人工干预:运维人员或开发人员可以定期查看死信队列中的消息,分析消费失败的原因,修复问题后手动重新发送这些消息。
      • 自动处理:可以编写一个专门的死信消息处理程序,根据死信消息的内容,自动进行一些处理,如调整消费逻辑、修正数据等,然后重新发送到原队列或其他合适的队列进行消费。

以上代码示例基于RocketMQ 4.x版本,实际应用中可能需要根据具体的RocketMQ版本和业务需求进行调整。同时,确保RocketMQ服务已正确安装和启动,并根据实际情况调整NameServer地址等配置。 RocketMQ

#找工作如何保持松弛感?##牛客创作赏金赛#
全部评论

相关推荐

评论
点赞
2
分享

创作者周榜

更多
牛客网
牛客企业服务