10-spark-task-任务本地性算法实现

  • org.apache.spark.scheduler.DAGScheduler --> getPreferredLocs
  • 在函数submitMissingTasks中会通过调用以下代码来获得任务的本地性:

      val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
        stage match {
          case s: ShuffleMapStage =>
            partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
          case s: ResultStage =>
            val job = s.activeJob.get
            partitionsToCompute.map { id =>
              val p = s.partitions(id)
              (id, getPreferredLocs(stage.rdd, p))
            }.toMap
        }
      } catch {
        case NonFatal(e) =>
          stage.makeNewStageAttempt(partitionsToCompute.size)
          listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
          abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
          runningStages -= stage
          return
      }
    
  • 具体一个Partition中的数据本地性的算法实现为下述代码中,计算每一个partition所对应block存储的主机上

    /**
     * Gets the locality information associated with a partition of a particular RDD.
     *
     * This method is thread-safe and is called from both DAGScheduler and SparkContext.
     *
     * @param rdd whose partitions are to be looked at
     * @param partition to lookup locality information for
     * @return list of machines that are preferred by the partition
     */
    private[spark]
    def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
      getPreferredLocsInternal(rdd, partition, new HashSet)
    }
    
  • 在具体算法实现的时候首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性的信息,如果有的话这直接返回;如果没有首先会调用rdd.getPreferedLocations 例如想让Spark运行在HBase上或者一种现在还没有直接直接的数据库上面,此时开发者需要自定义RDD,为了保证Task计算的数据本地性,最为关键的方式就是必须实现RDD的getPreferedLocations
  • tasks 指pai任务到rdd,partition所在的机器上
    val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.internalAccumulators)
          }

        case stage: ResultStage =>
          val job = stage.activeJob.get
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, stage.internalAccumulators)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
        runningStages -= stage
        return
    }

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)

      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
      }
      logDebug(debugString)
    }