图解ReentrantLock的基石AQS源码-独占锁的获取与释放

作者:小牛呼噜噜 | https://xiaoniuhululu.com

计算机内功、源码解析、科技故事、项目实战、面试八股等更多硬核文章,首发于公众号「小牛呼噜噜

大家好,我是呼噜噜,我们之前聊过Java中以互斥同步的方式保证线程安全:Sychronized,这次我们来再聊聊另一种互斥同步的方式Lock,本文会介绍ReentrantLock及其它的基石AQS的源码解析,一个非常重要的同步框架

Lock接口

Sychronized利用JVM指令级别的monitor锁,来实现线程安全(详情可见:Synchronized关键字详解

不同的是,Lock接口实现线程安全则是代码级别实现的,Lock接口是 Java并发编程中很重要的一个接口,当程序发生异常时,Sychronized可以自动释放锁,但Lock必须需要手动解锁。与 Lock 关联密切的锁有 ReetrantLockReadWriteLock。我们以ReentrantLock切入,来看看其底层涉及到的原理。

初识ReentrantLock

ReentrantLock也叫重入锁,我们首先得了解ReentrantLock的一般使用方法:

1
2
3
4
5
6
7
Lock lock = new ReentrantLock(false);
lock.lock();
try{
//临界区,执行一些具体操作。。
}finally{
lock.unlock();
}

代码层次必须要手动解锁。

公平锁和非公平锁

查看ReentrantLock的源码可以发现:

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

当参数fair为true表示公平锁,创建的是FairSync类;false为非公平锁,创建的是NonfairSync类。如果该参数不填,则默认是非公平锁,调用另一个构造方法。

  1. 那什么是公平锁和非公平锁?
  • 公平锁:每个线程获取锁的顺序是按照线程访问锁的先后顺序获取的,最前面的线程总是最先获取到锁,遵循先来先得的规则。
  • 非公平锁:每个线程获取锁的顺序是随机的,并不会遵循先来先得的规则,所有线程会竞争获取锁

我们接下来举个例子来看看:首先创建公平锁,开启6个线程执行,分别加锁和释放锁并打印线程名的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class FairReentrantLockTest {
static Lock lock = new ReentrantLock(true);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 6; i++) {
new Thread(() -> {
lock.lock();
System.out.println("临界区的当前线程名称:" + Thread.currentThread().getName());
lock.unlock();

}).start();
}
}
}

结果:

1
2
3
4
5
6
临界区的当前线程名称:Thread-0
临界区的当前线程名称:Thread-1
临界区的当前线程名称:Thread-2
临界区的当前线程名称:Thread-3
临界区的当前线程名称:Thread-4
临界区的当前线程名称:Thread-5

如果我们把公平锁换成非公平锁的话,static Lock lock = new ReentrantLock(false),再执行一遍结果为:

1
2
3
4
5
6
临界区的当前线程名称:Thread-0
临界区的当前线程名称:Thread-5
临界区的当前线程名称:Thread-1
临界区的当前线程名称:Thread-2
临界区的当前线程名称:Thread-3
临界区的当前线程名称:Thread-4

我们可以发现:当使用公平锁,线程获取锁的话,线程进入”等待队列”的队尾,得排队,依次获取锁,先到先得。如果使用的是非公平锁,那就直接尝试竞争锁,竞争得到,就获得锁,获取锁的顺序是随机的。

  1. 公平锁和非公平锁的优缺点?
  • 公平锁,其优点:所有的线程都能得到资源,不会饿死在队列中;缺点:吞吐量会下降很多,队列里面除了第一个线程,其他的线程都会阻塞,线程 每次从阻塞恢复到运行状态 都需要从用户态转换成内核态,而这个状态的转换是比较慢的,因此公平锁的执行速度会比较慢,而且CPU唤醒阻塞线程的开销会很大。
  • 非公平锁,其优点:不遵守先到先得的原则,CPU不必取唤醒所有线程,会减少唤起线程的数量,可以减少CPU唤醒线程的开销,整体的吞吐效率会高点。缺点:但这样也可能导致队列中间的线程一直获取不到锁或者长时间获取不到锁,导致”饿死”。

