07-spark-stage-源码分析
stage 划分过程
基本概念说明
- 没有父stage的stage 可以直接提交进行计算,因为没有依赖
- 有父stage的stage,不可以直接提交进行计算,因为有依赖,必须等父stage处理完成后,才可以进行计算
- org.apache.spark.scheduler.DAGScheduler-->handleMapStageSubmitted 中调用submitStage(finalStage),把最终的stage作为参数传递,注意handleMapStageSubmitted函数中,除了有 submitStage(finalStage)调用,还有submitWaitingStages()调用
- submitStage(finalStage) 提交没有父stage 的stage,并把有父stage的stage递归加入到集合 waitingStages中
- submitWaitingStages 循环提交集合中的stage,此时,集合中的所有的stage是有依赖关系的,并且他们的执行是有顺序的,因为后面的stage依赖前边的stage的计算
stage 图解
stage 提交源码分析
submiStage 源码分析
/** Submits stage, but first recursively submits any missing parents. * 提交stage, 但是先递归提交,漏掉的stage(即依次递归父stage,从最顶层stage开始提交) */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { //得到stage的直接父stages(可能是一个或多个) val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") //提交stage中的TaskSet submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } /** *因为有父stage,不能直接计算,所以都放到等待计算的stages中,此时不管先后顺序,因为计算的时候会先按stage的firstJobId排序 */ waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
- submitWaitingStages 源码分析
/** * Check for waiting or failed stages which are now eligible for resubmission. * Ordinarily run on every iteration of the event loop. * 检查 待待的或失败的 stages 现在是适合重新提交 * 通常对事件循环的每次迭代运行。 */ private def submitWaitingStages() { // TODO: We might want to run this less often, when we are sure that something has become // runnable that wasn't before. logTrace("Checking for newly runnable parent stages") logTrace("running: " + runningStages) logTrace("waiting: " + waitingStages) logTrace("failed: " + failedStages) val waitingStagesCopy = waitingStages.toArray waitingStages.clear() for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) { submitStage(stage) } }
stage 划分
- 1, Spark Application中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行;
- 2, Stage划分有宽依赖,也有窄依赖,什么产生宽依赖呢?例如reducByKey、groupByKey等等,可以产生宽依赖;;
3, 由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop
AppClient --> registerWithMaster -->
master 注册 application 持久化 application 发送给driver,报告注册状态 为应用程序可用资源调度(分配资源)
--> DAGScheduler
--> textFile.count()
-->sc.runJob
-->submitJob
-->dagScheduler.runJob
-->eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
-->/** *消息循环就是线程不断循环消息队列,所以可以不断的发消息,例如不断的提交新的Job,这样就可以不断的处理了 *那个线程没有消息的时候是阻塞的,来消息的时候才处理 * An event loop to receive events from the caller and process all events in the event thread. It * will start an exclusive event thread to process all events. * * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can * handle events in time to avoid the potential OOM. */ private[spark] abstract class EventLoop[E](name: String) extends Logging { private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() private val stopped = new AtomicBoolean(false) private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() }
-->回调 DAGScheduler onReceive 方法
-->case JobSubmitted
-->def handleJobSubmitted
--> newResultStage stage 包含了 父 stage
--> submitStage
-->submitMissingTasks
任务本地性算法的实现
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
-->getPreferredLocs
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
-> private[scheduler] def handleJobSubmitted -> private def submitStage(stage: Stage)
/**
* Submits stage, but first recursively submits any missing parents.
* 提交stage,先递归提交所有的父级stage(漏掉的父级stage)
*/
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//提交漏掉的tasks
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
-->
org.apache.spark.scheduler.DAGScheduler --> private def submitMissingTasks
-->
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())