200字范文 > 多米诺骨牌效应Semphore


时间:2018-11-25 13:07:09





public Semaphore(int permits) {sync = new NonfairSync(permits);}


举例,共享锁的资源 permits = 3,后续都会依靠这个往下面举例说明

public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}


public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}

继续来看 nonfairTryAcquireShared 方法

final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}

首先获取临界资源state的值,剩余资源 = 可用资源-请求资源,如果剩余资源大于等于0,才会进行CAS操作,更新主内存的值。


private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}


private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}

private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) {// Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}


/*** Acquires in shared interruptible mode.* @param arg the acquire argument*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}


此时队列中的排序是 B -> C -> D (线程B是head节点)

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}


protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}


private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;// loop on failed CAS}if (h == head) // loop if head changedbreak;}}


private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}


private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:* Propagation was indicated by caller,*or was recorded (as h.waitStatus either before*or after setHead) by a previous operation*(note: this uses sign-check of waitStatus because*PROPAGATE status may transition to SIGNAL.)* and* The next node is waiting in shared mode,*or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}

节点B将自己设置为head节点,由于B节点的waitStatus = -1 <0,所以满足条件,虽然propagate = 0,但条件是 “或” 关系,所以满足条件一个即可,

doReleaseShared方法我们刚刚分析过,归纳起来就是判断head节点的waitStatus是否为-1,如果满足,就唤醒后继节点,如果waitStatus = 0,则CAS更新为propagate状态(-3,新的状态,后面细谈它的作用),如果head没有发生改变则跳出循环。这样C会被B唤醒,同理,如果按照这样的发展,D也会被C唤醒。





private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:* Propagation was indicated by caller,*or was recorded (as h.waitStatus either before*or after setHead) by a previous operation*(note: this uses sign-check of waitStatus because*PROPAGATE status may transition to SIGNAL.)* and* The next node is waiting in shared mode,*or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}








```javaprivate void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;// loop on failed CAS}if (h == head) // loop if head changedbreak;}}

当代码执行到了unparkSuccessor唤醒了节点A,节点A成功将自己设置了新头时,发生了线程切换,继续切换到旧head的逻辑上,执行到if(h == head)这行代码上,因为head变化了,变成了节点A,所以会再次循环一次,这次会执行else if 的逻辑,因为旧head的waitStatus为0,所以进行一次CAS操作成功更新为PROPAGATE(-3),现在又切换到线程A继续执行,当前的节点状态如下图所示。

if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}

再看一下这个逻辑,旧head的判断逻辑是优先于新head的,现在旧head的waitStatus = -3 ,所以会成功调用doReleaseShared方法来唤醒线程B。


