02-spark控制台日志分析--spark-01-源码调试

源码

import org.apache.spark.{SparkContext, SparkConf}

/**
  * 开发: 刘文  
  * 邮箱: Email:[email protected]
  * 日期: 16/2/2   上午10:47
  * 功能: 统计 数据行数
  */
object RddCount extends App{
  /**
    * 指定本地hadoop 根目录,远程调试用到
    * 不设置环境变量HADOOP_HOME 时,会报错,所以需要设置此目录
    */
  System.setProperty("hadoop.home.dir", "/opt/modules/bigdata/hadoop/hadoop-2.6.0")
  DAGScheduler

  //指定本地hadoop 根目录,远程调试用到()

  val conf = new SparkConf()
  val jar = "/opt/workspace/bigdata/all_frame_intellij/spark-maven-scala-idea/target/spark-maven-scala-idea-1.0-SNAPSHOT.jar"
  conf.setJars(Array(jar))

  //spark://s0:7077  local
  conf.setAppName("原理调试").setMaster("spark://s0:7077")

  val sc = new SparkContext(conf)
  var path = "/opt/workspace/gitoschina/opensourceteams/data/input/wordCount"
  path = "hdfs://s0:9000/library/wordcount/input/Data"
  val textFile = sc.textFile(path,2)
  val count = textFile.count()
  println("统计行数:" + count)

  println("结束")
  Thread.sleep(1000 * 10000)

}

AppClient

  • ).org.apache.spark.deploy.client.AppClient$ClientEndpoint --> def onStart() --> registerWithMaster --> tryRegisterAllMasters --> 向master 注册应用程序(带上参数:appDescription, self)
       /**
       *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
       */
      private def tryRegisterAllMasters(): Array[JFuture[_]] = {
        for (masterAddress <- masterRpcAddresses) yield {
          registerMasterThreadPool.submit(new Runnable {
            override def run(): Unit = try {
              if (registered.get) {
                return
              }
              logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
              val masterRef =
                rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
              masterRef.send(RegisterApplication(appDescription, self))
            } catch {
              case ie: InterruptedException => // Cancelled
              case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
            }
          })
        }
      }
    

    SparkDeploySchedulerBackend

  • ).org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend --> def start() 已生成app ID

    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,

    command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    

    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) waitForRegistration() launcherBackend.setState(SparkAppHandle.State.RUNNING)

AppClient

  • ).AppClient 接收集群分配的资源(即:Executor) org.apache.spark.deploy.client.AppClient$ClientEndpoint --> def receive -->case ExecutorAdded

     override def receive: PartialFunction[Any, Unit] = {
    
     case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>   
    
  • ).AppClient 接收集群分配的资源(即:Executor) org.apache.spark.deploy.client.AppClient$ClientEndpoint --> def receive -->case ExecutorUpdated

     override def receive: PartialFunction[Any, Unit] = {
    
           case ExecutorUpdated(id, state, message, exitStatus) =>
             val fullId = appId + "/" + id
             val messageText = message.map(s => " (" + s + ")").getOrElse("")
             logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
             if (ExecutorState.isFinished(state)) {
               listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
             } 
    
  • ).开始作业 val count = textFile.count()

    DAGScheduler

  • ).org.apache.spark.scheduler.DAGScheduler$DAGSchedulerEventProcessLoop --> private def doOnReceive(event: DAGSchedulerEvent): Unit

        private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
          case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
            dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    
          case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
            dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    
          case StageCancelled(stageId) =>
            dagScheduler.handleStageCancellation(stageId)
    
          case JobCancelled(jobId) =>
            dagScheduler.handleJobCancellation(jobId)
    
          case JobGroupCancelled(groupId) =>
            dagScheduler.handleJobGroupCancelled(groupId)
    
          case AllJobsCancelled =>
            dagScheduler.doCancelAllJobs()
    
          case ExecutorAdded(execId, host) =>
            dagScheduler.handleExecutorAdded(execId, host)
    
          case ExecutorLost(execId) =>
            dagScheduler.handleExecutorLost(execId, fetchFailed = false)
    
          case BeginEvent(task, taskInfo) =>
            dagScheduler.handleBeginEvent(task, taskInfo)
    
          case GettingResultEvent(taskInfo) =>
            dagScheduler.handleGetTaskResult(taskInfo)
    
          case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
            dagScheduler.handleTaskCompletion(completion)
    
          case TaskSetFailed(taskSet, reason, exception) =>
            dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    
          case ResubmitFailedStages =>
            dagScheduler.resubmitFailedStages()
        }
    
          --> private[scheduler] def handleJobSubmitted    <br>
          -->  
          /**提交stage Submits stage, but first recursively submits any missing parents. */
           private def submitStage(stage: Stage) {
             /** Called when stage's parents are available and we can now do its task. */
             -->
           private def submitMissingTasks(stage: Stage, jobId: Int) {
           -->
           taskScheduler.submitTasks(new TaskSet(
               tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    

DAGScheduler

  • org.apache.spark.scheduler.TaskSchedulerImpl override def submitTasks(taskSet: TaskSet): Unit
    -->
    backend.reviveOffers()
    -->
  • org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend override def reviveOffers(): Unit
  • org.apache.spark.scheduler.TaskSchedulerImpl def resourceOffers