资源预览内容
第1页 / 共6页
第2页 / 共6页
第3页 / 共6页
第4页 / 共6页
第5页 / 共6页
第6页 / 共6页
亲,该文档总共6页全部预览完了,如果喜欢就下载吧!
资源描述
其中 TaskSetManager 类的 resourceOffer()方法调用流程CoarseGrainedSchedulerBackend 的 override def receiveAndReply(context: RpcCallContext): PartialFunctionAny, Unit = 方法 ,注册 executor,包括executorId,hostPort 和 cores.形成一个 WorkOffer 的列表,并发射任务val workOffers = activeExecutors.map case (id, executorData) =new WorkerOffer(id, executorData.executorHost, executorData.freeCores).toSeqlaunchTasks(scheduler.resourceOffers(workOffers)/ Launch tasks returned by a set of resource offersprivate def launchTasks(tasks: SeqSeqTaskDescription) for (task new WorkerOffer(id, executorData.executorHost, executorData.freeCores).toSeqlaunchTasks(scheduler.resourceOffers(workOffers)先对所有的 WordOffer 进行 random Shuffle 打乱顺序,并对 TaskSet 进行排序/ Take each TaskSet in our scheduling order, and then offer it each node in increasing order/ of locality levels so that it gets a chance to launch local tasks on all of them./ NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY然后不断调用自己的 resourceOfferSingleTaskSet()方法,直到 taskSet 发射成功。/* Called by cluster manager to offer resources on slaves. We respond by asking our active task* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so* that tasks are balanced across the cluster.*/def resourceOffers(offers: SeqWorkerOffer): SeqSeqTaskDescription = synchronized / Mark each slave as alive and remember its hostname/ Also track if new executor is addedvar newExecAvail = false for (o new ArrayBufferTaskDescription(o.cores)val availableCpus = shuffledOffers.map(o = o.cores).toArrayval sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet 0) hasLaunchedTask = truereturn tasksprivate def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: SeqWorkerOffer,availableCpus: ArrayInt,tasks: SeqArrayBufferTaskDescription) : Boolean = var launchedTask = falsefor (i = CPUS_PER_TASK) try for (task = 0)launchedTask = true catch case e: TaskNotSerializableException =logError(sResource offer failed, task set $taskSet.name was not serializable)/ Do not offer resources for this task, but dont throw an error to allow other/ task sets to be submitted.return launchedTaskreturn launchedTask在这个函数中分配 execId.对于所有的 WorkerOffer(executorId, host, cores),如果可以获取的 CPU 的数量大于完成每个任务所需要的 CPU 数量,就调用了TaskSetManager 的 resourceOffer()方法,返回一个 task.其中,每个 task 需要的 CPU个数是从配置文件中读取的,默认为然后,该 executor 的 availableCpus-= CPUS_PER_TASK,且executor 的 availableCpus=0,否则,换下一个 executor.进行整个循环,直到所有任务成功发射,返回 true,否则,返回 false;private def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: SeqWorkerOffer,availableCpus: ArrayInt,tasks: SeqArrayBufferTaskDescription) : Boolean = var launchedTask = falsefor (i = CPUS_PER_TASK) try for (task = 0)launchedTask = true catch case e: TaskNotSerializableException =logError(sResource offer failed, task set $taskSet.name was not serializable)/ Do not offer resources for this task, but dont throw an error to allow other/ task sets to be submitted.return launchedTaskreturn launchedTaskTaskSetManager 类成员变量pendingTasksForExecutor,以系列的为每个 executor/ Set of pending tasks for each executor. These collections are actually/ treated as stacks, in which new tasks are added to the end of the/ ArrayBuffer and removed from the end. This makes it faster to detect/ tasks that repeatedly fail because whenever a task failed, it is put/ back at the head of the stack. They are also only cleaned up lazily;/ when a task is launched, it remains in all the pending lists except/ the one that it was launched from, but gets removed from them later.private val pendingTasksForExecutor = new HashMapString, ArrayBufferIntresourceOffer()方法/* Respond to an offer of a single executor from the scheduler by finding a task* NOTE: this function is either called with a maxLocality which* would be adjusted by delay scheduling algorithm or it will be with a special* NO_PREF locality which will be not modified* param execId the executor Id of the offered resource* param host the host Id of the offered resource* param maxLocality the maximum locality we want to schedule the tasks at*/throwsTaskNotSerializableExceptiondef resourceOffer(调用 dequeueTask 方法让 Task 出栈dequeueTask 方法调用/* Dequeue a pending task from the given list and return its index.* Return None if the list is empty.* This method also cleans up any tasks in the list that have already* been launched, since we want that to happen lazily.*/private def dequeueTaskFromList(execId: String, list: ArrayBufferInt): OptionInt = var indexOffset = list.sizewhile (indexOffset 0) indexOffset -= 1val index = list(indexOffset)if (!executorIsBlacklisted(execId, index) / This should almost always be list.trimEnd(1) to remove taillist.remove(indexOffset)if (copiesRunning(index) = 0 & !successful(index) return Some(index)None从前三行可以看出,每次都返回
收藏 下载该资源
网站客服QQ:2055934822
金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号