我们这里贴一下公平锁和非公平锁的性能测试结果图,来源于《Java并发编程实战》:

从上述结果可以看出,使用非公平锁的性能(吞吐率)普遍比公平锁高很多。

可重入锁与非可重入锁

ReentrantLock顾名思义叫重入锁,是指当同一个线程在获取外层同步方法锁的时候,再进入该线程的内层同步方法会自动获取锁,其实就是递归调用前提锁对象得是同一个对象或者class对象,并不会因为之前已经获取过还没释放而阻塞,这样就可以有效避免死锁的产生,这个叫可重入锁Synchronized与本文的ReentrantLock都属于可重入锁

下面我们用几个例子来看看:

基于Synchronized实现可重入锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RLockTestBySynchronized {

public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized (RLockTestBySynchronized.class) {
System.out.println("第1次获取锁");
int index = 1;
while (true) {
synchronized (RLockTestBySynchronized.class) {
System.out.println("第" + (++index) + "次获取该锁");
}
if (index == 6) {
break;
}
}
}
}
}).start();
}

}

结果:

1
2
3
4
5
6
第1次获取锁
第2次获取该锁
第3次获取该锁
第4次获取该锁
第5次获取该锁
第6次获取该锁

基于ReentrantLock实现可重入锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class RLockTestByReentrantLock2 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();

new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
System.out.println("第1次获取锁");

int index = 1;
while (true) {
try {
lock.lock();
System.out.println("第" + (++index) + "次获取该锁");

try {
Thread.sleep(new Random().nextInt(200));
} catch (InterruptedException e) {
e.printStackTrace();
}

if (index == 6) {
break;
}
} finally {
lock.unlock();
}

}

} finally {
lock.unlock();
}
}
}).start();
}

结果:

1
2
3
4
5
6
第1次获取锁
第2次获取该锁
第3次获取该锁
第4次获取该锁
第5次获取该锁
第6次获取该锁

需要注意的是:ReentrantLock的时候一定要手动释放锁,并且加锁次数和释放次数要一样,不然还是会导致死锁

基于wait/notify 实现不可重入锁

与其经常拿来比较的是:不可重入锁,与可重入锁相反的是:一个线程在获取到外层同步方法锁后,再进入该方法的内层同步方法无法获取到锁,即使锁是同一个对象,这样容易死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NoRLockTest {
private static boolean isLock = false;

public synchronized void lock() throws InterruptedException {
Thread thread = Thread.currentThread();
// 判断是否加锁
while (isLock){
wait();//阻塞
}
isLock = true;
System.out.println(thread.getName() + " 获得了锁");
}

public synchronized void unLock(){
isLock = false;
notify();//唤醒
}

public static void main(String[] args) throws Exception {
NoRLockTest tt = new NoRLockTest();
tt.lock();//第1次获取锁
tt.lock();//第2次获取锁
}

}

结果:

1
main 获得了锁

显然当程序第2次获取锁时,由于锁已被占有,就发生了死锁

通过上面的例子,我们可以发现ReetrantLock锁在使用上还是比较简单的,但内部的原理可一点都不简单,我们接下来着重解读一下ReetrantLock的内部实现原理

ReentrantLock源码解析

当我们去阅读ReentrantLock的源码时,发现有以下3个类:


其中: ReentrantLock类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类,当然ReentrantLock类本身继承Lock接口

Lock

lock接口以下定义了并发中常用的5个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Lock {
// 获取锁
void lock();

// 获取锁,可中断,即在拿锁过程中可以中断interrupt,不同的是synchronized是不可中断锁。
void lockInterruptibly() throws InterruptedException;

// 尝试获取锁,锁在空闲的才能获取锁(未获得锁不会等待)
boolean tryLock();

// 在给定时间内尝试获取锁,成功返回true,失败返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

// 释放锁
void unlock();

// 等待与唤醒机制
Condition newCondition();
}

ReentrantLock继承了lock接口,这几个方法也会在ReentrantLock内部进行重写

Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
abstract static class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = -5179523762034025860L;

// 获取锁
abstract void lock();

