200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 多米诺骨牌效应Semphore

多米诺骨牌效应Semphore

时间:2018-11-25 13:07:09

相关推荐

多米诺骨牌效应Semphore

生活中其实都充满了共享锁的例子,例如坐校车,它的容量比如说只能装50人,排队等候,当公交车到来时,排队一个接一个的上车,当进去的人数差不多时,司机师傅可能会清点一下座位,并告诉下面排队的人还有多少剩余座位,如果换成Semphore的世界来讲的话,那就是司机师傅告诉排队的大队长,还有剩余的座位,大队长上车,第二位荣幸升为大队长,老队长告诉新队长是否还有余量,又或者老队长帮新队长通知后面的人,向前一步看看是否还有多余的座位。

联想一下生活的例子,会增添不同的色彩,下面我们就一起来看看Semphore是如何实现共享锁的吧!

public Semaphore(int permits) {sync = new NonfairSync(permits);}

默认是采用非公平的实现方式。

举例,共享锁的资源 permits = 3,后续都会依靠这个往下面举例说明

public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}

如果请求的资源数量小于0,则抛出异常,主要来看acquireSharedInterruptibly方法是如何获取锁的。

public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}

继续来看 nonfairTryAcquireShared 方法

final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}

首先获取临界资源state的值,剩余资源 = 可用资源-请求资源,如果剩余资源大于等于0,才会进行CAS操作,更新主内存的值。

A线程请求资源数量3,成功拿到了3个资源(一共3个),之后B,C,D线程都请求1个资源,但是因为剩余量小于0,获取锁失败,调用addWaiter函数进入队列。

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

下面两个函数是入队操作

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}

private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) {// Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

这两个方法和ReentrantLock一模一样,是AQS提供,不太了解的朋友先看下ReentrantLock作为基础篇,这里不多介绍。

/*** Acquires in shared interruptible mode.* @param arg the acquire argument*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

因为B线程的前驱是head节点,所以可以尝试一次获取锁,但是并没有可用资源,所以调用shouldParkAfterFailedAcquire将前驱节点的waitStatus置为-1,线程C和线程D的前驱节点是B,并不是head节点,所以直接调用shouldParkAfterFailedAcquire将节点B的waitStatus置为-1,之后线程B,C,D都调用parkAndCheckInterrupt进行阻塞。

此时队列中的排序是 B -> C -> D (线程B是head节点)

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

此时线程A释放1个资源,先执行tryReleaseShared方法

protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}

将临界资源state加1,现在剩余资源为1,成功CAS操作更新主内存的值,并返回true,接下来调用doReleaseShared方法。

private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;// loop on failed CAS}if (h == head) // loop if head changedbreak;}}

首先队列是存在的,并且队列元素是大于1的,所以满足第一个if的条件,接下来获取head的waitStatus是SIGNAL(-1),CAS操作将head节点的waitStatus更新成0(中间状态),之后调用unparkSuccessor方法来唤醒head节点的后继(节点B),接着我们来跟踪线程B的走向。

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

线程B被唤醒之后,继续尝试获取自己的前驱节点,发现它的前驱就是head节点,尝试获取资源,调用tryAcquireShared方法,线程B尝试获取的资源数量是1,而刚刚线程A释放的资源也是1,线程B抢占资源成功,剩余资源数量0,tryAcquireShared返回0,满足条件,调用setHeadAndPropagate方法。

private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:* Propagation was indicated by caller,*or was recorded (as h.waitStatus either before*or after setHead) by a previous operation*(note: this uses sign-check of waitStatus because*PROPAGATE status may transition to SIGNAL.)* and* The next node is waiting in shared mode,*or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}

节点B将自己设置为head节点,由于B节点的waitStatus = -1 <0,所以满足条件,虽然propagate = 0,但条件是 “或” 关系,所以满足条件一个即可,

doReleaseShared方法我们刚刚分析过,归纳起来就是判断head节点的waitStatus是否为-1,如果满足,就唤醒后继节点,如果waitStatus = 0,则CAS更新为propagate状态(-3,新的状态,后面细谈它的作用),如果head没有发生改变则跳出循环。这样C会被B唤醒,同理,如果按照这样的发展,D也会被C唤醒。

下面我们总结一下疑问点:

1、setHeadAndPropagate方法中的if为什么会有那么多的判断条件?为什么还要判断旧head的waitStatus?

2、doReleaseShared方法中的PROPAGATE的作用在哪里?为什么要判断head是否改变才跳出循环?

下面我们把这两个方法放在一起分析

private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:* Propagation was indicated by caller,*or was recorded (as h.waitStatus either before*or after setHead) by a previous operation*(note: this uses sign-check of waitStatus because*PROPAGATE status may transition to SIGNAL.)* and* The next node is waiting in shared mode,*or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}

setHeadAndPropagate方法中的逻辑包含设置新head,调用doReleaseShared,他们不是原子操作,会不会发生线程安全性问题?在执行了if的判断条件后会不会发生线程切换?会不会有一种场景,线程A是尾结点,此时的waitStatus是0,当执行了if中的5个条件后都不满足时,发生了线程切换,节点B成功入队,并且把节点A的waitStatus成功更新成了-1,节点B安心的去阻塞,等待节点A的唤醒,但是A节点已经判断过了if中的条件,均不满足,不可以调用doReleaseShared来唤醒后继节点B,只能等节点A释放资源去调用doReleaseShared才可以唤醒节点B,不能充分的利用资源,如果当时资源剩余,唤醒B节点就可以获取资源,执行线程B的业务逻辑,这样就白白等待一个线程A的业务逻辑的时间。

节点A刚刚入队,并把前驱waitStatus更新-1,线程A进入阻塞

head节点唤醒节点A,head自身的waitStatus更新为0

节点B加入队列,并把前驱waitStatus更新-1,成功阻塞

但是节点B的加入队列有些慢,在if判断条件之后,所以并没有唤醒B。

既然新head的waitStatus仍然是0,不能唤醒后继节点B,那么看看旧head的waitStatus的值仍然为0,因为它唤醒了A。

目光再回到doReleaseShared身上

```javaprivate void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;// loop on failed CAS}if (h == head) // loop if head changedbreak;}}

当代码执行到了unparkSuccessor唤醒了节点A,节点A成功将自己设置了新头时,发生了线程切换,继续切换到旧head的逻辑上,执行到if(h == head)这行代码上,因为head变化了,变成了节点A,所以会再次循环一次,这次会执行else if 的逻辑,因为旧head的waitStatus为0,所以进行一次CAS操作成功更新为PROPAGATE(-3),现在又切换到线程A继续执行,当前的节点状态如下图所示。

if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}

再看一下这个逻辑,旧head的判断逻辑是优先于新head的,现在旧head的waitStatus = -3 ,所以会成功调用doReleaseShared方法来唤醒线程B。

总结

PROPAGATE的真正意义就在于它可以把waitStatus<0,让它符合唤醒后继的条件而已,它不属于SIGNAL,因为它的后继已经被唤醒了,是一种全新的状态–传播,整个唤醒流程就像多米诺骨牌效应一样,不断的去唤醒后继节点。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。