Fork me on GitHub

J.U.C之阻塞队列:前期准备

  要实现一个线程安全的队列有两种方式:阻塞和非阻塞。阻塞队列就是锁的应用,而非阻塞则是CAS算法的应用
为什么要使用阻塞队列?
  在常见的情况下,生产者消费者模式需要用到队列,生产者线程生产数据,放进队列,然后消费从队列中获取数据,这个在单线程的情况下没有问题。但是当多线程的情况下,某个特定时间下,(峰值高并发)出现消费者速度远大于生产者速度,消费者必须阻塞来等待生产者,以保证生产者能够生产出新的数据;当生产者速度远大于消费者速度时,同样也是一个道理。这些情况都要程序员自己控制阻塞,同时又要线程安全和运行效率。
  阻塞队列的出现使得程序员不需要关心这些细节,比如什么时候阻塞线程,什么时候唤醒线程,这些都由阻塞队列完成了

  
  Java中有些多线程编程模式在很大程序上都依赖于Queue实现的线程安全性,所以非常有必要认识,首先来看一下接口定义,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Queue<E> extends Collection<E> {
// 两者都是往队列尾部插入元素
// 区别:当超出队列界限的时候,add()方法是抛出异常让你处理,而offer()方法是直接返回false
boolean add(E e);
boolean offer(E e);
// 都从队列中删除第一个元素。
// 区别:remove() 在空集合调用时抛出异常;而poll()返回null。
E remove();
E poll();
// 获取队列的头部查询元素。
// 区别:在队列为空时, element()抛出一个异常,而 peek()返回null
E element();
E peek();
}

  BlockingQueue类继承了如上的接口,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}

  
  抛出异常:抛出一个异常;
  特殊值:返回一个特殊值(null或false,视情况而定)
  阻塞:在成功操作之前,一直阻塞线程
  超时:放弃前只在最大的时间内阻塞


BlockingQueue

  BlockingQueue接口实现Queue接口,它支持两个附加操作:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。相对于同一操作他提供了四种机制:抛出异常、返回特殊值、阻塞等待、超时:
  BlockingQueue常用于生产者和消费者场景
  

  JDK 8 中提供了七个阻塞队列可供使用(上图的DelayedWorkQueue是ScheduledThreadPoolExecutor的内部类):
  1.ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  2.LinkedBlockingQueue :一个由链表结构组成的无界阻塞队列。
  3.PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  4.SynchronousQueue:一个不存储元素的阻塞队列。
  5.DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  6.LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  7.LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

  基于数组的阻塞队列,ArrayBlockingQueue内部维护这一个定长数组,阻塞队列的大小在初始化时就已经确定了,其后无法更改。
  采用可重入锁ReentrantLock来保证线程安全性,但是生产者和消费者是共用同一个锁对象,这样势必会导致降低一定的吞吐量。当然ArrayBlockingQueue完全可以采用分离锁来实现生产者和消费者的并行操作,但是我认为这样做只会给代码带来额外的复杂性,对于性能而言应该不会有太大的提升,因为 基于数组的ArrayBlockingQueue在数据的写入和读取操作已经非常轻巧了
  ArrayBlockingQueue支持公平性和非公平性,默认采用非公平模式,可以通过构造函数设置为公平访问策略(true)。

LinkedBlockingQueue

  LinkedBlockingQueue是基于链表的阻塞队列,内部维持的数据缓冲队列是由链表组成的,也是按照先进先出的原则。
  如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小(Integer.Max_VALUE)的容量,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。
  LinkedBlockingQueue之所以能够高效的处理并发数据,是因为take()方法和put(E param)方法使用了不同的可重入锁,分别为private final ReentrantLock putLock和private final ReentrantLock takeLock,这也意味着在 高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
两者对比:
  1.ArrayBlockingQueue在put,take操作使用了同一个锁,两者操作不能同时进行,而LinkedBlockingQueue使用了不同的锁,put操作和take操作可同时进行。
  2.ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象,这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。

PriorityBlockingQueue

  PriorityBlockingQueue是支持优先级的无界队列。默认情况下采用自然顺序排序,当然也可以通过自定义Comparator来指定元素的排序顺序。
  PriorityBlockingQueue内部采用二叉堆的实现方式,整个处理过程并不是特别复杂。添加操作则是不断“上冒”,而删除操作则是不断“下掉”。

DelayQueue

  DelayQueue是一个支持延时操作的无界阻塞队列。列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期满时才能够从队列中去元素。
  它主要运用于如下场景:
  缓存系统的设计:缓存是有一定的时效性的,可以用DelayQueue保存缓存的有效期,然后利用一个线程查询DelayQueue,如果取到元素就证明该缓存已经失效了。
  定时任务的调度:DelayQueue保存当天将要执行的任务和执行时间,一旦取到元素(任务),就执行该任务。
  DelayQueue采用支持优先级的PriorityQueue来实现,但是队列中的元素必须要实现Delayed接口,Delayed接口用来标记那些应该在给定延迟时间之后执行的对象,该接口提供了getDelay()方法返回元素节点的剩余时间。同时,元素也必须要实现compareTo()方法,compareTo()方法需要提供与getDelay()方法一致的排序。

SynchronousQueue

  SynchronousQueue是一个神奇的队列,他是一个不存储元素的阻塞队列,也就是说他的 每一个put操作都需要等待一个take操作,否则就不能继续添加元素了,有点儿像Exchanger,类似于生产者和消费者进行交换。
  队列本身不存储任何元素,所以非常适用于传递性场景,两者直接进行对接。其吞吐量会高于ArrayBlockingQueue和LinkedBlockingQueue。
  SynchronousQueue支持公平和非公平的访问策略,在默认情况下采用非公平性,也可以通过构造函数来设置为公平性。
  SynchronousQueue的实现核心为Transferer接口,该接口有TransferQueue和TransferStack两个实现类,分别对应着公平策略和非公平策略。接口Transferer有一个tranfer()方法,该方法定义了转移数据,如果e != null,相当于将一个数据交给消费者,如果e == null,则相当于从一个生产者接收一个消费者交出的数据。

LinkedTransferQueue

  LinkedTransferQueue是一个由链表组成的的无界阻塞队列,该队列是一个相当牛逼的队列:它是ConcurrentLinkedQueue、SynchronousQueue (公平模式下)、无界的LinkedBlockingQueues等的超集。
  与其他BlockingQueue相比,他多实现了一个接口TransferQueue,该接口是对BlockingQueue的一种补充,多了tryTranfer()和transfer()两类方法:
  tranfer():若当前存在一个正在等待获取的消费者线程,即立刻移交之。 否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素
  tryTranfer(): 若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作

LinkedBlockingDeque

  LinkedBlockingDeque是一个有链表组成的双向阻塞队列,与前面的阻塞队列相比它支持从两端插入和移出元素。以first结尾的表示从对头操作,以last结尾的表示从对尾操作。
  在初始化LinkedBlockingDeque时可以初始化队列的容量,用来防止其再扩容时过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。


参考文章【死磕Java并发】—–J.U.C之阻塞队列:BlockingQueue总结

-----------------本文结束,感谢您的阅读-----------------