Fork me on GitHub

J.U.C之AQS源码分析

AQS简介

  Java的内置锁一直都是备受争议的,在JDK 1.6之前,synchronized这个重量级锁其性能一直都是较为低下,虽然在1.6后,进行大量的锁优化策略(JVM 锁优化),但是与Lock相比synchronized还是存在一些缺陷的:虽然synchronized提供了便捷性的隐式获取锁释放锁机制(基于JVM机制),但是它却缺少了获取锁与释放锁的可操作性,可中断、超时获取锁,且它为独占式在高并发场景下性能大打折扣。
  谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈AbstractQueuedSynchronizer(AQS)!
  类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…。
  AQS,AbstractQueuedSynchronizer类,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock/CountDownLatch/Semaphore等),是JUC并发包中的核心基础组件。
  AQS实现同步器涉及的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。
  在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。
  AQS的主要使用方式是 继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
  AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

框架

1
2
3
4
5
6
7
8
9
10
11
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
static final class Node {
......
}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
}

  
  它维护了一个state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。state的volatile是核心关键词,并发基础知识之volatile
  state的访问方式有三种, AQS可以确保对state的操作是安全的。:
  1.getState()
  2.setState()
  3.compareAndSetState()
  当state>0时表示已经获取了锁,当state = 0时表示释放了锁

  AQS定义两种资源共享方式:
  1.Exclusive(独占,只有一个线程能执行,如ReentrantLock)
  2.Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

  不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
  isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
  以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
  以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
  一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。


CLH同步队列

  AQS内部维护着一个FIFO队列,该队列就是CLH同步队列。
  CLH锁即Craig, Landin, and Hagersten (CLH) locks。CLH锁是一个自旋锁。能确保无饥饿性(饥饿基础)。提供先来先服务的公平性。
  CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。
  
  CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
  在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
static final class Node {
/** 表示节点正处在共享模式下等待的标记 **/
static final Node SHARED = new Node();
/** 表示节点正在以独占模式等待的标记 */
static final Node EXCLUSIVE = null;
/**表示节点正在以独占模式等待的标记*/
static final int CANCELLED = 1;
/**表示节点正在以独占模式等待的标记*/
static final int SIGNAL = -1;
/** waitStatus值,表示线程正在等待条件 */
static final int CONDITION = -2;
/**waitStatus值指示下一个acquireShared应无条件传播*/
static final int PROPAGATE = -3;
/**
* 状态字段,仅接受值:
*
* SIGNAL:值为-1 ,后继节点的线程处于等待状态,
* 而当前节点的线程如果释放了同步状态或者被取消,
* 将会通知后继节点,使后继节点的线程得以运行。
*
* CANCELLED:值为1,由于在同步队列中等待的
* 线程等待超时或者被中断,需要从同步队列中取消等待,
* 节点进入该状态将不会变化
*
* CONDITION: 值为-2,节点在等待队列中,
* 节点线程等待在Condition上,当其他线程
* 对Condition调用了singal方法后,该节点
* 将会从等待队列中转移到同步队列中,加入到
* 对同步状态的获取中
*
* PROPAGATE: 值为-3,表示下一次共享模式同步
* 状态获取将会无条件地传播下去
*
* INITIAL: 初始状态值为0
*/
volatile int waitStatus;
/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 获取同步状态的线程 */
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

  CLH同步队列结构图如下:
  


独占式同步状态获取 acquire

  AQS的设计模式采用的模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态,对于子类而言它并没有太多的活要做.
  AQS定义两种资源共享方式:
  1.Exclusive(独占,只有一个线程能执行,同一时刻仅有一个线程持有同步状态)
  2.Share(共享,多个线程可同时执行)。

  acquire(int arg)方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且 整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。。代码如下:

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

  函数流程如下:
  1.tryAcquire:调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回,该方法必须要保证线程安全的获取同步状态。
  2.addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法 将该线程加入等待队列的尾部,并标记为独占模式。
  3.acquireQueued:使线程在等待队列中休息(自旋),有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4.selfInterrupt:如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
  由于此函数是重中之重,用流程图总结一下(这也就是ReentrantLock.lock()的流程):
  

tryAcquire(int)

  tryAcquire(int)此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

  为什么直接throw异常?
  AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现, AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。
  这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,还是站在开发者的角度,尽量减少不必要的工作量。

入列 addWaiter(Node)

