RabbitMQ基本常识
1 消息队列概述
1.1 消息队列MQ
MQ 全称为 Message Queue,消息队列是应用程序和应用程序之间的通信方法。
- 为什么使用 MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
- 消息队列应用场景
应用解耦、异步处理、流量削峰、日志处理、纯粹通信
1.2 AMQP和JMS
MQ 是消息通信的模型;实现 MQ 的大致有两种主流方式:AMQP、JMS。
- AMQP
AMQP 高级消息队列协议,是一个进程间传递异步消息的网络协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。
- JMS
JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
- AMQP和JMS区别
JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式JMS 限定了必须使用 Java 语言;AMQP 只是协议,不规定实现方式,因此是跨语言的。JMS 规定了两种消息模式(B2B、发布订阅);而 AMQP 的消息模式更加丰富
1.3 消息队列产品
- kafka
Apache 下的一个子项目,使用 scala 实现的一个高性能分布式 Publish/Subscribe 消息队列系统。
1.快速持久化:通过磁盘顺序读写与零拷贝机制,可以在 O(1)的系统开销下进行消息持久化;
2.高吞吐:在一台普通的服务器上既可以达到 10W/s 的吞吐速率;
3.高堆积:支持 topic 下消费者较长时间离线,消息堆积量大;
4.完全的分布式系统:Broker、Producer、Consumer 都原生自动支持分布式,依赖zookeeper 自动实现负载均衡;
5.支持 Hadoop 数据并行加载:对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
- RocketMQ
RocketMQ 的前身是 Metaq,当 Metaq3.0 发布时,产品名称改为 RocketMQ。RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点 :
1.能够保证严格的消息顺序
2.提供丰富的消息拉取模式
3.高效的订阅者水平扩展能力
4.实时的消息订阅机制
5.支持事务消息
6.亿级消息堆积能力
- RabbitMQ
使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了 Broker 架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的 ESB 整合。
1.4 rabbitMQj介绍
RabbitMQ 是由 erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ 提供了 6 种模式:简单模式,work 模式,Publish/Subscribe 发布与订阅模式,Routing 路由模式,Topics 主题模式,RPC 远程调用模式
2 RabbitMQ入门(简单模式)
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
Queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
- 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rabbitMQ01</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
</project>
- 创建连接工具类
public class RabbitMQConUtil {
private static final int PORT = 5672;
private static final String HOST = "localhost";
private static final String USER_NAME = "guest";
private static final String PWD = "guest";
private ConnectionFactory factory;
private Connection connection;
public RabbitMQConUtil(String virtualHost) {
//创建链接工厂
factory = new ConnectionFactory();
//参数设置
factory.setHost(HOST);//IP
factory.setPort(PORT);//端口号
factory.setUsername(USER_NAME);//用户
factory.setPassword(PWD);//密码
factory.setVirtualHost(virtualHost);//虚拟主机,相当于mysql中db
}
public RabbitMQConUtil(String host,int port,String userName,String pwd,String virtualHost){
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(userName);
factory.setPassword(pwd);
factory.setVirtualHost(virtualHost);
}
public Connection getConnection() throws Exception {
return factory.newConnection();
}
}
- 创建简单生产者
public class SimpleProduct {
public static void main(String[] args) throws Exception {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection = util.getConnection();
//创建频道-channel = connection.createChannel()
Channel channel = connection.createChannel();
//声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
channel.queueDeclare("simple_queue", true, false, false, null);
//创建消息-String m = xxx
String message = "欢迎来到我的世界";
//消息发送-channel.basicPublish(交换机[默认 Default Exchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)
channel.basicPublish("", "simple_queue", null, message.getBytes("utf-8"));
//关闭资源-channel.close();connection.close()
channel.close();
connection.close();
}
}
- 结果
- 创建消费者
public class SimpleConsumer {
public static void main(String[] args) throws Exception{
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection = util.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("simple_queue", true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由的 key
String routingKey = envelope.getRoutingKey();
//获取交换机信息
String exchange = envelope.getExchange();
//获取消息 ID
long deliveryTag = envelope.getDeliveryTag();
//获取消息信息
String message = new String(body, "utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
}
};
channel.basicConsume("simple_queue",true,consumer);
}
}
3 RabbitMQ进阶学习
3.1 work queues工作队列模式
Work Queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。 应用场景:对于任务过重或任务较多情况,使用工作队列模式使用多个消费者可以提高任务处理的速度。
- 代码实现
生产者
public class WorkProduct {
public static void main(String[] args) throws Exception {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection = util.getConnection();
Channel channel = connection.createChannel();
for (int i = 0; i < 10; i++) {
channel.queueDeclare("work_queue",true,false,false,null);
String mes = "welcome to my house:" + i;
channel.basicPublish("","work_queue",null,mes.getBytes("UTF-8"));
}
channel.close();
connection.close();
}
}
两个或多个消费者
public class WorkConsume1 implements Runnable {
public void run() {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection;
try {
connection = util.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work_queue", true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
}
};
channel.basicConsume("work_queue",true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class RunClass {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
WorkConsume1 consume1 = new WorkConsume1();
WorkConsume1 consume2 = new WorkConsume1();
service.execute(consume1);
service.execute(consume2);
}
}
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
3.2 publish/subscribe发布订阅模式
在发布订阅模型中,多了一个 x(exchange)角色,而且过程略有变化。
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X(交换机)C:消费者,消息的接受者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的 X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange 的类型。Exchange有常见以下 3 种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定 routing key 的队列
Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
- 代码实现
生产者,此时生产者连接的是交换机(exchange),不是队列
public class Product {
public static void main(String[] args) throws Exception {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection = util.getConnection();
Channel channel = connection.createChannel();
//声明exchange
channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
for (int i = 0; i < 10; i++) {
String mes = "hello," + i + " welcome to China";
channel.basicPublish("fanout_exchange", "", null, mes.getBytes("UTF-8"));
}
channel.close();
connection.close();
}
}
消费者,声明队列,队列绑定交换机
public class Consume1 implements Runnable {
public void run() {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection;
try {
connection = util.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("publish_queue1", true, false, false, null);
channel.queueBind("publish_queue1","fanout_exchange","key1");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
}
};
channel.basicConsume("publish_queue1",true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class Consume2 implements Runnable {
public void run() {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection;
try {
connection = util.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("publish_queue2", true, false, false, null);
channel.queueBind("publish_queue2","fanout_exchange","key2");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
}
};
channel.basicConsume("publish_queue2",true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class RunConsume {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,2,60, TimeUnit.MINUTES,
new LinkedBlockingDeque<Runnable>());
Consume1 consume1 = new Consume1();
Consume2 consume2 = new Consume2();
executor.execute(consume1);
executor.execute(consume2);
}
}
- 发布订阅模式与 work 队列模式的区别
1、work 队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,work 队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式的消费者需要设置队列和交换机的绑定,work 队列模式不需要设置,实际上work队列模式会将队列绑 定到默认的交换机 。
3.3 Routing 路由模式
路由模式特点:
1.队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
2.消息的发送方在 向 Exchange 发送消息时,也必须指定消息的 RoutingKey。
3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key 完全一致,才会接收到消息
P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列
C1:消费者,其所在队列指定了需要 routing key 为 log.error 的消息
C2:消费者,其所在队列指定了需要 routing key 为 log.info、log.error、log.warning 的消息
- 代码实现
生产者,创建生产者向三个routing key发信息,在发布步骤添加routing key
public class Product {
public static void main(String[] args) throws Exception {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection = util.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT);
for (int i = 0; i < 3; i++) {
String routing_key = "";
String mes = "hello:"+i;
switch (i){
case 0:
routing_key = "log.info";
break;
case 1:
routing_key = "log.error";
break;
case 2:
routing_key = "log.warning";
break;
}
channel.basicPublish("direct_exchange",routing_key,null,mes.getBytes("UTF-8"));
}
channel.close();
connection.close();
}
}
消费者,队列绑定时声明routing key
//消费者1 routing key为log.error
public class Consume1 implements Runnable {
public void run() {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection;
try {
connection = util.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("publish_queue1", true, false, false, null);
channel.queueBind("publish_queue1","direct_exchange","log.error");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
}
};
channel.basicConsume("publish_queue1",true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//消费者2
public class Consume2 implements Runnable {
public void run() {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection;
try {
connection = util.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("publish_queue2", true, false, false, null);
channel.queueBind("publish_queue2","direct_exchange","log.info");
channel.queueBind("publish_queue2","direct_exchange","log.error");
channel.queueBind("publish_queue2","direct_exchange","log.warning");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
}
};
channel.basicConsume("publish_queue2",true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合routing key 的队列。
3.4 Topics通配符模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过Topic 类型Exchange可以让队列在绑定 Routing key 的时候使用通配符! Routingkey 一般都是有一个或多个单词组成,多个单词之间以“ . ”分割,例如:item.insert
通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好 1 个词
举例:
item.#:能够匹配 item.insert.abc 或者 item.insert
item.*:只能匹配 item.insert
生产者,与路由模式类似
public class Product {
public static void main(String[] args) throws Exception {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection = util.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
for (int i = 0; i < 5; i++) {
String routing_key = "";
String mes = "hello:"+i;
switch (i){
case 0:
routing_key = "log.info";
break;
case 1:
routing_key = "log.error";
break;
case 2:
routing_key = "log.warning";
break;
case 3:
routing_key = "log.info.bat";
break;
case 4:
routing_key = "log.info.txt";
break;
}
channel.basicPublish("topic_exchange",routing_key,null,mes.getBytes("UTF-8"));
}
channel.close();
connection.close();
}
}
消费者
//消费者1,log.*
public class Consume1 implements Runnable {
public void run() {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection;
try {
connection = util.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("publish_queue1", true, false, false, null);
channel.queueBind("publish_queue1","topic_exchange","log.*");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
}
};
channel.basicConsume("publish_queue1",true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//消费者2 log.#
public class Consume2 implements Runnable {
public void run() {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection;
try {
connection = util.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("publish_queue2", true, false, false, null);
channel.queueBind("publish_queue2","topic_exchange","log.#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
}
};
channel.basicConsume("publish_queue2",true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//消费者3 log.info.*
public class Consume3 implements Runnable {
public void run() {
RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
Connection connection;
try {
connection = util.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("publish_queue3", true, false, false, null);
channel.queueBind("publish_queue3","topic_exchange","log.info.*");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
}
};
channel.basicConsume("publish_queue3",true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Topic 主题模式可以实现 Publish/Subscribe 发布订阅模式 和 Routing 路由模式的双重功能;只是Topic在配置 routing key 的时候可以使用通配符,显得更加灵活。
3.5 总结
- 1、简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
- 2、工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
- 3、发布订阅模式 Publish/subscribe 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
- 4、路由模式 Routing 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
- 5、通配符模式 Topic 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
4 SpringBoot整合RabbitMQ
4.1 生产者
生成交换机、队列,绑定交换机与队列,发送消息到队列中
application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/firstVirtualhost
生成交换机、队列,绑定交换机与队列
@Configuration
public class ProductConfig {
//创建交换机
@Bean(name = "topicExchange")
public TopicExchange topicExchange(){
return new TopicExchange("topic_exchange_springboot");
}
//创建队列
@Bean(name = "topicQueue")
public Queue queue(){
return QueueBuilder.durable("topic_queue_springboot").build();
}
//绑定交换机与队列
@Bean
public Binding binding(@Qualifier("topicQueue") Queue queue,
@Qualifier("topicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("log.#").noargs();
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProductTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
rabbitTemplate.convertAndSend("topic_exchange_springboot","log.info","发送了info消息");
rabbitTemplate.convertAndSend("topic_exchange_springboot","log.error","发送了error消息");
rabbitTemplate.convertAndSend("topic_exchange_springboot","log.warning","发送了warning消息");
}
}
4.2 消费者
重新开启一个session
@Component
public class ConsumerConfig {
@RabbitListener(queues = "topic_queue_springboot")
public void listener(String msg){
System.out.println(msg);
}
}
5 RabbitMQ的重要问题
1、什么是rabbitmq
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端的消息队列。
2、为什么要使用rabbitmq
- 在分布式系统下具备异步,削峰,负载均衡等一系列高级功能;
- 拥有持久化的机制,进程消息,队列中的信息也可以保存下来。
- 实现消费者和生产者之间的解耦。
- 对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。
- 可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。
3、使用rabbitmq的场景
- 服务间异步通信
- 顺序消费
- 定时任务
- 请求削峰
4、如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?
发送方确认模式 将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
接收方确认机制 接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。 这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;
下面罗列几种特殊情况: 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重) 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。
5、如何避免消息重复投递或重复消费?
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列; 在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重的依据,避免同一条消息被重复消费。
6、消息基于什么传输?
由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。
7、消息如何分发?
若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能
8、消息怎么路由?
消息提供方->路由->一至多个队列,消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。 消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则); 常用的交换器主要分为一下三种: fanout:如果交换器收到消息,将会广播到所有绑定的队列上 direct:如果路由键完全匹配,消息就被投递到相应的队列 topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符
9、如何确保消息不丢失?
消息持久化,当然前提是队列必须持久化。RabbitMQ确保持久性消息能从服务器重启中恢复的方式是 ,将它们写入磁盘上的一个持久化日志文件, 当发布一条持久性消息到持久交换器上时,Rabbit会在消息提交到日志文件后才发送响应。一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消息在被消费之前RabbitMQ重启,那么Rabbit会自动重建交换器和队列(以及绑定),并重新发布持久化日志文件中的消息到合适的队列。
10、使用RabbitMQ有什么好处?
- 服务间高度解耦
- 异步通信性能高
- 流量削峰
11、RabbitMQ的集群
Rabbitmq 集群
集群目的就是为了实现rabbitmq的高可用性,集群分为2种
普通集群:主备架构,只是实现主备方案,不至于主节点宕机导致整个服务无法使用 镜像集群:同步结构,基于普通集群实现的队列同步
普通集群
slave节点复制master节点的所有数据和状态,除了队列数据,队列数据只存在master节点,但是Rabbitmq slave节点可以实现队列的转发,也就是说消息消费者可以连接到slave节点,但是slave需要连接到master节点转发队列,由此说明只能保证了服务可以用,无法达到高可用 slave节点队列可以查看到,但是不会同步数据
镜像集群 基于普通集群实现队列的集群主从,消息会在集群中同步(至少三个节点)
12、mq的缺点
系统可用性降低 **系统引入的外部依赖越多,越容易挂掉,**本来你就是A系统调用BCD三个系统的接口就好了,本来ABCD四个系统好好的,没啥问题,你偏加个MQ进来,万一MQ挂了咋整?MQ挂了,整套系统崩溃了,你不就完了么。
系统复杂性提高 硬生生加个MQ进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
一致性问题 A系统处理完了直接返回成功了,本来都以为你这个请求就成功了;但是问题是,BD两个系统写库成功了,结果C系统写库失败了,咋整?你这数据就不一致了。