06-Hadoop处理员工信息Join

准备工作

  • 准备数据

      department.txt
      30 sales chicago
      20 research dallas
      10 accounting newyork
      worker.txt
      7499 allen salesman 7698 1989-09-01 1600 300 30
      7782 clark managers 7639 1989-05-09 2450 400 20
      7781 clark1 managers1 7631 1989-05-09 2450 400 10
    

功能描述

  • 手动逐行的MapReduce编程实战Hadoop处理员工信息Join

代码实现

package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.algorithm.join;
import com.sun.corba.se.spi.ior.Writeable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * 05-Hadoop处理员工信息Join
 * 描述:手动逐行的MapReduce编程实战Hadoop处理员工信息Join
 */
public class JoinData {

  public static class DataMapper
     extends Mapper<LongWritable, Text, LongWritable, WorkerInfomation>{

  private final static LongWritable empValue = new LongWritable(1);

  public void map(LongWritable key, Text value, Context context
                  ) throws IOException, InterruptedException {
      String inputData = value.toString();
      String[] data = inputData.split(" ");
      if(data.length <= 3){
          WorkerInfomation department = new WorkerInfomation();
          department.setDepartmentNo(data[0]);
          department.setDepartmentName(data[1]);
          department.setFlag(0);

          context.write(new LongWritable(Long.valueOf(department.getDepartmentNo())),department);

      }else{
          WorkerInfomation worker = new WorkerInfomation();
          worker.setWorkerNo(data[0]);
          worker.setWorkerName(data[1]);
          worker.setDepartmentNo(data[7]);
          worker.setFlag(1);
          context.write(new LongWritable(Long.valueOf(worker.getDepartmentNo())),worker);
      }
  }




  }


public static class DataReducer
     extends Reducer<LongWritable, WorkerInfomation,LongWritable, Text> {

  private LongWritable result = new LongWritable();

  public void reduce(Text key, Iterable<WorkerInfomation> values,
                     Context context
                     ) throws IOException, InterruptedException {

      WorkerInfomation deparkment = null;
      List<WorkerInfomation> workerList = new ArrayList<WorkerInfomation>();
      LongWritable resultKey = new LongWritable(0);
      Text resultValue = new Text();

      for(WorkerInfomation item : values){
        if(item.getFlag() == 0){
             deparkment = new WorkerInfomation(item);
        }else{
            workerList.add(new WorkerInfomation(item));
        }
      }

      for(WorkerInfomation worker : workerList){
          worker.setDepartmentName(deparkment.getDepartmentName());
          worker.setDepartmentNo(deparkment.getDepartmentNo());

          context.write(resultKey,new Text(worker.toString()) );

      }

  }
}
public static boolean deleteDir(File file){

    if(file.exists()){
        if(file.isDirectory()){
            String[] childrens = file.list();
            for(String childrenFilePath : childrens){
                deleteDir(new File(file,childrenFilePath));
            }

        }
    }
    return file.delete();
}
public static void main(String[] args) throws Exception {

    if(args == null || args.length ==0){
        args = new String[2];
        args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/input/data/algorithm/workers-deptment/input";
        args[1] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/input/data/algorithm/workers-deptment/output";
    }
    deleteDir(new File(args[1]));

  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length < 2) {
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
    System.exit(2);
  }
  Job job = Job.getInstance(conf, "Join");
  job.setJarByClass(JoinData.class);
  job.setMapperClass(DataMapper.class);
  job.setCombinerClass(DataReducer.class);
  job.setReducerClass(DataReducer.class);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(Text.class);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(WorkerInfomation.class);

  for (int i = 0; i < otherArgs.length - 1; ++i) {
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  }
  FileOutputFormat.setOutputPath(job,
    new Path(otherArgs[otherArgs.length - 1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

class WorkerInfomation  implements WritableComparable {

  private String workerNo = "";
  private String workerName = "" ;
  private String departmentNo = "" ;
  private String departmentName = "";
  private int flag = 0 ; // 0 :部门  1: 员工


  public int compareTo(Object o) {
    return 0;
  }

  public void write(DataOutput out) throws IOException {
        out.writeUTF(this.workerNo);
        out.writeUTF(this.workerName);
        out.writeUTF(this.departmentNo);
        out.writeUTF(this.departmentName);
        out.writeInt(this.flag);

  }

  public void readFields(DataInput in) throws IOException {
      this.workerNo = in.readUTF();
      this.workerName = in.readUTF();
      this.departmentNo = in.readUTF();
      this.departmentName = in.readUTF();
      this.flag = in.readInt();

  }

    @Override
    public String toString() {
        return  this.workerNo + "  " + this.workerName + "  " + this.departmentNo + "  " + this.departmentNo + "  " ;
    }
    public WorkerInfomation(){}

    public WorkerInfomation(String workerNo, String workerName, String departmentNo, String departmentName, int flag) {
        this.workerNo = workerNo;
        this.workerName = workerName;
        this.departmentNo = departmentNo;
        this.departmentName = departmentName;
        this.flag = flag;
    }

    public  WorkerInfomation(WorkerInfomation info ){
        this.workerNo = info.workerNo;
        this.workerName = info.workerName;
        this.departmentNo = info.departmentNo;
        this.departmentName = info.departmentName;
        this.flag = info.flag;

    }

    public String getWorkerNo() {
        return workerNo;
    }

    public void setWorkerNo(String workerNo) {
        this.workerNo = workerNo;
    }

    public String getWorkerName() {
        return workerName;
    }

    public void setWorkerName(String workerName) {
        this.workerName = workerName;
    }

    public String getDepartmentNo() {
        return departmentNo;
    }

    public void setDepartmentNo(String departmentNo) {
        this.departmentNo = departmentNo;
    }

    public String getDepartmentName() {
        return departmentName;
    }

    public void setDepartmentName(String departmentName) {
        this.departmentName = departmentName;
    }

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }
}