SpringBoot整合ActiveMQ 实现消息生产者消费者的点对点通信

/ 中间件 / 874浏览

准备工作

增加ActiveMQ的相关依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
</dependency>

然后配置application.yml:

spring:
  activemq:
    broker-url: tcp://192.168.1.121:61616
    #mq账户密码
    user: admin
    password: pass123

ActiveMQ自定义配置

由于在生产环境下,一般都需要并发的进行消费,所以需要一个配置类进行自定义配置:
1、配置消息监听器连接工厂(分别配置了点对点以及发布订阅各自的工厂类对象,可避免冲突)
2、配置全局的订阅目的地对象(可避免重复创建对象)

package cn.zealon.activemq.config;

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;

/**
 * ActiveMQ 配置
 * @auther: Zealon
 * @Date: 2018-01-20 16:41
 */
@Configuration
@EnableJms
public class ActiveMQConfig {

    @Bean
    public Topic topic(){
        return new ActiveMQTopic("order.topic");
    }

    //消息监听器连接工厂
    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        //JMS监听者并发线程数范围
        factory.setConcurrency("1-10");
        //重连间隔时间
        factory.setRecoveryInterval(1000L);
        //应答模式
        factory.setSessionAcknowledgeMode(2);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    //订阅监听器连接工厂
    @Bean
    public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        //JMS监听者并发线程数范围
        factory.setConcurrency("1-10");
        //重连间隔时间
        factory.setRecoveryInterval(1000L);
        //支持发布/订阅
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
}

这里需要说明一下消息的应答模式:
也就是factory.setSessionAcknowledgeMode(int number)属性,我们可以去看一下源码:
点击set进入AbstractJmsListenerContainerFactory类源码:

/**
 * @see AbstractMessageListenerContainer#setSessionAcknowledgeMode(int)
 */
public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) {
	this.sessionAcknowledgeMode = sessionAcknowledgeMode;
}

它提示see setSessionAcknowledgeMode,我们点击进去进一步观察一下,进入了JmsAccessor类如下:

/**
 * Set the JMS acknowledgement mode that is used when creating a JMS
 * {@link Session} to send a message.
 * <p>Default is {@link Session#AUTO_ACKNOWLEDGE}.
 * <p>Vendor-specific extensions to the acknowledgment mode can be set here as well.
 * <p>Note that inside an EJB, the parameters to the
 * {@code create(Queue/Topic)Session(boolean transacted, int acknowledgeMode)} method
 * are not taken into account. Depending on the transaction context in the EJB,
 * the container makes its own decisions on these values. See section 17.3.5
 * of the EJB spec.
 * @param sessionAcknowledgeMode the acknowledgement mode constant
 * @see javax.jms.Session#AUTO_ACKNOWLEDGE
 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
 * @see javax.jms.Session#DUPS_OK_ACKNOWLEDGE
 * @see javax.jms.Connection#createSession(boolean, int)
 */
public void setSessionAcknowledgeMode(int sessionAcknowledgeMode) {
	this.sessionAcknowledgeMode = sessionAcknowledgeMode;
}

很明显,可以看到注释中还要继续看AUTO_ACKNOWLEAGE...等等,再点击进去观察:

/**
 * @version $Rev: 467553 $ $Date: 2006-10-25 06:01:51 +0200 (Wed, 25 Oct 2006) $
 */
public interface Session extends Runnable {
    static final int AUTO_ACKNOWLEDGE = 1;

    static final int CLIENT_ACKNOWLEDGE = 2;

    static final int DUPS_OK_ACKNOWLEDGE = 3;

    static final int SESSION_TRANSACTED = 0;

    ...
}

所以应答模式分别对应的值就很清楚了,可以根据项目的实际需要来定义应答模式。

应答模式定义
自动确认AUTO_ACKNOWLEDGE = 1
客户端手动确认CLIENT_ACKNOWLEDGE = 2
自动批量确认DUPS_OK_ACKNOWLEDGE = 3
事务提交并确认SESSION_TRANSACTED = 0

创建生产者

这里我们以Web形式生产消息,所以先创建了一个服务类OrderProducerService:

package cn.zealon.activemq.p2p.service;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Destination;

//消息生产者服务类
@Service
public class OrderProducerService {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 发送消息
     * @param message 消息内容
     *
     */
    public void sendTextMessage(String message){
        jmsMessagingTemplate.convertAndSend("order.queue",message);
    }

    /**
     * 发送消息
     * @param destinationName 发送的队列
     * @param message 消息内容
     *
     */
    public void sendTextMessage(String destinationName,String message){
        //队列
        Destination destination = new ActiveMQQueue(destinationName);
        jmsMessagingTemplate.convertAndSend(destination,message);
    }
}

然后需要一个控制器 OrderController,供web请求生产消息:

package cn.zealon.activemq.p2p.controller;

import cn.zealon.activemq.p2p.service.OrderProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 生产者控制器类
 */
@RestController
@RequestMapping("order")
public class OrderController {

    @Autowired
    private OrderProducerService producerService;

    /**
     * 新增订单
     * @param queue 队列名称
     * @param message 消息内容
     * @param messageNum 数量
     * @return
     */
    @PostMapping("add")
    public Object addOrder(String queue,String message,int messageNum){
        if(messageNum==0){
            messageNum = 1;
        }

        for(int i=0;i<messageNum;i++) {
            producerService.sendTextMessage(queue, message+i);
        }

        return "ok";
    }
}

创建消费者

由于SpringBoot中把ActiveMQ的监听封装成了注解,所以这里直接使用@JmsListener来进行配置消费者。注解中指定了消息的目的地,以及监听器连接工厂的对象:

package cn.zealon.activemq.p2p.service;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * 订单消费服务
 */
@Component
public class OrderConsumer {

    //定义消费者监听
    @JmsListener(destination = "order.queue",containerFactory = "myFactory")
    public void receiveQueue(final TextMessage message, Session session) throws JMSException {
        try {
            String text = message.getText();
            System.out.println(Thread.currentThread().getName() + " 消费消息:" + text);
            //消费确认
            message.acknowledge();
        } catch (JMSException e) {
            e.printStackTrace();
            //异常重试
            session.recover();
        }
    }
}

测试

这里使用postman工具,进行Post请求生产消息: alt 生产了99个大面包,然后看一下控制台,已进行了消费,由于配置了1-10个线程,所以输出的信息都是不同的线程随机的进行消费: alt