// 获取非公平锁
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取状态
int c = getState();
if (c == 0) { // 表示没有线程正在竞争该锁
//通过CAS尝试拿到锁,状态0表示锁没有被占用 !!!
if (compareAndSetState(0, acquires)) {
// 设置当前线程独占,也就是设置当前线程排他锁
setExclusiveOwnerThread(current);
return true; // 成功
}
}
//如果是已上锁状态,就进一步判断当前线程拥有该锁(即可重入锁)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 增加重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置状态
setState(nextc);
// 成功
return true;
}
// 失败
return false;
}

// 尝试释放资源
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) // 当前线程不为独占线程
throw new IllegalMonitorStateException();
// 释放标识
boolean free = false;
if (c == 0) {
free = true;
// 已经释放,清空独占
setExclusiveOwnerThread(null);
}
// 设置标识
setState(c);
return free;
}

// 判断资源是否被当前线程占有
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

// 新生一个条件
final ConditionObject newCondition() {
return new ConditionObject();
}

// 获取持有锁的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 返回状态
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

// 是否上锁状态
final boolean isLocked() {
return getState() != 0;
}

/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
// 自定义反序列化逻辑
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}  

Sync是一个抽象类,那必然有继承它的类。在ReentrantLock中有两个Sync的实现,分别为非公平锁NonfairSync与公平锁FairSync

NonfairSync 与 FairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 使用CAS加锁(如果state等于0则设置为1返回true,否则返回false),
if (compareAndSetState(0, 1))
// 加锁成功则设置独占线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 加锁失败则调用AbstractQueuedSynchronizer类的acquire方法
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
// 调用父类Sync的nonfairTryAcquire方法
return nonfairTryAcquire(acquires);
}

}

我们可以看出,在非公平锁里,如果当前线程锁占用状态为0的话,会直接进行CAS尝试获取锁,不需要加入队列,然后等待队列头线程唤醒再获取锁这一步骤,所以效率相比于公平锁会较快。ReentrantLock默认是非公平锁

我们接着看FairSync的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//未上锁状态
//先判断没有等待节点时,才会开启CAS去拿锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

我们可以看出,在公平锁里,如果当前线程锁占用状态为0的话,会先去同步队列中是否有在等待的线程,如果没有才会去进行拿锁操作,这样就遵循FIFO的原则,先到先得。所以效率相较于非公平锁较慢

在ReentrantLock内部,无论NonfairSync、FairSync、Sync类其实归根结底都继承AbstractQueuedSynchronizer,这也是非常重要的部分,我们下面一起重点来看看

AQS

AbstractQueuedSynchronizer,一般简称AQS,也叫抽象队列同步器,AbstractQueuedSynchronizer是Java并发工具包JUC基石,它是一个同步框架,为Java的各种同步器,锁等提供了并发抽象,是由大名鼎鼎的Doug Lea完成。

CLH队列

我们先来看下它的源码,一上来就是CLH队列定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
protected AbstractQueuedSynchronizer() { }

static final class Node {
...
// 记录状态用
volatile int waitStatus;
// 标识哪个线程
volatile Thread thread;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
...
}

// 同步队列的头结点
private transient volatile Node head;
// 同步队列的尾结点
private transient volatile Node tail;
// 同步状态
private volatile int state;

//判断是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
...


}

CLH队列也叫AQS同步队列模型,该队列的设计是构建AQS的关键,多个线程对共享资源的竞争,以及线程阻塞等待以及被唤醒时锁分配的机制,都是基于AQS同步队列

Doug Lea参考了CLH的设计, 保留了基本的设计,由前驱节点做阻塞与唤醒的控制,但是在队列的选择上做出了改变,AQS选择双向链表来实现”虚拟的双向队列”,节点中添加了prev和next指针,添加prev指针主要是为了实现取消功能,而next指针的加入可以方便的实现唤醒后继节点

CLH 锁其实也是对自旋锁的一种改进,当多进程竞争资源时,无法直接获取不到锁的线程,会进入该队列。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配;同时还依赖state来控制同步状态,当state=0时,则说明共享资源未被上锁,当state=1时,则说明该共享资源被上锁了,其他线程必须加入同步队列进行等待