  CLH队列入列是再简单不过了,无非就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。代码我们可以看看 addWaiter(Node node)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 将当前线程加入到等待队列的队尾,并返回当前线程所在的结点
private Node addWaiter(Node mode) {
//新建Node。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//快速尝试添加尾节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//多次尝试
enq(node);
return node;
}
// addWaiter(Node node)先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法将node加入队尾
private Node enq(final Node node) {
// CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
//tail不存在,设置为首节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else { // 正常流程,放入队尾
//设置为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

  在上面代码中,两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。在enq(Node node)方法中,AQS通过“死循环”( CAS自旋volatile变量,是一种很经典的用法)的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。

出列
  CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在 这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。过程图如下:

acquireQueued(Node, int)

  通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。这个函数非常关键
  acquireQueued方法为一个自旋的过程,也就是说当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final boolean acquireQueued(final Node node, int arg) {
// 标记 是否成功拿到资源
boolean failed = true;
try {
// 标记 等待过程中是否被中断过
boolean interrupted = false;
/*
* 自旋过程,其实就是一个死循环而已
*/
for (;;) {
//当前线程的前驱节点
final Node p = node.predecessor();
// 当前线程的前驱节点是头结点,且同步状态成功
// 如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; //返回等待过程中是否被中断过
}
//如果自己可以休息了,就进入waiting状态,直到被unpark()
//获取失败,线程等待--具体在下面“阻塞和唤醒线程”那介绍
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
// throw new InterruptedException(); 如果这里为这行代码,则会响应中断,详情是acquireInterruptibly(int arg)方法
// 在中断方法处不再是使用interrupted标志,而是直接抛出InterruptedException异常。
}
} finally {
if (failed)
cancelAcquire(node);
}
}

  从上面代码中可以看到,当前线程会一直尝试获取同步状态,当然 前提是只有其前驱节点为头结点才能够尝试获取同步状态,理由:
  1.保持FIFO同步队列原则。
  2.头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。
  acquire(int arg)方法流程图如下:
  

  AQS提供了acquire(int arg)方法以供独占式获取同步状态,但是该方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态。
  为了响应中断,AQS提供了acquireInterruptibly(int arg)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException。
  AQS除了提供上面两个方法外,还提供了一个增强版的方法:tryAcquireNanos(int arg,long nanos)。该方法为acquireInterruptibly方法的进一步增强,它 除了响应中断外,还有超时控制。即如果当前线程没有在指定时间内获取同步状态,则会返回false,否则返回true。如下:

阻塞线程 shouldParkAfterFailedAcquire(Node, Node)

  在线程获取同步状态时如果获取失败,则加入CLH同步队列,通过通过自旋的方式不断获取同步状态,但是在自旋的过程中则需要判断当前线程是否需要阻塞,其主要方法在acquireQueued():

1
2
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;

  通过代码可以看到,在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态。
  
  检查状态的方法为shouldParkAfterFailedAcquire(Node pred, Node node)方法。此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态,如果线程状态转换不熟,可以参考Thread详解),万一队列前边的线程都放弃了只是瞎站着,那也说不定.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 方法主要 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到前驱的状态
int ws = pred.waitStatus;
//状态为signal,表示当前线程处于等待状态,直接返回true
if (ws == Node.SIGNAL) // static final int SIGNAL = -1;
return true;
// static final int CANCELLED = 1;
// 前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消
if (ws > 0) {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

  整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
  这段代码主要检查当前线程是否需要被阻塞,具体规则如下:
  1.如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞
  2.如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
  3.如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false

  如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程(如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。)。
  parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 调用park()使线程进入waiting状态
return Thread.interrupted();
}

  park()会让当前线程进入waiting状态。
  在此状态下,有两种途径可以唤醒该线程:
  1)被unpark();
  2)被interrupt()。
  需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。


独占式同步状态释放 release(int)

  也就是唤醒线程:当线程释放同步状态后,则需要唤醒该线程的后继节点:
  此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。
  当线程获取同步状态后,执行完相应逻辑后就需要释放同步状态。AQS提供了release(int arg)方法释放同步状态:

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}

  逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点
  该方法同样是先 调用自定义同步器自定义的tryRelease(int arg)方法来释放同步状态,释放成功后,会调用unparkSuccessor(Node node)方法唤醒后继节点

tryRelease(int)

  此方法尝试去释放指定量的资源。下面是tryRelease()的源码:

1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

  跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了! 所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

