Tuesday, March 11, 2014

Notes on Java AbstractQueuedSynchronizer

http://javaopensourcecode.blogspot.com/2012/10/abstractqueuedsynchronizer-aqs.html
http://gee.cs.oswego.edu/dl/papers/aqs.pdf
The basic algorithm for acquire is try acquire, if successful return else enqueue thread if it is not already queued and block the current thread. Similarly basic algorithm for release is try release, if successful, unblock the first thread in the queue else simply return.
The wait status of the header node will be set to SIGNAL so that as the owner thread releases the lock, it can signal the head node's successor to acquire the lock
http://ifeve.com/aqs-1/
http://ifeve.com/aqs-2/
http://ifeve.com/aqs-3/
但AbstractQueuedSynchronizer类也包含另一组方法(如acquireShared),它们的不同点在于tryAcquireShared和tryReleaseShared方法能够告知框架(通过它们的返回值)尚能接受更多的请求,最终框架会通过级联的signal(cascading signals)唤醒多个线程

ReentrantReadWriteLock类使用AQS同步状态中的16位来保存写锁持有的次数,剩下的16位用来保存读锁的持有次数。WriteLock的构建方式同ReentrantLock。ReadLock则通过使用acquireShared方法来支持同时允许多个读线程。

Semaphore类(计数信号量)使用AQS同步状态来保存信号量的当前计数。它里面定义的acquireShared方法会减少计数,或当计数为非正值时阻塞线程;tryRelease方法会增加计数,可能在计数为正值时还要解除线程的阻塞。

CountDownLatch类使用AQS同步状态来表示计数。当该计数为0时,所有的acquire操作(译者注:acquire操作是从aqs的角度说的,对应到CountDownLatch中就是await方法)才能通过。

FutureTask类使用AQS同步状态来表示某个异步计算任务的运行状态(初始化、运行中、被取消和完成)。设置(译者注:FutureTask的set方法)或取消(译者注:FutureTask的cancel方法)一个FutureTask时会调用AQS的release操作,等待计算结果的线程的阻塞解除是通过AQS的acquire操作实现的。

SynchronousQueues类(一种CSP(Communicating Sequential Processes)形式的传递)使用了内部的等待节点,这些节点可以用于协调生产者和消费者。同时,它使用AQS同步状态来控制当某个消费者消费当前一项时,允许一个生产者继续生产,反之亦然。



LockSupport最主要的作用,便是通过一个许可(permit)状态,解决了这个问题。
http://whitesock.iteye.com/blog/1336409
正如每个Object都有一个锁, 每个Object也有一个等待集合(wait set),它有wait、notify、notifyAll和Thread.interrupt方法来操作。同时拥有锁和等待集合的实体,通常被成为监视器(monitor)。每个Object的等待集合是由JVM维护的。等待集合一直存放着那些因为调用对象的wait方法而被阻塞的线程。由于等待集合和锁之间的交互机制,只有获得目标对象的同步锁时,才可以调用它的wait、notify和notifyAll方法。这种要求通常无法靠编译来检查,如果条件不能满足,那么在运行的时候调用以上方法就会导致其抛出IllegalMonitorStateException。
LockSupport通过许可(permit)机制保证:如果当前线程拥有许可,那么park系列方法会消费掉该许可,并且立即返回(不会被阻塞)。

WaitQueue是AbstractQueuedSynchronizer的核心,它用于保存被阻塞的线程。它的实现是"CLH" (Craig, Landin, and Hagersten) lock queue的一个变种。
AbstractQueuedSynchronizer的静态内部类Node维护了一个FIFO的等待队列。跟CLH不同的是,Node中包含了指向predecessor和sucessor的引用。predecessor引用的作用是为了支持锁等待超时(timeout)和锁等待回退(cancellation)的功能。sucessor的作用是为了支持线程阻塞.
Node中还包括一个volatile int waitStatus成员变量用于控制线程的阻塞/唤醒,以及避免不必要的调用LockSupport的park/unpark方法
http://whitesock.iteye.com/blog/1337374
shouldParkAfterFailedAcquire方法会确保每个线程在被阻塞之前,其对应WaitQueue中的节点的waitStatus被设置为Node.SIGNAL(-1),以便在release时避免不必要的unpark操作。此外shouldParkAfterFailedAcquire还会清理WaitQueue中已经超时或者取消的Node。需要注意的是,在某个线程最终被阻塞之前,tryAcquire可能会被多次调用。