AQS同步队列模型:

waitStatus

其中waitStatus是表示当前被封装成Node结点的状态,默认为0,表示初始化状态,还有4种状态:CANCELLED、SIGNAL、CONDITION、PROPAGATE,分别是:

  1. CANCELLED: 1, 表示该节点的线程被取消,当同步队列中的线程超时或中断,会将此节点取消。该节点永远不会再发生变化,需要注意的是当前节点的线程为取消状态时,再也不会被阻塞
  2. SIGNAL:-1, 当其prev结点释放了同步锁 或者 被取消后,立即通知处于SIGNAL状态的next节点的线程执行
  3. CONDITION:-2,表示节点处于条件队列等待,调用了await方法后处于等待状态的线程节点会被标记为此种状态,当调用了Condition的singal()方法后,CONDITION状态会变为SIGNAL状态,并且会在适当的时机从等待队列转移到同步队列中。
  4. PROPAGATE:-3,这种状态与共享模式有关,在共享模式下,表示节点处于可运行状态

独占模式和共享模式

AQS作为并发包基石,定义两种资源共享方式:独占模式和共享模式

  1. 共享模式,即共享锁:锁在同一时刻可以被多个线程共享使用,一个线程对资源加了共享锁后其它线程对资源也只能加共享锁。共享锁有着很好的读性能。ReentrantReadWriteLock的读锁就是一种共享锁的实现。在AQS中常量SHARED表示共享模式
  2. 独占模式,即排他锁:锁在同一时刻只能有一个线程使用,同一时刻不能被多个线程一同占用,一个线程占用后其它线程只能等待。ReentrantLock、Synchronized、ReentrantReadWriteLock的写锁等都是排他锁的实现。在AQS中常量EXCLUSIVE表示独占模式
  3. 无论是共享模式还是独占模式的实现类,比如ReentrantLock,其内部都是基于AQS实现的,也都维持着一个同步队列,当请求锁的线程超过现有模式的限制时,会将线程包装成Node结点并将线程当前必要的信息存储到node结点中,然后加入同步队列等会获取锁,而这系列操作都间接调用AQS完成的

volatile关键字

在阅读完AQS的源码后,我们可以发现里面充斥着大量volatile关键字:

1
2
3
4
5
6
7
8
9
10
...

// 同步队列的头结点
private transient volatile Node head;
// 同步队列的尾结点
private transient volatile Node tail;
// 同步状态
private volatile int state;

...

那什么是volatile呢?又有什么用呢?

volatile是Java中用于修饰变量的关键字,其可以保证该变量的可见性以及有序性,但是无法保证原子性。更准确地说是volatile关键字只能保证单操作的原子性,比如 x=1,但是无法保证复合操作的原子性,比如x++

其为Java提供了一种轻量级的同步机制:保证被volatile修饰的共享变量对所有线程总是可见的,也就是当一个线程修改了一个被volatile修饰共享变量的值,新值总是可以被其他线程立即得知

相比于synchronized关键字(synchronized通常称为重量级锁),volatile更轻量级,开销低,因为它不会引起线程上下文的切换和调度。

大家感兴趣的,可以去看看笔者之前的文章volatile关键字在并发中有哪些作用?

CAS

我们知道并发的三大特性:除了可见性,有序性,还有一个原子性,很不幸volatile关键字无法保证原子性,那么Doug Lea大师写AQS的时候是怎么保证原子性的呢?

当我们仔细去阅读源码时,会发现出现大量compareAndSwap相关方法,也叫CAS,顾名思义:比较并交换

CAS机制的可以保证一个共享变量的原子操作问题,比如对变量i”先读后写”这2步操作,可以封装成一个原子操作,这样就能保证并发的安全性,那其工作原理是什么呢?

在CAS机制中,包含三个核心操作数 – 内存位置(V)、预期原值(A)和新值(B)

如果内存位置V处的值 与预期原值A,相匹配,那么处理器会自动将该位置V处值更新为新值B。那如果不一样。说明A的值已经被别的线程修改过了,所以不会更新内存位置V处的值,更新失败后,线程会重新获取此时内存位置V处的值。其实这就是一种乐观锁,然后就是不断重复这一系列的操作,叫做自旋,这是高情商的说法,其实就是死循环~~

