RocketMQ的安装配置与使用
RocketMQ的安装配置与使用
一、关于RocketMQ
关于RocketMQ的信息以及和其他MQ的对比,请看官网
本文章只谈及一些关于RocketMQ的知识,其他关于MQ的是什么和好处不谈及,请看其他大佬文章,谢谢!
二、RocketMQ的安装和配置
关于RocketMQ的安装和配置,官网已经给出了Linux和Windows的安装和配置,大家可以自行前往了解,这里以Windows为例演示。
1.下载RocketMQ
官网地址 4.8.0 版本
<mark>进去以后 , 点击第一个下载</mark>
<mark>下载完成进行解压</mark>
2. 配置环境变量
以上图为例,配置环境变量 (windows10)
此电脑右键–》属性–》高级系统设置–》高级选项卡–》环境变量
下方系统变量,点击新建
配置如下
配置好以上两个环境变量后,点击确认即可
3. 启动服务
<mark>请先确保您的电脑安装了JDK1.8+并且配置好了环境变量</mark>
进入到安装目录下的bin目录
打开cmd命令行窗口
启动namesrv
D:\dev_tools\rocketmq-all-4.8.0-bin-release\bin>mqnamesrv.cmd
新的窗口启动broker
D:\dev_tools\rocketmq-all-4.8.0-bin-release\bin>mqbroker.cmd
显示如上结果表示已经安装成功
4. 简单测试一下
新起一个命令行窗口 Producer 发送消息
命令
D:\dev_tools\rocketmq-all-4.8.0-bin-release\bin>tools.cmd org.apache.rocketmq.example.quickstart.Producer
新起一个命令行窗口 Consumer 消费消息
D:\dev_tools\rocketmq-all-4.8.0-bin-release\bin>tools.cmd org.apache.rocketmq.example.quickstart.Consumer
回车
消费者接收消息
三、Java客户端 rocketmq-client
<mark>上面启动的服务别关哦</mark>
1. 创建maven工程,配置 pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
2.编码,消息消费者和生产者,发布订阅模式
这里演示同步发送和异步发送
1)编写消费者(Consumer)
/** * 消费者 */
public class Consumer {
public static void main(String[] args) throws Exception{
// 实例化一个消息消费者,并指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
// 设置名称服务器地址
consumer.setNamesrvAddr("localhost:9876");
// 设置订阅topic 以及 tag
consumer.subscribe("scheduleMsg", "*");
// 设置消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
/** * 消费消息 * @param msgs * @param context * @return */
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 打印当前是哪个现场消费的消息
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(),msgs);
System.out.println();
msgs.forEach(messageExt -> {
// 消费消息
System.out.println(new String(messageExt.getBody()));
});
// 返回消息状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消费者启动
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
2)编写生产者(Producer)<mark>同步</mark>
/** * 同步发送者 */
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,并指定组名
DefaultMQProducer producer = new DefaultMQProducer("first_group");
producer.setNamesrvAddr("localhost:9876");
// 生产者启动
producer.start();
// 循环发送100条消息
for (int i = 0; i < 100; i++) {
// 创建消息实例,设置Topic , Tag , 消息内容 body
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ lzx" +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容主体 */
);
// 消息发送,并且获取返回结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 发送消息完成,关闭生产者
producer.shutdown();
}
}
3)编写生产者(Producer)<mark>异步</mark>
/** * 异步发送者 */
public class ASyncProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("first_group");
producer.setNamesrvAddr("localhost:9876");
/** * 启动 */
producer.start();
/** * 发送失败重试时间 */
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("TopicTest","TagA",
"Hello world lzx".getBytes(RemotingHelper.DEFAULT_CHARSET));
/** * 设置回调方法实现异步 */
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
3. 启动main测试
先启动Consumer
再启动Producer
这里就不贴图了
效果可以拷贝代码试试
四、RocketMQ的其他模型
这种发布订阅的模型是最基本的模型
RocketMQ也支持顺序消息,延迟消息,批量消息,事务消息等。
RocketMQ的github中给出了这些模式的样例,大家可以自行前往学习
样例(GitHub)