SpringBoot整合ActiveMQ并使用MySQL持久化
ActiveMQ(windows 环境)
一、配置MySQL作为持久化数据库
1. 在lib目录下引入jar包
由于我本机数据库版本为MySQL 8,所以引入druid-1.1.23.jar和mysql-connector-java-8.0.19.jar两个包,如图所示:
2. 修改activemq.xml
配置文件在conf目录下,编辑并添加druid连接池配置信息:
<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true&useSSL=false&serverTimezone=GMT%2B8"/>
<property name="username" value="root"/>
<property name="password" value="12345"/>
<property name="initialSize" value="1" />
<property name="minIdle" value="1" />
<property name="maxActive" value="10" />
<property name="poolPreparedStatements" value="true"/>
</bean> 之后修改原来的数据库配置:
<!-- <persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter> -->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter> 3. 启动
首先新建数据库--activemq,之后在bin/win64目录下找到activemq.bat双击运行,可以在数据库里发现多了三个表,证明配置成功了。
访问web console平台:
二、SpringBoot整合
1. 新建SpringBoot项目
以maven模式创建,并在pom文件引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency> 2. 配置ActiveMQ
(1) 在启动类添加注解@EnableJms。
(2) 修改yml文件:
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
close-timeout: 15s
in-memory: true
non-blocking-redelivery: false
send-timeout: 0
queue-name: active.queue
topic-name: active.topic.name.model
pool:
enabled: true
max-connections: 10
idle-timeout: 30000 (3) 添加配置类:
注意Queue类和Topic类要导入javax.jms下的包
@Configuration
public class ActivemqConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.topic-name}")
private String password;
@Value("${spring.activemq.queue-name}")
private String queueName;
@Value("${spring.activemq.topic-name}")
private String topicName;
@Bean(name = "queue")
public Queue queue() {
return new ActiveMQQueue(queueName);
}
@Bean(name = "topic")
public Topic topic() {
return new ActiveMQTopic(topicName);
}
@Bean
public ConnectionFactory connectionFactory(){
return new ActiveMQConnectionFactory(username, password, brokerUrl);
}
@Bean
public JmsMessagingTemplate jmsMessageTemplate(){
return new JmsMessagingTemplate(connectionFactory());
}
@Bean("queueListener")
public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(false);
return factory;
}
@Bean("topicListener")
public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
} 3. 生产者
在添加@RestController注解之前要引入spring-boot-starter-web依赖。
@RestController
public class ProviderController {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@GetMapping("/queue")
public String sendQueue(@RequestParam String msg) {
this.sendMessage(this.queue, msg);
return "success";
}
@GetMapping("/topic")
public String sendTopic(@RequestParam String msg) {
this.sendMessage(this.topic, msg);
return "success";
}
/**
* 发送消息
* @param destination 模式
* @param message 消息
*/
private void sendMessage(Destination destination, final String message){
jmsMessagingTemplate.convertAndSend(destination, message);
}
} 4. 消费者
(1) queue模式
@Component
@Slf4j
public class QueueConsumer {
@JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener")
public void queueReceiveA(String message) {
log.info("queue A receive --> {}",message);
}
@JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener")
public void queueReceiveB(String message) {
log.info("queue B receive --> {}",message);
}
} 配置了两个消费者,用于区分queue和topic两种不同模式的特点。为使用@Slf4j注解应引入lombok依赖。
(2) topic模式
@Component
@Slf4j
public class TopicConsumer {
@JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
public void topicReceiveA(String message) {
log.info("topic A receive --> {}",message);
}
@JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
public void topicReceiveB(String message) {
log.info("topic B receive --> {}",message);
}
@JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
public void topicReceiveC(String message) {
log.info("topic C receive --> {}",message);
}
} 三、运行结果及解释
1. queue(点对点)模式
在浏览器地址栏输入http://localhost:8080/queue?msg=hello
可以在控制台输出中发现有一个消费者接收到了这个消息:
再一次再浏览器地址栏输入http://localhost:8080/queue?msg=world
发现还是只有一个消费者接收到了消息,但是消费者变成了queue B:
多次发送消息,会发现一直是A和B轮流接受消息:
2. topic(发布/订阅)模式
在浏览器地址栏输入http://localhost:8080/topic?msg=hello%20world
此时控制台同时输出了三条内容,说明三个订阅者同时接到了消息:
3. 两种模式的区别
(1) queue模式:
通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。
(2) topic模式:
通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。
