准备工作
增加ActiveMQ的相关依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
然后配置application.yml:
spring:
activemq:
broker-url: tcp://192.168.1.121:61616
#mq账户密码
user: admin
password: pass123
ActiveMQ自定义配置
由于在生产环境下,一般都需要并发的进行消费,所以需要一个配置类进行自定义配置:
1、配置消息监听器连接工厂(分别配置了点对点以及发布订阅各自的工厂类对象,可避免冲突)
2、配置全局的订阅目的地对象(可避免重复创建对象)
package cn.zealon.activemq.config;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;
/**
* ActiveMQ 配置
* @auther: Zealon
* @Date: 2018-01-20 16:41
*/
@Configuration
@EnableJms
public class ActiveMQConfig {
@Bean
public Topic topic(){
return new ActiveMQTopic("order.topic");
}
//消息监听器连接工厂
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
//JMS监听者并发线程数范围
factory.setConcurrency("1-10");
//重连间隔时间
factory.setRecoveryInterval(1000L);
//应答模式
factory.setSessionAcknowledgeMode(2);
configurer.configure(factory, connectionFactory);
return factory;
}
//订阅监听器连接工厂
@Bean
public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
//JMS监听者并发线程数范围
factory.setConcurrency("1-10");
//重连间隔时间
factory.setRecoveryInterval(1000L);
//支持发布/订阅
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
这里需要说明一下消息的应答模式:
也就是factory.setSessionAcknowledgeMode(int number)
属性,我们可以去看一下源码:
点击set进入AbstractJmsListenerContainerFactory类源码:
/**
* @see AbstractMessageListenerContainer#setSessionAcknowledgeMode(int)
*/
public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
this.sessionAcknowledgeMode = sessionAcknowledgeMode;
}
它提示see setSessionAcknowledgeMode,我们点击进去进一步观察一下,进入了JmsAccessor类如下:
/**
* Set the JMS acknowledgement mode that is used when creating a JMS
* {@link Session} to send a message.
* <p>Default is {@link Session#AUTO_ACKNOWLEDGE}.
* <p>Vendor-specific extensions to the acknowledgment mode can be set here as well.
* <p>Note that inside an EJB, the parameters to the
* {@code create(Queue/Topic)Session(boolean transacted, int acknowledgeMode)} method
* are not taken into account. Depending on the transaction context in the EJB,
* the container makes its own decisions on these values. See section 17.3.5
* of the EJB spec.
* @param sessionAcknowledgeMode the acknowledgement mode constant
* @see javax.jms.Session#AUTO_ACKNOWLEDGE
* @see javax.jms.Session#CLIENT_ACKNOWLEDGE
* @see javax.jms.Session#DUPS_OK_ACKNOWLEDGE
* @see javax.jms.Connection#createSession(boolean, int)
*/
public void setSessionAcknowledgeMode(int sessionAcknowledgeMode) {
this.sessionAcknowledgeMode = sessionAcknowledgeMode;
}
很明显,可以看到注释中还要继续看AUTO_ACKNOWLEAGE...等等,再点击进去观察:
/**
* @version $Rev: 467553 $ $Date: 2006-10-25 06:01:51 +0200 (Wed, 25 Oct 2006) $
*/
public interface Session extends Runnable {
static final int AUTO_ACKNOWLEDGE = 1;
static final int CLIENT_ACKNOWLEDGE = 2;
static final int DUPS_OK_ACKNOWLEDGE = 3;
static final int SESSION_TRANSACTED = 0;
...
}
所以应答模式分别对应的值就很清楚了,可以根据项目的实际需要来定义应答模式。
应答模式 | 定义 |
---|---|
自动确认 | AUTO_ACKNOWLEDGE = 1 |
客户端手动确认 | CLIENT_ACKNOWLEDGE = 2 |
自动批量确认 | DUPS_OK_ACKNOWLEDGE = 3 |
事务提交并确认 | SESSION_TRANSACTED = 0 |
创建生产者
这里我们以Web形式生产消息,所以先创建了一个服务类OrderProducerService:
package cn.zealon.activemq.p2p.service;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Destination;
//消息生产者服务类
@Service
public class OrderProducerService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 发送消息
* @param message 消息内容
*
*/
public void sendTextMessage(String message){
jmsMessagingTemplate.convertAndSend("order.queue",message);
}
/**
* 发送消息
* @param destinationName 发送的队列
* @param message 消息内容
*
*/
public void sendTextMessage(String destinationName,String message){
//队列
Destination destination = new ActiveMQQueue(destinationName);
jmsMessagingTemplate.convertAndSend(destination,message);
}
}
然后需要一个控制器 OrderController,供web请求生产消息:
package cn.zealon.activemq.p2p.controller;
import cn.zealon.activemq.p2p.service.OrderProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 生产者控制器类
*/
@RestController
@RequestMapping("order")
public class OrderController {
@Autowired
private OrderProducerService producerService;
/**
* 新增订单
* @param queue 队列名称
* @param message 消息内容
* @param messageNum 数量
* @return
*/
@PostMapping("add")
public Object addOrder(String queue,String message,int messageNum){
if(messageNum==0){
messageNum = 1;
}
for(int i=0;i<messageNum;i++) {
producerService.sendTextMessage(queue, message+i);
}
return "ok";
}
}
创建消费者
由于SpringBoot中把ActiveMQ的监听封装成了注解,所以这里直接使用@JmsListener来进行配置消费者。注解中指定了消息的目的地,以及监听器连接工厂的对象:
package cn.zealon.activemq.p2p.service;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 订单消费服务
*/
@Component
public class OrderConsumer {
//定义消费者监听
@JmsListener(destination = "order.queue",containerFactory = "myFactory")
public void receiveQueue(final TextMessage message, Session session) throws JMSException {
try {
String text = message.getText();
System.out.println(Thread.currentThread().getName() + " 消费消息:" + text);
//消费确认
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
//异常重试
session.recover();
}
}
}
测试
这里使用postman工具,进行Post请求生产消息: 生产了99个大面包,然后看一下控制台,已进行了消费,由于配置了1-10个线程,所以输出的信息都是不同的线程随机的进行消费:
作者: Zealon
崇尚简单,一切简单自然的事物都是美好的。