04-spark-master-源码分析
master
接收应用程序注册,分配资源,启动workers上的Executors
org.apache.spark.deploy.master --> override def receive
-->/** * 注册 application * 持久化 application * 发送给driver,报告注册状态 * 为应用程序可用资源调度(分配资源) */ case RegisterApplication(description, driver) => { // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) val app = createApplication(description, driver) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) persistenceEngine.addApplication(app) driver.send(RegisteredApplication(app.id, self)) schedule() } }
/**
*启动driver *调用在 开启workers上的Executors *Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return } // Drivers take strict precedence over executors val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { for (driver <- waitingDrivers) { if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver } } } startExecutorsOnWorkers()
}
/**
* Schedule and launch executors on workers */
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. for (app <- waitingApps if app.coresLeft > 0) { val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor // Filter out workers that don't have enough resources to launch an executor val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we've decided how many cores to allocate on each worker, let's allocate them for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } }
}