06-Hadoop处理员工信息Join
准备工作
准备数据
department.txt 30 sales chicago 20 research dallas 10 accounting newyork worker.txt 7499 allen salesman 7698 1989-09-01 1600 300 30 7782 clark managers 7639 1989-05-09 2450 400 20 7781 clark1 managers1 7631 1989-05-09 2450 400 10
功能描述
- 手动逐行的MapReduce编程实战Hadoop处理员工信息Join
代码实现
package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.algorithm.join;
import com.sun.corba.se.spi.ior.Writeable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* 05-Hadoop处理员工信息Join
* 描述:手动逐行的MapReduce编程实战Hadoop处理员工信息Join
*/
public class JoinData {
public static class DataMapper
extends Mapper<LongWritable, Text, LongWritable, WorkerInfomation>{
private final static LongWritable empValue = new LongWritable(1);
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String inputData = value.toString();
String[] data = inputData.split(" ");
if(data.length <= 3){
WorkerInfomation department = new WorkerInfomation();
department.setDepartmentNo(data[0]);
department.setDepartmentName(data[1]);
department.setFlag(0);
context.write(new LongWritable(Long.valueOf(department.getDepartmentNo())),department);
}else{
WorkerInfomation worker = new WorkerInfomation();
worker.setWorkerNo(data[0]);
worker.setWorkerName(data[1]);
worker.setDepartmentNo(data[7]);
worker.setFlag(1);
context.write(new LongWritable(Long.valueOf(worker.getDepartmentNo())),worker);
}
}
}
public static class DataReducer
extends Reducer<LongWritable, WorkerInfomation,LongWritable, Text> {
private LongWritable result = new LongWritable();
public void reduce(Text key, Iterable<WorkerInfomation> values,
Context context
) throws IOException, InterruptedException {
WorkerInfomation deparkment = null;
List<WorkerInfomation> workerList = new ArrayList<WorkerInfomation>();
LongWritable resultKey = new LongWritable(0);
Text resultValue = new Text();
for(WorkerInfomation item : values){
if(item.getFlag() == 0){
deparkment = new WorkerInfomation(item);
}else{
workerList.add(new WorkerInfomation(item));
}
}
for(WorkerInfomation worker : workerList){
worker.setDepartmentName(deparkment.getDepartmentName());
worker.setDepartmentNo(deparkment.getDepartmentNo());
context.write(resultKey,new Text(worker.toString()) );
}
}
}
public static boolean deleteDir(File file){
if(file.exists()){
if(file.isDirectory()){
String[] childrens = file.list();
for(String childrenFilePath : childrens){
deleteDir(new File(file,childrenFilePath));
}
}
}
return file.delete();
}
public static void main(String[] args) throws Exception {
if(args == null || args.length ==0){
args = new String[2];
args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/input/data/algorithm/workers-deptment/input";
args[1] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/input/data/algorithm/workers-deptment/output";
}
deleteDir(new File(args[1]));
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Join");
job.setJarByClass(JoinData.class);
job.setMapperClass(DataMapper.class);
job.setCombinerClass(DataReducer.class);
job.setReducerClass(DataReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(WorkerInfomation.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
class WorkerInfomation implements WritableComparable {
private String workerNo = "";
private String workerName = "" ;
private String departmentNo = "" ;
private String departmentName = "";
private int flag = 0 ; // 0 :部门 1: 员工
public int compareTo(Object o) {
return 0;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(this.workerNo);
out.writeUTF(this.workerName);
out.writeUTF(this.departmentNo);
out.writeUTF(this.departmentName);
out.writeInt(this.flag);
}
public void readFields(DataInput in) throws IOException {
this.workerNo = in.readUTF();
this.workerName = in.readUTF();
this.departmentNo = in.readUTF();
this.departmentName = in.readUTF();
this.flag = in.readInt();
}
@Override
public String toString() {
return this.workerNo + " " + this.workerName + " " + this.departmentNo + " " + this.departmentNo + " " ;
}
public WorkerInfomation(){}
public WorkerInfomation(String workerNo, String workerName, String departmentNo, String departmentName, int flag) {
this.workerNo = workerNo;
this.workerName = workerName;
this.departmentNo = departmentNo;
this.departmentName = departmentName;
this.flag = flag;
}
public WorkerInfomation(WorkerInfomation info ){
this.workerNo = info.workerNo;
this.workerName = info.workerName;
this.departmentNo = info.departmentNo;
this.departmentName = info.departmentName;
this.flag = info.flag;
}
public String getWorkerNo() {
return workerNo;
}
public void setWorkerNo(String workerNo) {
this.workerNo = workerNo;
}
public String getWorkerName() {
return workerName;
}
public void setWorkerName(String workerName) {
this.workerName = workerName;
}
public String getDepartmentNo() {
return departmentNo;
}
public void setDepartmentNo(String departmentNo) {
this.departmentNo = departmentNo;
}
public String getDepartmentName() {
return departmentName;
}
public void setDepartmentName(String departmentName) {
this.departmentName = departmentName;
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
}