Mina的线程池实现分析_第1页
Mina的线程池实现分析_第2页
Mina的线程池实现分析_第3页
Mina的线程池实现分析_第4页
Mina的线程池实现分析_第5页
已阅读5页,还剩5页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

本文格式为Word版,下载可任意编辑——Mina的线程池实现分析Mina的线程池实现分析(1)

文章分类:Java编程

线程池是并发应用中,为了减少每个任务调用的开销加强性能而经常使用的技术。在mina中大量的使用这一技术,除了Executors的工厂方法构建线程池之外,它还继承自ThreadPoolExecutor提供自己的线程池的实现OrderedThreadPoolExecutor和

UnorderedThreadPoolExecutor。这两者主要应用于ExecutorFilter过滤器。这个过滤器是mina内部实现的众多过滤器之一,其主要作用是把I/Oevents提交给线程池同时处理同一个IOSession的事件,其默认的线程池的构造是前者。这两个线程池的区别就在于同时处理I/O事件时,前者能够保证同一个Session的事件的处理顺序,而后者则不能保证,所以有可能出现sessionClosed事件在messageReceived事件之前被处理。下面试着从代码解密其怎么保证事件处理顺序的。

分析Mina的源码最大的感受就是其多线程应用的精细,每次从源码解决自己的疑问都有一种难以言喻的喜悦感。假使一般的框架的源码主要看设计结构的话,Mina的源码的精妙更在于具体的实现,虽然乍一看是繁杂又混乱,呵呵。先看看部分源码吧:

Java代码

