09-spark-task-源码分析

  • org.apache.spark.scheduler.DAGScheduler --> doOnReceive --> handleJobSubmitted-->submitStage-->submitMissingTasks-->submitTasks(TaskSchedulerImpl)
  • org.apache.spark.scheduler.TaskSchedulerImpl-->submitTasks

    override def submitTasks(taskSet: TaskSet) {
      val tasks = taskSet.tasks
      logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
      this.synchronized {
        val manager = createTaskSetManager(taskSet, maxTaskFailures)
        val stage = taskSet.stageId
        val stageTaskSets =
          taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
        stageTaskSets(taskSet.stageAttemptId) = manager
        val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
          ts.taskSet != taskSet && !ts.isZombie
        }
        if (conflictingTaskSet) {
          throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
            s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
        }
        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    
        if (!isLocal && !hasReceivedTask) {
          starvationTimer.scheduleAtFixedRate(new TimerTask() {
            override def run() {
              if (!hasLaunchedTask) {
                logWarning("Initial job has not accepted any resources; " +
                  "check your cluster UI to ensure that workers are registered " +
                  "and have sufficient resources")
              } else {
                this.cancel()
              }
            }
          }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
        }
        hasReceivedTask = true
      }
      backend.reviveOffers()
    }