07-Hadoop-证明reduce中values中不主动排序的

准备工作

  • 文件 key-value-int.txt

      b=5
      b=3
      b=6
      c=10
      c=8
      c=12
      a=6
      a=8
      a=4
    
  • 文件 key-value-text.txt

      b=aa
      b=cc
      b=f
      c=a
      c=f
      c=2
      a=c
      a=a
      a=b
    

推出结论

  • Hadoop reducer 中 values中的值是不主动进行排序的,以hadoop读取数据源的顺序为准

代码证明

  • KeyValueReduceValuesIntWritable.java

      package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.mapreduce.wordcount;
      import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.GenericOptionsParser;
    
      import java.io.IOException;
    
      /**
       * 证明:reducer --> values  中的数据是不排序的
       */
      public class KeyValueReduceValuesIntWritable {
    
        public static class DataMapper
           extends Mapper<LongWritable, Text, Text, LongWritable>{
    
          Text outputKey = new Text();
          LongWritable outputValue = new LongWritable();
    
      public void map(LongWritable key, Text value, Context context
                      ) throws IOException, InterruptedException {

        String[] valueArray = value.toString().split("=");
        outputValue.set(Long.parseLong(valueArray[1]));
        outputKey.set(valueArray[0]);
        context.write(outputKey,outputValue);
      }
    }

    public static class DataReducer
         extends Reducer<Text,LongWritable,Text,Text> {

      Text outputValue = new Text();
      public void reduce(Text key, Iterable<LongWritable> values,
                         Context context
                         ) throws IOException, InterruptedException {
        StringBuffer s = new StringBuffer() ;
        for (LongWritable val : values) {
          s.append(val.toString()).append(",");
        }
        outputValue.set("      " + s.toString());
        context.write(key, outputValue);
      }
    }

    public static void main(String[] args) throws Exception {

        if(args == null || args.length ==0){
            args = new String[2];
            args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/key-value-int.txt";
            args[1] = "hdfs://s0:9000/library/wordcount/output/wordcount_jar_27";
        }

      HadoopFile.delFileToHDFS(args[1]);

      Configuration conf = new Configuration();
      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if (otherArgs.length < 2) {
        System.err.println("Usage: wordcount <in> [<in>...] <out>");
        System.exit(2);
      }
      Job job = Job.getInstance(conf, "word count");
      job.setJarByClass(KeyValueReduceValuesIntWritable.class);
      job.setMapperClass(DataMapper.class);
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(LongWritable.class);
      //job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(DataReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
      for (int i = 0; i < otherArgs.length - 1; ++i) {
        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
      }
      FileOutputFormat.setOutputPath(job,
        new Path(otherArgs[otherArgs.length - 1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    }
  • KeyValueReduceValuesText.java

      package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.mapreduce.wordcount;
      import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.GenericOptionsParser;
    
      import java.io.IOException;
    
      /**
       * 证明:reducer --> values  中的数据是不排序的
       */
      public class KeyValueReduceValuesText {
    
        public static class DataMapper
           extends Mapper<LongWritable, Text, Text, Text>{
    
          Text outputKey = new Text();
          Text outputValue = new Text();
    
        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException {
    
          String[] valueArray = value.toString().split("=");
          outputKey.set(valueArray[0]);
          outputValue.set(valueArray[1]);
          context.write(outputKey,outputValue);
        }
      }
    
      public static class DataReducer
           extends Reducer<Text,Text,Text,Text> {
    
        Text outputValue = new Text();
    
        public void reduce(Text key, Iterable<Text> values,
                           Context context
                           ) throws IOException, InterruptedException {
          StringBuffer s = new StringBuffer() ;
          for (Text val : values) {
            s.append(val.toString()).append(",");
          }
          outputValue.set("      " + s.toString());
          context.write(key,outputValue);
        }
      }
    
      public static void main(String[] args) throws Exception {
    
          if(args == null || args.length ==0){
              args = new String[2];
              args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/key-value-text.txt";
              args[1] = "hdfs://s0:9000/library/wordcount/output/wordcount_jar_27";
          }
    
        HadoopFile.delFileToHDFS(args[1]);
    
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
          System.err.println("Usage: wordcount <in> [<in>...] <out>");
          System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(KeyValueReduceValuesText.class);
        job.setMapperClass(DataMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(DataReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
          FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job,
          new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    
      }