经典案例WordCount
需求描述
经典案例wordCount分析(附数据计算推理过程) *
开始准备
HDFS 数据文件
wordCount.txt,wordCount2.txt
hadoop@s0:~/temp$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -ls -R /library/input/wordCount/Data ls: `/library/input/wordCount/Data': No such file or directory hadoop@s0:~/temp$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -ls -R /library/wordcount/input/Data -rw-r--r-- 2 hadoop supergroup 50 2016-01-18 05:15 /library/wordcount/input/Data/wordCount.txt -rw-r--r-- 2 hadoop supergroup 76 2016-01-18 18:34 /library/wordcount/input/Data/wordCount2.txt hadoop@s0:~/temp$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/input/Data/wordCount.txt Hadoop Scala Scala Spark Spark Hadoop Scala Hello hadoop@s0:~/temp$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/input/Data/wordCount2.txt Hello Spark Java Eclipse Subline You English Eclipse Subline Intellij Hello
操做步骤
flatten 将嵌套结构扁平化为一个层次的集合
scala> val result = List(List(1, 2), List(3, 4)).flatten result: List[Int] = List(1, 2, 3, 4)
flatMap是一种常用的组合子,结合映射[mapping]和扁平化[flattening]。 flatMap需要一个处理嵌套列表的函数,然后将结果串连起来。
scala> val nestedNumbers = List(List(1, 2), List(3, 4)) nestedNumbers: List[List[Int]] = List(List(1, 2), List(3, 4)) scala> nestedNumbers.flatMap(x => x.map(_ * 2)) res0: List[Int] = List(2, 4, 6, 8) 可以把它看做是“先映射后扁平化”的快捷操作: scala> nestedNumbers.map((x: List[Int]) => x.map(_ * 2)).flatten res1: List[Int] = List(2, 4, 6, 8)
从HDFS文件系统中读取数据文件放到RDD中
//此时RDD的data变量相当于数据分片的集合,集合中的每一个元素相当于文件系统中的一行数据
val data = sc.textFile("/library/wordcount/input/Data)
循环遍历data每一个元素的数据
data.collect().foreach(x => println("读取文件的内容:" + x)) ========================================================================================== 得到的结果是: 读取文件的内容:Hadoop Scala Scala Spark 读取文件的内容:Spark Hadoop Scala 读取文件的内容:Hello 读取文件的内容:Hello Spark Java Eclipse Subline You 读取文件的内容:English Eclipse Subline 读取文件的内容:Intellij Hello
RDD map 操作后的数据结构分析
//data集合中的每一个元素按空格分隔,返回一个新的集合,即新的RDD,新集合中每一个元素都是以数组形式存储 val dataSplit = data.map(_.split(" "))
循环遍历 dataSplit 每一个元素的数据
dataSplit.collect().foreach(x => println("读取文件的内容:" + x)) ========================================================================================== 读取文件的内容:[Ljava.lang.String;@1e4e76b2 读取文件的内容:[Ljava.lang.String;@3a87209c 读取文件的内容:[Ljava.lang.String;@57291216 读取文件的内容:[Ljava.lang.String;@13d88528 读取文件的内容:[Ljava.lang.String;@7ece9a07 读取文件的内容:[Ljava.lang.String;@7dcec682 dataSplit.collect().foreach(x => println("读取文件的内容:" + x(0))) ========================================================================================== 读取文件的内容:Hadoop 读取文件的内容:Spark 读取文件的内容:Hello 读取文件的内容:Hello 读取文件的内容:English 读取文件的内容:Intellij
RDD faltMap 操作后的数据结构分析
//dataflatMap是先map,再flatten之后的新的集合 val dataflatMap = data.flatMap(_.split(" "))
循环遍历 dataflatMap 每一个元素的数据
dataflatMap.collect.foreach(x => println("data:" + x)) ========================================================================================== 得到的结果是: data:Hadoop data:Scala data:Scala data:Spark data:Spark data:Hadoop data:Scala data:Hello data:Hello data:Spark data:Java data:Eclipse data:Subline data:You data:English data:Eclipse data:Subline data:Intellij data:Hello
构建map统计对象
val map = dataflatMap.map(x => (x,1))
循环遍历 map 每一个元素的数据
map.saveAsTextFile("/library/wordcount/output/spark_word_map") ========================================================================================== /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_map/part-00000 (Hadoop,1) (Scala,1) (Scala,1) (Spark,1) (Spark,1) (Hadoop,1) (Scala,1) (Hello,1) /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_map/part-00001 (Hello,1) (Spark,1) (Java,1) (Eclipse,1) (Subline,1) (You,1) (English,1) (Eclipse,1) (Subline,1) (Intellij,1) (Hello,1)
对map中相同的Key进行累加操做,并把结果保存到HDFS文件系统中
val reslut = map.reduceByKey(_+_) result.saveAsTextFile("/library/wordcount/output/spark_word_count_1")
查看输出结果内容
hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -ls -R /library/wordcount/output/spark_word_count_1 -rw-r--r-- 2 hadoop supergroup 0 2016-01-18 19:10 /library/wordcount/output/spark_word_count_1/_SUCCESS -rw-r--r-- 2 hadoop supergroup 35 2016-01-18 19:10 /library/wordcount/output/spark_word_count_1/part-00000 -rw-r--r-- 2 hadoop supergroup 22 2016-01-18 19:10 /library/wordcount/output/spark_word_count_1/part-00001 -rw-r--r-- 2 hadoop supergroup 50 2016-01-18 19:10 /library/wordcount/output/spark_word_count_1/part-00002 hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_count_1/_SUCCESS hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_count_1/part-00000 (Spark,3) (Intellij,1) (English,1) hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_count_1/part-00001 (Eclipse,2) (Scala,3) hadoop@s2:~$ /opt/modules/bigdata/hadoop/hadoop-2.6.0/bin/hadoop fs -cat /library/wordcount/output/spark_word_count_1/part-00002 (Hello,3) (You,1) (Java,1) (Subline,2) (Hadoop,2)
将输出结果汇总到一个文件中
sc.textFile("/library/wordcount/input/Data").flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).repartition(1).saveAsTextFile("/library/wordcount/output/spark_word_count")
将输出结果排序
sc.textFile("/library/wordcount/input/Data").flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).saveAsTextFile("/library/wordcount/output/spark_word_count")