`

AbstractQueuedSynchronizer(4)

阅读更多
Condition是一个条件功能的class,必须放在Lock代码块内,如同wait,notify方法放在synchronized块一样。

Condition的(await,signal)与object的(wait,notify)相比,提供了更为通用和灵活的解决方案,可以让多种条件的线程之间相互通信。


Condition的定义:

public interface Condition{  
     void await() throws InterruptedException;  
     void awaitUninterruptibly();  
     long awaitNanos(long nanosTimeout) throws InterruptedException;  
     boolean await(long time, TimeUnit unit) throws InterruptedException;  
     boolean awaitUntil(Date deadline) throws InterruptedException;  
     void signal();  
     void signalAll();  
}

Lock接口的newCondition方法是实现Condition的一种方式,这样Condition就可以和Lock合作得亲密无间。


在AbstractQueuedSynchronizer中也有Condition的实现ConditionObject:

1. ConditionObject.addConditionWaiter 方法

添加一个waiter到等待队列中

        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

如果上一个waiter被取消了,则调用unlinkCancelledWaiters方法去掉队列里面的无效waiters。新建一个waiter并把它放到队列的尾部。


2. ConditionObject.unlinkCancelledWaiters方法

从condition队列中把取消的等待节点删除,目的是为了减少内存占用,提高效率。

private void unlinkCancelledWaiters() {
	Node t = firstWaiter;
	Node trail = null;
	while (t != null) {
		Node next = t.nextWaiter;
		if (t.waitStatus != Node.CONDITION) {
			t.nextWaiter = null;
			if (trail == null)
				firstWaiter = next;
			else
				trail.nextWaiter = next;
			if (next == null)
				lastWaiter = trail;
		}
		else
			trail = t;
		t = next;
	}
}

从起始节点firstWaiter出发,遍历到结束节点lastWaiter。

值得一提的是trail变量,保存的是上一个有效节点:

a. 如果trail == null,说明还没有有效的节点,所以把next节点暂且作为firstWaiter

b. 如果当前节点t无效,则把t剔除出列(t.nextWaiter = null),并把trail和next暂且关联起来(trail.nextWaiter = next)

c. 当next == null时,说明已经遍历完整个队列了,把trail作为结束节点


3. ConditionObject.doSignal方法

从开始节点开始,向后搜索直到找到一个不为null的等待节点并把它转移到同步队列上。

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }



4. ConditionObject.signal方法

把等待最久的condition节点移到锁的等待队列中。

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

拿到第一个waiter,如果不为空,则调用doSignal方法。


5. ConditionObject.awaitUninterruptibly方法

        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

首先把当前线程加入到condition的等待队列中,然后试着去释放当前等待节点。循环监测节点是否在锁的等待队列中,如果没有,就park当前线程。


6. ConditionObject.await方法

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

和awaitUninterruptibly类似,这里允许出现Interrupt。


7. ConditionObject.awaitNanos方法

        public final long awaitNanos(long nanosTimeout) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            long lastTime = System.nanoTime();
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;

                long now = System.nanoTime();
                nanosTimeout -= now - lastTime;
                lastTime = now;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return nanosTimeout - (System.nanoTime() - lastTime);
        }

实现指定时间的等待。
和awaitUninterruptibly类似。每次循环都会检查
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics