eclipse 新建 Spark项目并发布到集群中

准备工作

  • 操作系统 ubuntu 15 桌面版
  • 安装好hadoop集群
  • 安装好spark集群
  • scala 安装包 scala-2.10.6.tgz
  • spark 安装包 spark-1.6.0-bin-hadoop2.6.tgz
  • 准备scala开发工具

    linux 64版 scala-SDK-4.3.0-vfinal-2.11-linux.gtk.x86_64.tar.gz
    下载地址:http://scala-ide.org/download/sdk.html

操做步骤

  • 新建自定义库 scala-2.10.6

    window ==>Preferences==>Java==>User Libraries==>New==>输入自定义库名称(scala-2.10.6)==>OK==>选中scala-2.10.6==>Add External JARS ==> 找到 scala-2.10.6.tgz 安装目录lib下的(scala-actors.jar,scala-library.jar scala-reflect.jar)==>OK

  • 新建自定义库 spark-1.6.0-bin-hadoop2.6 window ==>Preferences==>Java==>User Libraries==>New==>输入自定义库名称(spark-1.6.0-bin-hadoop2.6)==>OK==>选中spark-1.6.0-bin-hadoop2.6==>Add External JARS ==> 找到 spark-1.6.0-bin-hadoop2.6.tgz 安装目录lib下的(spark-assembly-1.6.0-hadoop2.6.0.jar)==>OK

  • 打开scala idea 新建scala项目 File ==> new ==> Scala Project==>
    项目名称:WordCount
    JRE:use default JRE(如果不是JDK1.8的要换成1.8以上)
    ==>Next
    此时选中Libraries 选中 **Add Library ==>User Library ==>选择scala-2.10.6,spark-1.6.0-bin-hadoop2.6(删除原来的scala-2.11.7)==>OK
  • 为项目WordCount更改scala编译环境 选中项目WordCount ==>右键 ==>Propertites ==>Scala Compiler ==> 选中Use Project Setting ==>ScalaInstallation 选择 Latest 2.10 bundle(dynamic) ==>OK

  • 此时项目新建好了,可以开始进行spark开发了

本地 spark 程序开发

  • 新建 object WordCount.scala

    说明:项目根目录下新建文件/input/wordCount/wordCount.txt,/input/wordCount/wordCount2.txt 可以输入任意的数据,用空格格开就行, 下载地址:http://pan.baidu.com/s/1qXbv4CW

      package com.opensourceteam.bigdata.spark.quickstart
    
      import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
    
      /**
       * @author 刘文
       * email::[email protected]
       * 更多详情:https://opensourceteam.gitbooks.io/bigdata/content
       */
      object WordCount {
        def main(args: Array[String]): Unit = {
         /**
          * 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序
          * 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差
          */
          val conf = new SparkConf
          /**
           * appname: 应用的名称
           * master:主机地址     
           * 本地,不运行在集群值:local  
           * 运行在集群:spark://s0:7077
           */
          conf.setAppName("WordCount").setMaster("local") 
    
          /**
           * 创建SparkContext对象
           * SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等
           * 同时还会负则Spark程序往Master注册程序等
           * SparkContext是整个Spark应用程序中最为至关重要的一个对象
           */
          val sc =new  SparkContext(conf)
          /**
           * 根据具体的数据来源(HDFS、Hbse、local Fs、DB、S3等通过SparkContext来创建RDD)
           * RDD创建有三种方式:根据外部的数据来源例 如HDFS、根据Scala集合、由其它的RDD操作
           * 数据会被RDD划分成为一系列的Patitions,分配到每个Patition的数据属于一个Task的处理范畴
           */
          val path = "input/wordCount"
          val lines = sc.textFile(path, 3)  //读取文 件,并设置为一个Patitions (相当于几个Task去执行)
    
          val mapArray = lines.flatMap { x => x.split(" ") } //对每一行的字符串进行单词拆分并把把有行的拆分结果通过flat合并成为一个结果
          val mapMap = mapArray.map { x => (x,1) }
    
          val result  =  mapMap.reduceByKey(_+_) //对相同的Key进行累加
          result.foreach(x => println("value:"+ x))
           result.foreach(x => println("key:"+ x._1 + " value"  + x._2))
          result.saveAsTextFile("output")
    
          sc.stop()
        }
      } 
    
  • 右键运行

    选中项目 ==> 右键届新==>此时会多出一个文件夹叫output ,计算的结果就在该文件中

集群 spark 程序开发

  • 新建scala Object WordCountCluster.scala

      package com.opensourceteam.bigdata.spark.quickstart
      import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
    
      /**
       * @author 刘文
       * email::[email protected]
       * 更多详情:https://opensourceteam.gitbooks.io/bigdata/content
       */
      object WordCountCluster {
          def main(args: Array[String]): Unit = {
                  /**
          * 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序
          * 要连接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行,特别适合机器配置条件非常差
          */
          val conf = new SparkConf
          /**
           * appname: 应用的名称
           * master:主机地址     
           * 本地,不运行在集群值:local  
           * 运行在集群:spark://s0:7077
           */
          conf.setAppName("WordCountCluster").setMaster("spark://s0:7077") 
    
          /**
           * 创建SparkContext对象
           * SparkContext 是Spark程序的所有功能的唯一入口,无论是采用Scal,Java,Python,R等
           * 同时还会负则Spark程序往Master注册程序等
           * SparkContext是整个Spark应用程序中最为至关重要的一个对象
           */
          val sc =new  SparkContext(conf)
          /**
           * 根据具体的数据来源(HDFS、Hbse、local Fs、DB、S3等通过SparkContext来创建RDD)
           * RDD创建有三种方式:根据外部的数据来源例 如HDFS、根据Scala集合、由其它的RDD操作
           * 数据会被RDD划分成为一系列的Patitions,分配到每个Patition的数据属于一个Task的处理范畴
           */
          val path = "hdfs://s0:9000/library/wordcount/input/Data"
          val lines = sc.textFile(path, 3)  //读取文 件,并设置为一个Patitions (相当于几个Task去执行)
    
          val mapArray = lines.flatMap { x => x.split(" ") } //对每一行的字符串进行单词拆分并把把有行的拆分结果通过flat合并成为一个结果
          val mapMap = mapArray.map { x => (x,1) }
    
          val result  =  mapMap.reduceByKey(_+_) //对相同的Key进行累加
           result.collect().foreach(x => println("key:"+ x._1 + " value"  + x._2))
          result.saveAsTextFile("hdfs://s0:9000/library/wordcount/output/wordcount_jar_01")
    
          sc.stop()
          }
    
      } 
    

集群 spark 程序打包

  • 将项目打成jar包 选中项目右键 ==> export ==> Java ==> JAR file ==> 找到该项目,只选src,其它的都不要选 ==> JAR file 选择项目打成jar包后要保存的jar路径 ==> finish

集群 spark 程序提交

  • submit.sh

      #!/bin/bash
      SPARK_HOME=/opt/modules/bigdata/spark/spark-1.6.0-bin-hadoop2.6
       $SPARK_HOME/bin/spark-submit  --class  com.opensourceteam.bigdata.spark.quickstart.WordCountCluster  --master spark://s0:7077  /home/hadoop/workspaces/eclipse/bigdata/spark/spark-1.6.0-bin-hadoop2.6-practice/wordCount.jar