04-排序后显示行数

  • 数据行去重操作
  • 源码
  • RDD reducer 的输入参数类型只与mapper输出参数类型有关
  • 其它的参数无限制,注意数据之间的类型转换

      package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.mapreduce.wordcount;
      import java.io.IOException;
    
      import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.GenericOptionsParser;
    
      public class SortData5 {
    
          /**
           * 使用Mapper将输入文件中的数据作为Mapp输出的key直接输出 
           */
          public static class ForSortDataMapper 
                  extends Mapper<Object, Text, LongWritable, LongWritable> {
    
              private LongWritable data = new LongWritable(1);
              private LongWritable eValue = new LongWritable(1);
    
              public void map(Object key, Text value, Context context) 
                      throws IOException, InterruptedException {
                  data.set(Long.valueOf(value.toString())); 
                  context.write(data, eValue);
              }
          }
    
          /**
           * 使用Reducer将输入的key本身作为输入的key直接输出 
           */
          public static class ForSortReducer 
                  extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
    
              private LongWritable position = new LongWritable(1);
    
              public void reduce(LongWritable key, Iterable<LongWritable> values, 
                      Context context)
                      throws IOException, InterruptedException {
                  for(LongWritable item : values){
                      context.write(position, key);
                      position.set(position.get() + 1);
                  }
              }
          }
    
          public static void main(String[] args) throws Exception {
    
            if(args == null || args.length ==0){
                args = new String[2];
                args[0] = "hdfs://s0:9000/library/SortedData/input/sortData.txt";
                args[1] = "hdfs://s0:9000/library/SortedData/SortedData_jar_44";
            }
            HadoopFile.delFileToHDFS(args[1]);
            Configuration conf = new Configuration();

            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: Sort <in> [<in>...] <out>");
                System.exit(2);
            }
            Job job = new Job(conf, "Sort Data");
            job.setJarByClass(SortData5.class);
            job.setMapperClass(ForSortDataMapper.class);
            //job.setCombinerClass(ForSortReducer .class);
            job.setReducerClass(ForSortReducer.class);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(LongWritable.class);

            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

    }