03-行去重操作
- 数据行去重操作
源码
package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.mapreduce.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; /** * 去重 */ public class DefferentData { /** * 使用Mapper将输入文件中的数据本身作为map输出的key直接输出 */ public static class DefferentMapper extends Mapper<Object, Text, Text, Text>{ private final static IntWritable one = new IntWritable(1); private Text data = new Text(); //存放key的值 Text emptyValue = new Text();// 存放value的值 public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {
context.write(value,emptyValue);
}
}
/**
* 使用Reduce 将输入的key本身作为输出的key直接输出
*/
public static class DefferentReducer
extends Reducer<Text,Text,Text,Text> {
private IntWritable result = new IntWritable();
Text emptyValue = new Text();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
Text emptyValue = new Text();
context.write(key, emptyValue);
}
}
public static void main(String[] args) throws Exception {
if(args == null || args.length ==0){
args = new String[2];
args[0] = "hdfs://s0:9000/library/wordcount/input/Data";
args[1] = "hdfs://s0:9000/library/wordcount/output/wordcount_jar_33";
}
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(DefferentData.class);
job.setMapperClass(DefferentMapper.class);
job.setCombinerClass(DefferentReducer.class);//加速效率
job.setReducerClass(DefferentReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}