线程系列四AQS

1、什么是aqs

aqs是一个FIFO的双向链表队列。aqs将等待获取锁的线程封装成结点,放在队列中。

我们可以将aqs的作用理解为在多线程的环境下保证线程等待获取锁(添加进入队列)以及线程获取锁,并队列中出去都是线程安全的。

更简单的可以理解为aqs为了保证在多线程的环境下入队列出队列线程安全性提供了一个基本功能框架。


2、aqs是如何做到线程安全的

aqs主要是通过cas + 死循环以及state状态值,来做到线程安全。


3、aqs为什么会被设计为FIFO双向链表队列(以下是个人理解)
①aqs的锁实现,包含公平锁和非公平锁。为了实现公平锁,必须使用队列来保证获取锁的顺序(入队列的顺序)

②用链表的方式,主要是因为,操作更多是删除与增加。链表时间复杂度O(1)的效率会比数组O(n)的低。

③用双向队列的原因是,aqs的设计思想,或则说为了解决羊群效应(为了争夺锁,大量线程同时被唤醒)。每个结点(线程)只需要关心自己的前一个结点的状态(后续会说),线程唤醒也只唤醒队头等待线程

请参考 http://www.importnew.com/2400...


4、aqs是如何提供一个基础框架的

aqs 通过模板设计进行提供的,实现类只需实现特定的方法即可。

以下是aqs的模板方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

。。。 其他的省略了

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

tryAcquire(int arg)tryRelease(int arg) 是我们要实现的模板方法,当然还有分享锁的,这里只介绍了独占锁的。


5、从源码角度剖析aqs。aqs是如何通过双向链表队列,cas,state状态值,以及结点状态来保证入队列出队列的线程安全的!

注:以下只介绍独占式的不公平锁

①aqs 如何获取锁?

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg) 内部调用了nonfairTryAcquire(int acquires)

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {  // 锁未被获取
         // cas(自旋) 获取锁,并修改state 状态值
        if (compareAndSetState(0, acquires)) {
         // 设置当前占有的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }  //  重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

解释:利用cas自旋式的获取锁。


②aqs 获取锁失败,如何处理?

在看代码前,先解释一下:将当前线程包装成Node结点,并插入同步队列中,并用CAS形式尝试获取锁,获取失败,则挂起当前线程(以上只是说了大概)

先看第1个方法(将当前线程包装成Node结点,并插入同步队列)

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {  // 尾节点不为空
        node.prev = pred;
        // 用 CAS 将当前线程插入队尾
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 尾节点为空,说明当前队列还是空的,需要初始化
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // 死循环
    for (;;) {
        // 初始化
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else { // 这里主要是担心有多个线程同时进到enq(final Node node) 方法
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

解释:队列若为空,先初始化,不为空,用 CAS 将当前结点插入到队尾

再看第二个方法final boolean acquireQueued(final Node node, int arg);

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前置结点
            final Node p = node.predecessor();
            // 前置结点是头结点,则尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 前置结点不是头结点 或者 前置结点是头结点但是尝试获取锁失败
            // 则,应当将当前线程挂起(毕竟不能一直死循环获取吧~)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 当前线程的前置节点的状态!!!
    // 第waitStatus 初始化值为0,
    // 也因此当第1次进到这个方法时,会将前置结点的状态置为 Node.SIGNAL。
    // 第 2次进来的时候,前置节点的waitStatus的状态就为 Node.SIGNAL)。
    // 也就是说。aqs 只会让你尝试2次,都失败后,就会被挂起
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// 线程被挂起调用该方法!!
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

让我们总结一下,以及再回顾一下,为什么aqs会被设计为双向链表队列。

aqs为了保证结点(即线程)的入队列的安全。采用了CAS 以及死循环的方式(从代码中可看到,处处使用CAS)。
上面有说到,一个线程是否该被唤醒或者其他操作,只需要看前置结点的状态即可。从shouldParkAfterFailedAcquire() 方法就可以看出这个设计。当前线程该做什么操作,是看前置结点的状态的。


③aqs如何释放锁

看代码前,先解释一下,aqs是如何做的。aqs的做法就是,释放当前锁,然后唤醒头结点的后继结点,如果后继结点为空,或者是被取消的,则从尾节点向前寻找一个未被取消的结点

public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒后继结点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

①ReentratLock 是如何实现锁的释放的

注:这里看的是ReentrantLock的实现

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

解释:设置 state 的状态,如果 state == 0, 那么说明锁被释放了。否则锁还未被释放(锁重入!)


②aqs 如何唤醒其他结点

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
     // 清除状态,还记得等待的线程会把前置节点的状态置为 Node.SIGNAL(-1)吗
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
     // 正常情况下,下一个结点就是被唤醒的节点。
     // 但是如果下一个结点为null, 或者是被取消的
     // 那么从尾节点向前查找一个未被取消的节点唤醒。
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒
        LockSupport.unpark(s.thread);
}

release的释放比较简单。还是可以看到,aqs被设计成双向链表队列的好处!!!

看源代码,不能一下子就扎进去看,要先明白个大概,为什么看源代码?还不是为了学习作者是如何设计的。细节无论谁都记不清,最主要的是知道一个整体的流程,关键的代码!毕竟优秀的开源项目这么多,难道每行代码都看??

相关推荐