Fork me on GitHub

J.U.C之阻塞队列:SynchronousQueue源码分析

  作为BlockingQueue中的一员,SynchronousQueue与其他BlockingQueue有着不同特性:
  1.SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
  2.因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
  3.SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
  4.若使用 TransferQueue, 则队列中永远会存在一个 dummy node(这点后面详细阐述)。
  SynchronousQueue非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务。

属性

  与其他BlockingQueue一样,SynchronousQueue同样继承AbstractQueue和实现BlockingQueue接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
// SynchronousQueue的内部类
abstract static class Transferer<E> {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.!!!
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract E transfer(E e, boolean timed, long nanos);
}
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
// 通过 fair 值来决定公平性和非公平性
// 公平性使用TransferQueue,非公平性采用TransferStack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
// 非公平性
static final class TransferStack<E> extends Transferer<E> {
... ...
}
// 公平性
static final class TransferQueue<E> extends Transferer<E> {
... ...
}
}

  SynchronousQueue采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,他们两种都是通过链表实现的,其节点分别为QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演着非常重要的作用,SynchronousQueue的put、take操作都是委托这两个类来实现的。
  由于SynchronousQueue的put、take操作都是调用Transfer的transfer()方法,只不过是传递的参数不同而已,put传递的是e参数,所以模式为数据(公平isData = true,非公平mode= DATA),而take操作传递的是null,所以模式为请求(公平isData = false,非公平mode = REQUEST),如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// put操作 增加操作
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
// take操作
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}

TransferQueue

  TransferQueue是实现公平性策略的核心类,其节点为QNode,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final class TransferQueue<E> extends Transferer<E> {
/** 头节点 */
transient volatile QNode head;
/** 尾节点 */
transient volatile QNode tail;
// 指向一个取消的结点
//当一个节点中最后一个插入时,它被取消了但是可能还没有离开队列
transient volatile QNode cleanMe;
// 需要注意的是,其队列永远都存在一个dummy node,在构造时创建
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy(假的) node.
head = h;
tail = h;
}
/**
* 省略很多代码O(∩_∩)O
*/
}

  在TransferQueue中除了头、尾节点外还存在一个cleanMe节点。该节点主要用于标记,当删除的节点是尾节点时则需要使用该节点。

  在TransferQueue中定义了QNode类来表示队列中的节点,QNode节点定义如下:
  下面代码没啥好看的,需要注意的一点就是isData,该属性在进行数据交换起到关键性作用,两个线程进行数据交换的时候,必须要两者的模式保持一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
// 等待线程,用于park/unpark
volatile Thread waiter; // to control park/unpark
// 模式,表示当前是数据还是请求,只有当匹配的模式相匹配时才会交换
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
/**
* CAS next域,在TransferQueue中用于向next推进
*/
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* 取消本结点,将item域设置为自身
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
/**
* 是否被取消
* 与tryCancel相照应只需要判断item释放等于自身即可
*/
boolean isCancelled() {
return item == this;
}
/**
* Returns true if this node is known to be off the queue
* because its next pointer has been forgotten due to
* an advanceHead operation.
*/
boolean isOffList() {
return next == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
}

TransferStack

  TransferStack用于实现非公平性,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static final class TransferStack<E> extends Transferer<E> {
// 表示消费数据的消费者
static final int REQUEST = 0;
// 表示生产数据的生产者
static final int DATA = 1;
// 表示匹配另一个生产者或消费者
static final int FULFILLING = 2;
volatile SNode head;
/**
* 省略一堆代码 O(∩_∩)O~
*/
}

  TransferStack中定义了三个状态:
  REQUEST表示消费数据的消费者,
  DATA表示生产数据的生产者,
  FULFILLING,表示匹配另一个生产者或消费者。
  任何线程对TransferStack的操作都属于上述3种状态中的一种(对应着SNode节点的mode)。
  同时还包含一个head域,表示头结点。内部节点SNode定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static final class SNode {
// next 域
volatile SNode next;
// 相匹配的节点
volatile SNode match;
// 等待的线程
volatile Thread waiter;
// item 域
Object item; // data; or null for REQUESTs
// 模型
int mode;
/**
* item域和mode域不需要使用volatile修饰,因为它们在volatile/atomic操作之前写,之后读
*/
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* 将s结点与本结点进行匹配,匹配成功,则unpark等待线程
*/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}


参考文章:
  【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

-----------------本文结束,感谢您的阅读-----------------