那当然自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销

需要注意的是这比较和提交操作都是原子性的,来源于底层硬件层的,现在的 CPU 中,为这两个动作专门提供了一个指令,CAH ,由 CPU 来保证这两个操作一定是原子的。在Java语言层,调用UnSafe类的CAS方法,JVM会帮我实现CAS汇编指令,UnSafe类Java无法直接访问,需要通过本地(native)方法来访问

1
2
3
4
//将同步状态值设置为给定值update(CAS,原子性)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

然后我们接着依次看看AQS里面的重要的方法,这里主要是独占模式下的相关一系列方法

AQS独占模式获取锁

acquire与tryAcquire

1
2
3
4
5
6
public final void acquire(int arg) {
//尝试获取资源
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

在AQS的acquire方法中首先调用了tryAcquire,而AQS中没有实现tryAcquire,而是抛出了一个异常,那么就是由其子类实现

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

我们本文是以ReentrantLock为例,所以我们去找ReentrantLock中tryAcquire的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//公平锁tryAcquire实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取锁同步状态
int c = getState();
if (c == 0) {// 0表示无锁状态
//判断有没有别的线程排在了当前线程的前面,这个方法我们后续再讲
if (!hasQueuedPredecessors() &&
//cas竞争锁,将state的值改为1
compareAndSetState(0, acquires)) {
// 保存当前获得锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果是同一个线程来获得锁,则直接增加冲入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;// 增加重入次数
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}


//非公平tryAcquire实现

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {// 0表示无锁状态

//cas竞争锁,将state的值改为1
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果是同一个线程来获得锁,则直接增加冲入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

理想的情况是:当前线程直接通过tryAcquire方法直接拿到了锁。但是如果没有拿到锁该怎么办呢?

我们回到acquire源码处:

1
2
3
4
5
6
7
public final void acquire(int arg) {
//尝试获取资源
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();
}

我们可以发现在tryAcquire返回false的时候,会接着又调用了addWaiter方法将其加入到了同步队列。acquireQueued的职责是线程进入队列之后的操作,尝试获取锁,不然就挂起,让线程变成阻塞状态

addWaiter

我们先来看一下addWaiter方法相关的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private Node addWaiter(Node mode) {
//线程封装成Node,并根据给定的模式(独占或者共享)!!!
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//尝试添加尾节点,如果是第一个结点加入肯定为空,跳过
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//没有一次成功的话,就会去多次尝试
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {//自旋,也就是死循环
Node t = tail;
if (t == null) { // Must initialize
//CAS 设置队列头,新建一个空的Node节点作为头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//CAS 设置队列尾,存储当前线程的节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

需要注意的是,head结点本身不存在任何数据,是一个虚节点,它只是作为一个牵头结点,如果队列不为null,tail则永远指向尾部结点

采用虚节点当头结点,主要是因为每个节点都需要设置前置节点的 ws 状态(这个状态是为了保证数据一致性),如果只有一个线程竞争锁时,只有一个结点,其是没有前置节点的,所以需要创建一个虚拟节点,这样就能兼容临界情况当只有一个线程竞争锁时,无需初始化生成同步队列,直接获取同步锁即可

在aquire方法中调用addWaiter方法时,会标记模式,SHARED表示共享模式,EXCLUSIVE表示独占模式

acquireQueued与hasQueuedPredecessors

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))我们阅读看下acquireQueued的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋(死循环)
for (;;) {
final Node p = node.predecessor();////获得该node的前置节点
// 当前线程的前驱节点是头结点,即该节点是第二个节点,且获取锁成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // // help GC,将前置节点移出队列,这样就没有指针指向它,可以被gc回收
failed = false;
//返回false表示不能被打断,即没有被挂起,也就是获得到了锁
return interrupted;
}
//如果node的前驱节点不是头结点,那么则调用shouldParkAfterFailedAcquire方法判断是否要将线程挂起。如果是则调用parkAndCheckInterrupt将线程挂起。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待过程中只要被中断过,就将interrupted标记为true
}
} finally {
if (failed)
//如果失败就取消尝试获取锁
cancelAcquire(node);
}
}

需要注意的是,不管是非公平锁还是公平锁,只要你没获取到锁,就都得去同步队列中排队,然后出队抢锁!

这里可能就有小伙伴就要问了:非公平锁怎么还要排队啊?那还是非公平锁吗?没得灵魂

公平锁与非公平锁的主要区别,主要是tryAcquire()方法,我们上面已经贴出ReentrantLocktryAcquire()公平锁和非公平锁具体实现的实现源码,可以发现主要区别就是公平锁多一个hasQueuedPredecessors这个方法

1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
//读取头节点
Node t = tail;
//读取尾节点
Node h = head;
//s是首节点h的后继节点
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

hasQueuedPredecessors源码很简短,就是判断有没有别的线程排在了当前线程的前面,如果有的话,返回true表示线程需要排队,没有则返回false则表示线程无需排队,也就是为公平锁判断线程需不需要排队

换句话说就是,ReentrantLock这里公平与非公平锁的区别具体体现在:

  1. 公平锁,会先判断同步队列是否存在结点,如果存在必须先执行完同步队列中的线程结点,也就是说没入队的线程就不能参与抢锁
  2. 非公平锁,不管同步队列是否存在线程结点,直接尝试去抢锁,** **这样后到的线程就有可能先抢到锁

这里和公平与非公平锁一般意义上的定义有所区别,在一般情况下,我们更倾向于效率较高的非公平锁。

上述源码结合AQS同步队列示意图,能够更好地理解:

上图的具体参数,上文已经阐述,这里就不再赘述

如果node的前驱节点不是头结点,那么则调用shouldParkAfterFailedAcquire方法,判断是否要将线程挂起。如果是则调用parkAndCheckInterrupt将线程挂起。我们马上就来看这2部分源码

shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire主要是判断一个线程是否阻塞,这里涉及到Node类中waitStatus的两个状态属性

  1. CANCELLED等于1,表示节点被取消,即结束状态。
  2. SIGNAL等于-1,表示当前节点需要去唤醒下一个节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取前驱节点的等待状态
    int ws = pred.waitStatus;
    //如果如果前驱节点处于等待状态(SIGNAL),则返回true
    if (ws == Node.SIGNAL)
    return true;
    //如果ws>0 则说明是结束状态
    if (ws > 0) {
    //遍历前驱结点,直到找到最近一个不是结束状态的node,然后插个队!,排在它的后边
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    //如果ws小于0又不是SIGNAL状态,
    //则将其设置为SIGNAL状态,当前节点被挂起,等待唤醒。。
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }

parkAndCheckInterrupt

1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
//将当前线程挂起.进入等待唤醒状态
LockSupport.park(this);
//获取线程中断状态,也就是检测线程是否被中断,同时会清除当前线程的 中断标志!!!
return Thread.interrupted();
}

