09-Hadoop自定义reducer-values升序或降序排序
功能说明
- 对 hadoop reducer 中 values中的值进行自定义升序或降序排序
数据文件
key-value-text.txt
b=4 b=5 b=2 c=a c=f c=c a=c a=a a=b
排序后的
(a,c) a,b,c (b,5) 2,4,5 (c,f) a,c,f
源码 KeyValueReduceValuesTextSorted.java文件
package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.algorithm.join; import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 将 values 中的元素排序 */ public class KeyValueReduceValuesTextSorted { public static class DataMapper extends Mapper<LongWritable, Text, CombinationKey, Text>{ CombinationKey keyOut = new CombinationKey(); Text outputValue = new Text(); public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String[] valueArray = value.toString().split("="); keyOut.setKey(valueArray[0]); keyOut.setValue(valueArray[1]); outputValue.set(valueArray[1]); context.write(keyOut,outputValue); } } public static class DataReducer extends Reducer<CombinationKey,Text,Text,Text> { Text keyOut = new Text(); Text outputValue = new Text(); public void reduce(CombinationKey key, Iterable<Text> values, Context context ) throws IOException, InterruptedException { StringBuffer s = new StringBuffer() ; for (Text val : values) { s.append(val.toString()).append(","); } keyOut.set("("+ key.getKey() + "," + key.getValue() +")"); outputValue.set(" " + s.toString()); context.write(keyOut,outputValue); } } public static void main(String[] args) throws Exception { if(args == null || args.length ==0){ args = new String[2]; args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/key-value-text.txt"; args[1] = "hdfs://s0:9000/library/wordcount/output/wordcount_jar_27"; } HadoopFile.delFileToHDFS(args[1]); Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(KeyValueReduceValuesTextSorted.class); job.setMapperClass(DataMapper.class); job.setMapOutputKeyClass(CombinationKey.class); job.setMapOutputValueClass(Text.class); //job.setCombinerClass(IntSumReducer.class); job.setReducerClass(DataReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
job.setSortComparatorClass(DefinedComparator.class);
job.setGroupingComparatorClass(DefinedGroupSort.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
/**
* 对象 key
*/
class CombinationKey implements WritableComparable<CombinationKey> {
private String key;
private String value;
/**
* 一次排序 map key
* @param o
* @return
*/
public int compareTo(CombinationKey o) {
return this.key.compareTo(o.getKey());
}
public void write(DataOutput out) throws IOException {
out.writeUTF(this.key);
out.writeUTF(this.value);
}
public void readFields(DataInput in) throws IOException {
this.key = in.readUTF();
this.value = in.readUTF();
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
/**
* 二次排序
*/
class DefinedComparator extends WritableComparator {
public DefinedComparator() {
super(CombinationKey.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
CombinationKey c1 = (CombinationKey) a;
CombinationKey c2 = (CombinationKey) b;
if(!c1.getKey().equals(c2.getKey())){
return c1.getKey().compareTo(c2.getKey());
}else{
return - (c2.getValue().hashCode() - c1.getValue().hashCode()) ;
}
}
}
/**
* 分组
*/
class DefinedGroupSort extends WritableComparator{
public DefinedGroupSort() {
super(CombinationKey.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
CombinationKey ck1 = (CombinationKey)a;
CombinationKey ck2 = (CombinationKey)b;
return ck1.getKey().compareTo(ck2.getKey());
}
}