release方法中,总是总head节点开始向后查找sucessor。只有当该sucessor的waitStatus被设置的情况下才会调用unparkSuccessor。unparkSuccessor方法中首先清除之前设置的Node.waitStatus,然后向后查找并且唤醒第一个需要被唤醒的sucessor。需要注意的是,if (s == null || s.waitStatus > 0)这个分支中,查找是从tail节点开始,根据prev引用向前进行。在Inside AbstractQueuedSynchronizer (2)   中提到过,Node.next为null并不一定意味着没有sucessor,虽然WaitQueue是个双向链表,但是根据next引用向后查找sucessor不靠谱,而根据prev引用向前查找predecessor总是靠谱。
Fairness
tryAcquire的调用顺序先于acquireQueued,也就是说后来的线程可能在等待中的线程之前acquire成功。这种场景被称为barging FIFO strategy,它能提供更高的吞吐量。
大多数AbstractQueuedSynchronizer的子类都同时提供了公平和非公平的实现,例如ReentrantLock提供了NonfairSync和FairSync
FairSync优先确保等待中线程先acquire成功。但是公平性也不是绝对的:在一个多线程并发的环境下,就算锁的获取是公平的,也不保证后续的其它处理过程的先后顺序。

既然默认情况下使用的都是NonfairSync,那么FairSync适合什么样的场景呢?如果被锁所保护的代码段的执行时间比较长,而应用又不能接受线程饥饿(NonfairSync可能会导致虽然某个线程长时间排队,但是仍然无法获得锁的情况)的场景下可以考虑使用FairSync。对于ReentrantLock,在其构造函数中传入true,即可构造一把公平锁。

http://whitesock.iteye.com/blog/1337539
3.6 ConditionObject
AbstractQueuedSynchronizer的内部类ConditionObject实现了Condition接口。Condition接口提供了跟Java语言内置的monitor机制类似的接口:await()/signal()/signalAll(),以及一些支持超时和回退的await版本。可以将任意个数的ConcitionObject关联到对应的synchronizer,例如通过调用ReentrantLock.newCondition()方法即可构造一个ConditionObject实例。每个ConditionObject实例内部都维护一个ConditionQueue,该队列的元素跟AbstractQueuedSynchronizer的WaitQueue一样,都是Node对象。

http://ifeve.com/introduce-abstractqueuedsynchronizer/
该同步器即可以作为排他模式也可以作为共享模式,当它被定义为一个排他模式时,其他线程对其的获取就被阻止,而共享模式对于多个线程获取都可以成功。
同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。

队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。
Node nextWaiter 存储condition队列中的后继节点。
setExclusiveOwnerThread(Thread.currentThread());
// 仅需要将操作代理到Sync上即可

1. 尝试获取(调用tryAcquire更改状态,需要保证原子性);
在tryAcquire方法中使用了同步器提供的对state操作的方法,利用compareAndSet保证只有一个线程能够对状态进行成功修改,而没有成功修改的线程将进入sync队列排队。
2. 如果获取不到,将当前线程构造成节点Node并加入sync队列;
进入队列的每个线程都是一个节点Node,从而形成了一个双向队列,类似CLH队列,这样做的目的是线程间的通信会被限制在较小规模(也就是两个节点左右)。
3. 再次尝试获取,如果没有获取到那么将当前线程从线程调度器上摘下,进入等待状态
final boolean acquireQueued(final Node node, int arg) {
1. 获取当前节点的前驱节点;
2. 当前驱节点是头结点并且能够获取状态,代表该当前节点占有锁;
如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。
3. 否则进入等待状态。
public final boolean release(int arg) {
1. 尝试释放状态;
2. 唤醒当前节点的后继节点所包含的线程。
public final void acquireShared(int arg) {
4. 获取共享状态成功;
在退出队列的条件上,和独占锁之间的主要区别在于获取共享状态成功之后的行为,而如果共享状态获取成功之后会判断后继节点是否是共享模式,如果是共享模式,那么就直接对其进行唤醒操作,也就是同时激发多个线程并发的运行。
http://my.oschina.net/lifany/blog/173019
在 AQS 中,有两个重要的数据结构,一个是 volatile int state,另一个是 class Node 组成的双向链表。
int state
顾名思义,这个变量是用来表示 AQS 的状态的,例如 ReentrantLock 的锁的状态和重入次数、FutureTask 中任务的状态、CountDownLatch 中的 count 计数等等。这个值的更新都是由 AQS compareAndSetState 方法来实现的,而这个方法则是通过 Compare and Swap 算法实现
Node 双向链表

State
在 AQS 中有一个 int 类型的 volatile 变量 state,使用 AQS 的类可以自定义 state 对其的含义。例如,ReentrantLock 用 0 表示没有线程获取锁,大于 0 则表示重入锁的重入次数;Semaphore 用来表示许可数量;FutureTask 用来表示任务状态,例如运行中、已完成等。

在扩展 AQS 时,子类需要根据自己的需求在诸如 tryAcquire 方法中使用 compareAndSetState 方法设置相应的状态值。
以 CountDownLatch 为例,它的 tryAcquireShared 的实现如下:
ryAcquire 和 tryAcquireShared 的方法定义存在一个巨大的不同,就是返回值的不同。tryAcquire 返回的是 boolean 类型,其分别表示 acquire 成功或失败,而 tryAcquireShared 返回的却是 int 类型,负、零、正代表三种含义:失败、独占获取、共享获取。这就是 AQS 文档中对独占模式和共享模式描述中的那段“可能(但不是一定)”的原因。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;

No comments:

Post a Comment