10-Hadoop编程实战MapReduce处理员工信息Join
功能描述
- 手动逐行的MapReduce编程实战Hadoop处理员工信息Join
- 就是把工作的数据关联上部门相关的信息
- Hadoop reducer 中 values中不同补主动排序的,但是可以通过主动排序 key达到目录,重新组装 key 为当前的(k,v)
输入数据
- deptment.txt
30 sales chicago
20 research dallas
10 accounting newyork
- worker.txt
7711 clark1 managers1 7631 1989-05-09 2450 400 10
7721 clark3 managers1 7631 1989-05-09 2450 400 10
7455 allen1 salesman 7698 1989-09-01 1600 300 30
7782 clark1 managers 7639 1989-05-09 2450 400 20
7715 clark2 managers1 7631 1989-05-09 2450 400 10
7499 allen2 salesman 7698 1989-09-01 1600 300 30
7792 clark2 managers 7639 1989-05-09 2450 400 20
输出结果
flag: 1 workerNo:7711 workerName:clark1 departmentNo:10 departmentName:accounting
flag: 1 workerNo:7715 workerName:clark2 departmentNo:10 departmentName:accounting
flag: 1 workerNo:7721 workerName:clark3 departmentNo:10 departmentName:accounting
flag: 1 workerNo:7782 workerName:clark1 departmentNo:20 departmentName:research
flag: 1 workerNo:7792 workerName:clark2 departmentNo:20 departmentName:research
flag: 1 workerNo:7455 workerName:allen1 departmentNo:30 departmentName:sales
flag: 1 workerNo:7499 workerName:allen2 departmentNo:30 departmentName:sales
源码
package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.algorithm.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* 05-Hadoop处理员工信息Join
* 描述:手动逐行的MapReduce编程实战Hadoop处理员工信息Join
*/
public class JoinDataOptimize {
public static class DataMapper
extends Mapper<LongWritable, Text, CombinationKey, WorkerInfomation>{
private final static LongWritable empValue = new LongWritable(1);
CombinationKey combinationKey = new CombinationKey();
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String inputData = value.toString();
String[] data = inputData.split(" ");
if(data.length <= 3){
WorkerInfomation department = new WorkerInfomation();
department.setDepartmentNo(data[0]);
department.setDepartmentName(data[1]);
department.setFlag(0);
combinationKey.setKey(department.getDepartmentNo());
combinationKey.setValue("0");
context.write(combinationKey,department);
}else{
WorkerInfomation worker = new WorkerInfomation();
worker.setWorkerNo(data[0]);
worker.setWorkerName(data[1]);
worker.setDepartmentNo(data[7]);
worker.setFlag(1);
combinationKey.setKey(worker.getDepartmentNo());
combinationKey.setValue(worker.getWorkerNo());
context.write(combinationKey,worker);
}
}
}
public static class DataReducer
extends Reducer<CombinationKey, WorkerInfomation,Text, Text> {
public void reduce(CombinationKey key, Iterable<WorkerInfomation> values,
Context context
) throws IOException, InterruptedException {
WorkerInfomation deparkment = null;
Text resultValue = new Text();
context.write(resultValue, new Text());
for(WorkerInfomation item : values) {
if (item.getFlag() == 0) {
deparkment = new WorkerInfomation(item);
} else {
item.setDepartmentName(deparkment.getDepartmentName());
item.setDepartmentNo(deparkment.getDepartmentNo());
context.write(resultValue, new Text(item.toString()));
}
}
/* context.write(resultValue, resultValue);
resultValue.set("(" + key.getKey() +"," + key.getValue() +")");
for(WorkerInfomation item : values) {
context.write(resultValue, new Text(item.toString()));
}*/
}
}
public static boolean deleteDir(File file){
if(file.exists()){
if(file.isDirectory()){
String[] childrens = file.list();
for(String childrenFilePath : childrens){
deleteDir(new File(file,childrenFilePath));
}
}
}
return file.delete();
}
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir","/opt/modules/bigdata/hadoop/hadoop-2.6.0");
if(args == null || args.length ==0){
args = new String[2];
args[0] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/input/data/algorithm/workers-deptment/input";
args[1] = "/opt/workspace/bigdata/all_frame_intellij/hadoop-maven-idea/src/tutorial/resources/input/data/algorithm/workers-deptment/output";
}
deleteDir(new File(args[1]));
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Join");
job.setJarByClass(JoinDataOptimize.class);
job.setMapperClass(DataMapper.class);
job.setMapOutputKeyClass(CombinationKey.class);
job.setMapOutputValueClass(WorkerInfomation.class);
//job.setCombinerClass(DataReducer.class);
job.setReducerClass(DataReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(DefinedComparator.class);
job.setGroupingComparatorClass(DefinedGroupSort.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/**
* 对象 key
*/
static class CombinationKey implements WritableComparable<CombinationKey> {
private String key;
private String value;
/**
* 一次排序 map key
* @param o
* @return
*/
public int compareTo(CombinationKey o) {
return 0;
//return this.key.compareTo(o.getKey());
//return o.key.compareTo(this.getKey());
}
public void write(DataOutput out) throws IOException {
out.writeUTF(this.key);
out.writeUTF(this.value);
}
public void readFields(DataInput in) throws IOException {
this.key = in.readUTF();
this.value = in.readUTF();
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
/**
* 分组
*/
static class DefinedGroupSort extends WritableComparator {
public DefinedGroupSort() {
super(CombinationKey.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
CombinationKey ck1 = (CombinationKey)a;
CombinationKey ck2 = (CombinationKey)b;
//return ck1.getKey().compareTo(ck2.getKey());
return ck2.getKey().compareTo(ck1.getKey());
}
}
/**
* 二次排序
*/
static class DefinedComparator extends WritableComparator {
public DefinedComparator() {
super(CombinationKey.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
CombinationKey c1 = (CombinationKey) a;
CombinationKey c2 = (CombinationKey) b;
if(!c1.getKey().equals(c2.getKey())){
//改变组顺序
return c1.getKey().compareTo(c2.getKey());
}else{
if(c1.getValue() == null || c2.getValue() == null){
return 1 ;
}
//改变值顺序
return (c1.getValue().hashCode() - c2.getValue().hashCode() == 0 ? 0 : (c1.getValue().hashCode() - c2.getValue().hashCode() <0 ? -1 : 1)) ;
}
}
}
}