AbstractQueuedSynchronizer详解(二)——CountDownLatch源码分析_第1页
AbstractQueuedSynchronizer详解(二)——CountDownLatch源码分析_第2页
AbstractQueuedSynchronizer详解(二)——CountDownLatch源码分析_第3页
AbstractQueuedSynchronizer详解(二)——CountDownLatch源码分析_第4页
AbstractQueuedSynchronizer详解(二)——CountDownLatch源码分析_第5页
已阅读5页,还剩4页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、AbstractQueuedSynchronizer详解(二)CountDownLatch源码分析CountDownLatch的使用CountDownLatch是一个工具类,用于使一个或多个线程等待另一系列线程完成操作,也就是说一些线程在另外一些完成之后才能继续执行,类似线性。 初始CountDownLatch时需要提供一个count参数,await方法将会使线程阻塞,直到这个值变为0,countDown()方法可以使count值减1,一旦到达0后,调用await阻塞的线程将会得到释放。 CountDown一般有两个用法。一个线程等待多个线程一个线程等待多个线程完成后再执行,比如一个不限时的考

2、试,10个学生考试,交卷时间不同,当10个学生都交卷后,老师才认为考试结束。下面是该例子:public class ExamDemo public static class Student implements Runnable private int num;/学号 private CountDownLatch countDownLatch; public Student(int num, CountDownLatch countDownLatch) this.num = um; this.countDownLatch = countDownLatch; Override public vo

3、id run() Random random = new Random(); try TimeUnit.SECONDS.sleep(random.nextInt(10) + 1); catch (InterruptedException e) e.printStackTrace(); System.out.println("学生" + num + "交卷了"); /完成工作,将count-1 countDownLatch.countDown(); public static class Teacher implements Runnable privat

4、e CountDownLatch countDownLatch; public Teacher(CountDownLatch countDownLatch) this.countDownLatch = countDownLatch; Override public void run() System.out.println("考试开始,不限时间!"); try /等待count变为0 countDownLatch.await(); catch (InterruptedException e) e.printStackTrace(); System.out.println(&

5、quot;考试结束"); public static void main(String args) Executor executor = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(10); executor.execute(new Teacher(countDownLatch); for (int i = 1; i <= 10; i+) executor.execute(new Student(i, countDownLatch); 多个线程等待一个线

6、程再举个例子,比如几个司机在等红灯,而一旦变成了绿灯,几个司机就可以都通过了。public class TrafficDemo public static class Driver implements Runnable private int num; private CountDownLatch countDownLatch; public Driver(int num, CountDownLatch countDownLatch) this.num = num; this.countDownLatch = countDownLatch; Override public void run(

7、) System.out.println("司机" + num + "于红灯前等待"); try countDownLatch.await(); catch (InterruptedException e) e.printStackTrace(); System.out.println("司机" + num + "于绿灯通过"); public static class TrafficLight implements Runnable private CountDownLatch countDownLatch; p

8、ublic TrafficLight(CountDownLatch countDownLatch) this.countDownLatch = countDownLatch; Override public void run() try TimeUnit.SECONDS.sleep(5); catch (InterruptedException e) e.printStackTrace(); System.out.println("绿灯行"); countDownLatch.countDown(); public static void main(String args)

9、Executor executor = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(1); for (nt i = 1; i < 10; i+) executor.execute(new Driver(i, countDownLatch); executor.execute(new TrafficLight(countDownLatch); CountDownLatch源码解析CountDownLatch初始化CountDownLatch只有一个构造方法,如下: p

10、ublic CountDownLatch(int count) if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); 我们很容易知道Sync是一个继承自AQS的内部类,它负责CountDownLatch的同步事件。下面是Sync的定义:/* * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private sta

11、tic final class Sync extends AbstractQueuedSynchronizer private static final long serialVersionUID = 4982264981922014374L; /AQS的资源即设定的count Sync(int count) setState(count); int getCount() return getState(); protected int tryAcquireShared(int acquires) return (getState() = 0) ? 1 : -1; protected bool

12、ean tryReleaseShared(int releases) / Decrement count; signal when transition to zero for (;) int c = getState(); if (c = 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc) return nextc = 0; await方法await方法用于将线程阻塞,实现如下:public void await() throws InterruptedException sync.acquireSharedI

13、nterruptibly(1); 可以看到调用了acquireSharedInterruptibly方法,该方法在AQS中,实现如下:public final void acquireSharedInterruptibly(int arg) throws InterruptedException /线程被interrupt了,抛出异常 if (Terrupted() throw new InterruptedException(); /尝试获取arg个资源,如果小于0表示获取失败 if (tryAcquireShared(arg) < 0) doAcquireShare

14、dInterruptibly(arg); 从上面的方法可以看到,首先尝试获取资源,如果有资源,那么调用doAcquireSharedInterruptibly方法,tryAcquireShared方法是在Sync中的。从上面可以看到,如果getState为0则该线程不需要加入到等待队列中,而如果不等于0,则需要将该线程加入到等待队里中。 如果该线程需要加入到等待队列中,那么就调用doAcquireSharedInterruptibly方法,如下:private void doAcquireSharedInterruptibly(int arg) throws IuptedException /

15、将该线程加入到等待队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try for (;) /得到前驱节点 final Node p = node.predecessor(); /如果前驱节点是头节点 if (p = head) /尝试获取资源 int r = tryAcquireShared(arg); /getState为0,说明count为0了,等待的线程可以执行了 if (r >= 0) setHeadAndPropagate(node, r); p.next = null; / help G

16、C failed = false; return; /检查是否需要将当前线程挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() throw new InterruptedException(); finally if (failed) cancelAcquire(node); 队列的结构如下:其中Head是个空节点,所以一旦前驱节点是头节点,并且要是count变为了0,那么就调用setHeadAndPropagate方法,然后释放头节点。setHeadAndPropagate方法主要完成

17、两件事,第一更改头节点,第二由于此时等待线程可以执行了,将该事件传播给后面的节点,实现如下:private void setHeadAndPropagate(Node node, int propagate) Node h = head; / Record old head for check below /更改头节点 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either bef

18、ore * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause *

19、unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ /如果需要传播 if (propagate > 0 | h = null | h.waitStatus < 0 | (h = head) = null | h.waitStatus < 0) Node s = de.next; if (s = null | s.isShared() doReleaseShared(); 可

20、以看到如果需要传播的话,将会调用doReleasShared方法,方法如下:private void doReleaseShared() for (;) Node h = head; if (h != null && h != tail) int ws = h.waitStatus; if (ws = Node.SIGNAL) if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0) /重置状态,成功后,将后继节点唤醒 continue; / loop to recheck cases unparkSuccessor(h); else if (ws = 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE) continue; / loop on failed CAS if (h = head) / loop if head changed break; 至此,await方法就使线程挂起了,下面再分析countDown方法。countDown方法countDown方法用于将count值减1,如果count变为0则释放所有等待线程,方法的实现如下:public void countDown() sync.relea

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论