09-Hadoop自定义reducer-values升序或降序排序

功能说明

  • 对 hadoop reducer 中 values中的值进行自定义升序或降序排序

数据文件

  • key-value-text.txt

      b=4
      b=5
      b=2
      c=a
      c=f
      c=c
      a=c
      a=a
      a=b
    
  • 排序后的

      (a,c)          a,b,c
      (b,5)          2,4,5
      (c,f)          a,c,f
    
  • 源码 KeyValueReduceValuesTextSorted.java文件

      package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.algorithm.join;
      import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.io.WritableComparable;
      import org.apache.hadoop.io.WritableComparator;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.GenericOptionsParser;
    
      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
    
      /**
       * 将 values 中的元素排序
       */
      public class KeyValueReduceValuesTextSorted {
    
        public static class DataMapper
           extends Mapper<LongWritable, Text, CombinationKey, Text>{
    
          CombinationKey keyOut = new CombinationKey();
          Text outputValue = new Text();
    
        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException {
    
          String[] valueArray = value.toString().split("=");
          keyOut.setKey(valueArray[0]);
          keyOut.setValue(valueArray[1]);
          outputValue.set(valueArray[1]);
          context.write(keyOut,outputValue);
        }
      }
    
      public static class DataReducer
           extends Reducer<CombinationKey,Text,Text,Text> {
    
        Text keyOut = new Text();
        Text outputValue = new Text();
    
        public void reduce(CombinationKey key, Iterable<Text> values,
                           Context context
                           ) throws IOException, InterruptedException {
          StringBuffer s = new StringBuffer() ;
          for (Text val : values) {
            s.append(val.toString()).append(",");
          }
          keyOut.set("("+  key.getKey() + "," + key.getValue() +")");
          outputValue.set("      " + s.toString());
    
          context.write(keyOut,outputValue);
        }
      }
    
      public static void main(String[] args) throws Exception {
    
          if(args == null || args.length ==0){
              args = new String[2];
              args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/key-value-text.txt";
              args[1] = "hdfs://s0:9000/library/wordcount/output/wordcount_jar_27";
          }
    
        HadoopFile.delFileToHDFS(args[1]);
    
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
          System.err.println("Usage: wordcount <in> [<in>...] <out>");
          System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(KeyValueReduceValuesTextSorted.class);
        job.setMapperClass(DataMapper.class);
        job.setMapOutputKeyClass(CombinationKey.class);
        job.setMapOutputValueClass(Text.class);
        //job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(DataReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
    
      job.setSortComparatorClass(DefinedComparator.class);
      job.setGroupingComparatorClass(DefinedGroupSort.class);



      for (int i = 0; i < otherArgs.length - 1; ++i) {
        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
      }
      FileOutputFormat.setOutputPath(job,
        new Path(otherArgs[otherArgs.length - 1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }


    }

    /**
     * 对象 key
     */
    class CombinationKey implements WritableComparable<CombinationKey> {
      private String key;
      private String value;

      /**
       * 一次排序 map key
       * @param o
       * @return
         */
      public int compareTo(CombinationKey o) {
        return this.key.compareTo(o.getKey());
      }

      public void write(DataOutput out) throws IOException {
        out.writeUTF(this.key);
        out.writeUTF(this.value);

      }

      public void readFields(DataInput in) throws IOException {
        this.key = in.readUTF();
        this.value = in.readUTF();
      }

      public String getKey() {
        return key;
      }

      public void setKey(String key) {
        this.key = key;
      }

      public String getValue() {
        return value;
      }

      public void setValue(String value) {
        this.value = value;
      }
    }

    /**
     * 二次排序
     */
    class DefinedComparator extends WritableComparator {
      public DefinedComparator() {
        super(CombinationKey.class,true);
      }

      @Override
      public int compare(WritableComparable a, WritableComparable b) {
        CombinationKey c1 = (CombinationKey) a;
        CombinationKey c2 = (CombinationKey) b;


        if(!c1.getKey().equals(c2.getKey())){
          return c1.getKey().compareTo(c2.getKey());
        }else{
          return - (c2.getValue().hashCode() - c1.getValue().hashCode()) ;
        }

      }
    }

    /**
     * 分组
     */
    class DefinedGroupSort extends WritableComparator{
      public DefinedGroupSort() {
        super(CombinationKey.class,true);
      }

      @Override
      public int compare(WritableComparable a, WritableComparable b) {
        CombinationKey ck1 = (CombinationKey)a;
        CombinationKey ck2 = (CombinationKey)b;
        return ck1.getKey().compareTo(ck2.getKey());
      }
    }