需要注意的是:park()会让当前线程进入等待唤醒状态waiting,一般可以通过unpark或者interrupt去唤醒。

不知道大家有没有对这里感到一丝丝的奇怪:LockSupport.park(this)你不是说是将当前线程挂起嘛!!!,那怎么还能继续去执行下面的Thread.interrupted()???,博主你是不是打自己脸啊

嗐,首先这代码不是博主写的(先撇清关系),这个是大师Doug Lea写的,另外遇到问题我们不能先质疑别人,先反思自己

咳咳,我们来看下其中的奥秘,interrupt是Thread类的的API,park是Unsafe类的API,两者是有区别的

一般情况下,如果线程A调用LockSupport.park()后,会停在那,直到其他线程调用LockSupport.unpark(),线程A才能继续执行

但是我们之前讲了LockSupport.park(),还有一种方法唤醒,就是interrupt()它的作用就是给线程打一个中断标志,也就是说,当线程有中断标志时,线程A调用LockSupport.park()后,不会停,会接着执行Thread.interrupted(),检测线程是否被中断,同时会清除当前线程的中断标志,会返回true。

当返回true后,外层代码会执行 interrupted = true;再次记录其实当前线程是被中断过的,因为在parkAndCheckInterrupt中的Thread.interrupted()已经把当前线程的中断标志给清除了,所以当前线程它自己不知道自己已经被中断过了

