01-气温数据求最小值
准备工作
- hadoop集群环境安装
- intellij idea 开发工具
功能描述
- 用hadoop 进行气温数据文件年,最低气温统计分析
- 数据文件data.txt data.txt
- 可以本地提交到集群中
实现代码
- 实现mapreduce的类,需要有自己的实现类(org.apache.hadoop.mapreduce.Mapper,org.apache.hadoop.mapreduce.Reducer)
具体代码
package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.maxtemperature.combiner;
import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
- 开发者:刘文 Email:[email protected]
- 16/2/11 上午6:50
功能描述: 统计日志相关统计数据 */
public class TemperatureComputation {
public static class TeperatureMapper extends Mapper
{ private static final int MISSING = 999; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String year = value.toString().substring(15,19); int temperature = 0; if ('+' == value.charAt(45)){ temperature = Integer.parseInt(value.toString().substring(46,50)); }else{ temperature = Integer.parseInt(value.toString().substring(45,50)); } String validDataFlag = value.toString().substring(50,51); if(temperature != MISSING && validDataFlag.matches("[01459]")){ context.write(new Text(year),new IntWritable(temperature)); } }
}
public static class TemperatureReducer extends Reducer
{ // setup 前置方法,如打开数据库操作 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int coldestTemperature = Integer.MAX_VALUE; for (IntWritable item:values){ coldestTemperature = Math.min(coldestTemperature,item.get()); } context.write(key,new IntWritable(coldestTemperature)); }
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
System.setProperty("hadoop.home.dir","/opt/modules/bigdata/hadoop/hadoop-2.6.0");
if(args == null || args.length ==0){
args = new String[2];
args[0] = "hdfs://s0:9000/library/MaxTemperature/input/data.txt";
args[1] = "hdfs://s0:9000/library/MaxTemperature/output/result0";
}
//作业
Job job = Job.getInstance();
Configuration conf = job.getConfiguration();
HadoopFile.delFileToHDFS(args[1]);
//设置最大的切割尺寸
//conf.setLong(FileInputFormat.SPLIT_MAXSIZE, 1024 * 50);
//设置最小的切割尺寸
conf.setLong(FileInputFormat.SPLIT_MINSIZE, 1024 * 30);
job.setJarByClass(TemperatureComputation.class);
job.setJobName("求最高温度");
//可能添加多个输入路径(不仅可以是文件,而可以是文件夹,不会递归)
FileInputFormat.addInputPath(job, new Path(args[0]));
//只能输出一个输出路径,而且是不能存在
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(TeperatureMapper.class);
job.setReducerClass(TemperatureReducer.class);
//设置输出key类型
job.setOutputKeyClass(Text.class);
//设置输出value类型
job.setOutputValueClass(IntWritable.class);
//等待作业完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}