
stage 划分过程


  • 没有父stage的stage 可以直接提交进行计算,因为没有依赖
  • 有父stage的stage,不可以直接提交进行计算,因为有依赖,必须等父stage处理完成后,才可以进行计算
  • org.apache.spark.scheduler.DAGScheduler-->handleMapStageSubmitted 中调用submitStage(finalStage),把最终的stage作为参数传递,注意handleMapStageSubmitted函数中,除了有 submitStage(finalStage)调用,还有submitWaitingStages()调用
  • submitStage(finalStage) 提交没有父stage 的stage,并把有父stage的stage递归加入到集合 waitingStages中
  • submitWaitingStages 循环提交集合中的stage,此时,集合中的所有的stage是有依赖关系的,并且他们的执行是有顺序的,因为后面的stage依赖前边的stage的计算

    stage 图解

    stage 提交源码分析

  • submiStage 源码分析

       /** Submits stage, but first recursively submits any missing parents.
        *  提交stage, 但是先递归提交,漏掉的stage(即依次递归父stage,从最顶层stage开始提交)
        private def submitStage(stage: Stage) {
          val jobId = activeJobForStage(stage)
          if (jobId.isDefined) {
            logDebug("submitStage(" + stage + ")")
            if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
              val missing = getMissingParentStages(stage).sortBy(_.id)
              logDebug("missing: " + missing)
              if (missing.isEmpty) {
                logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
                submitMissingTasks(stage, jobId.get)
              } else {
                for (parent <- missing) {
                waitingStages += stage
          } else {
            abortStage(stage, "No active job for stage " + stage.id, None)
  • submitWaitingStages 源码分析
     * Check for waiting or failed stages which are now eligible for resubmission.
     * Ordinarily run on every iteration of the event loop.
     * 检查 待待的或失败的 stages 现在是适合重新提交
     * 通常对事件循环的每次迭代运行。
    private def submitWaitingStages() {
      // TODO: We might want to run this less often, when we are sure that something has become
      // runnable that wasn't before.
      logTrace("Checking for newly runnable parent stages")
      logTrace("running: " + runningStages)
      logTrace("waiting: " + waitingStages)
      logTrace("failed: " + failedStages)
      val waitingStagesCopy = waitingStages.toArray
      for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {

stage 划分

  • 1, Spark Application中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行;
  • 2, Stage划分有宽依赖,也有窄依赖,什么产生宽依赖呢?例如reducByKey、groupByKey等等,可以产生宽依赖;;
  • 3, 由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop

  • AppClient --> registerWithMaster -->

    注册 application
    持久化 application

    --> DAGScheduler
    --> textFile.count()

            jobId, rdd, func2, partitions.toArray, callSite, waiter,


       * An event loop to receive events from the caller and process all events in the event thread. It
       * will start an exclusive event thread to process all events.
       * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
       * handle events in time to avoid the potential OOM.
      private[spark] abstract class EventLoop[E](name: String) extends Logging {
        private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
        private val stopped = new AtomicBoolean(false)
        private val eventThread = new Thread(name) {
          override def run(): Unit = {
            try {
              while (!stopped.get) {
                val event = eventQueue.take()
                try {
                } catch {
                  case NonFatal(e) => {
                    try {
                    } catch {
                      case NonFatal(e) => logError("Unexpected error in " + name, e)
            } catch {
              case ie: InterruptedException => // exit even if eventQueue is not empty
              case NonFatal(e) => logError("Unexpected error in " + name, e)
        def start(): Unit = {
          if (stopped.get) {
            throw new IllegalStateException(name + " has already been stopped")
          // Call onStart before starting the event thread to make sure it happens before onReceive

-->回调 DAGScheduler onReceive 方法
-->case JobSubmitted
-->def handleJobSubmitted
--> newResultStage stage 包含了 父 stage
--> submitStage


    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          val job = s.activeJob.get
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
    } catch {
      case NonFatal(e) =>
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
        runningStages -= stage


  • org.apache.spark.scheduler.DAGSchedulerEventProcessLoop

    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
      case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
        dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    -> private[scheduler] def handleJobSubmitted -> private def submitStage(stage: Stage)

* Submits stage, but first recursively submits any missing parents.
* 提交stage,先递归提交所有的父级stage(漏掉的父级stage)
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
  logDebug("submitStage(" + stage + ")")
  if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
    val missing = getMissingParentStages(stage).sortBy(_.id)
    logDebug("missing: " + missing)
    if (missing.isEmpty) {
      logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
      submitMissingTasks(stage, jobId.get)
    } else {
      for (parent <- missing) {
      waitingStages += stage
} else {
  abortStage(stage, "No active job for stage " + stage.id, None)

    org.apache.spark.scheduler.DAGScheduler  --> private def submitMissingTasks
    taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())