RabbitMQ

资料

基本概念

  • ConnectionFactory(连接工厂): 生产Connection的的工厂

  • Connection(连接):是RabbitMQ的socket的长链接,它封装了socket协议相关部分逻辑

  • Channel(频道|信道): 是建立在Connection连接之上的一种轻量级的连接,我们大部分的业务操作是在Channel这个接口中完成的,包括定义队列的声明queueDeclare、交换机的声明exchangeDeclare、

  • 队列的绑定queueBind、发布消息basicPublish、消费消息basicConsume等。

  • Producer(生产者):生产者用于发布消息

  • Exchange(交换机):生产者会将消息发送到交换机,然后交换机通过路由策略(规则)将消息路由到匹配的队列中去

  • Routing Key(路由键):一个String值,用于定义路由规则,在队列绑定的时候需要指定路由键,在生产者发布消息的时候需要指定路由键,当消息的路由键和队列绑定的路由键匹配时,消息就会发送到该队列。

  • Queue(队列):用于存储消息的容器,可以看成一个有序的数组,生产者生产的消息会发送到交换机中,最终交换机将消息存储到某个或某些队列中,队列可被消费者订阅,消费者从订阅的队列中获取消息。

  • Binding(绑定): Binding并不是一个概念,而是一种操作,RabbitMQ中通过绑定,以路由键作为桥梁将Exchange与Queue关联起来(Exchange—>Routing Key—>Queue),这样RabbitMQ就知道如何正确地将消息路由到指定的队列了,通过queueBind方法将Exchange、Routing Key、Queue绑定起来

  • Consumer(消费者):用于从队列中获取消息,消费者只需关注队列即可,不需要关注交换机和路由键,消费者可以通过basicConsume(订阅模式可以从队列中一直持续的自动的接收消息)或者basicGet(先订阅消息,然后获取单条消息,再然后取消订阅,也就是说basicGet一次只能获取一条消息,如果还想再获取下一条还要再次调用basicGet)来从队列中获取消息

  • vhost(虚拟主机): RabbitMQ 通过虚拟主机(virtual host)来分发消息, 拥有自己独立的权限控制,不同的vhost之间是隔离的,单独的。vhost是权限控制的基本单位,用户只能访问与之绑定的vhost,默认vhost:”/” ,默认用户”guest” 密码“guest”,来访问默认的vhost。

RabbitMQ

  • 端口
    5672:amqp专用端口,开发必用
    15672:管理界面专用
    25672:集群专用

设置开机启动

  • 命令行
    chkconfig rabbitmq-server on

启动关闭RabbbitMQ-Server

  • 命令行
    启动 rabbitmq-server start &
    停止 rabbitmqctl app_stop

    管理插件:rabbitmq-plugins enable rabbitmq_management

    访问地址:http://127.0.0.1:15672/

    管理员帐号:guest/guest
    lsof –i:5672 检查是否启动,可以看到rabbitmq使用的amqp协议

安装管理插件、启动、登录

  • 安装并启动UI插件的命令
    rabbitmq-plugins.bat enable rabbitmq_management
    图片说明
  • 启动服务器的命令
    rabbitmq-server.bat
  • 访问地址
    http://127.0.0.1:15672/
  • 管理员帐号
    guest/guest
    图片说明

RabbitMQ基础开发

Maven添加依RabbitMQ的jar包依赖

  • maven代码

    配置Maven,添加依赖。把下面的依赖包加入到dependency
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
    </dependency>
  • 连接代码

          // 创建连接工厂
          ConnectionFactory connectionFactory = new ConnectionFactory();
          connectionFactory.setHost("127.0.0.1");
          connectionFactory.setPort(5672);
          //这里的用户密码需要在rabbitmq中添加
          connectionFactory.setUsername("guest");
          connectionFactory.setPassword("guest");
    // 高速公路
          Connection connection = connectionFactory.newConnection();
    //画定双车道
          Channel channel = connection.createChannel();

    图片说明

  • 添加数据

    public class Producer {
      public static void main(String[] args) throws IOException, TimeoutException {
          // 创建连接工厂
          ConnectionFactory connectionFactory = new ConnectionFactory();
          connectionFactory.setHost("127.0.0.1");
          connectionFactory.setPort(5672);
          //这里的用户密码需要在rabbitmq中添加
          connectionFactory.setUsername("guest");
          connectionFactory.setPassword("guest");
          Connection connection = connectionFactory.newConnection();
          Channel channel = connection.createChannel();
          // 声明一个消息队列,(存放小米手机的货架子)
          //第一个参数:queue 名字
          //第二个参数:durable 重启之后继续生存
          //第三个参数:exclusive 只有自己才能使用 防止大家一起用
          //第四个参数:autoDelete 自动删除 当队列被用过以后 没有任何连接 就会删除
          //第五个参数:arguments Map一系列的参数
          channel.queueDeclare("QueueXiaoMi", true, false, false, null);
          //使用信道发送数据
          String msg = "a phone";
          /*
           * @param exchange: 交换机
           * @param routingKey:路由键 消息的路由名
           * @param props:附加信息
           * @param body:消息信息
           */
          channel.basicPublish("", "PhoneQueue", null, msg.getBytes());
          channel.close();
          connection.close();

    图片说明

  • 取出数据

    public class Consumer {
      public static void main(String[] args) throws IOException, TimeoutException {
          ConnectionFactory connectionFactory = new ConnectionFactory();
          connectionFactory.setHost("127.0.0.1");
          connectionFactory.setPort(5672);
          connectionFactory.setUsername("guest");
          connectionFactory.setPassword("guest");
          Connection connection = connectionFactory.newConnection();
          Channel channel = connection.createChannel();
          //接收数据
          DefaultConsumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String data = new String(body);
                  System.out.println("---->" + data);
                  super.handleDelivery(consumerTag, envelope, properties, body);
              }
          };
          //回调
          channel.basicConsume("QueueXiaoMi", true, consumer);
          channel.close();
          connection.close();
      }
    }
    手动确认 √ 
          channel.basicConsume("QueueXiaoMi", consumer);
                      channel.basicAck(envelope.getDeliveryTag(), false);

    图片说明
    效果
    图片说明

队列声明三个参数含义

  • 队列的自动删除
    boolean autoDelete
    在第一次使用后,如果没有连接,并且队列里没有数据,队列就会自动删除。
  • durable队列持久化
    消息持久化
    channel.basicPublish("", "routingkey",
    deliveryMode 1=内存,2=序列化
  • exclusive 队列的独占
    boolean exclusive
    同一时刻,只能有一个程序连接队列。
    独占队列必定是自动删除的队列,给自己用的,无法实现进程间的通信。

给队列消息加上这个附加信息

  • 在消息上添加额外的信息 和取出方法
          String msg = "a phone~~~~~~";
          Map<String, Object> headers = new HashMap<String, Object>();
          headers.put("name", "cznczai");
    //注意这里的new情况 一系列的new().build()....
          AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().contentEncoding("UTF-8").deliveryMode(1).headers(headers).build();
          channel.basicPublish("", "QueueXiaoMi", properties, msg.getBytes());
          if (properties.getHeaders() != null) {
              Iterator<Map.Entry<String, Object>> it = properties.getHeaders().entrySet().iterator();
              while (it.hasNext()) {
                  Map.Entry<String, Object> ent = (Map.Entry<String, Object>) it.next();
                  System.out.println(ent.getKey() + " = " + ent.getValue());
              }
          }
全部评论

相关推荐

Noob1024:一笔传三代,人走笔还在
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务