准备工作
增加ActiveMQ的相关依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
然后配置application.yml:
spring:
activemq:
broker-url: tcp://192.168.1.121:61616
#mq账户密码
user: admin
password: pass123
ActiveMQ自定义配置
由于在生产环境下,一般都需要并发的进行消费,所以需要一个配置类进行自定义配置:
1、配置消息监听器连接工厂(分别配置了点对点以及发布订阅各自的工厂类对象,可避免冲突)
2、配置全局的订阅目的地对象(可避免重复创建对象)
package cn.zealon.activemq.config;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;
/**
* ActiveMQ 配置
* @auther: Zealon
* @Date: 2018-01-20 16:41
*/
@Configuration
@EnableJms
public class ActiveMQConfig {
@Bean
public Topic topic(){
return new ActiveMQTopic("order.topic");
}
//消息监听器连接工厂
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
//JMS监听者并发线程数范围
factory.setConcurrency("1-10");
//重连间隔时间
factory.setRecoveryInterval(1000L);
//应答模式
factory.setSessionAcknowledgeMode(2);
configurer.configure(factory, connectionFactory);
return factory;
}
//订阅监听器连接工厂
@Bean
public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
//JMS监听者并发线程数范围
factory.setConcurrency("1-1");
//重连间隔时间
factory.setRecoveryInterval(1000L);
//支持发布/订阅
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
⭐ 需要注意的是,发布订阅的场景,并发线程数需要控制在1个。不然会有重复消费数据的情况。
创建发布者
需要一个发布者服务类,这个类中使用了全局的消息Topic对象(可以避免重复创建对象):
package cn.zealon.activemq.pubsub.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Topic;
/**
* 发布者服务类
*/
@Service
public class PublishService {
@Autowired
private Topic topic;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
public void publish(String message){
jmsMessagingTemplate.convertAndSend(topic,message);
}
}
然后创建一个发布者控制器,供web调用:
package cn.zealon.activemq.pubsub.controller;
import cn.zealon.activemq.pubsub.service.PublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 发布者控制器
*/
@RestController
@RequestMapping("pub")
public class PublishController {
@Autowired
private PublishService publishService;
@RequestMapping("add")
public Object publish(String message){
publishService.publish(message);
return "ok";
}
}
创建订阅者
这里直接使用@JmsListener注解来创建订阅者。
需要注意的是,如果一个SpringBoot工程里面有既有生产者消费者,又有发布订阅时,需要定义多个JmsListenerContainerFactory的Bean对象,来分别给订阅者和消费者使用。
这里定义了2个订阅者A、B:
package cn.zealon.activemq.pubsub.service;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@Component
public class SubscribeA {
@JmsListener(destination = "order.topic",containerFactory = "topicFactory")
public void receive(final TextMessage message){
String text = null;
try {
text = message.getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+
" SubscribeA订阅消息:"+text);
}
}
订阅者B:
package cn.zealon.activemq.pubsub.service;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@Component
public class SubscribeB {
@JmsListener(destination = "order.topic",containerFactory = "topicFactory")
public void receive(final TextMessage message){
String text = null;
try {
text = message.getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+
" SubscribeB订阅消息:"+text);
}
}
测试
使用postman工具,发布消息:
观察控制台,已经A、B两个订阅者已经成功进行了消息订阅。
作者: Zealon
崇尚简单,一切简单自然的事物都是美好的。