SparkContext 的初始化
SparkContext 的初始化
Spark Driver 用于提交用户应用程序,实际可以看作 Spark 的客户端。了解 Spark Driver 的初始化,有助于读者理解用户应用程序在客户端的处理过程。 Spark Driver 的初始化始终围绕着 SparkContext 的初始化。SparkContext 可以算得上是所 有 Spark 应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动。SparkContext 初始 化完毕,才能向 Spark 集群提交任务。在平坦的公路上,发动机只需以较低的转速、较低的 功率就可以游刃有余;在山区,你可能需要一台能够提供大功率的发动机才能满足你的需求。 这些参数都是通过驾驶员操作油门、档位等传送给发动机的,而 SparkContext 的配置参数则 第 3 章 SparkContext 的初始化  29 SparkConf 的构造很简单,主要是通过 ConcurrentHashMap 来维护各种 Spark 的配置属 由 SparkConf 负责,SparkConf 就是你的操作面板。 性。SparkConf 代码结构见代码清单 3-1。Spark 的配置属性都是以“spark.”开头的字符串。
* 代码清单3-1 SparkConf代码结构
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
import SparkConf._
def this() = this(true)
private val settings = new ConcurrentHashMap[String, String]()
if (loadDefaults) {
// 加载任何以spark.开头的系统属性
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value)
}
} //其余代码省略
现在开始介绍 SparkContext。SparkContext 的初始化步骤如下:
- 1)创建 Spark 执行环境 SparkEnv;
- 2)创建 RDD 清理器 metadataCleaner;
- 3)创建并初始化 Spark UI;
- 4)Hadoop 相关配置及 Executor 环境变量的设置; 5)创建任务调度 TaskScheduler;
- 6)创建和启动 DAGScheduler;
- 7)TaskScheduler 的启动;
- 8)初始化块管理器 BlockManager(BlockManager 是存储体系的主要组件之一,将在第 4 章介绍);
- 9)启动测量系统 MetricsSystem;
- 10)创建和启动 Executor 分配管理器 ExecutorAllocationManager; 11)ContextCleaner * 的创建与启动;
- 12)Spark 环境更新;
- 13)创建 DAGSchedulerSource 和 BlockManagerSource;
- 14)将 SparkContext 标记为激活。
SparkContext 的主构造器参数为 SparkConf,其实现如下。
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { private val creationSite: CallSite = Utils.getCallSite()
上面代码中的 CallSite 存储了线程栈中最靠近栈顶的用户类及最靠近栈底的 Scala 或者 Spark 核心类信息。Utils.getCallSite 的详细信息见附录 A。SparkContext 默认只有一个实例(由 private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false) SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
30 核心设计篇
属性 spark.driver.allowMultipleContexts 来控制,用户需要多个 SparkContext 实例时,可以将 其设置为 true),方法 markPartiallyConstructed 用来确保实例的唯一性,并将当前 SparkContext 标记为正在构建中。 接下来会对 SparkConf 进行复制,然后对各种配置信息进行校验,代码如下。 private[spark] val conf = config.clone()
throw new SparkException("An application name must be set in your configuration") }conf.validateSettings() if (!conf.contains("spark.master")) { } throw new SparkException("A master URL must be set in your configuration") if (!conf.contains("spark.app.name")) {
- 从上面校验的代码看到必须指定属性 spark.master 和 spark.app.name,否则会抛出异常, 结束初始化过程。spark.master 用于设置部署模式,spark.app.name 用于指定应用程序名称。