RocketMQ的安装配置与使用

一、关于RocketMQ

关于RocketMQ的信息以及和其他MQ的对比,请看官网
本文章只谈及一些关于RocketMQ的知识,其他关于MQ的是什么和好处不谈及,请看其他大佬文章,谢谢!

二、RocketMQ的安装和配置

关于RocketMQ的安装和配置,官网已经给出了Linux和Windows的安装和配置,大家可以自行前往了解,这里以Windows为例演示。

1.下载RocketMQ

官网地址 4.8.0 版本
<mark>进去以后 , 点击第一个下载</mark>

<mark>下载完成进行解压</mark>

2. 配置环境变量

以上图为例,配置环境变量 (windows10)
此电脑右键–》属性–》高级系统设置–》高级选项卡–》环境变量
下方系统变量,点击新建
配置如下


配置好以上两个环境变量后,点击确认即可

3. 启动服务

<mark>请先确保您的电脑安装了JDK1.8+并且配置好了环境变量</mark>

进入到安装目录下的bin目录

打开cmd命令行窗口
启动namesrv

D:\dev_tools\rocketmq-all-4.8.0-bin-release\bin>mqnamesrv.cmd


新的窗口启动broker

D:\dev_tools\rocketmq-all-4.8.0-bin-release\bin>mqbroker.cmd


显示如上结果表示已经安装成功

4. 简单测试一下

新起一个命令行窗口 Producer 发送消息
命令

D:\dev_tools\rocketmq-all-4.8.0-bin-release\bin>tools.cmd org.apache.rocketmq.example.quickstart.Producer


新起一个命令行窗口 Consumer 消费消息

D:\dev_tools\rocketmq-all-4.8.0-bin-release\bin>tools.cmd org.apache.rocketmq.example.quickstart.Consumer


回车
消费者接收消息

三、Java客户端 rocketmq-client

<mark>上面启动的服务别关哦</mark>

1. 创建maven工程,配置 pom.xml

 		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>

2.编码,消息消费者和生产者,发布订阅模式

这里演示同步发送和异步发送

1)编写消费者(Consumer)

/** * 消费者 */
public class Consumer {
   

    public static void main(String[] args) throws Exception{
   
        // 实例化一个消息消费者,并指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
        // 设置名称服务器地址
        consumer.setNamesrvAddr("localhost:9876");
        // 设置订阅topic 以及 tag
        consumer.subscribe("scheduleMsg", "*");
        // 设置消息监听
        consumer.registerMessageListener(new MessageListenerConcurrently() {
   
            /** * 消费消息 * @param msgs * @param context * @return */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
   
                // 打印当前是哪个现场消费的消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(),msgs);
                System.out.println();
                msgs.forEach(messageExt -> {
   
                    // 消费消息
                    System.out.println(new String(messageExt.getBody()));
                });
                // 返回消息状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 消费者启动
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}
2)编写生产者(Producer)<mark>同步</mark>
/** * 同步发送者 */
public class SyncProducer {
   

    public static void main(String[] args) throws Exception {
   
        // 实例化消息生产者,并指定组名
        DefaultMQProducer producer = new DefaultMQProducer("first_group");

        producer.setNamesrvAddr("localhost:9876");

        // 生产者启动
        producer.start();

        // 循环发送100条消息
        for (int i = 0; i < 100; i++) {
   
            // 创建消息实例,设置Topic , Tag , 消息内容 body
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ lzx" +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容主体 */
            );
            // 消息发送,并且获取返回结果
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        // 发送消息完成,关闭生产者
        producer.shutdown();
    }

}
3)编写生产者(Producer)<mark>异步</mark>
/** * 异步发送者 */
public class ASyncProducer {
   

    public static void main(String[] args) throws MQClientException, InterruptedException {
   
        DefaultMQProducer producer = new DefaultMQProducer("first_group");
        producer.setNamesrvAddr("localhost:9876");
        /** * 启动 */
        producer.start();
        /** * 发送失败重试时间 */
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
   
            try {
   
                final int index = i;
                Message msg = new Message("TopicTest","TagA",
                        "Hello world lzx".getBytes(RemotingHelper.DEFAULT_CHARSET));
                /** * 设置回调方法实现异步 */
                producer.send(msg, new SendCallback() {
   
                    @Override
                    public void onSuccess(SendResult sendResult) {
   
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
   
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
   
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();

    }

}

3. 启动main测试

先启动Consumer
再启动Producer
这里就不贴图了
效果可以拷贝代码试试

四、RocketMQ的其他模型

这种发布订阅的模型是最基本的模型
RocketMQ也支持顺序消息,延迟消息,批量消息,事务消息等。

RocketMQ的github中给出了这些模式的样例,大家可以自行前往学习
样例(GitHub)

全部评论

相关推荐

不愿透露姓名的神秘牛友
11-27 10:48
点赞 评论 收藏
分享
10-09 09:39
门头沟学院 C++
HHHHaos:这也太虚了,工资就一半是真的
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务