AQS源码解析(2)-共享锁的获取与释放

作者:小牛呼噜噜 | https://xiaoniuhululu.github.io

计算机内功、源码解析、科技故事、项目实战、面试八股等更多硬核文章,首发于公众号「小牛呼噜噜

大家好,我是呼噜噜,好久没聊关于Java并发,本文接着上一篇文章图解ReentrantLock的基石AQS源码-独占锁的获取与释放,将继续聊聊AQS共享锁获取与释放的一些细节

共享锁与独占锁的区别

首先我们先了解知道共享锁与独占锁的区别,他们主要的区别是:

排他锁,顾名思义,锁是线程独占的,锁在同一时刻只能有一个线程使用,同一时刻不能被多个线程一同占用,一个线程占用后其它线程只能等待。在AQS中常量EXCLUSIVE表示独占模式(独占锁)

共享锁,锁是线程共享的,即锁在同一时刻可以被多个线程共享使用,一个线程对资源加了共享锁后其它线程对资源也只能加共享锁。共享锁有着很好的读性能。在AQS中常量SHARED表示共享模式(共享锁)

共享锁的获取

在AQS中,我们通过acquireShared来获取共享锁,先来看看acquireShared的源码:

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

acquireShared如果对应到独占锁的方法,其实就是acquire

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquireShared与Semaphore子类的实现

我们可以发现AQS中没有实现tryAcquireShared,而是抛出了一个异常,那么就是由其子类实现

1
2
3
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

我们这里以Semaphore这个类来分析tryAcquireShared的具体实现。Semaphore也叫信号量,可以用来控制同时访问共享资源的线程数量,通过协调各个线程,以保证合理的使用资源。

在Semaphore内部维护了一个计数器,其值为可以访问的共享资源的个数。一个线程如果要访问共享资源,需要先获得信号量:

  1. 如果信号量的计数器值大于 1,意味着有共享资源可以访问,则使其计数器值减去1,再访问共享资源。
  2. 如果计数器值为0, 表示信号量(许可证)已分配完了,则线程将进入等待状态。直到某个线程使用完共享资源后,释放信号量,并将信号量内部的计数器加1,之前进入等待状态的线程,将被唤醒并再次试图获得信号量。

Semaphore简单的使用:

1
2
3
4
5
Semaphore semaphore = new Semaphore(10,true);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();

