10-Hadoop编程实战MapReduce处理员工信息Join

功能描述

  • 手动逐行的MapReduce编程实战Hadoop处理员工信息Join
  • 就是把工作的数据关联上部门相关的信息
  • Hadoop reducer 中 values中不同补主动排序的,但是可以通过主动排序 key达到目录,重新组装 key 为当前的(k,v)

    输入数据

  • deptment.txt
      30 sales chicago
      20 research dallas
      10 accounting newyork
    
  • worker.txt
      7711 clark1 managers1 7631 1989-05-09 2450 400 10
      7721 clark3 managers1 7631 1989-05-09 2450 400 10
      7455 allen1 salesman 7698 1989-09-01 1600 300 30
      7782 clark1 managers 7639 1989-05-09 2450 400 20
      7715 clark2 managers1 7631 1989-05-09 2450 400 10
      7499 allen2 salesman 7698 1989-09-01 1600 300 30
      7792 clark2 managers 7639 1989-05-09 2450 400 20
    

    输出结果

     flag: 1 workerNo:7711  workerName:clark1  departmentNo:10  departmentName:accounting  
     flag: 1 workerNo:7715  workerName:clark2  departmentNo:10  departmentName:accounting  
     flag: 1 workerNo:7721  workerName:clark3  departmentNo:10  departmentName:accounting  

     flag: 1 workerNo:7782  workerName:clark1  departmentNo:20  departmentName:research  
     flag: 1 workerNo:7792  workerName:clark2  departmentNo:20  departmentName:research  

     flag: 1 workerNo:7455  workerName:allen1  departmentNo:30  departmentName:sales  
     flag: 1 workerNo:7499  workerName:allen2  departmentNo:30  departmentName:sales   

