【< AQS>】 灰太狼 2023-03-06 05:05 14阅读 0赞 ### 【< AQS>】 ### * * * * * \[1\] 什么是AQS * \[2\] LCK队列源码及其实现 * \[3\] 独占式同步状态获取 * \[4\] 共享式同步状态获取与释放 * \[5\] 独占式超时获取同步状态 ##### \[1\] 什么是AQS ##### [AQS][] 全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。 **AQS分为独占式同步状态获取与释放,共享式同步状态获取与释放,独占式超时获取同步状态。接下来我先简单介绍下第一种:独占式同步状态获取与释放。AQS从数据结构上说,通过一个int成员变量state来控制同步状态,内部的同步队列(FIFO的双向链表队列)完成 对同步状态(state)的管理。** > **state**:当state = 0时,说明没有任何线程占有共享资源的锁; state = 1时,则说明有线程目前正在使用共享变量 **具体实现为同步器的acquire方法,该方法对中断不敏感:** 1. **首先调用自定义同步器实现的tryAcquire(int arg)方法,尝试获取同步状态**。 > **tryAcquire(int arg)方法**,以独占式获取同步状态,实现该方法需要查询当前状态state,然后再使用CAS设置当前状态。**无参方法tryAcquire()的作用是尝试的获得1个许可,如果getstate!=0,则说明,此时已经被占用无法获取,返回false,如果getstate==0,则说明,此时共享变量是空闲的,使用CAS(判断当前状态是否为0)设置锁状态为1,并返回ture。** 2. **如果获取成功,则直接退出,如果同步状态获取失败,则构造同步节点()并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,并阻塞当前线程**。 > **同步节点**:独占式node,同一时刻只有一个线程可以获得同步状态; > > **addWaiter(Node node)**:首先使用CAS尝试快速在队尾插入,如果入队失败了,则调用enq(),[link][] > > **compareAndSetTail(pred, node)**:确保节点可以被线程安全的添加。如果多个线程同时使用添加节点,最终可能导致节点数量和顺序变化。compareAndSetTail会比较pred和tail是否指向同一个节点,如果是,才将tail更新为node。虽然当前线程在声明pred时,为pred赋值了tail,但tail可能会被其他线程改变,而当前线程的本地变量pred是不会感知到这个改变的。 > > **enq(final Node node)方法**,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。 3. **然后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则节点中的线程则会被阻塞,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。** > 在acquireQueued(final Node node,int arg)方法中,当前\*\*线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态。\*\*首先是维护了先进先出原则,再就是后面节点需要“死循环”来判断自己是否是头结点。 ![img][] **AQS核心思想:** * **如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。内部通过一个int成员变量state来控制同步状态**。 * 当state = 0时,说明没有任何线程占有共享资源的锁; * state = 1时,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待,当然state也 可以继续执行+1操作,比如可重入锁。 * **AQS同步器的实现依赖于内部的同步队列(FIFO的双向链表队列)完成 对同步状态(state)的管理**。 * 当前线程获取锁(同步状态)失败时,AQS会将该线程以及相关等待信息包装成 一个节点(Node)并将其加入同步队列,同时会阻塞当前线程 * 当同步状态释放时,会将头结点head中的线程唤醒,让其尝试获取同步状态。简单来说,就是同步状态。 **AQS实现:** * getState - 获取 state 状态 * setState - 设置 state 状态 * compareAndSetState - cas 机制设置 state 状态 * 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源 * 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList * 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet **AQS功能:** * 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire * 获取锁超时机制 * 通过打断取消机制 * 独占机制及共享机制 * 条件不满足时的等待机制 **子类主要实现这样一些方法**(默认抛出 UnsupportedOperationException) * tryAcquire * tryRelease * tryAcquireShared * tryReleaseShared * isHeldExclusively ##### \[2\] LCK队列源码及其实现 ##### 我们来看看AbstractQueuedSynchronizer类中的acquire方法实现 public final void acquire(int arg) { //尝试获取锁 if (!tryAcquire(arg) && //获取不到,则进入等待队列,返回是否中断 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果返回中断,则调用当前线程的interrupt()方法 selfInterrupt(); } **1. 入队** :如果线程调用tryAcquire(其最终实现是调用上面分析过的nonfairTryAcquire方法)获取锁失败。调用addWaiter(Node.EXCLUSIVE)方法,将自己加入CLH队列的尾部。 * **tryAcquire(int arg)方法**,以独占式获取同步状态,实现该方法需要查询当前状态state,然后再使用CAS设置当前状态。**无参方法tryAcquire()的作用是尝试的获得1个许可,如果getstate!=0,则说明,此时已经被占用无法获取,返回false,如果getstate==0,则说明,此时共享变量是空闲的,使用CAS(判断当前状态是否为0)设置锁状态为1,并返回ture。** * **同步节点**:独占式node,同一时刻只有一个线程可以获得同步状态; * **addWaiter(Node node)**:首先使用CAS尝试快速在队尾插入,如果入队失败了,则调用enq(),[link][] * **compareAndSetTail(pred, node)**:确保节点可以被线程安全的添加。如果多个线程同时使用添加节点,最终可能导致节点数量和顺序变化。compareAndSetTail会比较pred和tail是否指向同一个节点,如果是,才将tail更新为node。虽然当前线程在声明pred时,为pred赋值了tail,但tail可能会被其他线程改变,而当前线程的本地变量pred是不会感知到这个改变的。 * **enq(final Node node)方法**,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } private Node addWaiter(Node mode) { //线程对应的Node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //尾节点不为空 if (pred != null) { //当前node的前驱指向尾节点 node.prev = pred; //将当前node设置为新的尾节点 //如果cas操作失败,说明线程竞争 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //lockfree的方式插入队尾 enq(node); return node; } private Node enq(final Node node) { //经典的lockfree算法:循环+CAS for (;;) { Node t = tail; //尾节点为空 if (t == null) { // Must initialize //初始化头节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 入队过程,入下图所示 1 T0持有锁时,其CLH队列的头尾指针为NULL ![img][img 1] 2 线程T1,此时请求锁,由于锁被T0占有。因此加入队列尾部。具体过程如下所示: (1) 初始化头节点 (2) 初始化T1节点,入队,尾指针指向T1 ![img][img 2] 3 此时如果有一个T10线程先于T1入队,则T1执行compareAndSetTail(t, node)会失败,然后回到for循环开始处,重新入队。 ![img][img 3] image.png **2. 由自旋到阻塞** 入队后,调用acquireQueued方法,时而自旋,时而阻塞,直到获取锁(或被取消)。 * 在acquireQueued(final Node node,int arg)方法中,当前\*\*线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态。\*\*首先是维护了先进先出原则,再就是后面节点需要“死循环”来判断自己是否是头结点。 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //其前驱是头结点,并且再次调用tryAcquire成功获取锁 if (p == head && tryAcquire(arg)) { //将自己设为头结点 setHead(node); p.next = null; // help GC failed = false; //成功获取锁,返回 return interrupted; } //没有得到锁时: //shouldParkAfterFailedAcquire方法:返回是否需要阻塞当前线程 //parkAndCheckInterrupt方法:阻塞当前线程,当线程再次唤醒时,返回是否被中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //修改中断标志位 interrupted = true; } } finally { if (failed) //获取锁失败,则将此线程对应的node的waitStatus改为CANCEL cancelAcquire(node); } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * 获取锁失败时,检查并更新node的waitStatus。 * 如果线程需要阻塞,返回true。 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前驱节点的waitStatus是SIGNAL。 if (ws == Node.SIGNAL) /* * SIGNAL状态的节点,释放锁后,会唤醒其后继节点。 * 因此,此线程可以安全的阻塞(前驱节点释放锁时,会唤醒此线程)。 */ return true; //前驱节点对应的线程被取消 if (ws > 0) { do { //跳过此前驱节点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* 此时,需要将前驱节点的状态设置为SIGNAL。 * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //当shouldParkAfterFailedAcquire方法返回true,则调用parkAndCheckInterrupt方法阻塞当前线程 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 自旋过程,入下图所示 ![img][img 4] image.png ![img][img 5] image.png ![img][img 6] image.png ![img][img 7] image.png ![img][img 8] image.png 然后线程T2,加入了请求锁的队列,尾指针后移。 ![img][img 9] image.png ![img][img 10] image.png ![img][img 11] image.png 终上所述,每个得不到锁的线程,都会讲自己封装成Node,加入队尾,或自旋或阻塞,直到获取锁(为简化问题,不考虑取消的情况) **3. 锁的释放** 前文提到,T1,T2在阻塞之前,都回去修改其前驱节点的waitStatus=-1。这是为什么? 我们看下锁释放的代码,便一目了然 public final boolean release(int arg) { //修改锁计数器,如果计数器为0,说明锁被释放 if (tryRelease(arg)) { Node h = head; //head节点的waitStatus不等于0,说明head节点的后继节点对应的线程,正在阻塞,等待被唤醒 if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //后继节点 Node s = node.next; //如果s被取消,跳过被取消节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒后继节点 LockSupport.unpark(s.thread); } 如代码所示,waitStatus=-1的作用,主要是告诉释放锁的线程:后面还有排队等待获取锁的线程,请唤醒他! 释放锁的过程,如图所示: ![img][img 12] ![img][img 13] **总结** 实现锁的关键在于: 1. 通过CAS操作与volatile变量互相配合,线程安全的修改锁标志位 2. 基于CLH队列,实现锁的排队策略 ##### \[3\] 独占式同步状态获取 ##### 通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出,该方法代码如下: \[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Hu1634Tt-1596598683765)(X:\\Users\\xu\\AppData\\Roaming\\Typora\\typora-user-images\\image-20200716092220731.png)\] 上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:**首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式node,同一时刻只有一个线程可以获得同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。** 我们看下同步器的**addWaiter**和enq的实现代码如下: ![image-20200716092454978][] 上述代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。 **在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。** **节点进入同步队列之后,就进入了一个自旋的过程(acquireQueued方法),每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)**,如代码所示: ![img][img 14] 在acquireQueued(final Node node,int arg)方法中,当前\*\*线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态。\*\*首先是维护了先进先出原则,再就是后面节点需要“死循环”来判断自己是否是头结点。 ![img][img 15] 独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如图所示: ![img][] 下面看下如何释放释放同步状态的: ![img][img 16] -------------------- 该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport(这里不做介绍)来唤醒处于等待状态的线程。 **适当做个总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。** ##### \[4\] 共享式同步状态获取与释放 ##### **共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。** 通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态,该方法代码如下: ![img][img 17] 在acquireShared(int arg)方法中,**同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出.** 与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以释放同步状态,该方法代码: ![img][img 18] 该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程 ##### \[5\] 独占式超时获取同步状态 ##### 通过调用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。该方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。 超时获取同步状态过程可以被视作响应中断获取同步状态过程的“增强版”,doAcquireNanos(int arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性。针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知,nanosTimeout计算公式为:nanosTimeout-=now-lastTime,其中now为当前唤醒时间,lastTime为上次唤醒时间,如果nanosTimeout大于0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒,反之,表示已经超时.具体代码这里就不贴出来了。大致的流程图如下: ![img][img 19] 独占式超时获取同步状态doAcquireNanos(int arg,long nanosTimeout)和独占式获取同步状态acquire(int args)在流程上非常相似,其主要区别在于未获取到同步状态时的处理逻辑。**acquire(int args)在未获取到同步状态时,将会使当前线程一直处于等待状态,而doAcquireNanos(int arg,long nanosTimeout)会使当前线程等待nanosTimeout纳秒,如果当前线程在nanosTimeout纳秒内没有获取到同步状态,将会从等待逻辑中自动返回。** [AQS]: https://www.jianshu.com/p/0f876ead2846 [link]: https://www.jianshu.com/p/c806dd7f60bc [img]: /images/20230208/ac2fe48f570044c99fd94d543e0bd1be.png [img 1]: /images/20230208/7c44170c635a411bb937ae1416148747.png [img 2]: /images/20230208/ec1e42ba57824c099fd28cb26fb4b816.png [img 3]: /images/20230208/dde0bf6dbfa54f6eb8078bc17cda6ad8.png [img 4]: /images/20230208/319a314125764cb29a358e5f070b8734.png [img 5]: /images/20230208/734a4e395e474677bd7cb8b7b16ed520.png [img 6]: /images/20230208/0a8b2f17f7f046daaa4cc7cc1849f000.png [img 7]: /images/20230208/6a0b77ff64db4241a476c476715e1dc1.png [img 8]: /images/20230208/0b2a22f8c49346a997e7e56c781b6849.png [img 9]: /images/20230208/7be86ad18617443f853699501ce0f4f9.png [img 10]: /images/20230208/493dc175811a4f6daab6b3e7fa6c13fb.png [img 11]: /images/20230208/8d43291c91544c6aa68003992f828b0e.png [img 12]: /images/20230208/7025f44deb9d4d0aa53c06eb0d3c60f8.png [img 13]: /images/20230208/2d5e155c680243199abb35df46138640.png [image-20200716092454978]: [img 14]: /images/20230208/2388d0ea4adc48e4ad6ac6e953ba5019.png [img 15]: /images/20230208/e05e2cd85ce74c8ab71b270a68fecebf.png [img 16]: /images/20230208/5ca811ef148447c6a5fd758ae719185c.png [img 17]: /images/20230208/5a7ccfcd3fea46839d674afc7027ca7d.png [img 18]: /images/20230208/a43f270af3ec48f1a4018dd2d02c33a2.png [img 19]: /images/20230208/53bde120ad034f719ff9fea80af33a17.png
相关 AQS 照马士兵写的一个AQS类 package com.cocurrent.aqs; import java.util.concurren 小咪咪/ 2023年06月06日 12:23/ 0 赞/ 64 阅读
相关 AQS ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ub 骑猪看日落/ 2022年12月07日 01:53/ 0 赞/ 220 阅读
相关 AQS 1 AQS抽象的队列同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountD ﹏ヽ暗。殇╰゛Y/ 2022年11月09日 03:55/ 0 赞/ 260 阅读
相关 AQS简介 AQS简介 > AQS,AbstractQueuedSynchronizer,即队列同步器 > AQS即是AbstractQueuedSynchronizer,一个用 末蓝、/ 2022年05月25日 00:16/ 0 赞/ 245 阅读
相关 AQS AQS 继承AOS(含独占线程) volatile int state 组合使用 可重写tryAcquire tryRelease tryAcquireShared tryR àì夳堔傛蜴生んèń/ 2022年04月14日 03:37/ 0 赞/ 294 阅读
相关 AQS详解 AQS的介绍 AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。 ![在这里插 电玩女神/ 2022年02月28日 11:20/ 0 赞/ 372 阅读
相关 AQS笔记 AQS笔记 概述 AQS 是 `AbstractQueueSynchronized` 抽象同步队列的简称,它是实现同步器的基础组件,并发包中锁的底层就是使 我不是女神ヾ/ 2022年02月21日 15:36/ 0 赞/ 285 阅读
相关 AQS架构 AQS,全称是AbstractQueuedSynchronizer,中文译为抽象队列式同步器 AQS架构: ![watermark_type_ZmFuZ3poZW5 水深无声/ 2021年12月10日 01:07/ 0 赞/ 379 阅读
还没有评论,来说两句吧...