我们继续回到Semaphore这个类实现的tryAcquireShared源码处:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);//permits是代表许可证的数量
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {//自旋(死循环)

int available = getState();// 获取state状态变量的值(即:许可证的数量)
int remaining = available - acquires;//计算 剩余许可证的数量
//如果remaining<0直接返回
//或者如果使用CAS设置许可证数量成功,也返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}


/**
* 非公平锁tryAcquireShared实现
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);//调用父类Sync的构造方法
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

/**
* 公平锁tryAcquireShared实现
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {//自旋(死循环)

// 判断AQS队列中头部结点的后继节点是否为空,或者是判断头部结点的后继节点是否不是当前线程
// 如果阻塞队列不为空直接返回-1
if (hasQueuedPredecessors())
return -1;
//得到当前state的值
int available = getState();
int remaining = available - acquires;
// 得到资源后state的值小于0直接返回state的值,否则CAS更新state的值再返回state的值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

可以很容易发现,Semaphore中tryAcquireShared公平和非公平锁的逻辑,主要区别就是,公平锁里面每次循环都会先调用hasQueuedPredecessors(),用来判断AQS队列中头部结点的后继节点是否为空,或者是判断头部结点的后继节点是否不是当前线程,这样就保证了先进来的线程会先执行,也就是实现了公平锁的逻辑

1
2
3
4
5
6
7
8
9
10
11
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;

Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

所以tryAcquireShared()顾名思义就是当前线程尝试获取共享锁,它的返回值有3种情况:

  1. 如果返回值小于0,表示当前线程获取共享锁失败
  2. 如果返回值大于0,表示当前线程获取共享锁成功,后继线程尝试获取共享锁,可能会成功
  3. 如果返回值大于0,也表示当前线程获取共享锁成功,但后继线程尝试获取共享锁,不会成功,这点需要格外注意

doAcquireShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
//线程封装成Node,并根据给定的模式(独占或者共享)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 中断标志
boolean interrupted = false;
for (;;) {
// 获取node的前继节点
final Node p = node.predecessor();
if (p == head) {// 若node的前继节点为head节点,则执行tryAcquireShared方法尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) { // 若返回值>=0,表明获取锁成功
// 将当前节点设置为head节点,并唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC

if (interrupted)//如果中断标志位true
selfInterrupt();//调用selfInterrupt,给当前线程补上一个中断标志,让当前线程自己知道自己被中断过,同时也唤醒当前线程。
failed = false;
return;
}
}
//通过Node的状态来判断,线程竞争锁失败以后是否应该被挂起
if (shouldParkAfterFailedAcquire(p, node) &&
//将线程挂起,重新被唤醒后,去检查阻塞期间是否被中断过
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

doAcquireShared与之对应的是独占锁的acquireQueued,逻辑类似,需要注意的是,线程获取锁失败后入队列并不会立刻阻塞,而是判断是否应该阻塞shouldParkAfterFailedAcquire,如果前继是head,会再给一次机会获取锁,一切都是为了尽快唤醒其他等待线程

与之对应的是独占锁的acquireQueued的主要区别,我们接着一一到来:

addWaiter(Node.SHARED)

不同于独占锁调用addWaiter(Node.EXCLUSIVE)SHARED表示共享模式,EXCLUSIVE表示独占模式,我们先来看一下addWaiter(Node.SHARED)方法相关的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private Node addWaiter(Node mode) {
//线程封装成Node,并根据给定的模式(独占或者共享)!!!
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//尝试添加尾节点,如果是第一个结点加入肯定为空,跳过
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//没有一次成功的话,就会去多次尝试
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {//自旋,也就是死循环
Node t = tail;
if (t == null) { // Must initialize
//CAS 设置队列头,新建一个空的Node节点作为头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//CAS 设置队列尾,存储当前线程的节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

需要注意的是,head结点本身不存在任何数据,是一个虚节点,它只是作为一个牵头结点,如果队列不为null,tail则永远指向尾部结点

采用虚节点当头结点,主要是因为每个节点都需要设置前置节点的 ws 状态(这个状态是为了保证数据一致性),如果只有一个线程竞争锁时,只有一个结点,其是没有前置节点的,所以需要创建一个虚拟节点,这样就能兼容临界情况当只有一个线程竞争锁时,无需初始化生成同步队列,直接获取同步锁即可

setHeadAndPropagate

另一个不同的是,独占锁直接调用了setHead(node),而当共享锁获取锁之后,调用的是setHeadAndPropagate(node, r),共享锁的传播性由setHeadAndPropagate完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 将当前节点设置为head
setHead(node);
/*
* propagate是tryAcquireShared的返回值,这是决定是否传播唤醒的依据之一,表示剩余可用许可数
* h 表示旧的head节点
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {//若propagate>0或者原head节点为null或原head节点的状态值<0
// 获取node的后继节点
Node s = node.next;
if (s == null || s.isShared())// 若后继节点为null或者为共享节点,则执行doReleaseShared方法继续传递唤醒操作
doReleaseShared();
}
}

propagate这里是tryAcquireShared的返回值,如果propagate > 0,说明tryAcquireShared还有剩余共享锁可以获取,会去唤醒下一个线程;如果propagate=0,说明tryAcquireShared没有剩余共享锁可以获取

setHeadAndPropagate其内部不仅仅是setHead(node)还会在一定条件唤醒head后继,这是为啥呢?

这是由于在共享锁模式下,顾名思义,锁可以被多个线程所共同持有,如果当前线程已获取到锁了,那么后继节点(线程),也可以拿到该锁,所以当符合一定条件,可以唤醒head后继节点,在这里调用doReleaseShared能更灵敏的唤醒阻塞线程

doReleaseShared方法,我们先暂时不分析,等下午讲共享锁释放的时候再一起讲

selfInterrupt

还有一个区别就是,如果中断标志位truedoAcquireShared会在方法内部调用selfInterrupt,直接将中断响应掉,而我们知道acquireQueued只是返回中断标志,会在其外层方法调用selfInterrupt响应中断

共享锁的释放

releaseShared

AQS中,我们通过releaseShared来释放共享锁,先来看看releaseShared的源码:

1
2
3
4
5
6
7
8
public final boolean releaseShared(int arg) {
//尝试释放资源
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared与Semaphore子类的实现

我们可以发现AQS中也没有实现tryReleaseShared,而是抛出了一个异常,那么就是由其子类实现

1
2
3
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

我们来看看Semaphore子类的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryReleaseShared(int releases) {
for (;;) {//自旋
// 获取当前state的值
int current = getState();
//释放资源state值要增加
int next = current + releases;

if (next < current) // next < current说明传入参数非法
throw new Error("Maximum permit count exceeded");

if (compareAndSetState(current, next))//尝试CAS更新state的值
return true;
}
}

首先通过tryReleaseShared() 去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则会通过 doReleaseShared() 去释放共享锁

doReleaseShared

doReleaseShared是AQS共享锁核心释放锁的方法,我们先来看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doReleaseShared() {
*/
for (;;) {//自旋
Node h = head;
//若head不为null且不是tail节点
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
//若head节点状态为SIGNAL(-1),则通过CAS将head节点的状态设置为0
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒head结点的后继节点
unparkSuccessor(h);
}
//若head节点状态为0,自旋CAS设置节点状态PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果head节点在自旋期间未change的话,则跳出自旋
if (h == head)
break;
}
}

doReleaseShared源码中,有许多Node结点的状态,我们先回顾一下图解ReentrantLock的基石AQS源码-独占锁的获取与释放 中AQS同步队列模型:

