07-Hadoop-证明reduce中values中不主动排序的
准备工作
文件 key-value-int.txt
b=5 b=3 b=6 c=10 c=8 c=12 a=6 a=8 a=4
文件 key-value-text.txt
b=aa b=cc b=f c=a c=f c=2 a=c a=a a=b
推出结论
- Hadoop reducer 中 values中的值是不主动进行排序的,以hadoop读取数据源的顺序为准
代码证明
KeyValueReduceValuesIntWritable.java
package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.mapreduce.wordcount; import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; /** * 证明:reducer --> values 中的数据是不排序的 */ public class KeyValueReduceValuesIntWritable { public static class DataMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ Text outputKey = new Text(); LongWritable outputValue = new LongWritable();
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String[] valueArray = value.toString().split("=");
outputValue.set(Long.parseLong(valueArray[1]));
outputKey.set(valueArray[0]);
context.write(outputKey,outputValue);
}
}
public static class DataReducer
extends Reducer<Text,LongWritable,Text,Text> {
Text outputValue = new Text();
public void reduce(Text key, Iterable<LongWritable> values,
Context context
) throws IOException, InterruptedException {
StringBuffer s = new StringBuffer() ;
for (LongWritable val : values) {
s.append(val.toString()).append(",");
}
outputValue.set(" " + s.toString());
context.write(key, outputValue);
}
}
public static void main(String[] args) throws Exception {
if(args == null || args.length ==0){
args = new String[2];
args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/key-value-int.txt";
args[1] = "hdfs://s0:9000/library/wordcount/output/wordcount_jar_27";
}
HadoopFile.delFileToHDFS(args[1]);
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(KeyValueReduceValuesIntWritable.class);
job.setMapperClass(DataMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(DataReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
KeyValueReduceValuesText.java
package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.mapreduce.wordcount; import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; /** * 证明:reducer --> values 中的数据是不排序的 */ public class KeyValueReduceValuesText { public static class DataMapper extends Mapper<LongWritable, Text, Text, Text>{ Text outputKey = new Text(); Text outputValue = new Text(); public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String[] valueArray = value.toString().split("="); outputKey.set(valueArray[0]); outputValue.set(valueArray[1]); context.write(outputKey,outputValue); } } public static class DataReducer extends Reducer<Text,Text,Text,Text> { Text outputValue = new Text(); public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException { StringBuffer s = new StringBuffer() ; for (Text val : values) { s.append(val.toString()).append(","); } outputValue.set(" " + s.toString()); context.write(key,outputValue); } } public static void main(String[] args) throws Exception { if(args == null || args.length ==0){ args = new String[2]; args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/key-value-text.txt"; args[1] = "hdfs://s0:9000/library/wordcount/output/wordcount_jar_27"; } HadoopFile.delFileToHDFS(args[1]); Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(KeyValueReduceValuesText.class); job.setMapperClass(DataMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //job.setCombinerClass(IntSumReducer.class); job.setReducerClass(DataReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }