AQS学习

AQS的全称为(AbstractQueuedSynchronizer),这个类也是在java.util.concurrent.locks下面, 我们把ReentrantLock里面的公平锁和非公平锁作为入口来学习。

final void lock() {
    acquire(1);
}
public final void acquire(int arg) {
    //1.尝试获取锁
    //2.没有获取到,挂起当前线程添加到阻塞队列中去
    if (!tryAcquire(arg) &&
        acquireQueued(
            addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    }    
}
//锁的状态,0代表没有被占用,大于0表示被占用(锁可以重入可能大于1)
private volatile int state;
protected final boolean tryAcquire(int acquires) {
    //当前线程
    final Thread current = Thread.currentThread();
    //获取锁的状态
    int c = getState();
    if (c == 0) {
        //锁没有被占用
        if (!hasQueuedPredecessors()//当前线程前没有其他排队的线程
            //并且同时没有其他线程来抢这个锁
            &&compareAndSetState(0, acquires)){

            //设置独占
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;

}
/**
* 这个方法作用是判断当前节点前面还有没有其它排队的节点
* true表示有,false表示没有
*/
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    //我们看下false的情况
    //1.队列是空的
    //2.当前节点在队列头
    return h != t
           && ((s = h.next) == null
               || s.thread != Thread.currentThread());
    }

公平锁:

  • state = 0 表示无其他线程占用锁
    • 判断队列前面有没有其他在等待的节点
      • 如果有, 则继续排队, return false
      • 如果没有其他等待的并且 CAS 成功(同一刻其他线程可能也在尝试获取锁)
        • 获取锁成功了,把当前线程设置为独占线程, return true
  • state > 0 (锁存在重入的情况, 表示锁已经被其它线程占用)
    • 表示锁重入了, state 加 1, return true
public final void acquire(int arg) {
    //1.尝试获取锁
    //2.没有获取到,挂起当前线程添加到阻塞队列中去
    if (!tryAcquire(arg) &&
        acquireQueued(
            addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    }    
}

经过上面尝试获取锁的操作

  • 成功
  • 失败(挂起当前线程,放到阻塞队列中去)

关键我们要学习下怎么包装当前线程放到队列尾部的

 /**
  * Creates and enqueues node for current thread and given mode.
  *
  * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  * @return the new node
  */
private Node addWaiter(Node mode) {
    //把当前线程包装成Node节点,并设为独占模式
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        //当前节点设为尾节点的前驱节点(CLH是双向链表)
        node.prev = pred;
        //CAS设置当前节点为尾节点
        if (compareAndSetTail(pred, node)) {
            //设置成功
            pred.next = node;
            return node;
        }
    }
    //tail节点为null或者CAS失败(其它线程竞争入队列尾部)
    enq(node);
    return node;
}

接下来就是如何处理上面的node节点,tail节点为null或者CAS失败(其它线程竞争入队列尾部)


/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    //死循环,其实是为了自旋
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //tail节点确实为null,这时候需要初始化, 设置头结点
            if (compareAndSetHead(new Node()))
                //如果头结点设置成功了,那么tail这个时候等于head,没有return继续自旋
                tail = head;
        } else {
            //和上面类似,把当前线程包装后的节点放到队列尾部,把node节点设为tail节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

上面的步骤结束之后,就意味着已经成功的把当前的线程包装成Node节点,放到队列的尾部去了。

TODO



/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
文章目录