<span>RabbitMQ rabbitmq的用法</span>
RPC 模式代码:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCServer.java
其他五种模式的源码和思维导图:
https://github.com/crossyourheart/rabbitmq.git
RabbitMQ是一个消息代理:它接收、存储和转发二进制的数据消息。RabbitMQ和消息传递通常使用一些术语。
发送消息的程序是生产者producer:
虽然消息通过RabbitMQ和应用程序流动,但它们只能存储在队列中。队列只受主机内存和磁盘限制的限制,它本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,而许多消费者可以尝试从一个队列接收数据。
队列queue:**
消费和接收有着相似的含义。使用者是一个主要等待接收消息的程序:
消费者consumer:
请注意,生产者、消费者和代理不必驻留在同一个主机上;事实上,在大多数应用程序中,它们都不是这样的。应用程序可以同时是生产者和消费者。
1."Hello World" 简单模式
package com.rabbitmaq.helloworld;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
/**
* @author wang
* @description
* rabbitmq 的服务器是蹭的阿里云 免费试用的1个月服务器 部署的。
* 使用的docker 一次性装了 很多中间件用于练习
* @create 2020/12/16 23:57
*/
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
//create a connection to the server
try (//该连接抽象了套接字连接,并为我们处理协议版本协商和身份验证等。
Connection connection = factory.newConnection();
//接下来,我们创建一个通道,用于完成工作的大多数API驻留在此。
//注意,我们可以使用带有资源的try-with-resources语句,因为连接和通道都实现了java.io.Closeable。这样我们就不需要在代码中显式地关闭它们。
Channel channel = connection.createChannel()) {
//要发送,我们必须声明一个队列供我们发送到;然后我们可以在try-with-resources语句中向队列发布消息
//声明一个队列是幂等的——它只会在队列不存在的情况下被创建。消息内容是一个字节数组,因此您可以在这里对任何内容进行编码。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!!11!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
---------------------------------------------------------------------------------------
package com.rabbitmaq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author wang
* @description
* 消费者监听来自RabbitMQ的消息,所以不像发布者只发布一条消息,
* 我们会让消费者一直运行来监听消息并将其打印出来。
* @create 2020/12/17 0:12
*/
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//发布者相同;我们打开一个连接和一个通道,并声明我们将从中消费的队列。注意,这与发送publish到的队列匹配(队列名hello)。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
//为什么我们不使用try-with-resource语句来自动关闭通道和连接?
//这样做,我们可以简单地让程序继续,关闭所有内容,然后退出!
//但是这样做很笨拙,因为我们希望在使用者异步侦听消息到达时 ,进程保持活动状态。
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//请注意,我们在这里也声明了队列。
// 因为我们可能会在发布服务器之前启动消费者,所以我们希望在尝试使用来自队列的消息之前确保队列已经存在。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//我们将告诉服务器从队列中传递消息。
// 因为它将异步推送消息给我们,所以我们提供一个对象形式的回调,它将缓冲消息,直到我们准备好使用它们。
// 这就是DeliverCallback的子类所做的事情。
DeliverCallback deliverCallback = new DeliverCallback() {
//接口不能实例化,接口的实现类才可以。new接口其实new的是接口的实现类,所以必须写上相关方法。
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
/* java 8 的lambda表达式
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
2.Work Queues
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并必须等待它完成。相反,我们把任务安排在以后去做。我们将任务封装为消息并将其发送到队列。后台运行的工作进程将取出任务并最终执行该任务。当您运行多个工作者时,任务将在它们之间共享。
这个概念在web应用程序中特别有用,因为web应用程序不可能在短时间的HTTP请求窗口内处理复杂任务。
package com.rabbitmaq.workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
/**
* @author wang
* @description
* 我们将稍微修改前面示例中的Send.java代码,以允许从命令行发送任意消息。
* 这个程序将把任务添加到我们的工作队列中,所以我们将它命名为NewTask.java
* @create 2020/12/17 23:05
*/
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Scanner scanner = new Scanner(System.in);
String message = null;
do{
System.out.println("请输入一个字符串:");
message = scanner.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}while (scanner.hasNextLine());
}
}
}
---------------------------------------------------------------------------------------
package com.rabbitmaq.workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @author wang
* @description
* 我们旧的Recv.java程序也需要做一些更改:它需要为消息体中的每个点假做一秒钟的工作。
* 它将处理传递的消息并执行任务,因此我们将其称为Worker.java
* @create 2020/12/17 23:12
*/
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 收到 '" + message + "'");
long l1 = 0,l2 = 0;
try {
l1 = System.currentTimeMillis();
doWork(message);
l2 = System.currentTimeMillis();
} finally {
long l = l2 - l1;
System.out.println("花费"+(l2 - l1)+"s");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = true;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
/**
* 在前一部分中,我们发送了一个包含“Hello World!”的消息。现在我们将发送字符串来代表复杂的任务。
* 我们没有现实生活中的任务,比如要调整图像大小或要渲染pdf文件,所以让我们假装自己很忙——通过使用Thread.sleep()函数。
* 我们用字符串中点的数量表示它的复杂度;每一个点将占“工作”的一秒钟。例如,一个由"Hello..."描述的伪任务… 需要三秒钟。
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
//模拟执行时间
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
2.1轮询调度 Round-robin dispatching
使用任务队列的优点之一是能够轻松地将工作并行化。如果我们正在积累积压的工作,我们可以增加更多的工人,通过这种方式,很容易扩大规模。首先,让我们尝试同时运行两个worker实例。它们都将从队列中获取消息,但具体如何获取呢?让我们来看看。你需要打开三个控制台。两个将运行worker程序。这些控制台将是我们的两个消费者——C1和C2。
默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将得到相同数量的消息。这种分发消息的方式称为轮询。用三个或更多的工人来试一试。
2.2 Message acknowledgment 消息确认
完成一项任务可能需要几秒钟。您可能想知道,如果一个消费者开始一项很长的任务,只完成了一部分就死了,会发生什么。在我们现在的代码中,一旦RabbitMQ向消费者发送了一条消息,它就会立即标记要删除。在这种情况下,如果你杀死了一个worker,我们将丢失它正在处理的消息。我们还将丢失已发送给这个特定worker但尚未处理的所有消息。
但我们不想失去任何任务。如果一个工人死了,我们想把任务交给另一个工人。
为了确保消息不会丢失,RabbitMQ支持消息确认。消费者向RabbitMQ发送回一个确认消息,告诉RabbitMQ某个消息已经被接收、处理,并且RabbitMQ可以自由删除该消息。
如果一个消费者在没有发送ack的情况下死亡(它的通道关闭,连接关闭,或者TCP连接丢失),RabbitMQ将会知道消息没有被完全处理,并重新排队。如果同时有其他消费者在线,它就会迅速将商品重新交付给另一个消费者。这样,即使工人偶尔死亡,你也可以确保没有信息丢失。
没有任何消息超时;当消费者挂掉时,RabbitMQ会重新发送消息。即使处理一条消息需要很长很长的时间,也没关系。
默认情况下,手动消息确认 是打开的。在前面的例子中,我们通过autoAck=true标志显式地关闭了它们。当我们完成一个任务时,是时候将这个标志设置为false并从worker发送一个适当的确认。
然后worker2 worker3 收到消息后,立刻停止服务。
最终worker1 只收到了 一条,原来发给woker2的消息。
还有一点是,如果不关闭work2和work3 。 work3 也只能收到一次 消息。之后 就是在work1和work2直接轮询了。
总结 autoAck = false 时,一个消费者挂掉 且有未处理完消息,rabbitMQ 会重新发送该消息 给另一个消费者。
确认必须在接收到交付的同一通道上发送。尝试确认使用不同的通道将导致通道级协议异常。请参阅确认文件指南以了解更多信息。
2.3 Message durability 消息持久化
我们已经学会了如何确保即使用户死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。
首先,我们需要确保队列在RabbitMQ节点重启后仍然存在。为了做到这一点,我们需要声明它为持久的:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
尽管这个命令本身是正确的,但在我们目前的设置中它不会工作。这是因为我们在上一部分已经定义了一个名为hello的队列,它不是持久的,但仍然在rabbitMQ中。RabbitMQ不允许你用不同的参数重新定义一个现有的队列,任何试图这样做的程序都会返回一个错误。只能再声明一个不同名称的队列(或者删除RabbitMQ中的hello队列):
boolean durable = true;
//温馨提示:消费者和生产者 声明的队列名要一致!
channel.queueDeclare("task_queue", durable, false, false, null);
我们可以确定即使RabbitMQ重启,任务队列队列也不会丢失。现在我们需要将消息标记为persistent 持久的—通过将MessageProperties(它实现了BasicProperties)设置为persistent TEXT PLAIN的值。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
2.4 Fair dispatch 公平分派
您可能已经注意到,分派仍然不能完全按照我们的要求工作。例如,在有两个工人的情况下,当所有奇数消息是重的,偶数消息是轻的,一个工人将一直很忙,而另一个工人几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。这是因为当消息进入队列时,RabbitMQ只会发送一条消息。它不查看消费者未确认的消息的数量。它只是盲目地将每n条消息分派给第n个消费者。
为了克服这个问题,我们可以使用prefetchCount = 1设置的basicQos方法。这告诉RabbitMQ一次给一个worker的消息不要超过一条。或者,换句话说,在worker处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分配给下一个不太忙的员工。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
3.Publish/Subscribe 发布订阅模式
在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务确切地交付给一个工人。在本部分中,我们将做一些完全不同的事情——我们将向多个消费者传递消息。这个模式即是 “发布/订阅”模式。
为了说明这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将发出日志消息,第二个将接收并打印它们。
在我们的日志系统中,每个正在运行的接收程序副本都将获得消息。通过这种方式,我们将能够运行一个接收器并将日志引导到磁盘;同时,我们可以运行另一个接收器,在屏幕上看到日志。本质上,已发布的日志消息将被广播给所有的接收者。
3.1 Exchanges 交换机
在本教程的前几部分中,我们向队列发送和接收消息。现在是时候介绍Rabbit中的完整消息传递模型了。
让我们快速回顾一下我们在之前的教程中所涵盖的内容:
-生产者producer 是发送消息的用户应用程序。
-队列queue 是存储消息的缓冲区。
-消费者consumer 是接收消息的用户应用程序。
RabbitMQ消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,很多时候生产者甚至根本不知道消息是否将被传递到任何队列。
相反,生产者只能向交换机发送消息。交换是一件很简单的事情。它一边接收来自生产者的消息,另一边将消息推送到队列。交换机必须确切知道如何处理它接收到的消息。它应该被附加到一个特定的队列吗?它应该被附加到多个队列吗?或者它应该被丢弃。它的规则由交换类型定义。
有几种exchange类型可用:direct、topic、headers和fanout。我们将关注最后一个——fanout。让我们创建一个这种类型的交换,并将其称为日志:
//fanout扇形交换非常简单。正如您可能从名称猜到的那样,它只是将它接收到的所有消息广播到它知道的所有队列。这正是我们记录器所需要的
channel.exchangeDeclare("logs", "fanout");
//发布消息到交换机
channel.basicPublish( "logs", "", null, message.getBytes());
3.2 Temporary queues临时队列
您可能还记得,我们以前使用的队列有特定的名称(还记得hello和task_queue吗?)能够为队列命名对我们来说至关重要——我们需要将worker指向相同的队列。当您希望在生产者和消费者之间共享队列时,给队列起一个名字很重要。
但我们的logger日志记录器却不是这样。我们希望了解所有日志消息,而不仅仅是其中的一个子集。我们还只对当前流动的消息感兴趣,而不是在旧的消息中。要解决这个问题,我们需要做两件事。
首先,当我们连接到Rabbit时,我们需要一个新的空队列。要做到这一点,我们可以创建一个具有随机名称的队列,或者更好——让服务器为我们选择一个随机队列名称。其次,一旦我们断开了消费者的连接,队列应该被自动删除。
//在Java客户机中,当我们不向queueDeclare()提供参数时,我们创建一个非持久的、排他的、自动删除的随机名称队列
String queueName = channel.queueDeclare().getQueue();
//queueName包含一个随机队列名。例如,amq.gen-JzTY20BRgKO-HjmUJj0wLg
You can learn more about the exclusive flag and other queue properties in the guide on queues.
3.3 Bindings 通信绑定
我们已经创建了一个fanout交换器和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。exchange和队列之间的这种关系称为绑定。
channel.queueBind(queueName, "logs", "");
//从现在开始,logs交换机将向我们的队列追加消息。
3.4 Putting it all together 总览
package com.rabbitmaq.publishsubscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 生产者程序发出日志消息,看起来与前一教程没有太大区别。
* 最重要的变化是,我们现在希望向logs日志交换机发布消息,而不是向无名的日志交换机发布消息。
* 我们需要在发送时提供一个routingKey,但是它的值在扇形交换机中被忽略。
* @author wang
* @description
* @create 2020/12/23 22:05
*/
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//建立连接之后,声明了一个fanout类型交换机 logs
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
//如果还没有队列绑定到exchange,消息就会丢失,但这对我们来说没问题;
//如果没有消费者正在收听,我们可以安全地丢弃消息。
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
------------------------------------------------------------------------------------
package com.rabbitmaq.publishsubscribe;
/**
* @author wang
* @description
* @create 2020/12/23 22:10
*/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机,因为不知道生产者和消费者哪个先启动,都一起声明。
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//创建临时队列,返回随机的队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列,fanout类型的交换机 routingKey会被忽略
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
4.Routing 路由模式
在前面,我们构建了一个简单的日志系统。我们能够将日志消息广播给许多接收者。在本教程中,我们将向它添加一个特性——我们将使只订阅消息的子集成为可能。例如,我们将能够只将关键的错误消息指向日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
4.1 Bindings
//在前面的示例中,我们已经创建了绑定。您可能会回忆起类似的代码:
channel.queueBind(queueName, EXCHANGE_NAME, "");
//绑定可以接受额外的路由键routingKey参数。为了避免与基本发布参数混淆,我们将其称为绑定键binding key。这就是我们如何创建一个带有键的绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
//绑定键的含义取决于交换类型。我们以前使用的扇形交换器完全忽略了它的值。
4.2 Direct exchange 直连交换机
上一教程中的日志系统将所有消息广播给所有消费者。我们希望对其进行扩展,允许根据消息的严重程度对其进行过滤。例如,我们可能希望一个将日志消息写入磁盘的程序只接收关键错误,而不会在警告或信息日志消息上浪费磁盘空间。
我们使用的是fanout交换器,它没有给我们太多的灵活性——它只能进行无意识的广播。我们将使用直连交换机代替。直接交换背后的路由算法很简单—将消息发送到其绑定键与消息的路由键完全匹配的队列。
为了说明这一点,考虑下面的设置:
在这个设置中,我们可以看到direct exchange X与两个绑定到它的队列。第一个队列绑定绑定键为orange,第二个队列有两个绑定,一个绑定键为black,另一个绑定键为green。在这种设置中,发布到exchange的带有routing key orange的消息将被路由到队列Q1。带有black或green路由键的消息将转到Q2。所有其他消息将被丢弃。
4.3 Multiple bindings 多重绑定
使用相同绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键black在X和Q1之间添加绑定。在这种情况下,直接交换器的行为将类似于fanout,并将消息广播给所有匹配的队列。带有routing key black的消息将被传递到Q1和Q2。
4.4 Emitting logs 发布日志
我们将在我们的日志系统中使用这个模式。而不是fanout,我们将发送消息到一个直接交换机。我们将提供日志严重程度作为路由键。这样,接收程序将能够选择它想接收的严重程度。让我们首先关注发送日志。
//首先需要创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//准备好发送一条消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
//为了简化,我们假设路由键RoutingKey ‘severity’可以是‘info’,‘warning’,‘error’中的一个。
4.5 Subscribing 订阅
//接收消息的工作方式与上一课类似,不同的是——我们将为每个严重程度创建一个绑定。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
4.6 总览
package com.rabbitmaq.helloworld.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.UUID;
/**
* @author wang
* @description
* @create 2020/12/23 23:00
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity();
String message = UUID.randomUUID().toString();
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
private static String getSeverity() {
int i = (int) (Math.random() * 3);
System.out.println(i);
return String.valueOf(i);
}
}
---------------------------------------------------------------------------------------
package com.rabbitmaq.helloworld.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @author wang
* @description
* @create 2020/12/23 23:06
*/
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {
System.out.println(severity);
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
5. Topics 主题模式
在前面的教程中,我们改进了日志系统。我们使用直接交换机代替只能进行虚拟广播的扇形交换机,从而获得了有选择地接收日志的可能性。
虽然使用直接交换改善了我们的系统,它仍然有局限性-它不能做路由的基础上的多个标准。
在我们的日志系统中,我们可能不仅希望根据严重性订阅日志,还希望根据发出日志的源订阅日志。您可能从syslog unix工具中了解了这个概念,他的路由日志同时基于日志级别和设备。
这将给我们带来很大的灵活性——我们可能只想监听来自'cron'的关键错误,但也想监听来自'kern'的所有日志。
要在我们的日志系统中实现这一点,我们需要了解一个更复杂的topic交换机。
5.1 Topic exchange 主题交换机
发送到主题交换的消息不能有任意的路由键—它必须是由点分隔的单词列表。这些词可以是任何东西,但通常它们指定了与信息相关的一些特征。一些有效的路由关键的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"
路由键中可以有任意多的字,最多255个字节的限制。
绑定键也必须采用相同的形式。主题交换背后的逻辑类似于直接的逻辑—使用特定路由键发送的消息将被传递到使用匹配绑定键绑定的所有队列。但是,绑定键有两个重要的特殊情况:
'' 星号能够替代一个词。
'#'井号可以替代0个或者多个单词。
一个例子:
在这个例子中,我们将发送所有描述动物的消息。消息将通过由三个单词(两个点)组成的路由键发送。路由关键字的第一个字将描述速度,第二个字是颜色,第三个字是品种。
<speed> . <colour> . <species>
这些绑定可以总结为:Q1对所有橙色动物感兴趣。Q2想知道所有关于兔子和懒惰动物的事情。
routing key设置为"quick.orange.rabbit"的消息将被送到两个队列。
lazy.orange.elephant 也回被送到两个队列。quick.orange.fox 只送到Q1
lazy.brown.fox 只送到Q2 lazy.pink.rabbit 制备送到Q2
quick.brown.fox 会被丢弃
如果违反规则,消息路由键设置为 orange 或者 quick.orange.male.rabbit 消息将不会匹配到任何绑定键,最后会被丢弃。
lazy.orange.male.rabbit 会发送到Q2
当一个队列被绑定到“#”绑定键时——它将接收所有的消息,不管路由键是什么。当没有在绑定中使用特殊字符“ </species> </colour> </speed>”(星号)和“#”时,主题交换机的行为将像直接交换机一样。
5.2 总览
package com.rabbitmaq.topic;
/**
* @author wang
* @description
* @create 2020/12/23 23:40
*/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.UUID;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRoutingKey();
String message = UUID.randomUUID().toString();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
private static String getRoutingKey() {
int i = (int) (Math.random() * 3);
String key = null;
if (i==0){
key = "1.1.1" ;
}
else {
key = "2.3";
}
return key;
}
}
------------------------------------------------------------------------------
package com.rabbitmaq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @author wang
* @description
* @create 2020/12/23 23:41
*/
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.99.68.32");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
6. RPC 模式
Remote procedure call (RPC 远程过程调用)
在第二篇教程中,我们学习了如何使用工作队列将耗时的任务分配给多个工作人员。但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?那就另当别论了。这种模式通常称为远程过程调用(Remote Procedure Call)或RPC。在本教程中,我们将使用RabbitMQ来构建一个RPC系统:一个客户端和一个可伸缩的RPC服务器。由于我们没有任何值得分配的耗时任务,所以我们将创建一个返回斐波那契数的虚拟RPC服务。
6.1 Client interface
//为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法,该方法发送RPC请求并阻塞,直到接收到答案为止
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
尽管RPC是计算中相当常见的模式,但它经常受到批评。当程序员不知道一个函数调用是本地调用还是缓慢的RPC时,问题就出现了。这样的混淆会导致不可预测的系统,并给调试增加不必要的复杂性。误用RPC不仅不会简化软件,反而会导致难以维护的混乱代码。记住这一点,考虑以下建议:
-确保哪个函数调用是本地的,哪个函数调用是远程的很明显。
-明确组件之间的依赖关系。
-处理错误情况。当RPC服务器长时间关闭时,客户端应该如何反应?
当有疑问时,避免使用RPC。如果可以的话,你应该使用异步管道,而不是rpc之类的阻塞,结果将异步推送到下一个计算阶段。
6.2 Callback queue
//一般来说,通过RabbitMQ做RPC是很容易的。客户机发送请求消息,服务器用响应消息响应。为了接收响应,我们需要发送一个“回调”队列地址和请求。我们可以使用默认队列(在Java客户机中是独占的)。让我们试一试:
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
6.2 Correlation Id 关联Id
在上面提出的方法中,我们建议为每个RPC请求创建回调队列。这是非常低效的,但幸运的是有一个更好的方法——让我们为每个客户机创建一个回调队列。
这就产生了一个新问题,在该队列中接收到响应后,不清楚响应属于哪个请求。这就是使用correlationId属性的时候。我们将为每个请求设置一个唯一的值。稍后,当我们在回调队列中收到消息时,我们将查看这个属性,并基于此,我们将能够匹配响应与请求。如果我们看到一个未知的correlationId值,我们可以安全地丢弃消息——它不属于我们的请求。
您可能会问,为什么要忽略回调队列中的未知消息,而不是出现错误而失败呢?这是由于服务器端可能存在竞争条件。尽管不太可能,但RPC服务器也有可能在向我们发送响应之后、但在为请求发送确认消息之前死亡。如果发生这种情况,重新启动的RPC服务器将再次处理请求。这就是为什么在客户机上我们必须优雅地处理重复的响应,而RPC在理想情况下应该是幂等的。
6.3 总结
我们的RPC将像这样工作:
-对于RPC请求,客户端发送带有两个属性的消息:replyTo,它被设置为专门为请求创建的匿名排他队列.correlationId,它被设置为每个请求的唯一值。
-请求被发送到rpc队列。
-RPC工作人员(也就是服务器)正在等待那个队列上的请求。当出现请求时,它将执行该工作,并使用replyTo字段中的队列将带有结果的消息发送回客户机。
-客户端等待应答队列上的数据。当消息出现时,它检查correlationId属性。如果与请求中的值匹配,则将响应返回给应用程序。
RPC 模式代码:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCServer.java
其他五种模式的源码和思维导图:
https://github.com/crossyourheart/rabbitmq.git