SpringBoot整合ActiveMQ并使用MySQL持久化
ActiveMQ(windows 环境)
一、配置MySQL作为持久化数据库
1. 在lib目录下引入jar包
由于我本机数据库版本为MySQL 8,所以引入druid-1.1.23.jar和mysql-connector-java-8.0.19.jar两个包,如图所示:
2. 修改activemq.xml
配置文件在conf目录下,编辑并添加druid连接池配置信息:
<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close"> <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true&useSSL=false&serverTimezone=GMT%2B8"/> <property name="username" value="root"/> <property name="password" value="12345"/> <property name="initialSize" value="1" /> <property name="minIdle" value="1" /> <property name="maxActive" value="10" /> <property name="poolPreparedStatements" value="true"/> </bean>
之后修改原来的数据库配置:
<!-- <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> --> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" /> </persistenceAdapter>
3. 启动
首先新建数据库--activemq,之后在bin/win64目录下找到activemq.bat双击运行,可以在数据库里发现多了三个表,证明配置成功了。
访问web console平台:
二、SpringBoot整合
1. 新建SpringBoot项目
以maven模式创建,并在pom文件引入依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
2. 配置ActiveMQ
(1) 在启动类添加注解@EnableJms。
(2) 修改yml文件:
spring: activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: admin close-timeout: 15s in-memory: true non-blocking-redelivery: false send-timeout: 0 queue-name: active.queue topic-name: active.topic.name.model pool: enabled: true max-connections: 10 idle-timeout: 30000
(3) 添加配置类:
注意Queue类和Topic类要导入javax.jms下的包
@Configuration public class ActivemqConfig { @Value("${spring.activemq.broker-url}") private String brokerUrl; @Value("${spring.activemq.user}") private String username; @Value("${spring.activemq.topic-name}") private String password; @Value("${spring.activemq.queue-name}") private String queueName; @Value("${spring.activemq.topic-name}") private String topicName; @Bean(name = "queue") public Queue queue() { return new ActiveMQQueue(queueName); } @Bean(name = "topic") public Topic topic() { return new ActiveMQTopic(topicName); } @Bean public ConnectionFactory connectionFactory(){ return new ActiveMQConnectionFactory(username, password, brokerUrl); } @Bean public JmsMessagingTemplate jmsMessageTemplate(){ return new JmsMessagingTemplate(connectionFactory()); } @Bean("queueListener") public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } @Bean("topicListener") public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } }
3. 生产者
在添加@RestController注解之前要引入spring-boot-starter-web依赖。
@RestController public class ProviderController { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; @GetMapping("/queue") public String sendQueue(@RequestParam String msg) { this.sendMessage(this.queue, msg); return "success"; } @GetMapping("/topic") public String sendTopic(@RequestParam String msg) { this.sendMessage(this.topic, msg); return "success"; } /** * 发送消息 * @param destination 模式 * @param message 消息 */ private void sendMessage(Destination destination, final String message){ jmsMessagingTemplate.convertAndSend(destination, message); } }
4. 消费者
(1) queue模式
@Component @Slf4j public class QueueConsumer { @JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener") public void queueReceiveA(String message) { log.info("queue A receive --> {}",message); } @JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener") public void queueReceiveB(String message) { log.info("queue B receive --> {}",message); } }
配置了两个消费者,用于区分queue和topic两种不同模式的特点。为使用@Slf4j注解应引入lombok依赖。
(2) topic模式
@Component @Slf4j public class TopicConsumer { @JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener") public void topicReceiveA(String message) { log.info("topic A receive --> {}",message); } @JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener") public void topicReceiveB(String message) { log.info("topic B receive --> {}",message); } @JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener") public void topicReceiveC(String message) { log.info("topic C receive --> {}",message); } }
三、运行结果及解释
1. queue(点对点)模式
在浏览器地址栏输入http://localhost:8080/queue?msg=hello
可以在控制台输出中发现有一个消费者接收到了这个消息:
再一次再浏览器地址栏输入http://localhost:8080/queue?msg=world
发现还是只有一个消费者接收到了消息,但是消费者变成了queue B:
多次发送消息,会发现一直是A和B轮流接受消息:
2. topic(发布/订阅)模式
在浏览器地址栏输入http://localhost:8080/topic?msg=hello%20world
此时控制台同时输出了三条内容,说明三个订阅者同时接到了消息:
3. 两种模式的区别
(1) queue模式:
通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。
(2) topic模式:
通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。