eclipse 新建 Spark项目并发布到集群中
准备工作
- 操作系统 ubuntu 15 桌面版
- 安装好hadoop集群
- 安装好spark集群
- scala 安装包 scala-2.10.6.tgz
- spark 安装包 spark-1.6.0-bin-hadoop2.6.tgz
准备scala开发工具
linux 64版 scala-SDK-4.3.0-vfinal-2.11-linux.gtk.x86_64.tar.gz
下载地址:http://scala-ide.org/download/sdk.html
操做步骤
新建自定义库 scala-2.10.6
window ==>Preferences==>Java==>User Libraries==>New==>输入自定义库名称(scala-2.10.6)==>OK==>选中scala-2.10.6==>Add External JARS ==> 找到 scala-2.10.6.tgz 安装目录lib下的(scala-actors.jar,scala-library.jar scala-reflect.jar)==>OK
新建自定义库 spark-1.6.0-bin-hadoop2.6 window ==>Preferences==>Java==>User Libraries==>New==>输入自定义库名称(spark-1.6.0-bin-hadoop2.6)==>OK==>选中spark-1.6.0-bin-hadoop2.6==>Add External JARS ==> 找到 spark-1.6.0-bin-hadoop2.6.tgz 安装目录lib下的(spark-assembly-1.6.0-hadoop2.6.0.jar)==>OK
- 打开scala idea 新建scala项目
File ==> new ==> Scala Project==>
项目名称:WordCount
JRE:use default JRE(如果不是JDK1.8的要换成1.8以上)
==>Next
此时选中Libraries 选中 **Add Library ==>User Library ==>选择scala-2.10.6,spark-1.6.0-bin-hadoop2.6(删除原来的scala-2.11.7)==>OK 为项目WordCount更改scala编译环境 选中项目WordCount ==>右键 ==>Propertites ==>Scala Compiler ==> 选中Use Project Setting ==>ScalaInstallation 选择 Latest 2.10 bundle(dynamic) ==>OK
此时项目新建好了,可以开始进行spark开发了
本地 spark 程序开发
新建 object WordCount.scala
说明:项目根目录下新建文件/input/wordCount/wordCount.txt,/input/wordCount/wordCount2.txt 可以输入任意的数据,用空格格开就行, 下载地址:http://pan.baidu.com/s/1qXbv4CW
package com.opensourceteam.bigdata.spark.quickstart import org.apache.spark.SparkConf import org.apache.spark.SparkContext /** * @author 刘文 * email::[email protected] * 更多详情:https://opensourceteam.gitbooks.io/bigdata/content */ object WordCount { def main(args: Array[String]): Unit = { /** * 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序 * 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差 */ val conf = new SparkConf /** * appname: 应用的名称 * master:主机地址 * 本地,不运行在集群值:local * 运行在集群:spark://s0:7077 */ conf.setAppName("WordCount").setMaster("local") /** * 创建SparkContext对象 * SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等 * 同时还会负则Spark程序往Master注册程序等 * SparkContext是整个Spark应用程序中最为至关重要的一个对象 */ val sc =new SparkContext(conf) /** * 根据具体的数据来源(HDFS、Hbse、local Fs、DB、S3等通过SparkContext来创建RDD) * RDD创建有三种方式:根据外部的数据来源例 如HDFS、根据Scala集合、由其它的RDD操作 * 数据会被RDD划分成为一系列的Patitions,分配到每个Patition的数据属于一个Task的处理范畴 */ val path = "input/wordCount" val lines = sc.textFile(path, 3) //读取文 件,并设置为一个Patitions (相当于几个Task去执行) val mapArray = lines.flatMap { x => x.split(" ") } //对每一行的字符串进行单词拆分并把把有行的拆分结果通过flat合并成为一个结果 val mapMap = mapArray.map { x => (x,1) } val result = mapMap.reduceByKey(_+_) //对相同的Key进行累加 result.foreach(x => println("value:"+ x)) result.foreach(x => println("key:"+ x._1 + " value" + x._2)) result.saveAsTextFile("output") sc.stop() } }
右键运行
选中项目 ==> 右键届新==>此时会多出一个文件夹叫output ,计算的结果就在该文件中
集群 spark 程序开发
新建scala Object WordCountCluster.scala
package com.opensourceteam.bigdata.spark.quickstart import org.apache.spark.SparkConf import org.apache.spark.SparkContext /** * @author 刘文 * email::[email protected] * 更多详情:https://opensourceteam.gitbooks.io/bigdata/content */ object WordCountCluster { def main(args: Array[String]): Unit = { /** * 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序 * 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差 */ val conf = new SparkConf /** * appname: 应用的名称 * master:主机地址 * 本地,不运行在集群值:local * 运行在集群:spark://s0:7077 */ conf.setAppName("WordCountCluster").setMaster("spark://s0:7077") /** * 创建SparkContext对象 * SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等 * 同时还会负则Spark程序往Master注册程序等 * SparkContext是整个Spark应用程序中最为至关重要的一个对象 */ val sc =new SparkContext(conf) /** * 根据具体的数据来源(HDFS、Hbse、local Fs、DB、S3等通过SparkContext来创建RDD) * RDD创建有三种方式:根据外部的数据来源例 如HDFS、根据Scala集合、由其它的RDD操作 * 数据会被RDD划分成为一系列的Patitions,分配到每个Patition的数据属于一个Task的处理范畴 */ val path = "hdfs://s0:9000/library/wordcount/input/Data" val lines = sc.textFile(path, 3) //读取文 件,并设置为一个Patitions (相当于几个Task去执行) val mapArray = lines.flatMap { x => x.split(" ") } //对每一行的字符串进行单词拆分并把把有行的拆分结果通过flat合并成为一个结果 val mapMap = mapArray.map { x => (x,1) } val result = mapMap.reduceByKey(_+_) //对相同的Key进行累加 result.collect().foreach(x => println("key:"+ x._1 + " value" + x._2)) result.saveAsTextFile("hdfs://s0:9000/library/wordcount/output/wordcount_jar_01") sc.stop() } }
集群 spark 程序打包
- 将项目打成jar包 选中项目右键 ==> export ==> Java ==> JAR file ==> 找到该项目,只选src,其它的都不要选 ==> JAR file 选择项目打成jar包后要保存的jar路径 ==> finish
集群 spark 程序提交
submit.sh
#!/bin/bash SPARK_HOME=/opt/modules/bigdata/spark/spark-1.6.0-bin-hadoop2.6 $SPARK_HOME/bin/spark-submit --class com.opensourceteam.bigdata.spark.quickstart.WordCountCluster --master spark://s0:7077 /home/hadoop/workspaces/eclipse/bigdata/spark/spark-1.6.0-bin-hadoop2.6-practice/wordCount.jar