03-行去重操作

  • 数据行去重操作
  • 源码

      package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.mapreduce.wordcount;
    
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.GenericOptionsParser;
    
      import java.io.IOException;
    
      /**
       * 去重
       */
      public class DefferentData {
    
        /**
         * 使用Mapper将输入文件中的数据本身作为map输出的key直接输出
         */
        public static class DefferentMapper
           extends Mapper<Object, Text, Text, Text>{
    
        private final static IntWritable one = new IntWritable(1);
             private Text data = new Text(); //存放key的值
    
            Text emptyValue = new Text();// 存放value的值
            public void map(Object key, Text value, Context context
                            ) throws IOException, InterruptedException {
    
                context.write(value,emptyValue);
          }
    }

      /**
       * 使用Reduce 将输入的key本身作为输出的key直接输出
       */
      public static class DefferentReducer
         extends Reducer<Text,Text,Text,Text> {
      private IntWritable result = new IntWritable();

         Text emptyValue = new Text();
          public void reduce(Text key, Iterable<IntWritable> values,
                             Context context
                             ) throws IOException, InterruptedException {
            Text emptyValue = new Text();
            context.write(key, emptyValue);
          }
    }

    public static void main(String[] args) throws Exception {

        if(args == null || args.length ==0){
            args = new String[2];
            args[0] = "hdfs://s0:9000/library/wordcount/input/Data";
            args[1] = "hdfs://s0:9000/library/wordcount/output/wordcount_jar_33";
        }

      Configuration conf = new Configuration();
      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if (otherArgs.length < 2) {
        System.err.println("Usage: wordcount <in> [<in>...] <out>");
        System.exit(2);
      }
      Job job = Job.getInstance(conf, "word count");
      job.setJarByClass(DefferentData.class);
      job.setMapperClass(DefferentMapper.class);
      job.setCombinerClass(DefferentReducer.class);//加速效率
      job.setReducerClass(DefferentReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
      for (int i = 0; i < otherArgs.length - 1; ++i) {
        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
      }
      FileOutputFormat.setOutputPath(job,
        new Path(otherArgs[otherArgs.length - 1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    }