SpringBoot整合ActiveMQ并使用MySQL持久化

ActiveMQ(windows 环境)

一、配置MySQL作为持久化数据库

1. 在lib目录下引入jar包

由于我本机数据库版本为MySQL 8,所以引入druid-1.1.23.jar和mysql-connector-java-8.0.19.jar两个包,如图所示:

2. 修改activemq.xml

配置文件在conf目录下,编辑并添加druid连接池配置信息:

<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true&amp;useSSL=false&amp;serverTimezone=GMT%2B8"/>
    <property name="username" value="root"/>
    <property name="password" value="12345"/>
    <property name="initialSize" value="1" /> 
    <property name="minIdle" value="1" /> 
    <property name="maxActive" value="10" />
    <property name="poolPreparedStatements" value="true"/>
</bean>

之后修改原来的数据库配置:

<!-- <persistenceAdapter>
          <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter> -->
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>

3. 启动

首先新建数据库--activemq,之后在bin/win64目录下找到activemq.bat双击运行,可以在数据库里发现多了三个表,证明配置成功了。

访问web console平台:

二、SpringBoot整合

1. 新建SpringBoot项目

以maven模式创建,并在pom文件引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

2. 配置ActiveMQ

(1) 在启动类添加注解@EnableJms。

(2) 修改yml文件:

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    close-timeout: 15s
    in-memory: true
    non-blocking-redelivery: false
    send-timeout: 0
    queue-name: active.queue
    topic-name: active.topic.name.model
  pool:
    enabled: true
    max-connections: 10
    idle-timeout: 30000

(3) 添加配置类:
注意Queue类和Topic类要导入javax.jms下的包

@Configuration
public class ActivemqConfig {
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String username;

    @Value("${spring.activemq.topic-name}")
    private String password;

    @Value("${spring.activemq.queue-name}")
    private String queueName;

    @Value("${spring.activemq.topic-name}")
    private String topicName;

    @Bean(name = "queue")
    public Queue queue() {
        return new ActiveMQQueue(queueName);
    }

    @Bean(name = "topic")
    public Topic topic() {
        return new ActiveMQTopic(topicName);
    }

    @Bean
    public ConnectionFactory connectionFactory(){
        return new ActiveMQConnectionFactory(username, password, brokerUrl);
    }

    @Bean
    public JmsMessagingTemplate jmsMessageTemplate(){
        return new JmsMessagingTemplate(connectionFactory());
    }

    @Bean("queueListener")
    public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }

    @Bean("topicListener")
    public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}

3. 生产者

在添加@RestController注解之前要引入spring-boot-starter-web依赖。

@RestController
public class ProviderController {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    @GetMapping("/queue")
    public String sendQueue(@RequestParam String msg) {
        this.sendMessage(this.queue, msg);
        return "success";
    }

    @GetMapping("/topic")
    public String sendTopic(@RequestParam String msg) {
        this.sendMessage(this.topic, msg);
        return "success";
    }

    /**
     * 发送消息
     * @param destination 模式
     * @param message 消息
     */
    private void sendMessage(Destination destination, final String message){
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
}

4. 消费者

(1) queue模式

@Component
@Slf4j
public class QueueConsumer {
    @JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener")
    public void queueReceiveA(String message) {
        log.info("queue A receive --> {}",message);
    }

    @JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener")
    public void queueReceiveB(String message) {
        log.info("queue B receive --> {}",message);
    }
}

配置了两个消费者,用于区分queue和topic两种不同模式的特点。为使用@Slf4j注解应引入lombok依赖。

(2) topic模式

@Component
@Slf4j
public class TopicConsumer {
    @JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
    public void topicReceiveA(String message) {
        log.info("topic A receive --> {}",message);
    }

    @JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
    public void topicReceiveB(String message) {
        log.info("topic B receive --> {}",message);
    }

    @JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
    public void topicReceiveC(String message) {
        log.info("topic C receive --> {}",message);
    }
}

三、运行结果及解释

1. queue(点对点)模式

在浏览器地址栏输入http://localhost:8080/queue?msg=hello

可以在控制台输出中发现有一个消费者接收到了这个消息:

再一次再浏览器地址栏输入http://localhost:8080/queue?msg=world

发现还是只有一个消费者接收到了消息,但是消费者变成了queue B:

多次发送消息,会发现一直是A和B轮流接受消息:

2. topic(发布/订阅)模式

在浏览器地址栏输入http://localhost:8080/topic?msg=hello%20world

此时控制台同时输出了三条内容,说明三个订阅者同时接到了消息:

3. 两种模式的区别

(1) queue模式:

通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。

(2) topic模式:

通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。

全部评论

相关推荐

我已成为0offer的糕手:别惯着,胆子都是练出来的,这里认怂了,那以后被裁应届被拖工资还敢抗争?
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务