




已阅读5页,还剩1页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
其中TaskSetManager类的resourceOffer()方法调用流程CoarseGrainedSchedulerBackend的 override def receiveAndReply(context: RpcCallContext): PartialFunctionAny, Unit = 方法,注册executor,包括executorId,hostPort和cores.形成一个WorkOffer的列表,并发射任务val workOffers = activeExecutors.map case (id, executorData) = new WorkerOffer(id, executorData.executorHost, executorData.freeCores).toSeqlaunchTasks(scheduler.resourceOffers(workOffers)/ Launch tasks returned by a set of resource offersprivate def launchTasks(tasks: SeqSeqTaskDescription) for (task new WorkerOffer(id, executorData.executorHost, executorData.freeCores) .toSeq launchTasks(scheduler.resourceOffers(workOffers)先对所有的WordOffer进行random Shuffle打乱顺序,并对TaskSet进行排序/ Take each TaskSet in our scheduling order, and then offer it each node in increasing order/ of locality levels so that it gets a chance to launch local tasks on all of them./ NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY然后不断调用自己的resourceOfferSingleTaskSet()方法,直到taskSet发射成功。/* * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */def resourceOffers(offers: SeqWorkerOffer): SeqSeqTaskDescription = synchronized / Mark each slave as alive and remember its hostname / Also track if new executor is added var newExecAvail = false for (o - offers) executorIdToHost(o.executorId) = o.host activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host) executorsByHost(o.host) = new HashSetString() executorAdded(o.executorId, o.host) newExecAvail = true for (rack new ArrayBufferTaskDescription(o.cores) val availableCpus = shuffledOffers.map(o = o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet - sortedTaskSets) logDebug(parentName: %s, name: %s, runningTasks: %s.format( taskS, taskS, taskSet.runningTasks) if (newExecAvail) taskSet.executorAdded() / Take each TaskSet in our scheduling order, and then offer it each node in increasing order / of locality levels so that it gets a chance to launch local tasks on all of them. / NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY var launchedTask = false for (taskSet - sortedTaskSets; maxLocality 0) hasLaunchedTask = true return tasksprivate def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: SeqWorkerOffer, availableCpus: ArrayInt, tasks: SeqArrayBufferTaskDescription) : Boolean = var launchedTask = false for (i = CPUS_PER_TASK) try for (task = 0) launchedTask = true catch case e: TaskNotSerializableException = logError(sResource offer failed, task set $taskS was not serializable) / Do not offer resources for this task, but dont throw an error to allow other / task sets to be submitted. return launchedTask return launchedTask在这个函数中分配execId.对于所有的WorkerOffer(executorId, host, cores),如果可以获取的CPU的数量大于完成每个任务所需要的CPU数量,就调用了TaskSetManager的resourceOffer()方法,返回一个task.其中,每个task需要的CPU个数是从配置文件中读取的,默认为然后,该executor的availableCpus-= CPUS_PER_TASK,且executor的availableCpus=0,否则,换下一个executor.进行整个循环,直到所有任务成功发射,返回true,否则,返回false;private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: SeqWorkerOffer, availableCpus: ArrayInt, tasks: SeqArrayBufferTaskDescription) : Boolean = var launchedTask = false for (i = CPUS_PER_TASK) try for (task = 0) launchedTask = true catch case e: TaskNotSerializableException = logError(sResource offer failed, task set $taskS was not serializable) / Do not offer resources for this task, but dont throw an error to allow other / task sets to be submitted. return launchedTask return launchedTaskTaskSetManager类成员变量pendingTasksForExecutor,以系列的为每个executor/ Set of pending tasks for each executor. These collections are actually/ treated as stacks, in which new tasks are added to the end of the/ ArrayBuffer and removed from the end. This makes it faster to detect/ tasks that repeatedly fail because whenever a task failed, it is put/ back at the head of the stack. They are also only cleaned up lazily;/ when a task is launched, it remains in all the pending lists except/ the one that it was launched from, but gets removed from them later.private val pendingTasksForExecutor = new HashMapString, ArrayBufferIntresourceOffer()方法/* * Respond to an offer of a single executor from the scheduler by finding a task * * NOTE: this function is either called with a maxLocality which * would be adjusted by delay scheduling algorithm or it will be with a special * NO_PREF locality which will be not modified * * param execId the executor Id of the offered resource * param host the host Id of the offered resource * param maxLocality the maximum locality we want to schedule the tasks at */throwsTaskNotSerializableExceptiondef resourceOffer(调用dequeueTask方法让Task出栈dequeueTask方法调用/* * Dequeue a pending task from the given list and return its index. * Return None if the list is empty. * This method also cleans up any tasks in the list that have already * been launched, since we want that to happen lazily. */private def dequeueTaskFromList(execId: String, list: ArrayBufferInt): OptionInt = var indexOffset = list.size while (indexOffset 0) indexOffset -= 1 val index = list(indexOffset) if (!executorIsBlacklisted(execId, index) / This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) if (copiesRunning(index) = 0 & !successful(index) return Some(index) None从前三行可以看出,每次都返回list最后的元素。在dequeueTask中,对list中所有的元素,按照locality的优先级来返回元素。Executor类private val threadPool = ThreadUtils.newDaemonCachedThreadPool(Executor task launch worker)threadPool为ThreadPoolExecutor类型。AnExecutorServicethat executes each submitted task using one of possibly several pooled threads, normally configured usingExecutorsfactory methods.Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks. EachThreadPoolExecutoralso maintains some basic statistics, such as the number of completed tasks./* * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a * unique, sequentially assigned integer. */def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = val threadFactory = namedThreadFactory(prefix) Executors.newCachedThreadPool(threadFactory).asInstanceOfThreadPoolExecutor/* * Create a thread factory that names threads with a prefix and also sets the threads to daemon. */def namedThreadFactory(prefix: String): ThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + -%d).build()luanchTask()方法调用 threadPool变量execute()方法。其中TaskRunner继承了Runnable接口def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = logInfo(In Executor.launchTa
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 外派劳务雇用合同
- 天津工厂租赁合同
- 公司短期借款合同书
- 劳务合同居间协议
- 农副食品采购合同
- 木门购销安装合同范本
- 科技鉴证合同范本
- 2024年辽宁省锦州市第八初级中学中考一模语文试题(含答案)
- 环境整治机械合同范本
- 加工合同范本模板
- 眼科急救知识培训课件
- 留置胃管技术操作
- 第三单元 走向整体的世界 单元测试A卷基础夯实含答案 2024-2025学年统编版高中历史中外历史纲要下册
- 泵房基坑开挖专项施工方案
- 垃圾清运服务投标方案技术标
- 2023年新疆铁道职业技术学院单招面试模拟试题及答案解析
- 2022年四川省遂宁市中考数学试卷真题及答案定稿
- 房地产销售部(售楼部)员工手册
- ABB_symphony培训
- 红星美凯龙租赁合同
- 水厂工程施工组织设计方案
评论
0/150
提交评论