01-气温数据求最小值

准备工作

  • hadoop集群环境安装
  • intellij idea 开发工具

功能描述

  • 用hadoop 进行气温数据文件年,最低气温统计分析
  • 数据文件data.txt data.txt
  • 可以本地提交到集群中

实现代码

  • 实现mapreduce的类,需要有自己的实现类(org.apache.hadoop.mapreduce.Mapper,org.apache.hadoop.mapreduce.Reducer)
  • 具体代码

    package com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.maxtemperature.combiner;

    import com.opensourceteams.modeles.common.bigdata.hadoop.hadoop2.hdfs.file.HadoopFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    /**

    • 开发者:刘文 Email:[email protected]
    • 16/2/11 上午6:50
    • 功能描述: 统计日志相关统计数据 */

      public class TemperatureComputation {

      public static class TeperatureMapper extends Mapper{

       private  static  final int MISSING = 999;
       @Override
       protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           String year = value.toString().substring(15,19);
           int temperature = 0;
           if ('+' == value.charAt(45)){
               temperature = Integer.parseInt(value.toString().substring(46,50));
           }else{
               temperature = Integer.parseInt(value.toString().substring(45,50));
           }
      
           String validDataFlag = value.toString().substring(50,51);
           if(temperature != MISSING &&  validDataFlag.matches("[01459]")){
               context.write(new Text(year),new IntWritable(temperature));
      
           }
      
       }
      

      }

      public static class TemperatureReducer extends Reducer{

       // setup 前置方法,如打开数据库操作
       @Override
       protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
           int coldestTemperature = Integer.MAX_VALUE;
           for (IntWritable item:values){
               coldestTemperature = Math.min(coldestTemperature,item.get());
           }
           context.write(key,new IntWritable(coldestTemperature));
       }
      

      }

      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

       System.setProperty("hadoop.home.dir","/opt/modules/bigdata/hadoop/hadoop-2.6.0");
      
        if(args == null || args.length ==0){
            args = new String[2];

            args[0] = "hdfs://s0:9000/library/MaxTemperature/input/data.txt";
            args[1] = "hdfs://s0:9000/library/MaxTemperature/output/result0";

        }



        //作业
        Job job = Job.getInstance();

        Configuration conf = job.getConfiguration();

        HadoopFile.delFileToHDFS(args[1]);


        //设置最大的切割尺寸
        //conf.setLong(FileInputFormat.SPLIT_MAXSIZE, 1024 * 50);

        //设置最小的切割尺寸
        conf.setLong(FileInputFormat.SPLIT_MINSIZE, 1024 * 30);


        job.setJarByClass(TemperatureComputation.class);
        job.setJobName("求最高温度");

        //可能添加多个输入路径(不仅可以是文件,而可以是文件夹,不会递归)
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //只能输出一个输出路径,而且是不能存在
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(TeperatureMapper.class);

        job.setReducerClass(TemperatureReducer.class);

        //设置输出key类型
        job.setOutputKeyClass(Text.class);
        //设置输出value类型
        job.setOutputValueClass(IntWritable.class);
        //等待作业完成
        System.exit(job.waitForCompletion(true) ? 0 : 1);


    }
}