SpringBoot整合ActiveMQ 实现消息的发布订阅

/ 中间件 / 515浏览

准备工作

增加ActiveMQ的相关依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
</dependency>

然后配置application.yml:

spring:
  activemq:
    broker-url: tcp://192.168.1.121:61616
    #mq账户密码
    user: admin
    password: pass123

ActiveMQ自定义配置

由于在生产环境下,一般都需要并发的进行消费,所以需要一个配置类进行自定义配置:
1、配置消息监听器连接工厂(分别配置了点对点以及发布订阅各自的工厂类对象,可避免冲突)
2、配置全局的订阅目的地对象(可避免重复创建对象)

package cn.zealon.activemq.config;

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;

/**
 * ActiveMQ 配置
 * @auther: Zealon
 * @Date: 2018-01-20 16:41
 */
@Configuration
@EnableJms
public class ActiveMQConfig {

    @Bean
    public Topic topic(){
        return new ActiveMQTopic("order.topic");
    }

    //消息监听器连接工厂
    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        //JMS监听者并发线程数范围
        factory.setConcurrency("1-10");
        //重连间隔时间
        factory.setRecoveryInterval(1000L);
        //应答模式
        factory.setSessionAcknowledgeMode(2);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    //订阅监听器连接工厂
    @Bean
    public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        //JMS监听者并发线程数范围
        factory.setConcurrency("1-1");
        //重连间隔时间
        factory.setRecoveryInterval(1000L);
        //支持发布/订阅
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
}

⭐ 需要注意的是,发布订阅的场景,并发线程数需要控制在1个。不然会有重复消费数据的情况。

创建发布者

需要一个发布者服务类,这个类中使用了全局的消息Topic对象(可以避免重复创建对象):

package cn.zealon.activemq.pubsub.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Topic;

/**
 * 发布者服务类
 */
@Service
public class PublishService {

    @Autowired
    private Topic topic;

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    public void publish(String message){
        jmsMessagingTemplate.convertAndSend(topic,message);
    }
}

然后创建一个发布者控制器,供web调用:

package cn.zealon.activemq.pubsub.controller;

import cn.zealon.activemq.pubsub.service.PublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 发布者控制器
 */
@RestController
@RequestMapping("pub")
public class PublishController {

    @Autowired
    private PublishService publishService;

    @RequestMapping("add")
    public Object publish(String message){
        publishService.publish(message);
        return "ok";
    }
}

创建订阅者

这里直接使用@JmsListener注解来创建订阅者。
需要注意的是,如果一个SpringBoot工程里面有既有生产者消费者,又有发布订阅时,需要定义多个JmsListenerContainerFactory的Bean对象,来分别给订阅者和消费者使用。
这里定义了2个订阅者A、B:

package cn.zealon.activemq.pubsub.service;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;

@Component
public class SubscribeA {

    @JmsListener(destination = "order.topic",containerFactory = "topicFactory")
    public void receive(final TextMessage message){
        String text = null;
        try {
            text = message.getText();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+
                " SubscribeA订阅消息:"+text);
    }
}

订阅者B:

package cn.zealon.activemq.pubsub.service;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;

@Component
public class SubscribeB {

    @JmsListener(destination = "order.topic",containerFactory = "topicFactory")
    public void receive(final TextMessage message){
        String text = null;
        try {
            text = message.getText();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+
                " SubscribeB订阅消息:"+text);
    }
}

测试

使用postman工具,发布消息: alt 观察控制台,已经A、B两个订阅者已经成功进行了消息订阅。 alt