然后acquireQueued()这个方法会返回true,来提醒当前线程被中断过。最后调用selfInterrupt,给当前线程补上一个中断标志,让当前线程自己知道自己被中断过,同时也唤醒当前线程。

如果你需要在线程发生中断时结束获取锁,那么可以考虑使用lockInterruptibly()来获取锁。

让我们再次回到acquire源码处:

1
2
3
4
5
6
7
8
9
10
11
12
public final void acquire(int arg) {
//尝试获取资源
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();
}

static void selfInterrupt() {
//给当前线程打一个中断标志,唤醒当前线程
Thread.currentThread().interrupt();
}

至此AQS通过lock拿锁的流程结束!这同样也是ReentrantLock.lock()拿锁的流程, 笔者画了张图,让我们把AQS中lock拿锁的流程给串起来

AQS独占模式释放锁

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//释放锁成功后,唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease

tryAcquire()一样,这个方法是需要独占模式的ReentrantLock去实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected final boolean tryRelease(int releases) {
//针对可重入锁的情况下, c可能大于1,将当前持有锁的线程个数减1
int c = getState() - releases;
// 确保释放锁的线程,当前必须是持有锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全释放锁
boolean free = false;
// 如果c==0,表明没有嵌套锁了,可以释放了,不然还不能释放掉
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

需要注意的是,与tryAcquire()不同的是:这里并没有使用任何CAS操作,因为当前线程已经持有了锁,才会去释放锁呀~~,所以肯定线程安全

unparkSuccessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
///当前线程所在的结点状态设置为0,允许失败
compareAndSetWaitStatus(node, ws, 0);

//唤醒后继节点的线程,若为空,从tail往前遍历找一个距离head最近的正常的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)// 从后向前找!!!
if (t.waitStatus <= 0)
//找到的正常节点后,并没有返回,而是继续往前找
s = t;
}
if (s != null)
//唤醒线程
LockSupport.unpark(s.thread);
}

需要注意的是,从后向前查找正常的节点,是为了兼容在addWaiter方法中刚入队列的节点,由于节点入队不是一个原子操作,有3步:

  1. 设置node的前驱节点为当前的尾节点:node.prev = t(不成功也会自旋,直到成功)
  2. 修改tail属性,使它指向当前节点
  3. 修改原来的尾节点,使它的next指向当前节点(这步依赖于第2步,第2步不执行,这步也不会执行)

当有大量的线程在同时入队的时候,同一时刻,只有一个线程能完整地完成这三步,而其他线程只能完成第1步,也就是说会出现t.next的值还没有被设置成node导致next链可能中间断开了的情况,而每个线程都能完成第1步,也就是node.prev = pred保证了prev链是连续且唯一的,所以如果从tail往前遍历,新加的节点都能遍历到,能够将整个队列完整地走一遍

小结

AQS不愧是Doug Lea大神的闭关修炼下的力作,其利用CAS + 自旋 + volatile变量,最终实现多个线程访问共享资源的功能,写的真的很精妙,里面细节满满

AQS的实现中,并不是后继节点“监听”前驱节点的状态,来决定自身是否持有锁,而是通过前驱节点释放锁,并主动唤醒后继节点来实现排队的

本文着重解读了AQS独占锁的获取与释放,由于篇幅有限,而AQS的细节实在太多,呼噜噜后续有空会继续更新共享锁,可中断,等待队列等很重要的特性~


参考资料:

《Java并发编程实战》

《Java并发编程的艺术》

https://www.cnblogs.com/dennyzhangdd/p/7218510.html


全文完,感谢您的阅读,如果我的文章对你有所帮助的话,还请点个免费的,你的支持会激励我输出更高质量的文章,感谢!

计算机内功、源码解析、科技故事、项目实战、面试八股等更多硬核文章,首发于公众号「小牛呼噜噜」,我们下期再见!