经典案例WordCount

需求描述

  • 经典案例wordCount分析(附数据计算推理过程) *

    开始准备

    • HDFS 数据文件

      wordCount.txt,wordCount2.txt

       hadoop@s0:~/temp$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -ls -R /library/input/wordCount/Data
       ls: `/library/input/wordCount/Data': No such file or directory
       hadoop@s0:~/temp$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -ls -R /library/wordcount/input/Data
       -rw-r--r--   2 hadoop supergroup         50 2016-01-18 05:15 /library/wordcount/input/Data/wordCount.txt
       -rw-r--r--   2 hadoop supergroup         76 2016-01-18 18:34 /library/wordcount/input/Data/wordCount2.txt
       hadoop@s0:~/temp$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/input/Data/wordCount.txt
       Hadoop Scala Scala Spark
       Spark Hadoop Scala
       Hello
       hadoop@s0:~/temp$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/input/Data/wordCount2.txt
       Hello Spark Java Eclipse Subline You
       English Eclipse Subline
       Intellij Hello
      

操做步骤

  • flatten 将嵌套结构扁平化为一个层次的集合

      scala> val result = List(List(1, 2), List(3, 4)).flatten
      result: List[Int] = List(1, 2, 3, 4)
    
  • flatMap是一种常用的组合子,结合映射[mapping]和扁平化[flattening]。 flatMap需要一个处理嵌套列表的函数,然后将结果串连起来。

      scala> val nestedNumbers = List(List(1, 2), List(3, 4))
      nestedNumbers: List[List[Int]] = List(List(1, 2), List(3, 4))
    
      scala> nestedNumbers.flatMap(x => x.map(_ * 2))
      res0: List[Int] = List(2, 4, 6, 8)
    
      可以把它看做是“先映射后扁平化”的快捷操作:
    
      scala> nestedNumbers.map((x: List[Int]) => x.map(_ * 2)).flatten
      res1: List[Int] = List(2, 4, 6, 8)
    
  • 从HDFS文件系统中读取数据文件放到RDD中

    //此时RDD的data变量相当于数据分片的集合,集合中的每一个元素相当于文件系统中的一行数据
    val data = sc.textFile("/library/wordcount/input/Data)
  • 循环遍历data每一个元素的数据

      data.collect().foreach(x => println("读取文件的内容:" + x))
      ==========================================================================================
      得到的结果是:
      读取文件的内容:Hadoop Scala Scala Spark
      读取文件的内容:Spark Hadoop Scala
      读取文件的内容:Hello
      读取文件的内容:Hello Spark Java Eclipse Subline You
      读取文件的内容:English Eclipse Subline
      读取文件的内容:Intellij Hello
    
  • RDD map 操作后的数据结构分析

      //data集合中的每一个元素按空格分隔,返回一个新的集合,即新的RDD,新集合中每一个元素都是以数组形式存储 
      val dataSplit = data.map(_.split(" "))
    
  • 循环遍历 dataSplit 每一个元素的数据

      dataSplit.collect().foreach(x => println("读取文件的内容:" + x))
      ==========================================================================================
      读取文件的内容:[Ljava.lang.String;@1e4e76b2
      读取文件的内容:[Ljava.lang.String;@3a87209c
      读取文件的内容:[Ljava.lang.String;@57291216
      读取文件的内容:[Ljava.lang.String;@13d88528
      读取文件的内容:[Ljava.lang.String;@7ece9a07
      读取文件的内容:[Ljava.lang.String;@7dcec682
    
      dataSplit.collect().foreach(x => println("读取文件的内容:" + x(0)))
      ==========================================================================================
      读取文件的内容:Hadoop
      读取文件的内容:Spark
      读取文件的内容:Hello
      读取文件的内容:Hello
      读取文件的内容:English
      读取文件的内容:Intellij
    
  • RDD faltMap 操作后的数据结构分析

      //dataflatMap是先map,再flatten之后的新的集合
      val dataflatMap = data.flatMap(_.split(" "))
    
  • 循环遍历 dataflatMap 每一个元素的数据

      dataflatMap.collect.foreach(x => println("data:" + x))
      ==========================================================================================
      得到的结果是:
      data:Hadoop
      data:Scala
      data:Scala
      data:Spark
      data:Spark
      data:Hadoop
      data:Scala
      data:Hello
      data:Hello
      data:Spark
      data:Java
      data:Eclipse
      data:Subline
      data:You
      data:English
      data:Eclipse
      data:Subline
      data:Intellij
      data:Hello
    
  • 构建map统计对象

       val map = dataflatMap.map(x => (x,1)) 
    
  • 循环遍历 map 每一个元素的数据

     map.saveAsTextFile("/library/wordcount/output/spark_word_map")
      ==========================================================================================
    
      /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_map/part-00000
      (Hadoop,1)
      (Scala,1)
      (Scala,1)
      (Spark,1)
      (Spark,1)
      (Hadoop,1)
      (Scala,1)
      (Hello,1)
      /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_map/part-00001
      (Hello,1)
      (Spark,1)
      (Java,1)
      (Eclipse,1)
      (Subline,1)
      (You,1)
      (English,1)
      (Eclipse,1)
      (Subline,1)
      (Intellij,1)
      (Hello,1)
    
  • 对map中相同的Key进行累加操做,并把结果保存到HDFS文件系统中

    val reslut = map.reduceByKey(_+_) 
    result.saveAsTextFile("/library/wordcount/output/spark_word_count_1")
    
  • 查看输出结果内容

      hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -ls -R /library/wordcount/output/spark_word_count_1
      -rw-r--r--   2 hadoop supergroup          0 2016-01-18 19:10 /library/wordcount/output/spark_word_count_1/_SUCCESS
      -rw-r--r--   2 hadoop supergroup         35 2016-01-18 19:10 /library/wordcount/output/spark_word_count_1/part-00000
      -rw-r--r--   2 hadoop supergroup         22 2016-01-18 19:10 /library/wordcount/output/spark_word_count_1/part-00001
      -rw-r--r--   2 hadoop supergroup         50 2016-01-18 19:10 /library/wordcount/output/spark_word_count_1/part-00002
      hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_count_1/_SUCCESS
      hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_count_1/part-00000
      (Spark,3)
      (Intellij,1)
      (English,1)
      hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_count_1/part-00001
      (Eclipse,2)
      (Scala,3)
      hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_count_1/part-00002
      (Hello,3)
      (You,1)
      (Java,1)
      (Subline,2)
      (Hadoop,2)
    
  • 将输出结果汇总到一个文件中

     sc.textFile("/library/wordcount/input/Data").flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).repartition(1).saveAsTextFile("/library/wordcount/output/spark_word_count")
    
  • 将输出结果排序

      sc.textFile("/library/wordcount/input/Data").flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).saveAsTextFile("/library/wordcount/output/spark_word_count")