04-排序后显示行数
- 数据行去重操作
- 源码
- RDD reducer 的输入参数类型只与mapper输出参数类型有关
其它的参数无限制,注意数据之间的类型转换
package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.mapreduce.wordcount; import java.io.IOException; import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class SortData5 { /** * 使用Mapper将输入文件中的数据作为Mapp输出的key直接输出 */ public static class ForSortDataMapper extends Mapper<Object, Text, LongWritable, LongWritable> { private LongWritable data = new LongWritable(1); private LongWritable eValue = new LongWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { data.set(Long.valueOf(value.toString())); context.write(data, eValue); } } /** * 使用Reducer将输入的key本身作为输入的key直接输出 */ public static class ForSortReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> { private LongWritable position = new LongWritable(1); public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { for(LongWritable item : values){ context.write(position, key); position.set(position.get() + 1); } } } public static void main(String[] args) throws Exception {
if(args == null || args.length ==0){
args = new String[2];
args[0] = "hdfs://s0:9000/library/SortedData/input/sortData.txt";
args[1] = "hdfs://s0:9000/library/SortedData/SortedData_jar_44";
}
HadoopFile.delFileToHDFS(args[1]);
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: Sort <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "Sort Data");
job.setJarByClass(SortData5.class);
job.setMapperClass(ForSortDataMapper.class);
//job.setCombinerClass(ForSortReducer .class);
job.setReducerClass(ForSortReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}