04-spark-master-源码分析

master

接收应用程序注册,分配资源,启动workers上的Executors

  • org.apache.spark.deploy.master --> override def receive
    -->

     /**
     * 注册 application
     * 持久化 application
     * 发送给driver,报告注册状态
     * 为应用程序可用资源调度(分配资源)
     */
     case RegisterApplication(description, driver) => {
       // TODO Prevent repeated registrations from some driver
       if (state == RecoveryState.STANDBY) {
         // ignore, don't send response
       } else {
         logInfo("Registering app " + description.name)
         val app = createApplication(description, driver)
         registerApplication(app)
         logInfo("Registered app " + description.name + " with ID " + app.id)
         persistenceEngine.addApplication(app)
         driver.send(RegisteredApplication(app.id, self))
         schedule()
       }
     }
    

    /**

    *启动driver
    *调用在 开启workers上的Executors
    *Schedule the currently available resources among waiting apps. This method will be called
    * every time a new app joins or resource availability changes.
    */
    

    private def schedule(): Unit = {

     if (state != RecoveryState.ALIVE) { return }
     // Drivers take strict precedence over executors
     val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
     for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
       for (driver <- waitingDrivers) {
         if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
           launchDriver(worker, driver)
           waitingDrivers -= driver
         }
       }
     }
     startExecutorsOnWorkers()
    

    }

    /**

    * Schedule and launch executors on workers
    */
    

    private def startExecutorsOnWorkers(): Unit = {

     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
     // in the queue, then the second app, etc.
     for (app <- waitingApps if app.coresLeft > 0) {
       val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
       // Filter out workers that don't have enough resources to launch an executor
       val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
         .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
           worker.coresFree >= coresPerExecutor.getOrElse(1))
         .sortBy(_.coresFree).reverse
       val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
    
       // Now that we've decided how many cores to allocate on each worker, let's allocate them
       for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
         allocateWorkerResourceToExecutors(
           app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
       }
     }
    

    }