02-spark控制台日志分析--spark-01-源码调试
源码
import org.apache.spark.{SparkContext, SparkConf}
/**
* 开发: 刘文
* 邮箱: Email:[email protected]
* 日期: 16/2/2 上午10:47
* 功能: 统计 数据行数
*/
object RddCount extends App{
/**
* 指定本地hadoop 根目录,远程调试用到
* 不设置环境变量HADOOP_HOME 时,会报错,所以需要设置此目录
*/
System.setProperty("hadoop.home.dir", "/opt/modules/bigdata/hadoop/hadoop-2.6.0")
DAGScheduler
//指定本地hadoop 根目录,远程调试用到()
val conf = new SparkConf()
val jar = "/opt/workspace/bigdata/all_frame_intellij/spark-maven-scala-idea/target/spark-maven-scala-idea-1.0-SNAPSHOT.jar"
conf.setJars(Array(jar))
//spark://s0:7077 local
conf.setAppName("原理调试").setMaster("spark://s0:7077")
val sc = new SparkContext(conf)
var path = "/opt/workspace/gitoschina/opensourceteams/data/input/wordCount"
path = "hdfs://s0:9000/library/wordcount/input/Data"
val textFile = sc.textFile(path,2)
val count = textFile.count()
println("统计行数:" + count)
println("结束")
Thread.sleep(1000 * 10000)
}
AppClient
- ).org.apache.spark.deploy.client.AppClient$ClientEndpoint --> def onStart() --> registerWithMaster --> tryRegisterAllMasters
--> 向master 注册应用程序(带上参数:appDescription, self)
/** * Register with all masters asynchronously and returns an array `Future`s for cancellation. */ private def tryRegisterAllMasters(): Array[JFuture[_]] = { for (masterAddress <- masterRpcAddresses) yield { registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { if (registered.get) { return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) masterRef.send(RegisterApplication(appDescription, self)) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } }) } }
SparkDeploySchedulerBackend
).org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend --> def start() 已生成app ID
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) waitForRegistration() launcherBackend.setState(SparkAppHandle.State.RUNNING)
AppClient
).AppClient 接收集群分配的资源(即:Executor) org.apache.spark.deploy.client.AppClient$ClientEndpoint --> def receive -->case ExecutorAdded
override def receive: PartialFunction[Any, Unit] = { case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
).AppClient 接收集群分配的资源(即:Executor) org.apache.spark.deploy.client.AppClient$ClientEndpoint --> def receive -->case ExecutorUpdated
override def receive: PartialFunction[Any, Unit] = { case ExecutorUpdated(id, state, message, exitStatus) => val fullId = appId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) }
- ).开始作业 val count = textFile.count()
DAGScheduler
).org.apache.spark.scheduler.DAGScheduler$DAGSchedulerEventProcessLoop --> private def doOnReceive(event: DAGSchedulerEvent): Unit
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } --> private[scheduler] def handleJobSubmitted <br> --> /**提交stage Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { /** Called when stage's parents are available and we can now do its task. */ --> private def submitMissingTasks(stage: Stage, jobId: Int) { --> taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
DAGScheduler
- org.apache.spark.scheduler.TaskSchedulerImpl
override def submitTasks(taskSet: TaskSet): Unit
-->
backend.reviveOffers()
--> - org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend override def reviveOffers(): Unit
- org.apache.spark.scheduler.TaskSchedulerImpl def resourceOffers