Fork me on GitHub

J.U.C之阻塞队列:ArrayBlockingQueue源码分析

  
  ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用FIFO的原则对元素进行排序添加的。
  ArrayBlockingQueue为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了。
  ArrayBlockingQueue支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。

先看ArrayBlockingQueue的定义:

1
2
3
4
5
6
7
8
9
10
11
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
private static final long serialVersionUID = -817911632652898426L;
final Object[] items; // 一个定长数组,维护ArrayBlockingQueue的元素
int takeIndex; // ArrayBlockingQueue队首位置
int putIndex; // ArrayBlockingQueue队尾位置
int count; // 元素个数
final ReentrantLock lock; // 重入锁, ArrayBlockingQueue出列入列都必须获取该锁,两个步骤公用一个锁
private final Condition notEmpty; // 出列条件
private final Condition notFull; // 入列条件
transient ArrayBlockingQueue.Itrs itrs;
}

  可以清楚地看到ArrayBlockingQueue继承AbstractQueue,实现BlockingQueue接口。看过java.util包源码的同学应该都认识AbstractQueue,改类在Queue接口中扮演着非常重要的作用,该类提供了对queue操作的骨干实现(具体内容移驾其源码)。BlockingQueue继承java.util.Queue为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作,作为使用者,则不需要关心队列在什么时候阻塞线程,什么时候唤醒线程,所有一切均由BlockingQueue来完成。
  ArrayBlockingQueue内部 使用可重入锁ReentrantLock + Condition来完成多线程环境的并发操作

  下面来看ArrayBlockingQueue类的最主要的一个构造函数,如下:

1
2
3
4
5
6
7
8
9
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

  Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。
  notEmpty表示锁的非空条件。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒之前通过notEmpty.await()进入等待状态的线程。
  同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。


入列

  ArrayBlockingQueue提供了诸多方法,可以将元素加入队列尾部。
  add(E e) :将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException
  offer(E e) :将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false
  offer(E e, long timeout, TimeUnit unit) :将指定的元素插入此队列的尾部,如果在指定的时间内还无法插入队列,则返回false,表示插入失败。否则让插入队列等待一定的时间。如果插入成功,则返回true。
  put(E e) :将指定的元素插入此队列的尾部,如果队列已经满了,则阻塞等待;直到检测到不满时调用insert()方法进行插入
  方法较多,我们就分析一个方法:add(E e):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
// 如果此队列已满,则抛出 IllegalStateException
public boolean add(E e) {
return super.add(e);
}
// 如果此队列已满,则返回 false
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// 该方法就是在putIndex(对尾)为知处添加元素。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 通知阻塞在出列的线程(如果队列为空,则进行出列操作是会阻塞)!!!!
}
}
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
}


出列

  ArrayBlockingQueue提供的出队方法如下:
  poll() :获取并移除此队列的头,如果此队列为空,则返回 null
  poll(long timeout, TimeUnit unit) :在指定的时间内队列仍然为空则阻塞,超过指定时间返回null;队列不空直接调用dequeue()方法返回元素值
  remove(Object o) :从此队列中移除指定元素的单个实例(如果存在)
  take() :获取并移除此队列的头部,如果此时队列为空,则阻塞等待;否则调用dequeue()方法返回元素值
  take()与poll()存在一个区别就是count == 0 时的处理,poll()直接返回null,而take()则是在notEmpty上面等待直到被入列的线程唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 该方法主要是从列头(takeIndex 位置)取出元素,同时如果迭代器itrs不为null,则需要维护下该迭代器。
// 最后调用notFull.signal()唤醒入列线程。
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}


查找元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 调用peek()方法查找,如果元素存在,则返回,否则抛出异常。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列为空,则返回null,否则调用itemAt()方法获取元素
return (count == 0) ? null : itemAt(takeIndex);
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return this.<E>cast(items[i]);
}

这个类还继承了AbstractQueue中的一个element()方法,如下:

1
2
3
4
5
6
7
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}


有一个构造函数为何需要加锁?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion //----(1)
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

  第五行代码获取互斥锁,官方解释为 锁的目的不是为了互斥,而是为了保证可见性
  保证可见性?保证哪个可见性?我们知道 ArrayBlockingQueue 操作的其实就是一个 items 数组,这个数组是不具备线程安全的,所以保证可见性就是保证 items 的可见性。如果不加锁为什么就没法保证 items 的可见性呢?这其实是 指令重排序的问题
  什么是指令重排序?编译器或运行时环境为了优化程序性能而采取的对指令进行重新排序执行的一种手段。也就是说程序运行的顺序与我们所想的顺序是不一致的。虽然它遵循 as-if-serial 语义,但是还是无法保证多线程环境下的数据安全。更多请参考Java内存模型之重排序/)
  为什么说指令重排序会影响 items 的可见性呢?创建一个对象要分为三个步骤:
  1.分配内存空间  
  2.初始化对象
  3.将内存空间的地址赋值给对应的引用

  但是由于指令重排序的问题,步骤 2 和步骤 3 是可能发生重排序的,如下:
  1.分配内存空间
  2.将内存空间的地址赋值给对应的引用
  3.初始化对象
  这个过程就会对上面产生影响。假如我们两个线程:线程 A,负责 ArrayBlockingQueue 的实例化工作,线程 B,负责入队、出队操作。线程 A 优先执行。当它执行第 2 行代码,也就是 this(capacity, fair);,如下:

1
2
3
4
5
6
7
8
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

  这个时候 items 是已经完成了初始化工作的,也就是说我们可以对其进行操作了。如果在线程 A 实例化对象过程中,步骤 2 和步骤 3 重排序了,那么对于线程 B 而言,ArrayBlockingQueue 是已经完成初始化工作了也就是可以使用了。其实线程 A 可能还正在执行构造函数中的某一个行代码。两个线程在不加锁的情况对一个不具备线程安全的数组同时操作,很有可能会引发线程安全问题。

  还有一种解释:缓存一致性。为了解决CPU处理速度以及读写主存速度不一致的问题,引入了 CPU 高速缓存。虽然解决了速度问题,但是也带来了缓存一致性的问题。在不加锁的前提下,线程 A 在构造函数中 items 进行操作,线程 B 通过入队、出队的方式对 items 进行操作,这个过程对 items 的操作结果有可能只存在各自线程的缓存中,并没有写入主存,这样肯定会造成数据不一致的情况。


本文参考:
  【死磕Java并发】—–J.U.C之阻塞队列:ArrayBlockingQueue
  http://blog.csdn.net/mazhimazh/article/details/19239033

-----------------本文结束,感谢您的阅读-----------------