11-Hadoop-倒排索引的算法
准备数据
源码
package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.algorithm;
import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* 05-Hadoop处理员工信息Join
* 描述:手动逐行的MapReduce编程实战Hadoop处理员工信息Join
*/
public class InvertedIndex {
public static class DataMapper
extends Mapper<LongWritable, Text, Text, Text>{
final Text number = new Text("1");
private String fileName = "";
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)context.getInputSplit();
fileName = fileSplit.getPath().getName();
}
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
if(line.trim().length() >0){
StringTokenizer s = new StringTokenizer(line);
while (s.hasMoreTokens()){
String keyForCombiner = s.nextToken() + ":" + fileName;
context.write(new Text(keyForCombiner),number);
}
}
}
}
public static class DataCombiner
extends Reducer<Text, Text,Text, Text> {
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
int sum = 0 ;
for(Text item : values){
sum += Integer.valueOf(item.toString());
}
String[] keyArray = key.toString().split(":");
context.write(new Text(keyArray[0]),new Text(keyArray[1] + ":" + sum));
}
}
public static class DataReducer
extends Reducer<Text, Text,Text, Text> {
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
StringBuffer result = new StringBuffer();
for(Text item : values){
result.append(item + ";");
}
context.write(key,new Text(result.toString().substring(0,result.toString().length()-1)));
}
}
public static boolean deleteDir(File file){
if(file.exists()){
if(file.isDirectory()){
String[] childrens = file.list();
for(String childrenFilePath : childrens){
deleteDir(new File(file,childrenFilePath));
}
}
}
return file.delete();
}
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir","/opt/modules/bigdata/hadoop/hadoop-2.6.0");
if(args == null || args.length ==0){
args = new String[2];
args[0] = "hdfs://s0:9000/library/InvertedIndex/input";
args[1] = "hdfs://s0:9000/library/InvertedIndex/output";
}
HadoopFile.delFileToHDFS(args[1]);
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "InvertedIndex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(DataMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(DataCombiner.class);
job.setReducerClass(DataReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}