这篇文章将为大家详细讲解有关使用BlockingQueue怎么实现阻塞队列,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。





package com.shi.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 阻塞队列
* @author shiye
*
*/
public class TestBlockQueue {
public static void main(String[] args) throws InterruptedException {
//定义一个长度为3的阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println("----------------抛出异常的 情况----------------------");
// blockingQueue.add("aa");
// blockingQueue.add("bb");
// blockingQueue.add("cc");
//blockingQueue.add("dd");//如果队列满了 Exception java.lang.IllegalStateException: Queue full
// System.out.println(blockingQueue.element());//检查队列头的信息 : aa
// blockingQueue.remove();
// blockingQueue.remove();
// blockingQueue.remove();
//blockingQueue.remove();//如果队列为空 Exception java.util.NoSuchElementException
//System.out.println(blockingQueue.element());//如果队列为空 Exception java.util.NoSuchElementException
System.out.println("----------------返回true/false----------------------");
// System.out.println(blockingQueue.offer("11"));//插入队列 true
// System.out.println(blockingQueue.offer("22"));//插入队列 true
// System.out.println(blockingQueue.offer("33"));//插入队列 true
// System.out.println(blockingQueue.offer("44"));//插入队列 false
//
// System.out.println(blockingQueue.peek());//检查队列头元素 11
//
// System.out.println(blockingQueue.poll());//输出队列 11
// System.out.println(blockingQueue.poll());//输出队列 22
// System.out.println(blockingQueue.poll());//输出队列 33
// System.out.println(blockingQueue.poll());//输出队列 null
System.out.println("----------------队列阻塞等待----------------------");
// blockingQueue.put("zhangsan");
// blockingQueue.put("lisi");
// blockingQueue.put("wangwu");
// //blockingQueue.put("shiye");//线程一直等待无法关闭
//
// blockingQueue.take();
// blockingQueue.take();
// blockingQueue.take();
//blockingQueue.take();//线程一直等待 无法响应
System.out.println("----------------队列等待一定时间之后就退出----------------------");
System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true
System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true
System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true
System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//false 等待2s钟之后就退出
}
}
SynchronousQueue
package com.shi.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
/**
* 不存储数据的队列,即生产一个消费一个的队列
* @author shiye
*
*结果:
AA 存放1 ...
BB get 1
AA 存放2 ...
BB get 2
AA 存放3 ...
BB get 3
*/
public class TestSynchroniousQueue {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()-> {
try {
System.out.println(Thread.currentThread().getName()+ "\t 存放1 ..." );
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+ "\t 存放2 ..." );
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+ "\t 存放3 ..." );
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"AA").start();
new Thread(()-> {
try {
Thread.sleep(5000);//睡眠5秒
System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take());
Thread.sleep(5000);//睡眠5秒
System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take());
Thread.sleep(5000);//睡眠5秒
System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"BB").start();
}
}
综合案例(使用阻塞队列实现生产者消费者问题)
package com.shi.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 通过阻塞队列的方式 实现 生产者 消费者 问题
* @author shiye
* 使用到的技术:
* countDownLatch:闭锁
* volatile 自旋锁
* AtomicInteger 原子整型
* BlockingQueue 阻塞队列
*
*/
public class TestProducterAndConsumterByQueue {
public static void main(String[] args) throws InterruptedException {
//闭锁
final CountDownLatch countDownLatch = new CountDownLatch(11);
Check check = new Check(new ArrayBlockingQueue<>(3));
//创建线程生产 (启动10个线程去生产)
for (int i = 0; i < 10; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "\t 生产者启动...");
try {
check.productor("aaa");
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();//线程数量减一
},"AA-"+i).start();
}
//创建1 个线程消费
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "\t 消费者启动...");
try {
check.consumter();
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();//线程数量减一
},"BB").start();
Thread.sleep(5000);//等待5秒 停止
check.stop();
countDownLatch.await();//等待上面的线程全部执行完毕,才检查产品数量
System.out.println("5s之后线程停止,总共生产了:"+ check.getTotle() +"件产品");
}
}
//店员
class Check{
private volatile boolean FLAG = true;//标志位
private AtomicInteger atomicInteger = new AtomicInteger();//统计总数的变量
private BlockingQueue<Object> blockingQueue = null;//定义一个阻塞队列
public Check(BlockingQueue<Object> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println("创建一个 "+blockingQueue.getClass().getName()+" 实例");
}
//生产者
public void productor(String num) throws InterruptedException {
while(FLAG) {
System.out.println( Thread.currentThread().getName() + "\t 生产者生产数据:" + num + "到队列中...");
blockingQueue.offer(num,2l,TimeUnit.SECONDS); //延迟2s插入数据到队列中。。
Thread.sleep(1000);//线程睡眠1s
atomicInteger.getAndIncrement();//让总数自加1
}
}
//消费者
public void consumter() throws InterruptedException {
while(FLAG) {
Object object = blockingQueue.poll(2, TimeUnit.SECONDS);//最多消费延迟2s
if(object != null) {
System.out.println( Thread.currentThread().getName() + "\t 消费者消费数据:" + object);
}
}
}
//停止
public void stop() {
FLAG = false;
}
public int getTotle() {
return atomicInteger.get();
}
}
关于使用BlockingQueue怎么实现阻塞队列就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。