其中waitStatus是表示当前被封装成Node结点的状态,默认为0,表示初始化状态,还有4种状态:CANCELLED、SIGNAL、CONDITION、PROPAGATE,分别是:

  1. CANCELLED: 1, 表示该节点的线程被取消,当同步队列中的线程超时或中断,会将此节点取消。该节点永远不会再发生变化,需要注意的是当前节点的线程为取消状态时,再也不会被阻塞
  2. SIGNAL:-1, 当其prev结点释放了同步锁 或者 被取消后,立即通知处于SIGNAL状态的next节点的线程执行
  3. CONDITION:-2,表示节点处于条件队列等待,调用了await方法后处于等待状态的线程节点会被标记为此种状态,当调用了Condition的singal()方法后,CONDITION状态会变为SIGNAL状态,并且会在适当的时机从等待队列转移到同步队列中。
  4. PROPAGATE:-3,这种状态与共享模式有关,在共享模式下,表示节点处于可运行状态

我们来梳理一下doReleaseShared的主要流程:

  1. 如果若head节点不为null且不是tail节点(这个其实是保证了同步队列初始话完成,防止未初始化,直接去调用doReleaseShared),首先去获取节点的waitStatus状态,如果head节点的状态等于SIGNAL(-1),则通过CAS将head节点的状态设置为0

  2. 如果状态设置成功则调用 unparkSuccessor唤醒head结点的后继节点,此时当头节点发生变化时(即被唤醒后继节点已经成为了新的头节点),会继续回到循环中,继续唤醒head节点的后继节点,直到符合跳出循环的条件(见第4点)

  3. 还有种特殊情况,当AQS同步队列的最后一个节点成为了头节点,由于后面没有新的节点了,也就不会出现新节点将自己的前驱节点的修改成SIGNAL,最终此时head节点状态为0,会进入else if,通过CAS将head 的状态从0再次修改为 PROPAGATE(-3);如果CAS操作失败的话,就说明有新的节点入队, 就ws的值被修改为SIGNAL,继续循环,等待被唤醒

  4. 若head节点的状态不是SIGNAL或者0,会去判断head节点在自旋期间是否发生改变的,也就是h == head,若如果未发生改变,则跳出自旋(死循环); 这种情况h == head一般发生于某个被唤醒的线程因为获取不到锁(资源被用尽)执行shouldParkAfterFailedAcquire方法被阻塞挂起,head节点没有发生改变

那这里为什么要设置PROPAGATE(-3)状态?

PROPAGETE的作用是,保证唤醒的传播,避免线程无法会唤醒的窘境

因为当多个线程并发执行releaseShared时,有可能出现在AQS同步队列等待的节点,比如前一个线程完成了释放唤醒,同时后一个线程获取锁,但还未执行setHeadAndPropagate进行共享锁传播,也就是未设置好head,也就是说此时读取老的head状态为0(也就是这种else if (ws == 0情况),会导致释放但不唤醒,此时会将头结点的状态置为 PROPAGATE状态,让获取锁的线程任然能够进行共享锁传播,唤醒下一个线程。

doReleaseShared在AQS中,有2处地方被调用,一处就是在这里,当线程释放共享锁的时候调用;另一处就是我们上面讲的setHeadAndPropagate方法中,当线程成功获取到共享锁后,在一定条件下调用该方法

如果设置了PROPAGETE,传播行为抽象出来,不仅仅简单地依赖unparkSuccessor,再配合setHeadAndPropagate处的逻辑

1
2
3
4
5
6
7
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {//若propagate>0或者原head节点为null或原head节点的状态值<0
// 获取node的后继节点
Node s = node.next;
if (s == null || s.isShared())// 若后继节点为null或者为共享节点,则执行doReleaseShared方法继续传递唤醒操作
doReleaseShared();
}

若后继节点为null或者为共享节点,持锁线程会去调用doReleaseShared来唤醒该线程,这样也在某种意义上加快了唤醒后继节点的速度

小结

共享锁加锁的逻辑和独占锁类似,最主要的区别就是共享锁可以被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。

共享锁的释放,doReleaseShared()中只要head发生改变,会不断地循环唤醒head的后继节点,尝试唤醒尽可能多且可以唤醒的节点,而新的线程一旦获取到锁,会加入到唤醒head后继节点的循环中,尽可能提高唤醒等待线程的速度

我们可以发现AQS共享锁head节点的各个状态,转化的过程,还是非常绕的,需要耐心地阅读源码,一点点地抽丝剥茧。


参考资料:

https://www.cnblogs.com/micrari/p/6937995.html


全文完,感谢您的阅读,如果我的文章对你有所帮助的话,还请点个免费的,你的支持会激励我输出更高质量的文章,感谢!

计算机内功、源码解析、科技故事、项目实战、面试八股等更多硬核文章,首发于公众号「小牛呼噜噜」,我们下期再见!