?publicclassOrderedThreadPoolExecutorextendsThreadPoolExecutor{?...

?privatestaticfinalIoSessionEXIT_SIGNAL=newDummySession();?

?privatefinalAttributeKeyTASKS_QUEUE=newAttributeKey(getClass(),\?

?privatefinalBlockingQueuewaitingSessions=newLinkedBlockingQueue();?

?privatefinalSetworkers=newHashSet();??

??privatefinalAtomicIntegeridleWorkers=newAtomicInteger();??

??privatelongcompletedTaskCount;??privatevolatilebooleanshutdown;??...

??privateSessionTasksQueuegetSessionTasksQueue(IoSessionsession){

??SessionTasksQueuequeue=(SessionTasksQueue)session.getAttribute(TASKS_QUEUE);??if(queue==null){

??queue=newSessionTasksQueue();??SessionTasksQueueoldQueue=

??(SessionTasksQueue)session.setAttributeIfAbsent(TASKS_QUEUE,queue);??

??if(oldQueue!=null){??queue=oldQueue;??}??}??

??returnqueue;??}??

??privatevoidaddWorker(){??synchronized(workers){

??if(workers.size()>=super.getMaximumPoolSize()){??return;??}??

??//Createanewworker,andaddittothethreadpool??Workerworker=newWorker();

??Threadthread=getThreadFactory().newThread(worker);??

??//Aswehaveaddedanewthread,it'sconsideredasidle.??idleWorkers.incrementAndGet();??

??//Now,wecanstartit.??thread.start();??workers.add(worker);??

??if(workers.size()>largestPoolSize){??largestPoolSize=workers.size();??}??}??}??

??privateclassSessionTasksQueue{

??privatefinalQueuetasksQueue=newConcurrentLinkedQueue();??

??privatebooleanprocessingCompleted=true;??}??...??}

首先看到的是OrderedThreadPoolExecutor的部分实现,其除了含有ThreadPoolExecutor的一些静态常量之外,这里列出了自身特有的几个变量。EXIT_SIGNAL是代表空的IOSession,假使此线程池得到的全部都是EXIT_SIGNAL,那么处理也就终止了。waitingSessions是存储可用I/O会话的队列,正是这个队列在后面多线程处搭理话的I/O事件起到了有序的作用,这个数据机构在

UnorderedThreadPoolExecutor内是没有的,后面可以看到其对待事件的粒度要比前者大。workers是一个Worker的集合,每一个Worker都实现了Runnable接口,线程池管理这些Worker线程执行并发的事件处理。很形象的命名,这些Worker说白了就是线程池内的打工仔。idleWorkers则负责及时的统计空闲的工人以便进一步的剥削,由于仅仅是数字在多线程下的增减所以使用atomic包的实现无疑是上佳的选择。最终就是SessionTasksQueue这个内部类了,这个内部的数据结构其实就是一个队列,负责每一个IOSession所对应的I/O事件的处理。

这样通过waitingSessions区分IOSession,通过SessionTasksQueue区分每个IOSession的I/O事件这个两层结构就可以为有序处理提供了数据结构的保证。对比UnorderedThreadPoolExecutor,其仅仅提供I/O事件的存取队列LinkedBlockingQueue的实例,而不对IOSession进行区分。从上述的源码可以看到getSessionTasksQueue()方法会试图取特定IOSession相关联的事件队列,假使没有则为IOSession添加事件队列的属性。addWorker()则是启动新的Worker线程参与线程池的管理,这些操作不管是有序还是无序的线程池实现都基本一致。最能反映两者区别的就是线程池的execute()方法和其内部的Worker的run()的实现,通过使用不同的数据结构进行I/O事件

的存取处理也就表达了两者的区别。先来看看有序的实现:

Java代码

??publicvoidexecute(Runnabletask){??...

??IoEventevent=(IoEvent)task;??

??IoSessionsession=event.getSession();??

??SessionTasksQueuesessionTasksQueue=getSessionTasksQueue(session);??QueuetasksQueue=sessionTasksQueue.tasksQueue;??

??booleanofferSession;??

??booleanofferEvent=eventQueueHandler.accept(this,event);??

??if(offerEvent){

??synchronized(tasksQueue){

??//*********************************1**********************************??tasksQueue.offer(event);??

??if(sessionTasksQcessingCompleted){??sessionTasksQcessingCompleted=false;??offerSession=true;??}else{

??offerSession=false;??}??...

??//*********************************end********************************??}??}else{

??offerSession=false;??}??

??if(offerSession){

??waitingSessions.offer(session);??}??

??addWorkerIfNecessary();??...??}

这个方法可以简单的理解为提交I/O事件给线程池处理,也就是I/O事件的存储。前面说到有序的线程池的实现是采用了两层结构,所

以代码很明了,首先是找I/O事件对应的IOSession,然后找IOSession的事件队列属性(没有就创立)把事件添加到队列里面去。由于要保证代码段1的操作的原子性,所以使用了synchronized的锁机制。offerSession在这里的作用就是保证waitingSessions内的非EXIT_SIGNAL的IOSession是唯一的。addWorkerIfNecessary()则是在没有空闲工人处理事件的状况下添加新的人手(线程),最终还是调用addWorker()。再来看看无序线程池的实现:

Java代码

??publicvoidexecute(Runnabletask){???...

???IoEvente=(IoEvent)task;

???booleanofferedEvent=queueHandler.accept(this,e);???if(offeredEvent){???getQueue().offer(e);???}???

???addWorkerIfNecessary();???...???}

这个就简单明白了,getQueue()获得是类的构造器提供的LinkedBlockingQueue的实例。offer()方法我之前分析过,属于非阻塞的入队实现,所以不管队列满不满都不会阻塞当前的线程。所以这个I/O事件的存储实际上不牵涉到IOSession的划分:同一个IOSession的多个I/O事件存于同一个队列里,不同的IOSession的I/O事件也都在这个队列里;而有序线程池的实现则是:同一个IOSession的多个I/O事件只存于同一个队列里,每一个IOSession都对应有自己的事件队列。

分析了I/O事件的存储,下面看看多个Worker同时工作时I/O事件的取得过程。首先看看有序的Worker的实现:

Java代码

?privateclassWorkerimplementsRunnable{?

?privatevolatilelongcompletedTaskCount;?privateThreadthread;?

?publicvoidrun(){

?thread=Thread.currentThread();?

?try{??for(;;){

??IoSessionsession=fetchSession();??

??idleWorkers.decrementAndGet();??

??if(session==null){

??synchronized(workers){

??if(workers.size()>getCorePoolSize()){??//Removenowtopreventduplicateexit.??workers.remove(this);??break;??}??}??}??

??if(session==EXIT_SIGNAL){??break;??}??

??try{

??if(session!=null){

??runTasks(getSessionTasksQueue(session));??}??}finally{

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论