06-spark-executor-源码分析

Spark Executor 工作原理

  • 需要特别注意是在 CoarseGrainedExecutorBackend启动时向 Driver注册 Executor其实质是注册 ExecutorBackend实例 ,和 Executor实例之间没有直接关系
  • CoarseGrainedExecutorBackend 是 Executor 运行所在的进程名称,Executor 才是真正处理Task的对象,Executor内部是通过线程池的方式来完成Task的计算的
  • CoarseGrainedExecutorBackend 和 Executor是一一对应的
  • CoarseGrainedExecutorBackend 是一个消息通信体(其实现了ThreadSafeRpcEndpoint)

executror 工作原理 图解

  • 源码

    // Start worker thread pool 从线程池中开启一个LaunchTask
    private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
    org.apache.spark.executor.Executor
    
    def launchTask(
        context: ExecutorBackend,
        taskId: Long,
        attemptNumber: Int,
        taskName: String,
        serializedTask: ByteBuffer): Unit = {
      val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
        serializedTask)
      runningTasks.put(taskId, tr)
      threadPool.execute(tr)
    }
    
    org.apache.spark.executor.Executor$TaskRunner
    
      class TaskRunner(
    execBackend: ExecutorBackend,
    val taskId: Long,
    val attemptNumber: Int,
    taskName: String,
    serializedTask: ByteBuffer)
    

    extends Runnable {

    --> def run() --> task.run 运行任务