版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、.实战java Concurrent时间:2020-09-24 08:09:55来源:网络 作者:未知 点击:1686次编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神秘感。讲到Java多线程,大多数人脑海中
2、跳出来的是Thread、Runnable、synchronized这些是最基本的东西,虽然已经足够强大,但想要用好还真不容易。从JDK 1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛Doug Lee)。java.util.concurrent包分成了三个部分,分别是java.util.concurrent、 java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。为了便于理解,本文使用一个
3、例子来做说明,交代一下它的场景:假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。1、Executors通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的 ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到 ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了 C
4、allable<T>的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用 ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。Java代码 package service; import java.util.ArrayList; import java.util.List;
5、60; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurre
6、nt.TimeUnit; /* * 线程池服务类 * * author DigitalSonic */ public class ThreadPoolService /*
7、0; * 默认线程池大小 */ public static final int DEFAULT_POOL_SIZE = 5 ;
8、60; /* * 默认一个任务的超时时间,单位为毫秒 */ public static final long DEFAULT_TASK_TIMEOUT = 1000 ;
9、 private int poolSize = DEFAULT_POOL_SIZE; private ExecutorService
10、 executorService; /* * 根据给定大小创建线程池 */ public ThreadPoolService( int poolSize)
11、 setPoolSize(poolSize); /* * 使用线程池中的线程来执行任务 */ public
12、 void execute(Runnable task) executorService.execute(task); /* * 在线程池中执行所有给定的任务并取回运行结果,使
13、用默认超时时间 * * see #invokeAll(List, long) */ public List<Node> invokeAll(List<ValidationTask> tasks)
14、0; return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size(); /* * 在线程池中执行所有给定的任务并取回运行结果
15、; * * param timeout 以毫秒为单位的超时时间,小于0表示不设定超时 * see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection) */
16、 public List<Node> invokeAll(List<ValidationTask> tasks, long timeout) List<Node> nodes = new ArrayList<Node>(tasks.size();
17、160; try List<Future<Node>> futures = null ; if (timeout < 0 )
18、; futures = executorService.invokeAll(tasks); else
19、60; futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS); for (Future<Node> fut
20、ure : futures) try nodes.add(future.get();
21、0; catch (ExecutionException e) e.printStackTrace(); &
22、#160; catch (InterruptedException e)
23、60; e.printStackTrace(); return nodes;
24、160; /* * 关闭当前ExecutorService * * param timeout 以毫秒为单位的超时时间 */ public
25、60; void destoryExecutorService( long timeout) if (executorService != null && !executorService.isShutdown() tr
26、y executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS); catch (InterruptedException
27、 e) e.printStackTrace();
28、0; executorService.shutdown(); /* * 关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService &
29、#160; */ public void createExecutorService() destoryExecutorService(1000 ); executorService = Executo
30、rs.newFixedThreadPool(poolSize); /* * 调整线程池大小 * see #createExecutorService() */
31、0; public void setPoolSize( int poolSize) this .poolSize = poolSize; createExecutorService();
32、160; package service;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurr
33、ent.TimeUnit;/* * 线程池服务类 * * author DigitalSonic */public class ThreadPoolService /* * 默认线程池大小 */ public static final int DEFAULT_POOL_SIZE = 5; /*
34、 * 默认一个任务的超时时间,单位为毫秒 */ public static final long DEFAULT_TASK_TIMEOUT = 1000; private int poolSize
35、0; = DEFAULT_POOL_SIZE; private ExecutorService executorService; /* * 根据给定大小创建线程池 */ public ThreadPoolService(int poolSize)
36、160; setPoolSize(poolSize); /* * 使用线程池中的线程来执行任务 */ public void execute(Runnable task) executorService.execute(task);
37、/* * 在线程池中执行所有给定的任务并取回运行结果,使用默认超时时间 * * see #invokeAll(List, long) */ public List<Node> invokeAll(List<ValidationTask> tasks)
38、 return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size(); /* * 在线程池中执行所有给定的任务并取回运行结果 * * param timeout 以毫秒为单位的超时时间,小于0表示不设定超时 * see java.util.concurrent.Exec
39、utorService#invokeAll(java.util.Collection) */ public List<Node> invokeAll(List<ValidationTask> tasks, long timeout) List<Node> nodes = new ArrayList<Node>(tasks.size();
40、0; try List<Future<Node>> futures = null; if (timeout < 0)
41、60; futures = executorService.invokeAll(tasks); else futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS);
42、60; for (Future<Node> future : futures) try
43、160; nodes.add(future.get(); catch (ExecutionException e)
44、160; e.printStackTrace(); catch (InterruptedException e) &
45、#160; e.printStackTrace(); return nodes; /* * 关闭当前ExecutorService *
46、0; * param timeout 以毫秒为单位的超时时间 */ public void destoryExecutorService(long timeout) if (executorService != null && !executorService.isShutdown()
47、60; try executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS); catch (InterruptedException e)
48、160; e.printStackTrace(); executorService.shutdown();
49、0; /* * 关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService */ public void createExecutorService() destoryExecutorService(1000); executorService = Ex
50、ecutors.newFixedThreadPool(poolSize); /* * 调整线程池大小 * see #createExecutorService() */ public void setPoolSize(int poolSize) this.po
51、olSize = poolSize; createExecutorService(); 这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable<T>对象,等所有任务完成后返回一个包含了执行结果的List<Future<T>>,每个Future.isDone()都是true,可以用 Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值
52、就是执行结果。还有一个比较诡异的地方 本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable<Node>,可是它死活不认,类型不匹配,这时可以将参数声明由List<ValidationTask>改为 List<Callable<Node>>。造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是 invokeAll(Collection<? extends Cal
53、lable<T>> tasks),而1.5里是invokeAll(Collection<Callable<T>> tasks)。网上也有人遇到类似的问题(invokeAll() is not willing to acept a Collection<Callable<T>> )。和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,
54、没有执行的任务作为返回值返回。2、Lock多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是 java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock()。使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:Java代码 Lock l = .; l.lock(); &
55、#160;try / 执行操作 finally l.unlock(); Lock l = .; l.lock();try / 执行操作 finally l.unlock();java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是
56、ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:Java代码 package service; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /* * 节
57、点类 * * author DigitalSonic */ public class Node private String name; private String wsdl;
58、0; private String result = "PASS" ; private String dependencies = new String ; private Lock lock = new ReentrantLock();
59、60; /* * 默认构造方法 */ public Node() /*
60、; * 构造节点对象,设置名称及WSDL */ public Node(String name, String wsdl) this .name = name;
61、60; this .wsdl = wsdl; /* * 返回包含节点名称、WSDL以及验证结果的字符串 */ Override
62、160; public String toString() String toString = "Node: " + name + " WSDL: " + wsdl + " Result: " + result; &
63、#160; return toString; / Getter & Setter public String getName()
64、160; return name; public void setName(String name) this .name = name;
65、 public String getWsdl() return wsdl; public void setWsdl(String
66、 wsdl) this .wsdl = wsdl; public String getResult() return result;
67、60; public void setResult(String result) this .result = result;
68、 public String getDependencies() return dependencies; public void setDependencies(String dependencies)
69、60; this .dependencies = dependencies; public Lock getLock() return lock;
70、60; package service;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/* * 节点类 * * author DigitalSonic */public class Node private String name; priv
71、ate String wsdl; private String result = "PASS" private String dependencies = new String ; private Lock lock = new ReentrantLock(); /* * 默认构造方法 */ public Node() /* * 构造节点对象,设置名称及WSDL */ public Node(String name, String
72、wsdl) = name; this.wsdl = wsdl; /* * 返回包含节点名称、WSDL以及验证结果的字符串 */ Override public String toString() String toString = "Node: " + name + " WSDL: " + wsdl + " Result: " + result; return
73、 toString; / Getter & Setter public String getName() return name; public void setName(String name) = name; public String getWsdl() return wsdl; public void setWsdl(String wsdl) this.w
74、sdl = wsdl; public String getResult() return result; public void setResult(String result) this.result = result; public String getDependencies() return dependencies; public void setDependencies(String dependencies)
75、 this.dependencies = dependencies; public Lock getLock() return lock; Java代码 package service; import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock;
76、60;import java.util.logging.Logger; import service.mock.MockNodeValidator; /* * 执行验证的任务类 * * author DigitalSonic */
77、; public class ValidationTask implements Callable<Node> private static Logger logger = Logger.getLogger( "ValidationTask" ); pri
78、vate String wsdl; /* * 构造方法,传入节点的WSDL */ public ValidationTask(Str
79、ing wsdl) this .wsdl = wsdl; /* * 执行针对某个节点的验证<br/> * 如果正有别
80、的线程在执行同一节点的验证则等待其结果,不重复执行验证 */ Override public Node call() throws Exception Node node = Validat
81、ionService.NODE_MAP.get(wsdl); Lock lock = null ; ("开始验证节点:" + wsdl); if (node !=
82、 null ) lock = node.getLock(); if (lock.tryLock()
83、60; / 当前没有其他线程验证该节点 ("当前没有其他线程验证节点" + node.getName() + "" + wsdl + "" ); &
84、#160; try Node result = MockNodeValidator.validateNode(wsdl);
85、0; mergeNode(result, node); finally
86、 lock.unlock();
87、160; else / 当前有别的线程正在验证该节点,等待结果 ("当前有别的线程正在验证节点"
88、160; + node.getName() + "" + wsdl + ",等待结果" ); lock.lock();
89、; lock.unlock(); else / 从未进行过验证,这种情况应该只出现在系统启动初期
90、0; / 这时是在做初始化,不应该有冲突发生 ("首次验证节点:" + wsdl);
91、60; node = MockNodeValidator.validateNode(wsdl); ValidationService.NODE_MAP.put(wsdl, node);
92、60; ("节点" + node.getName() + "" + wsdl + "验证结束,验证结果:" + node.getResult(); return node;
93、0; /* * 将src的内容合并进dest节点中,不进行深度拷贝 */ private Node mergeNode(Node src, Node dest)
94、60; dest.setName(src.getName(); dest.setWsdl(src.getWsdl(); dest.setDependencies(src.getDependencies(); dest.setResult(s
95、rc.getResult(); return dest; package service;import java.util.concurrent.Callable;import java.util.concurrent.locks.Lock;import java.util.logging.Logger;import se
96、rvice.mock.MockNodeValidator;/* * 执行验证的任务类 * * author DigitalSonic */public class ValidationTask implements Callable<Node> private static Logger logger = Logger.getLogger("ValidationTask"); private String
97、; wsdl; /* * 构造方法,传入节点的WSDL */ public ValidationTask(String wsdl) this.wsdl = wsdl; /* *
98、 执行针对某个节点的验证<br/> * 如果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证 */ Override public Node call() throws Exception Node node = ValidationService.NODE_MAP.get(wsdl);
99、; Lock lock = null; ("开始验证节点:" + wsdl); if (node != null) lock = node.getLock();
100、 if (lock.tryLock() / 当前没有其他线程验证该节点 ("当前没有其他线程验证节点" + no
101、de.getName() + "" + wsdl + ""); try Node result = MockNodeValidator.validat
102、eNode(wsdl); mergeNode(result, node); finally
103、160; lock.unlock(); else
104、0; / 当前有别的线程正在验证该节点,等待结果 ("当前有别的线程正在验证节点" + node.getName() + "" + wsdl + ",等待结果");
105、60; lock.lock(); lock.unlock(); else
106、0; / 从未进行过验证,这种情况应该只出现在系统启动初期 / 这时是在做初始化,不应该有冲突发生 ("首次验证节点:" + wsdl);
107、 node = MockNodeValidator.validateNode(wsdl); ValidationService.NODE_MAP.put(wsdl, node); ("节点" + node.getName
108、() + "" + wsdl + "验证结束,验证结果:" + node.getResult(); return node; /* * 将src的内容合并进dest节点中,不进行深度拷贝 */ private Node mergeNode(Node src, Node
109、dest) dest.setName(src.getName(); dest.setWsdl(src.getWsdl(); dest.setDependencies(src.getDependencies(); dest.setResult(src.getRes
110、ult(); return dest; 请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、 notify()和notifyAll()方法(C
111、ondition中提供了await()、signal()和signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的 Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:Java代码 class BoundedBuffer final Lock lock = new ReentrantLock();
112、60; final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object items = new Object 100 ; int put
113、ptr, takeptr, count; public void put(Object x) throws InterruptedException lock.lock(); try while
114、60; (count = items.length) notFull.await(); itemsputptr = x; if (+putptr = items.length) putptr = 0 ; &
115、#160; +count; notEmpty.signal(); finally lock.unlock(); public Object take() throws InterruptedException lock.lock(); try while (count = 0 )
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2024年车位产权买卖协议格式
- 2024年防水施工劳务协议规范化文件
- 2024新疆企业劳动协议规范化样本
- 2024受托代理事务协议样本
- 2024年专业运营车辆租赁协议模板
- DB11∕T 1514-2018 低效果园改造技术规范
- 单位广告策划与制作服务协议范例
- 2024年公司文秘职务聘用协议模板
- 2024年企业员工全日制劳动协议模板
- 文书模板-《厂房光伏租赁合同》
- 2024年10月时政100题(附答案)
- 学生校外托管协议书
- 建筑幕墙施工方案
- 第二章 地图(考点串讲课件)七年级地理上学期期中考点大串讲(人教版2024)
- 2024年健身房管理制度(六篇)
- 期中测试卷(1-4单元)(试题)-2024-2025学年人教版数学六年级上册
- 车辆绿本抵押借款合同
- 意识形态分析研判制度
- GB/T 18029.6-2024轮椅车第6 部分:电动轮椅车最大速度的测定
- 2024至2030年中国学前教育(幼儿园)行业研究报告
- 统编版(2024新版)七年级上册《道德与法治》第1-13课全册教材“活动课”参考答案
评论
0/150
提交评论