SpringBoot —— 基于BlockingQueue实现的生产者消费者用例

/ SpringBoot / 4919浏览

阻塞队列介绍

阻塞队列BlockingQueue,是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。 插入和移除的4种处理方式:

方法抛出异常返回特殊值一直阻塞超时抛出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除犯法remove()poll()take()poll(time,unit)
检查方法element()peek()----

阻塞队列常用与生产者和消费者的场景,这里使用SpringBoot实现一个简单的生产消费用例。
首先创建任务类,Task:

package cn.zealon.pac;

import java.util.Date;

/**
 * 任务
 * @auther: Zealon
 * @Date: 2018-06-29 10:48
 */
public class Task {

    private String id;
    private String jsonData;
    private int state;
    private Date createTime;
    private Date finishTime;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getJsonData() {
        return jsonData;
    }

    public void setJsonData(String jsonData) {
        this.jsonData = jsonData;
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getFinishTime() {
        return finishTime;
    }

    public void setFinishTime(Date finishTime) {
        this.finishTime = finishTime;
    }

    @Override
    public String toString() {
        return "Task{" +
                "id='" + id + '\'' +
                ", jsonData='" + jsonData + '\'' +
                ", state=" + state +
                ", createTime=" + createTime +
                ", finishTime=" + finishTime +
                '}';
    }
}

公用队列、线程池配置

然后需要一个公用配置类,供放入Spring容器中:

package cn.zealon.pac.config;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;

/**
 * 阻塞队列、线程池配置类
 * @auther: Zealon
 * @Date: 2018-06-29 11:39
 */
public class QueueBean {

    private BlockingQueue<Runnable> queue;

    private ExecutorService executorService;

    public ExecutorService getExecutorService() {
        return executorService;
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public BlockingQueue<Runnable> getQueue() {
        return queue;
    }

    public void setQueue(BlockingQueue<Runnable> queue) {
        this.queue = queue;
    }
}

然后需要配置阻塞队列,需要定义在Spring的Bean容器中,使得生产者消费者公用同一个队列。这里直接配置在Boot的启动类Application中:

package cn.zealon;

import cn.zealon.pac.config.QueueBean;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.concurrent.*;

@SpringBootApplication
public class Application {

    @Bean
    public QueueBean queueBean(){
        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(50);
        QueueBean queueBean = new QueueBean();
        queueBean.setQueue(blockingQueue);
        ExecutorService executorService = new ThreadPoolExecutor(
                5,          //core
                10,     //max
                120L,      //两分钟
                TimeUnit.SECONDS,       //单位
                blockingQueue);

        queueBean.setExecutorService(executorService);
        return queueBean;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

创建生产者

队列和线程池准备好了,这是开始创建生产者,是一个服务类TaskService:

package cn.zealon.pac.service;

import cn.zealon.pac.config.QueueBean;
import cn.zealon.pac.consumer.TaskConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 任务生产者服务
 * @auther: Zealon
 * @Date: 2018-06-29 11:26
 */
@Component
public class TaskService {

    @Autowired
    private QueueBean queueBean;

    public void insert(TaskConsumer task){
        try {
            queueBean.getQueue().put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

package cn.zealon.pac.controller;

import cn.zealon.pac.Task;
import cn.zealon.pac.config.QueueBean;
import cn.zealon.pac.consumer.TaskConsumer;
import cn.zealon.pac.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Date;
import java.util.UUID;

以及对应的控制器TaskController:

/**
 * 生产者控制器
 * @auther: Zealon
 * @Date: 2018-06-29 11:28
 */
@Controller
@RequestMapping("task")
public class TaskController {

    @Autowired
    private TaskService taskService;

    @Autowired
    private QueueBean queueBean;

    @ResponseBody
    @PostMapping("insert")
    public Object insert(int taskNum){
        for (int i=0;i<taskNum;i++){
            TaskConsumer taskConsumer = new TaskConsumer(queueBean.getQueue());
            Task task = new Task();
            task.setId(UUID.randomUUID().toString());
            task.setJsonData("{"type":"任务类型","data":"数据"}");
            task.setState(0);
            task.setCreateTime(new Date());
            taskConsumer.setTask(task);
            taskService.insert(taskConsumer);
        }
        return "ok";
    }
}

创建消费者

最后需要一个消费者:

package cn.zealon.pac.consumer;

import cn.zealon.pac.Task;
import java.util.concurrent.BlockingQueue;

/**
 * 消费者
 * @auther: Zealon
 * @Date: 2018-06-29 15:02
 */
public class TaskConsumer implements Runnable {

    private Task task;

    private BlockingQueue<Runnable> queue;

    public TaskConsumer(BlockingQueue<Runnable> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while(true){
            try {
                TaskConsumer taskConsumer = (TaskConsumer) queue.take();

                System.out.println(Thread.currentThread().getName()+"消费:"+taskConsumer.getTask().getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public Task getTask() {
        return task;
    }

    public void setTask(Task task) {
        this.task = task;
    }
}

由于生产者消费者的用例是基于web场景,这里需要在SpringBoot启动后,自动的创建消费者线程,这里使用了InitializingBean来实现:

package cn.zealon.pac.config;

import cn.zealon.pac.consumer.TaskConsumer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 启动消费者线程
 * @auther: Zealon
 * @Date: 2018-06-29 14:30
 */
@Component
public class ConsumerTaskEventSubscribe implements InitializingBean {

    @Autowired
    private QueueBean queueBean;

    @Override
    public void afterPropertiesSet() throws Exception {
        TaskConsumer consumer = new TaskConsumer(queueBean.getQueue());
        for(int i=0;i<5;i++){
            queueBean.getExecutorService().execute(new Thread(consumer));
        }
        System.out.println("consumer started...");
    }
}

用例测试

最后进行用例测试,启动SpringBoot: alt 可以看到,消费者线程已经成功启动(consumer started...)。
这时候我们访问生产者服务进行生产任务,生产者调用的资源信息为:

请求地址:/task/insert
请求方式:post
请求参数:taskNum(输入具体数值来模拟任务生产总数)

使用工具开始请求: alt 生产了100个任务,并且返回结果没有异常,这时来看下控制台,消费者线程池已经成功进行了消费。 alt

  1. 为啥我消费执行业务逻辑代码一直报空指针异常呢

    回复