高并发技术之java线程池源码解析_第1页
高并发技术之java线程池源码解析_第2页
高并发技术之java线程池源码解析_第3页
高并发技术之java线程池源码解析_第4页
高并发技术之java线程池源码解析_第5页
已阅读5页,还剩11页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

高并发技术之Java线程池源目题 问高并发技术之Java线程池源目题 问 线程池框 第一层结 接口简 实现 辅助 完成服 源码原...........................................................................................................................线程池执行原 调度线程池原 异步结果源码分 1.线程池的模块结2.示例&原1.线程池包含哪些东西2.1.线程池包含哪些东西2.线程池原理3.调度线程池原理4.调度线程池怎么实现FixRate,FixDelay?和他们之间的区别5.怎么取消的第一层结 (启动一个线程行 (more执行器,直接执行 (启动一个线程行 (more执行器,直接执行 (线性执行器java.util.concurrent.ExecutorService执行器服务java.util.concurrent.Executor(执行器,执行方法java.util.concurrent.ExecutorService(执行服务)包含服务的生命周java.util.concurrent.ScheduledExecutorService(调度相关的服务 (普通的的线程池实现类oolExecutor(调度实现类 channel–AIO辅助完成服线程池辅助完成服线程池执行原构造数量,任务队列容器,存活时间,线程工厂,处理器//固定线程ExecutorServiceexecutorService= executorService.execute(newRunnable()初始化构造publicThre longTimeUnitBlockingQueue<Runnable>ThreadFactoryif(corePoolSize<0keepAliveTime<thrownewif(workQueue==null||threadFactory==thrownewthis.acc=System.初始化构造publicThre longTimeUnitBlockingQueue<Runnable>ThreadFactoryif(corePoolSize<0keepAliveTime<thrownewif(workQueue==null||threadFactory==thrownewthis.acc=System.getSecurityManager()==nullnullActhis.corePoolSize= umPoolSize this.workQueue=this.keepAliveTime=this.threadFactory=this.handler=}publicvoidrun()}});//runnable接 ubmit(newCallable<String>()publicStringcall()throwsExceptionreturn}});//callable接if(compareAndIncrementWorkerCount(cworkreturnfalsewc>=(core?corePoolSize if(wc>=CAPACITYwc=for(;;)if(compareAndIncrementWorkerCount(cworkreturnfalsewc>=(core?corePoolSize if(wc>=CAPACITYwc=for(;;)returnfalse1如果非运行状态2(nullworkQueue!Task==null!(rs==SHUTDOWNif(rs>=SHUTDOWN//Checkifqueueemptyonlyif rs= c=for(;;)c= if(workerCountOf(c)<corePoolSize){if(addWorker( d,true))cctl.get}if(isRunning(c)&&workQueue.offer( d)){recheck=ctl.get();if(!isRunning(recheck)&&remove( elseif(workerCountOf(recheck)00nullworker。addWorker(null,false);}elseif(!addWorker( d,false))//直接增加worker如果不成功直接reject thrownewNullPoif d==if(!}finally}}workerStarted=if(workerAdded)workstart}}finally}workerAdded=largestPoolSize=if(s>s=workers.add(w);thrownewIllegalThreadSif(t.isAlive())// ttis(rs==if(!}finally}}workerStarted=if(workerAdded)workstart}}finally}workerAdded=largestPoolSize=if(s>s=workers.add(w);thrownewIllegalThreadSif(t.isAlive())// ttis(rs==SHUTDOWN Task==null))if(rs<SHUTDOWNrs=runS//shutdownbeforelockacquired锁定后并重新检查下是否 //BackoutonThreadFactoryfailureor//RecheckwhileholdingtryfinalReentrantLockmainLock=if(tnullfinalThreadt=wnew tryWorkerw=workerAdded=workerStarted=}}//elseCASfailedduetoworkerCountchange;retryinnercontinueif eOf(c)!=cctl.get();Re-readctl增加workbreaktask=}finally}afterExecute(task,thrown}finallythrown=x;thrownew}catch(Throwablex)thrown=x;throw}catch(Errorx)thrown=x;throw}catch(RuntimeExceptiontask=}finally}afterExecute(task,thrown}finallythrown=x;thrownew}catch(Throwablex)thrown=x;throw}catch(Errorx)thrown=x;throw}catch(RuntimeExceptionx)tryThrowablethrown=beforeExecute(wt,task);trywt.errupt();//!wt.isif((runSeeast(ctl.get(),STOP)//shutdownNowracewhile //requiresarecheckinsecondcasetodeal//ifnot,ensurethreadis errupted//Ifpoolis ,ensurethread while(tasknull||taskgetTask(nulltrycompletedAbruptly=w.unlock();// Task=Runnabletask= ThreadwtThread.currentThread();//1.return}if(min==0&&!min=if(workerCountOf(c)>=return;//replacementnot}addWorker(null,}if(!completedAbruptly)min=allowCoreThreadTimeOut?0:if nif(min==0&&!min=if(workerCountOf(c)>=return;//replacementnot}addWorker(null,}if(!completedAbruptly)min=allowCoreThreadTimeOut?0:if n(cSTOPif(completedAbruptly)//Ifabrupt,thenworkerCountwasn'tfinalReentrantLockmainLock=trycompletedTaskCount workers.remove(w);//移除}finally}c=}}}completedAbruptly=}finallyprosWorkerExit(w,调度线程池原调构造DelayedWorkQueue延迟队通过DelayedWorkQueue延迟队列实现offer获取对象的延if(x==throw调度线程池原调构造DelayedWorkQueue延迟队通过DelayedWorkQueue延迟队列实现offer获取对象的延if(x==thrownewNullPoRunnableScheduledFuture<?>eRunnableScheduledFuture<?>)x;对finalReentrantLocklock=tryi=if(iqueue.length扩ifelsesuper.getQueue().add(task);//增加任if(isShutdown()!canRunInCurrentRunSe(task.isPeriodic())}publicScheduledThre corePoolSize) eger.MAX_VALUE,0,new}oolExecutor.DelayedWorkQueue#siftUp(二堆算法)保证相同if(other==this)//comparezeroifsamereturnif(otherinstanceofScheduledFutureTask)ScheduledFutureTask<?>x=longdifftimex.time判断if(diff<oolExecutor.DelayedWorkQueue#siftUp(二堆算法)保证相同if(other==this)//comparezeroifsamereturnif(otherinstanceofScheduledFutureTask)ScheduledFutureTask<?>x=longdifftimex.time判断if(diff<return-while(k>0)parent=(k-1)>>>RunnableScheduledFuture<?>e=if pareTo(e)>=queue[k]=setIndex(e,k=}queue[k]=setIndex(key,size=i+if(i==0)queue[0]=setIndex(e0第一个直接设置索引和下标}elsesiftUp(i,e);选到}if(queue[0]==e)leader=available.signal();//唤醒所有的被挤压的wait线}}finally}return确保有work执work运行的时候调用queue的take方finalReentrantLocklock=lock.locktryfor(;;) =queue确保有work执work运行的时候调用queue的take方finalReentrantLocklock=lock.locktryfor(;;) =queue[0];//获取第一个对if ==elselongdelay .getDelay(NANOSECONDS);//延迟时if(delay0)//到时间return =null;//don'tretainrefwhileif(leader!=available.await();//因为没有执行线程初始化,所以等等什么时候有了自己被他人唤elsewc=if(wc<addWorker(null,elseif(wc==addWorker(null,elseif(diff>returnelseif(sequenceNumber<return-return}longdiff=getDelay(NANOSECONDS)-return(diff<0)?-1:(diff>0)?1:怎么实现固定率的periodic=if elseif怎么实现固定率的periodic=if elseifelseif(ScheduledFutureTask.super.runAndReset()){//有periodreturns=--RunnableScheduledFuture<?>xqueue[s];//重排序队queue[s]=if(s!=siftDown(0,setIndex(f,-ThreadthisThread=Thread.currentThread();leader=thisThread;tryavailable.awaitNanos(delay);//各condition}finallyif(leader==thisThread)leader=null;}}}}}finallyif(leader==null&&queue[0]!=null)}怎么拿到的异步任务结果finallongdeadline=timed?System.nanoTime()+nanos:WaitNodeq=怎么拿到的异步任务结果finallongdeadline=timed?System.nanoTime()+nanos:WaitNodeq=queued=for(;;)if(Thread.errupted/check中throw }s=sif(sCOMPLETING){//判断是否完if(q!=q.thread=returnlongp=if(p>timep;//假如延迟了这个时间早过了,+当前时候肯定还是过的timetriggerTime(-p);前的任务延scheduleAtFixedRatescheduleWithFixedDelay有什么区别吗}什么时候回填的结果那if(se!=NEWpareAndSwapObject(this,nullThread.currentThread态不是new者runner置不成功直接退tryCallable<V>c=if(c!=null&&什么时候回填的结果那if(se!=NEWpareAndSwapObject(this,nullThread.currentThread态不是new者runner置不成功直接退tryCallable<V>c=if(c!=null&&se==NEW)Vtryresultc.call();//运行ok时候返回ran=}catch(Throwableex)result=ran=}if(ran)//正常成功setresult}elseif(s==COMPLETING)//cannottimeoutelseif(q==q=newWaitNode();//生成一个 对els

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论