阻塞队列介绍
阻塞队列BlockingQueue,是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。 插入和移除的4种处理方式:
方法 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时抛出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除犯法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | -- | -- |
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用与生产者和消费者的场景,这里使用SpringBoot实现一个简单的生产消费用例。
首先创建任务类,Task:
package cn.zealon.pac;
import java.util.Date;
/**
* 任务
* @auther: Zealon
* @Date: 2018-06-29 10:48
*/
public class Task {
private String id;
private String jsonData;
private int state;
private Date createTime;
private Date finishTime;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getJsonData() {
return jsonData;
}
public void setJsonData(String jsonData) {
this.jsonData = jsonData;
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getFinishTime() {
return finishTime;
}
public void setFinishTime(Date finishTime) {
this.finishTime = finishTime;
}
@Override
public String toString() {
return "Task{" +
"id='" + id + '\'' +
", jsonData='" + jsonData + '\'' +
", state=" + state +
", createTime=" + createTime +
", finishTime=" + finishTime +
'}';
}
}
公用队列、线程池配置
然后需要一个公用配置类,供放入Spring容器中:
package cn.zealon.pac.config;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
/**
* 阻塞队列、线程池配置类
* @auther: Zealon
* @Date: 2018-06-29 11:39
*/
public class QueueBean {
private BlockingQueue<Runnable> queue;
private ExecutorService executorService;
public ExecutorService getExecutorService() {
return executorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
public BlockingQueue<Runnable> getQueue() {
return queue;
}
public void setQueue(BlockingQueue<Runnable> queue) {
this.queue = queue;
}
}
然后需要配置阻塞队列,需要定义在Spring的Bean容器中,使得生产者消费者公用同一个队列。这里直接配置在Boot的启动类Application中:
package cn.zealon;
import cn.zealon.pac.config.QueueBean;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.concurrent.*;
@SpringBootApplication
public class Application {
@Bean
public QueueBean queueBean(){
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(50);
QueueBean queueBean = new QueueBean();
queueBean.setQueue(blockingQueue);
ExecutorService executorService = new ThreadPoolExecutor(
5, //core
10, //max
120L, //两分钟
TimeUnit.SECONDS, //单位
blockingQueue);
queueBean.setExecutorService(executorService);
return queueBean;
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
创建生产者
队列和线程池准备好了,这是开始创建生产者,是一个服务类TaskService:
package cn.zealon.pac.service;
import cn.zealon.pac.config.QueueBean;
import cn.zealon.pac.consumer.TaskConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 任务生产者服务
* @auther: Zealon
* @Date: 2018-06-29 11:26
*/
@Component
public class TaskService {
@Autowired
private QueueBean queueBean;
public void insert(TaskConsumer task){
try {
queueBean.getQueue().put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package cn.zealon.pac.controller;
import cn.zealon.pac.Task;
import cn.zealon.pac.config.QueueBean;
import cn.zealon.pac.consumer.TaskConsumer;
import cn.zealon.pac.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Date;
import java.util.UUID;
以及对应的控制器TaskController:
/**
* 生产者控制器
* @auther: Zealon
* @Date: 2018-06-29 11:28
*/
@Controller
@RequestMapping("task")
public class TaskController {
@Autowired
private TaskService taskService;
@Autowired
private QueueBean queueBean;
@ResponseBody
@PostMapping("insert")
public Object insert(int taskNum){
for (int i=0;i<taskNum;i++){
TaskConsumer taskConsumer = new TaskConsumer(queueBean.getQueue());
Task task = new Task();
task.setId(UUID.randomUUID().toString());
task.setJsonData("{"type":"任务类型","data":"数据"}");
task.setState(0);
task.setCreateTime(new Date());
taskConsumer.setTask(task);
taskService.insert(taskConsumer);
}
return "ok";
}
}
创建消费者
最后需要一个消费者:
package cn.zealon.pac.consumer;
import cn.zealon.pac.Task;
import java.util.concurrent.BlockingQueue;
/**
* 消费者
* @auther: Zealon
* @Date: 2018-06-29 15:02
*/
public class TaskConsumer implements Runnable {
private Task task;
private BlockingQueue<Runnable> queue;
public TaskConsumer(BlockingQueue<Runnable> queue) {
this.queue = queue;
}
@Override
public void run() {
while(true){
try {
TaskConsumer taskConsumer = (TaskConsumer) queue.take();
System.out.println(Thread.currentThread().getName()+"消费:"+taskConsumer.getTask().getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Task getTask() {
return task;
}
public void setTask(Task task) {
this.task = task;
}
}
由于生产者消费者的用例是基于web场景,这里需要在SpringBoot启动后,自动的创建消费者线程,这里使用了InitializingBean来实现:
package cn.zealon.pac.config;
import cn.zealon.pac.consumer.TaskConsumer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 启动消费者线程
* @auther: Zealon
* @Date: 2018-06-29 14:30
*/
@Component
public class ConsumerTaskEventSubscribe implements InitializingBean {
@Autowired
private QueueBean queueBean;
@Override
public void afterPropertiesSet() throws Exception {
TaskConsumer consumer = new TaskConsumer(queueBean.getQueue());
for(int i=0;i<5;i++){
queueBean.getExecutorService().execute(new Thread(consumer));
}
System.out.println("consumer started...");
}
}
用例测试
最后进行用例测试,启动SpringBoot:
可以看到,消费者线程已经成功启动(consumer started...)。
这时候我们访问生产者服务进行生产任务,生产者调用的资源信息为:
请求地址:/task/insert
请求方式:post
请求参数:taskNum(输入具体数值来模拟任务生产总数)
使用工具开始请求: 生产了100个任务,并且返回结果没有异常,这时来看下控制台,消费者线程池已经成功进行了消费。
作者: Zealon
崇尚简单,一切简单自然的事物都是美好的。
为啥我消费执行业务逻辑代码一直报空指针异常呢