RabbitMQ 常用模式快速上手

注意配置

连接工厂基本配置

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
/**
 * @author yezixun
 * @date 2019/11/7 14:35
 */
public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost 
        //默认账号为
        factory.setVirtualHost("testhost"); 
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 通过工厂获取连接
        return factory.newConnection();
    }
}

Queue

发送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 14:38
 */
public class Send {
    //队列名称
    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 14:48
 */
public class Recv {
    //队列名称
    private final static String QUEUE_NAME = "q_test_01"; 

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

work

发送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:26
 */
public class Send {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送0-99的数字
        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            //线程沉睡 数字越大睡得越久
            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:25
 */
public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        // true 自动确认:无论是否成功 都认为成功消费
        // false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒  模拟数据量 接收间隔比Recv1长 模拟性能(压力)弱的服务器
            Thread.sleep(1000);

            //下面这行注释掉表示使用自动确认模式 开启表示手动
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:23
 */
public class Recv {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        // true 自动确认:无论是否成功 都认为成功消费
        // false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [y] Received '" + message + "'");
            //休眠 模拟数据量  接收间隔比Recv2短 模拟性能(压力)强的服务器
            Thread.sleep(10);

            // 返回确认状态,注释掉表示使用自动确认模式 开启表示手动
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消息的确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。

模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

发布订阅模式

发送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * 发布订阅模式
 * @author yezixun
 * @date 2019/11/7 15:54
 */
public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:55
 */
public class Recv {
    private final static String QUEUE_NAME = "test_queue_work1";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:55
 */
public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv2] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

路由模式

发送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * 路由模式
 * @author yezixun
 * @date 2019/11/7 16:26
 */
public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.exchangeDeclare(EXCHANGE_NAME,  "direct");


            // 消息内容
            String message = "添加";
            channel.basicPublish(EXCHANGE_NAME, "select",null, message.getBytes());
            //String message = "删除";
            //channel.basicPublish(EXCHANGE_NAME, "delete",null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");


        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:23
 */
public class Recv {
    private final static String QUEUE_NAME = "test_queue_direct_1";

    private final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //绑定队列到交换机 删除和修改
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        //定义队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        // true 自动确认:无论是否成功 都认为成功消费
        // false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [y] Received '" + message + "'");

            // 返回确认状态,注释掉表示使用自动确认模式 开启表示手动
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 15:23
 */
public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_direct_2";
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //绑定队列到交换机 查询和增加
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select");
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"insert");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        //定义队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        // true 自动确认:无论是否成功 都认为成功消费
        // false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [y] Received '" + message + "'");

            // 返回确认状态,注释掉表示使用自动确认模式 开启表示手动
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

主题模式

发送

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 16:45
 */
public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息内容
        String message = "Hello World!!";
        channel.basicPublish(EXCHANGE_NAME, "routekey.", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

接收

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 16:46
 */
public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_work_1";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv_x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;

/**
 * @author yezixun
 * @date 2019/11/7 16:46
 */
public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_topic_work_2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv2_x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

————————————————

版权声明:本文为CSDN博主「niaobirdfly」的原创 详细信息参考原作者

原文链接:https://blog.csdn.net/hellozpc/article/details/81436980

全部评论

相关推荐

Noob1024:一笔传三代,人走笔还在
点赞 评论 收藏
分享
吃不饱的肱二头肌很想退休:tnnd 我以为选妹子呢,亏我兴高采烈的冲进来😠
投递快手等公司10个岗位
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务