11-Hadoop-倒排索引的算法

准备数据

  • file1.txt
      Spark is so powerful
    
  • file2.txt
      Spark is the most exciting thing happening in big data
    
  • file3.txt
      Hello Spark Hello again Spark
    

源码

    package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.algorithm;
    import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;

    import java.io.File;
    import java.io.IOException;
    import java.util.StringTokenizer;

    /**
     * 05-Hadoop处理员工信息Join
     * 描述:手动逐行的MapReduce编程实战Hadoop处理员工信息Join
     */
    public class InvertedIndex {

      public static class DataMapper
         extends Mapper<LongWritable, Text, Text, Text>{


          final Text number = new Text("1");
          private String fileName = "";

          @Override
          protected void setup(Context context) throws IOException, InterruptedException {
              FileSplit fileSplit = (FileSplit)context.getInputSplit();
              fileName = fileSplit.getPath().getName();
          }

          public void map(LongWritable key, Text value, Context context
                      ) throws IOException, InterruptedException {

            String line = value.toString();
            if(line.trim().length() >0){
                StringTokenizer s =  new StringTokenizer(line);
                while (s.hasMoreTokens()){
                    String keyForCombiner = s.nextToken() + ":" + fileName;
                    context.write(new Text(keyForCombiner),number);
                }
            }
      }




      }

        public static class DataCombiner
                extends Reducer<Text, Text,Text, Text> {



            public void reduce(Text key, Iterable<Text> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0 ;
                for(Text item : values){
                    sum += Integer.valueOf(item.toString());
                }

                String[] keyArray = key.toString().split(":");

                context.write(new Text(keyArray[0]),new Text(keyArray[1] + ":" + sum));



            }
        }

    public static class DataReducer
         extends Reducer<Text, Text,Text, Text> {



      public void reduce(Text key, Iterable<Text> values,
                         Context context
                         ) throws IOException, InterruptedException {

          StringBuffer result = new StringBuffer();
          for(Text item : values){
            result.append(item + ";");
          }
          context.write(key,new Text(result.toString().substring(0,result.toString().length()-1)));

      }
    }
    public static boolean deleteDir(File file){

        if(file.exists()){
            if(file.isDirectory()){
                String[] childrens = file.list();
                for(String childrenFilePath : childrens){
                    deleteDir(new File(file,childrenFilePath));
                }

            }
        }
        return file.delete();
    }
    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir","/opt/modules/bigdata/hadoop/hadoop-2.6.0");
        if(args == null || args.length ==0){
            args = new String[2];
            args[0] = "hdfs://s0:9000/library/InvertedIndex/input";
            args[1] = "hdfs://s0:9000/library/InvertedIndex/output";
        }
        HadoopFile.delFileToHDFS(args[1]);

      Configuration conf = new Configuration();
      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if (otherArgs.length < 2) {
        System.err.println("Usage: wordcount <in> [<in>...] <out>");
        System.exit(2);
      }
      Job job = Job.getInstance(conf, "InvertedIndex");
      job.setJarByClass(InvertedIndex.class);
      job.setMapperClass(DataMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

      job.setCombinerClass(DataCombiner.class);
      job.setReducerClass(DataReducer.class);
      job.setOutputKeyClass(LongWritable.class);
      job.setOutputValueClass(Text.class);





      for (int i = 0; i < otherArgs.length - 1; ++i) {
        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
      }
      FileOutputFormat.setOutputPath(job,
        new Path(otherArgs[otherArgs.length - 1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }







    }