【有书共读】《Spring源码深度解读》第13章MQ
Spring整合ActiveMQ
Java消息服务(JavaMessageService,JMS)应用程序接口是一一个 Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间或分布式系统中发送消息,进行异步通信。Java消息服务是一一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
Java消息服务的规范包括两种消息模式,点对点和发布者/订阅者。许多提供商支持这一通用框架。因此,程序员可以在他们的分布式软件中实现面向消息的操作,这些操作将具有不同面向消息中间件产品的可移植性。
Java消息服务支持同步和异步的消息处理,在某些场景下,异步消息是必要的,而且比同步消息操作更加便利。
Java消息服务支持面向事件的方法接收消息,事件驱动的程序设计现在被广泛认为是一种富有成效的程序设计范例,程序员们都相当熟悉。
在应用系统开发时,Java消息服务可以推迟选择面对消息中间件产品,也可以在不同的面对消息中间件切换。
本章以Java消息服务的开源实现产品ActiveMQ为例来进行Spring整合消息服务功能的实现分析。
(1) Spring配置文件。
配置文件是Spring 的核心,Spring 整合消息服务的使用也从配置文件配置开始。类似于数据库操作,Spring也将ActiveMQ中的操作统封装至JmsTemplate中,以方便我们统使用。所以,在Spring的核心配置文件中首先要注册JmsTemplate类型的bean。当然,ActiveMQConnectionFactory用于连接消息服务器,是消息服务的基础,也要注册。ActiveMQQueue 则用于指定消息的目的地。
(2)发送端。
有了以上的配置,Spring 就可以根据配置信息简化我们的工作量。Spring 中使用发送消息到消息服务器,省去了冗余的Connection以及Session 等的创建与销毁过程,简化了工作量。
(3)接收端。
到这里我们已经完成了Spring消息的发送与接收操作。但是,如HelloWorldReciver中所示的代码,使用jmsTemplate.receive(destination)方 法只能接收一一次消息,如果未接收到消息,则会-一直等待,当然用户可以通过设置timeout属性来控制等待时间,但是一旦接收到消息本次接收任务就会结束,虽然用户可以通过while(true)的方式来实现循环监听消息服务器上的消息,还有一种更好的解决办法:创建消息***。消息***的使用方式如下。
(1)创建消息***。
用于监听消息,一旦有新消息Spring会将消息引导至消息***以方便用户进行相应的逻辑处理。
(2)修改配置文件。
为了使用消息***,需要在配置文件中注册消息容器,并将消息***注人到容器中。
源码分析
尽管消息接收可以使用消息***的方式替代模版方法,但是在发送的时候是无法替代的,在Spring中必须要使用JmsTemplate提供的方法来进行发送操作,可见JmsTemplate类的重要性,那么我们对于Spring整合消息服务的分析就从JmsTemplate开始。
首先还是按照一贯的分析套路, 提取我们感兴趣的接口InitializingBean, 接口方法实现是在JmsAccessor类中,如下:
public void afterPropertiesSet() {
if (getConnectionFactory() == nu1l)
throw new IllegalArgumentExcept ion ("Property 'connectionFactory' is required");
发现函数中只是一一个验证的功能,并没有逻辑实现。丢掉这个线索,我们转向实例代码的分析。首先以发送为例,在Spring中发送消息可以通过JmsTemplate中提供的方法来实现。
public void send(final Destination destination, final MessageCreator messageCreator)throws JmsExcept ion
使用方式如下:
jmsTemplate, send (destination, new MessageCreator ()
public Message createMessage (Session session) throws JMSException {
return session. createTextMessage("大家好这个是测试! ") ;
});
我们就跟着程序流,进人函数send查看其源代码:
public void send(final Destination destination, final MessageCreator messageCreator)throws JmsExcept ion{
execute (new SessionCallback<object>(){
public object doInJms (Session session) throws JMSException {
doSend (session, destination, messageCreator) ;return nul1;
},false) ;
public <T> T execute (SessionCallback<T> action, boolean startConnection) throws JmsException {
2.发送消息的实现
有了基类辅助实现,使Spring更加专注于个性的处理,也就是说Spring使用execute方法中封装了冗余代码,而将个性化的代码实现放在了回调函数doInJms函数中。在发送消息的功能中回调函数通过局部类实现。