unparkSuccessor(Node)

  此方法用于唤醒等待队列中下一个线程。
  调用unparkSuccessor(Node node)唤醒后继节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void unparkSuccessor(Node node) {
//当前节点状态
int ws = node.waitStatus;
// 置零当前线程所在的结点状态,允许失败
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//当前节点的后继节点
Node s = node.next;
//后继节点为null或者其状态 > 0 (超时或者被中断了)
if (s == null || s.waitStatus > 0) {
s = null;
//从tail节点来找可用节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}

  可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,但是为何是从tail尾节点开始,而不是从node.next开始呢?原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。
  这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了

这里稍微总结下:
  在AQS中维护着一个FIFO的同步队列,当线程获取同步状态失败后,则会加入到这个CLH同步队列的对尾并一直保持着自旋。
  在CLH同步队列中的线程在自旋时会判断其前驱节点是否为首节点,如果为首节点则不断尝试获取同步状态,获取成功则退出CLH同步队列。
  当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。


LockSupport

  从上面我可以看到,当需要阻塞或者唤醒一个线程的时候,AQS都是使用LockSupport这个工具类来完成的。
  LockSupport是用来创建锁和其他同步类的基本线程阻塞原语
  每个使用LockSupport的线程都会与一个许可关联,如果该许可可用,并且可在进程中使用,则调用park()将会立即返回,否则可能阻塞。如果许可尚不可用,则可以调用 unpark 使其可用。但是注意许可不可重入,也就是说只能调用一次park()方法,否则会一直阻塞。
  LockSupport定义了一系列以park开头的方法来阻塞当前线程,unpark(Thread thread)方法来唤醒一个被阻塞的线程。如下:
  

  park(Object blocker)方法的blocker参数,主要是用来标识当前线程在等待的对象,该对象主要用于问题排查和系统监控。
  park方法和unpark(Thread thread)都是成对出现的,同时unpark必须要在park执行之后执行,当然并不是说没有不调用unpark线程就会一直阻塞,park有一个方法,它带了时间戳(parkNanos(long nanos):为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用)。
  park()方法的源码如下:

1
2
3
public static void park() {
UNSAFE.park(false, 0L);
}

  unpark(Thread thread)方法源码如下:

1
2
3
4
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

  从上面可以看出,其内部的实现都是通过UNSAFE(sun.misc.Unsafe UNSAFE)来实现的,其定义如下:

1
2
public native void park(boolean var1, long var2);
public native void unpark(Object var1);

  两个都是native本地方法。Unsafe 是一个比较危险的类,主要是用于执行低级别、不安全的方法集合。尽管这个类和所有的方法都是公开的(public),但是这个类的使用仍然受限,你无法在自己的java程序中直接使用该类,因为只有授信的代码才能获得该类的实例。


共享式同步状态获取 acquireShared(int)

  共享式与独占式的最主要区别在于
  .独占式在同一时刻只能有一个线程获取同步状态,
  .而共享式在同一时刻可以有多个线程获取同步状态。
  例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。

  此方法是共享模式下线程获取共享资源的顶层入口。
  它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。
  AQS提供acquireShared(int arg)方法共享式获取同步状态:
  其实跟acquire()的流程大同小异,只不过多了个 自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)

1
2
3
4
5
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//获取失败,自旋获取同步状态
doAcquireShared(arg);
}

  这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
  所以这里acquireShared()的流程就是:
  1.tryAcquireShared()尝试获取资源,成功则直接返回;
  2.失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

doAcquireShared(int)

  此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireShared(int arg) {
// 共享式节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//前驱节点
final Node p = node.predecessor();
//如果其前驱节点,获取同步状态
if (p == head) {
//尝试获取同步
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

  tryAcquireShared(int arg)方法尝试获取同步状态,返回值为int,当其 >= 0 时,表示能够获取到同步状态,这个时候就可以从自旋过程中退出。
  有木有觉得跟acquireQueued()很相似?对,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样,不知道Doug Lea是怎么想的。
  跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。

  acquireShared(int arg)方法不响应中断,与独占式相似,AQS也提供了响应中断、超时的方法,分别是:acquireSharedInterruptibly(int arg)、tryAcquireSharedNanos(int arg,long nanos),这里就不做解释了。

setHeadAndPropagate(Node, int)

  此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!
  doReleaseShared()我们留着下一小节的releaseShared()里来讲。

1
2
3
4
5
6
7
8
9
10
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//head指向自己
//如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

共享式同步状态释放 releaseShared()

  释放掉资源后,唤醒后继
  获取同步状态后,需要调用release(int arg)方法释放同步状态,方法如下:

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //尝试释放资源
doReleaseShared(); // 唤醒后继结点
return true;
}
return false;
}

  因为可能会存在多个线程同时进行释放同步状态资源,所以需要确保同步状态安全地成功释放,一般都是通过CAS和循环来完成的。


参考文章
  死磕Java
  Java并发之AQS详解

-----------------本文结束,感谢您的阅读-----------------