源码

    package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.algorithm.join;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.File;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;

    /**
     * 05-Hadoop处理员工信息Join
     * 描述:手动逐行的MapReduce编程实战Hadoop处理员工信息Join
     */
    public class JoinDataOptimize {

      public static class DataMapper
         extends Mapper<LongWritable, Text, CombinationKey, WorkerInfomation>{

      private final static LongWritable empValue = new LongWritable(1);

          CombinationKey combinationKey = new CombinationKey();
      public void map(LongWritable key, Text value, Context context
                      ) throws IOException, InterruptedException {
          String inputData = value.toString();
          String[] data = inputData.split(" ");
          if(data.length <= 3){
              WorkerInfomation department = new WorkerInfomation();
              department.setDepartmentNo(data[0]);
              department.setDepartmentName(data[1]);
              department.setFlag(0);

              combinationKey.setKey(department.getDepartmentNo());
              combinationKey.setValue("0");
              context.write(combinationKey,department);

          }else{
              WorkerInfomation worker = new WorkerInfomation();
              worker.setWorkerNo(data[0]);
              worker.setWorkerName(data[1]);
              worker.setDepartmentNo(data[7]);
              worker.setFlag(1);

              combinationKey.setKey(worker.getDepartmentNo());
              combinationKey.setValue(worker.getWorkerNo());
              context.write(combinationKey,worker);
          }
      }




      }


    public static class DataReducer
         extends Reducer<CombinationKey, WorkerInfomation,Text, Text> {



      public void reduce(CombinationKey key, Iterable<WorkerInfomation> values,
                         Context context
                         ) throws IOException, InterruptedException {

          WorkerInfomation deparkment = null;
          Text resultValue = new Text();

          context.write(resultValue, new Text());
          for(WorkerInfomation item : values) {
              if (item.getFlag() == 0) {
                  deparkment = new WorkerInfomation(item);

              } else {
                  item.setDepartmentName(deparkment.getDepartmentName());
                  item.setDepartmentNo(deparkment.getDepartmentNo());
                  context.write(resultValue, new Text(item.toString()));
              }
          }


    /*      context.write(resultValue, resultValue);
          resultValue.set("(" + key.getKey() +"," + key.getValue() +")");

          for(WorkerInfomation item : values) {
              context.write(resultValue, new Text(item.toString()));
          }*/
      }
    }
    public static boolean deleteDir(File file){

        if(file.exists()){
            if(file.isDirectory()){
                String[] childrens = file.list();
                for(String childrenFilePath : childrens){
                    deleteDir(new File(file,childrenFilePath));
                }

            }
        }
        return file.delete();
    }
    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir","/opt/modules/bigdata/hadoop/hadoop-2.6.0");
        if(args == null || args.length ==0){
            args = new String[2];
            args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/input/data/algorithm/workers-deptment/input";
            args[1] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/input/data/algorithm/workers-deptment/output";
        }
        deleteDir(new File(args[1]));

      Configuration conf = new Configuration();
      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if (otherArgs.length < 2) {
        System.err.println("Usage: wordcount <in> [<in>...] <out>");
        System.exit(2);
      }
      Job job = Job.getInstance(conf, "Join");
      job.setJarByClass(JoinDataOptimize.class);
      job.setMapperClass(DataMapper.class);
        job.setMapOutputKeyClass(CombinationKey.class);
        job.setMapOutputValueClass(WorkerInfomation.class);

      //job.setCombinerClass(DataReducer.class);
      job.setReducerClass(DataReducer.class);
      job.setOutputKeyClass(LongWritable.class);
      job.setOutputValueClass(Text.class);


        job.setSortComparatorClass(DefinedComparator.class);
        job.setGroupingComparatorClass(DefinedGroupSort.class);




      for (int i = 0; i < otherArgs.length - 1; ++i) {
        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
      }
      FileOutputFormat.setOutputPath(job,
        new Path(otherArgs[otherArgs.length - 1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }




        /**
         * 对象 key
         */
        static class CombinationKey implements WritableComparable<CombinationKey> {
            private String key;
            private String value;

            /**
             * 一次排序 map key
             * @param o
             * @return
             */
            public int compareTo(CombinationKey o) {
                return 0;
                //return this.key.compareTo(o.getKey());
                //return o.key.compareTo(this.getKey());
            }

            public void write(DataOutput out) throws IOException {
                out.writeUTF(this.key);
                out.writeUTF(this.value);

            }

            public void readFields(DataInput in) throws IOException {
                this.key = in.readUTF();
                this.value = in.readUTF();
            }

            public String getKey() {
                return key;
            }

            public void setKey(String key) {
                this.key = key;
            }

            public String getValue() {
                return value;
            }

            public void setValue(String value) {
                this.value = value;
            }
        }


        /**
         * 分组
         */
        static class DefinedGroupSort extends WritableComparator {
            public DefinedGroupSort() {
                super(CombinationKey.class,true);
            }

            @Override
            public int compare(WritableComparable a, WritableComparable b) {
                CombinationKey ck1 = (CombinationKey)a;
                CombinationKey ck2 = (CombinationKey)b;
                //return ck1.getKey().compareTo(ck2.getKey());
                return ck2.getKey().compareTo(ck1.getKey());
            }
        }



        /**
         * 二次排序
         */
        static class DefinedComparator extends WritableComparator {
            public DefinedComparator() {
                super(CombinationKey.class,true);
            }

            @Override
            public int compare(WritableComparable a, WritableComparable b) {
                CombinationKey c1 = (CombinationKey) a;
                CombinationKey c2 = (CombinationKey) b;


                if(!c1.getKey().equals(c2.getKey())){
                    //改变组顺序
                    return c1.getKey().compareTo(c2.getKey());
                }else{
                    if(c1.getValue() == null || c2.getValue() == null){
                        return 1 ;
                    }
                    //改变值顺序
                    return (c1.getValue().hashCode() - c2.getValue().hashCode() == 0 ? 0 : (c1.getValue().hashCode() - c2.getValue().hashCode() <0 ? -1 : 1)) ;
                }

            }
        }


    }