纵有疾风起
人生不言弃

MapReduce学习笔记(二)

一、MapReduce老API的写法

package OldAPI;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/** * hadoop版本1.x的包一般是mapreduce * hadoop版本0.x的包一般是mapred * @author liguodong * */
public class OldApp {
static final String INPUT_PATH = "hdfs://liguodong:9000/hello";

    static final String OUT_PATH = "hdfs://liguodong:9000/out";

    /** * 1、不再使用Job,而是使用JobConf. * 2、类的包名不再使用mapreduce,而是使用mapred. * 3、不再使用job.waitForCompletion(true);提交作业,而是使用JobRunner.runJob(...) * @param args * @throws Exception */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUT_PATH)))
        {
            fileSystem.delete(new Path(OUT_PATH),true);
        }

        //final Job job = new Job(conf, WordCountApp.class.getSimpleName());
        final JobConf job = new JobConf(conf, OldApp.class);

        //1.1 输入目录在哪里
        FileInputFormat.setInputPaths(job, INPUT_PATH);

        //指定对输入数据进行格式化处理的类
        //job.setInputFormatClass(TextInputFormat.class);

        //1.2 指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定map的输出的<k,v>类型,如果<k3,v3>的类型与<k2,v2>的类型一致,那么可以省略。
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(LongWritable.class);

        //1.3分区
        //job.setPartitionerClass(HashPartitioner.class);
        //job.setNumReduceTasks(1);

        //1.4 TODO 排序、分组
        //1.5 TODO (可选)归约

        //2.2 指定自定义的Reducer类
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定输出的路径
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //指定输出的格式化类
        //job.setOutputFormatClass(TextOutputFormat.class);

        //把作业交给JobTracker运行
        //job.waitForCompletion(true);
        JobClient.runJob(job);
    }

    /** * 新的api:extends Mapper * 老的api:extends MapReduceBase implements Mapper * @author liguodong * */
    static class MyMapper extends MapReduceBase 
    implements Mapper<LongWritable, Text, Text, LongWritable>{

        @Override
        public void map(LongWritable k1, Text v1,
                OutputCollector<Text, LongWritable> collector, Reporter reporter)
                throws IOException {
            final String[] splited = v1.toString().split(" ");
            for (String word : splited) {
                collector.collect(new Text(word), new LongWritable(1));
            }

        }
    }

    static class MyReducer extends MapReduceBase 
    implements Reducer<Text, LongWritable, Text, LongWritable>{

        @Override
        public void reduce(Text k2, Iterator<LongWritable> v2s,
                OutputCollector<Text, LongWritable> collector, Reporter reporter)
                throws IOException {
            long times = 0L;
            while(v2s.hasNext()){
                final long temp = v2s.next().get();
                times += temp;
            }
            collector.collect(k2, new LongWritable(times));
        }

    }
}

二、MapReduce获取命令行参数

package cmd;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountApp extends Configured implements Tool{

    //hdfs://liguodong:9000/hello
    static String INPUT_PATH = "";
    //hdfs://liguodong:9000/out
    static String OUT_PATH = "";
        public static void main(String[] args) throws Exception {
        ToolRunner.run(new WordCountApp(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        INPUT_PATH = args[0];
        OUT_PATH = args[1];

        Configuration conf = new Configuration();

        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUT_PATH)))
        {
            fileSystem.delete(new Path(OUT_PATH),true);
        }

        final Job job = new Job(conf, WordCountApp.class.getSimpleName());

        //打包运行必须执行的方法//
        job.setJarByClass(WordCountApp.class);

        //1.1 输入目录在哪里
        FileInputFormat.setInputPaths(job, INPUT_PATH);

        //指定对输入数据进行格式化处理的类
        //job.setInputFormatClass(TextInputFormat.class);

        //1.2 指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定map的输出的<k,v>类型,如果<k3,v3>的类型与<k2,v2>的类型一致,那么可以省略。
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(LongWritable.class);

        //1.3分区
        //job.setPartitionerClass(HashPartitioner.class);
        //job.setNumReduceTasks(1);

        //1.4 TODO 排序、分组
        //1.5 TODO (可选)规约

        //2.2 指定自定义的Reducer类
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定输出的路径
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //指定输出的格式化类
        //job.setOutputFormatClass(TextOutputFormat.class);

        //把作业交给JobTracker运行
        job.waitForCompletion(true);
        return 0;
    }

    /** * KEYIN 即K1 表示每一行的起始位置(偏移量offset) * VALUEIN 即v1 表示每一行的文本内容 * KEYOUT 即k2 表示每一行中的每个单词 * VALUEOUT 即v2 表示每一行中的每个单词的出现次数,固定值1 * @author liguodong * Java Hadoop * Long LongWritable * String Text */
    static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            final String[] splited = value.toString().split(" ");
            for (String word : splited) {
                context.write(new Text(word), new LongWritable(1));
            }
        }
    }
    /** * KEYIN 即k2 表示每一行中的每个单词 * VALUEIN 即v2 表示每一行中每个单词出现次数,固定值1 * KEYOUT 即k3 表示整个文件中的不同单词 * VALUEOUT 即v3 表示整个文件中的不同单词的出现总次数 * @author liguodong */

    static class MyReducer extends Reducer<Text, LongWritable,Text, LongWritable>{
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                sum += v2.get();
            }
            context.write(k2, new LongWritable(sum));
        }
    }
}

导出Jar包
右键该文件–>Export–>Jar file–>选择导出的位置,如:D:xxx\xxx.jar–>选择入口类Main class–>Finish .

将Jar包上传到HDFS中。
执行
Hadoop jar wordcount.jar hdfs://liguodong:9000/hello hdfs://liguodong:9000/out

三、计数器
hadoop计数器:可以让开发人员以全局的视角来审查程序的运行情况以及各项指标,及时做出错误诊断并进行相应处理。
内置计数器(MapReduce相关、文件系统相关和作业调度相关)。。。

hello you
hello me

单词统计计数器信息

Counters: 19
   File Output Format Counters 
     Bytes Written=19   //reduce输出到hdfs的字节数
   FileSystemCounters
     FILE_BYTES_READ=481
     HDFS_BYTES_READ=38
     FILE_BYTES_WRITTEN=81316
     HDFS_BYTES_WRITTEN=19
   File Input Format Counters 
     Bytes Read=19  //map从hdfs读取的字节数
   Map-Reduce Framework
     Map output materialized bytes=49
     Map input records=2    //map读入的记录行数
     Reduce shuffle bytes=0
     Spilled Records=8
     Map output bytes=35
     Total committed heap usage (bytes)=266469376
     SPLIT_RAW_BYTES=105
     Combine input records=0
     Reduce input records=4 //reduce从map端接收的记录行数
     Reduce input groups=3  //reduce函数接收的key数量,即归并后的k2数量
     Combine output records=0
     Reduce output records=3    //reduce输出的记录行数
     Map output records=4   //map输出的记录行数

自定义计数器
计数器声明
1.通过枚举声明
context.getCounter(Enum enum)
2.动态声明
context.getCounter(String groupName,String counterName)

计数器操作
counter.setValue(long value);//设置初始值
counter.increment(long incr);//增加计数

package counter;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.aspectj.weaver.NewParentTypeMunger;

public class WordCountApp {

    static final String INPUT_PATH = "hdfs://liguodong:9000/hello";
    static final String OUT_PATH = "hdfs://liguodong:9000/out";

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(); 
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUT_PATH)))
        {
            fileSystem.delete(new Path(OUT_PATH),true);
        }

        final Job job = new Job(conf, WordCountApp.class.getSimpleName());
        //1.1 输入目录在哪里
        FileInputFormat.setInputPaths(job, INPUT_PATH); 
        //指定对输入数据进行格式化处理的类
        //job.setInputFormatClass(TextInputFormat.class);

        //1.2 指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定map的输出的<k,v>类型,如果<k3,v3>的类型与<k2,v2>的类型一致,那么可以省略。
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(LongWritable.class);

        //1.3分区
        //job.setPartitionerClass(HashPartitioner.class);
        //job.setNumReduceTasks(1);

        //1.4 TODO  排序、分组
        //1.5 TODO  (可选)归约

        //2.2 指定自定义的Reducer类
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定输出的路径
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //指定输出的格式化类
        //job.setOutputFormatClass(TextOutputFormat.class);

        //把作业交给JobTracker运行
        job.waitForCompletion(true);
    }

    static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {      
            /** * 自定义计数器 */
            final Counter hellocounter = context.getCounter("Sensitive Words", "hello");

            final String line = value.toString();
            if(line.contains("hello")){
                //记录敏感词出现在一行中
                hellocounter.increment(1L);
            }
            final String[] splited = value.toString().split("\t");
            for (String word : splited) {
                context.write(new Text(word), new LongWritable(1));
            }
        }
    }

    static class MyReducer extends Reducer<Text, LongWritable,Text, LongWritable>{
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                sum += v2.get();
            }
            context.write(k2, new LongWritable(sum));
        }
    }   
}
15/03/26 22:36:22 INFO mapred.JobClient: Counters: 20
15/03/26 22:36:22 INFO mapred.JobClient:   Sensitive Words
15/03/26 22:36:22 INFO mapred.JobClient:     hello=2

四、Combiners编程
每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

package combine;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountApp { 
    static final String INPUT_PATH = "hdfs://liguodong:9000/hello"; 
    static final String OUT_PATH = "hdfs://liguodong:9000/out";

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUT_PATH)))
        {
            fileSystem.delete(new Path(OUT_PATH),true);
        }

        final Job job = new Job(conf, WordCountApp.class.getSimpleName());
        //1.1 输入目录在哪里
        FileInputFormat.setInputPaths(job, INPUT_PATH);

        //指定对输入数据进行格式化处理的类
        //job.setInputFormatClass(TextInputFormat.class);

        //1.2 指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定map的输出的<k,v>类型,如果<k3,v3>的类型与<k2,v2>的类型一致,那么可以省略。
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(LongWritable.class);

        //1.3分区
        //job.setPartitionerClass(HashPartitioner.class);
        //job.setNumReduceTasks(1);

        //1.4 TODO 排序、分组 
        //1.5 TODO (可选)规约
        //job.setCombinerClass(MyReducer.class);

        job.setCombinerClass(MyCombiner.class);

        //2.2 指定自定义的Reducer类
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定输出的路径
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //指定输出的格式化类
        //job.setOutputFormatClass(TextOutputFormat.class);

        //把作业交给JobTracker运行
        job.waitForCompletion(true);
    }
    static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            final String[] splited = value.toString().split("\t");
            for (String word : splited) {
                context.write(new Text(word), new LongWritable(1));
                System.out.println("Mapper输出<"+word+","+1+">");
            }
        }
    }

    static class MyReducer extends Reducer<Text, LongWritable,Text, LongWritable>{
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            //显示次数表示reduce函数调用了多少次,表示k2有多少个分组。
            System.out.println("Reducer输入分组<"+k2.toString()+",...>");
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                sum += v2.get();
                //显示次数表示输入k2,v2键值对数量
                System.out.println("MyReducer输入键值对<" + k2.toString() + "," + v2.get() + ">");
            }
            context.write(k2, new LongWritable(sum));
        }
    }

    /** * copy from MyReducer * @author liguodong */
    static class MyCombiner extends Reducer<Text, LongWritable,Text, LongWritable>{
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            //显示次数表示reduce函数调用了多少次,表示k2有多少个分组。
            System.out.println("Combiner输入分组<"+k2.toString()+",...>");
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                sum += v2.get();
                //显示次数表示输入k2,v2键值对数量
                System.out.println("Combiner输入键值对<" + k2.toString() + "," + v2.get() + ">");
            }
            context.write(k2, new LongWritable(sum));
            //显示次数表示输出k2,v2键值对数量
            System.out.println("Combiner输出键值对<" + k2.toString() + "," + sum + ">");
        }
    }
}

问:为什么使用Combiner?
答:Combiner发生在Map端。对数据进行规约处理,数据量变小了,
传送到reduce端的数据量变小了,传输时间变短了,作业的整体时间变短了。

问:为什么Combiner不作为MR运行的标配,而是可选步骤呢?
答:因为不是所有的算法都适和使用Combiner处理。例如求平均数。

问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作呢?
答:combiner操作发生在map端,处理运行在一个map任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。

五、Partitioner编程

Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
计算方法是which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

分区的用处
1、根据业务需要,产生多个输出文件
2、多个reduce任务在运行,可以提高整体job的运行效率

package partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/** * 分区的例子必须打成jar包运行 * 分区的用处 * 1、根据业务需要,产生多个输出文件 * 2、多个reduce任务在运行,可以提高整体job的运行效率 * @author liguodong */

public class KpiApp {
    static final  String INPUT_PATH = "hdfs://liguodong:9000/wlan";
    static final  String OUT_PATH = "hdfs://liguodong:9000/out";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        final Job job = new Job(new Configuration(),KpiApp.class.getSimpleName());


        //打包运行必须执行的秘密方法
        job.setJarByClass(KpiApp.class);

        //1.1 指定输入文件路径
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        //指定那个类用来格式化输入文件
        job.setInputFormatClass(TextInputFormat.class);

        //1.2 指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定输出<k2,v2>的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(KpiWritable.class);


        //1.3 指定分区类
        //job.setPartitionerClass(HashPartitioner.class);
        //job.setNumReduceTasks(1);
        //自定义一个分区
        job.setPartitionerClass(KpiPartitioner.class);
        job.setNumReduceTasks(2);//两个输出文件 意味着两个reduce任务

        //1.4 TODO 排序、分组 
        //1.5 TODO (可选)规约

        //2.2 指定自定义的Reducer类
        job.setReducerClass(MyReducer.class);
        //指定输入<k3,v3>的值
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(KpiWritable.class);
        //2.3 指定输出到哪里
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //设定输出文件的格式化类
        job.setOutputFormatClass(TextOutputFormat.class);

        //把任务及交给JobTrack执行
        job.waitForCompletion(true);        
    }


    static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{

        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
                throws IOException, InterruptedException {
            final String[] splited = value.toString().split("\t");
            final String msisdn = splited[1];
            final Text k2 = new Text(msisdn);
            final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]);
            context.write(k2, v2);

        }       
    }

    static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{

        /** * @param k2 表示整个文件中不同的手机号码 * @param v2表示该手机号在不同时段的流量的集合 * @param */
        @Override
        protected void reduce(Text k2, Iterable<KpiWritable> v2s,
                Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
                throws IOException, InterruptedException {
            long upPackNum = 0L;
            long downPackNum = 0L;
            long upPayLoad = 0L;
            long downPayLoad = 0L;
            for (KpiWritable kpiWritable : v2s) {
                upPackNum += kpiWritable.upPackNum;
                downPackNum += kpiWritable.downPackNum;
                upPayLoad += kpiWritable.upPayLoad;
                downPayLoad += kpiWritable.downPayLoad;
            }
            final KpiWritable v3 = new KpiWritable(upPackNum+"",downPackNum+"",upPayLoad+"",downPayLoad+"");
            context.write(k2, v3);
        }

    }

    static class KpiPartitioner extends HashPartitioner<Text, KpiWritable>{

        @Override
        public int getPartition(Text key, KpiWritable value, int numReduceTasks) {
            //是手机号,返回0;非手机号,返回1。
            return key.toString().length() == 11 ? 0 : 1;
        }   
    }   
}

class KpiWritable implements Writable{

    long upPackNum;
    long downPackNum;
    long upPayLoad;
    long downPayLoad;   
    public KpiWritable(){

    }
    public KpiWritable(String upPackNum, String downPackNum, String upPayLoad,
            String  downPayLoad) {
        super();
        this.upPackNum = Long.parseLong(upPackNum);
        this.downPackNum = Long.parseLong(downPackNum); 
        this.upPayLoad = Long.parseLong(upPayLoad); 
        this.downPayLoad = Long.parseLong(downPayLoad);
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upPackNum);
        out.writeLong(downPackNum);
        out.writeLong(upPayLoad);
        out.writeLong(downPayLoad);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upPackNum = in.readLong();
        this.downPackNum = in.readLong();
        this.upPayLoad = in.readLong();
        this.downPayLoad = in.readLong();
    }
    @Override
    public String toString(){
        return upPackNum+"\t"+downPackNum+"\t"+upPayLoad+"\t"+downPayLoad;
    }
}

六、hadoop中常见压缩格式特征的比较
1 gzip压缩

优点:压缩率比较高,而且压缩/解压速度也比较快;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便。
缺点:不支持split。

应用场景:当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用gzip压缩格式。譬如说一天或者一个小时的日志压缩成一个gzip 文件,运行mapreduce程序的时候通过多个gzip文件达到并发。hive程序,streaming程序,和java写的mapreduce程序完 全和文本处理一样,压缩之后原来的程序不需要做任何修改。

2 lzo压缩

优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;可以在linux系统下安装lzop命令,使用方便。
缺点:压缩率比gzip要低一些;hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。

应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越越明显。

3 snappy压缩

优点:高速压缩速度和合理的压缩率;支持hadoop native库。
缺点:不支持split;压缩率比gzip要低;hadoop本身不支持,需要安装;linux系统下没有对应的命令。

应用场景:当mapreduce作业的map输出的数据比较大的时候,作为map到reduce的中间数据的压缩格式;或者作为一个mapreduce作业的输出和另外一个mapreduce作业的输入。

4 bzip2压缩

优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便。
缺点:压缩/解压速度慢;不支持native。

应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式;或者输出之后的数据比较大,处理之后的数据 需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程 序(即应用程序不需要修改)的情况。
MapReduce学习笔记(二)插图

对MapReduce的输出进行压缩
MapReduce学习笔记(二)插图1

七、排序
首先按照第一列升序排列,当第一列相同时,第二列升序排列

3 3
3 2
3 1
2 2
2 1
1 1

1 1
2 1
2 2
3 1
3 2
3 3

package sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class SortApp {
    static final String INPUT_PATH = "hdfs://liguodong:9000/input";
    static final String OUT_PATH = "hdfs://liguodong:9000/out1";
    public static void main(String[] args) throws Exception{
        final Configuration configuration = new Configuration();

        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
        if(fileSystem.exists(new Path(OUT_PATH))){
            fileSystem.delete(new Path(OUT_PATH), true);
        }

        final Job job = new Job(configuration, SortApp.class.getSimpleName());

        //1.1 指定输入文件路径
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        //指定哪个类用来格式化输入文件
        job.setInputFormatClass(TextInputFormat.class);

        //1.2指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定输出<k2,v2>的类型
        job.setMapOutputKeyClass(NewK2.class);
        job.setMapOutputValueClass(LongWritable.class);

        //1.3 指定分区类
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);

        //1.4 TODO 排序、分区

        //1.5 TODO (可选)合并

        //2.2 指定自定义的reduce类
        job.setReducerClass(MyReducer.class);
        //指定输出<k3,v3>的类型
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定输出到哪里
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //设定输出文件的格式化类
        job.setOutputFormatClass(TextOutputFormat.class);

        //把代码提交给JobTracker执行
        job.waitForCompletion(true);
    }


    static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
        protected void map(LongWritable key, Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) 
                throws java.io.IOException ,InterruptedException {
            final String[] splited = value.toString().split("\t");
            //
            final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));

            final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
            context.write(k2, v2);
        };
    }

    static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
        protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
            //方式一输出
            //context.write(new LongWritable(k2.first), new LongWritable(k2.second));
            //方式二输出
            for (LongWritable v2 : v2s) {
                context.write(new LongWritable(k2.first), v2);
            }
        };
    }

    /** * 问:为什么实现该类? * 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2 * */
    static class  NewK2 implements WritableComparable<NewK2>{
        Long first;
        Long second;

        public NewK2(){}

        public NewK2(long first, long second){
            this.first = first;
            this.second = second;
        }


        @Override
        public void readFields(DataInput in) throws IOException {
            this.first = in.readLong();
            this.second = in.readLong();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(first);
            out.writeLong(second);
        }

        /** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列升序 */
        @Override
        public int compareTo(NewK2 o) {
            final long minus = this.first - o.first;
            if(minus !=0){
                return (int)minus;
            }
            return (int)(this.second - o.second);
        }

        @Override
        public int hashCode() {
            return this.first.hashCode()+this.second.hashCode();
        }

        @Override
        public boolean equals(Object obj) {
            if(!(obj instanceof NewK2)){
                return false;
            }
            NewK2 oK2 = (NewK2)obj;
            return (this.first==oK2.first)&&(this.second==oK2.second);
        }
    }

}

八、分组

当第一列相同时,求出第二列的最小值。

3 3
3 2
3 1
2 2
2 1
1 1

1 1
2 1
3 1

package group;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class Group {
    static final String INPUT_PATH = "hdfs://liguodong:9000/input";
    static final String OUT_PATH = "hdfs://liguodong:9000/out1";
    public static void main(String[] args) throws Exception{
        final Configuration configuration = new Configuration();

        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
        if(fileSystem.exists(new Path(OUT_PATH))){
            fileSystem.delete(new Path(OUT_PATH), true);
        }

        final Job job = new Job(configuration, Group.class.getSimpleName());

        //1.1 指定输入文件路径
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        //指定哪个类用来格式化输入文件
        job.setInputFormatClass(TextInputFormat.class);

        //1.2指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定输出<k2,v2>的类型
        job.setMapOutputKeyClass(NewK2.class);
        job.setMapOutputValueClass(LongWritable.class);

        //1.3 指定分区类
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);

        //1.4 TODO 排序、分组
        job.setGroupingComparatorClass(MyGroupComparator.class);

        //1.5 TODO (可选)合并

        //2.2 指定自定义的reduce类
        job.setReducerClass(MyReducer.class);
        //指定输出<k3,v3>的类型
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定输出到哪里
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //设定输出文件的格式化类
        job.setOutputFormatClass(TextOutputFormat.class);

        //把代码提交给JobTracker执行
        job.waitForCompletion(true);
    }


    static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
        protected void map(LongWritable key, Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) 
                throws java.io.IOException ,InterruptedException {
            final String[] splited = value.toString().split("\t");
            //
            final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));

            final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
            context.write(k2, v2);
        };
    }

    static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
        protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
            long min = Long.MAX_VALUE;
            for (LongWritable v2 : v2s) {
                if(v2.get()<min)
                {
                    min = v2.get();
                }
            }
            context.write(new LongWritable(k2.first), new LongWritable(min));
        };
    }

    /** * 问:为什么实现该类? * 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2 * */
    static class  NewK2 implements WritableComparable<NewK2>{
        Long first;
        Long second;

        public NewK2(){}

        public NewK2(long first, long second){
            this.first = first;
            this.second = second;
        }


        @Override
        public void readFields(DataInput in) throws IOException {
            this.first = in.readLong();
            this.second = in.readLong();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(first);
            out.writeLong(second);
        }

        /** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列升序 */
        @Override
        public int compareTo(NewK2 o) {
            final long minus = this.first - o.first;
            if(minus !=0){
                return (int)minus;
            }
            return (int)(this.second - o.second);
        }

        @Override
        public int hashCode() {
            return this.first.hashCode()+this.second.hashCode();
        }

        @Override
        public boolean equals(Object obj) {
            if(!(obj instanceof NewK2)){
                return false;
            }
            NewK2 oK2 = (NewK2)obj;
            return (this.first==oK2.first)&&(this.second==oK2.second);
        }
    }

    /** * 问:为什么自定义该类? * 答:业务要求分组按照第一列分,但是NewK2的比较规则决定了不能按照第一列分,只能自定义分组比较类。 * @author liguodong * */
    static class MyGroupComparator implements RawComparator<NewK2>{

        @Override
        public int compare(NewK2 o1, NewK2 o2) {            
            return (int)(o1.first-o2.first);
        }
        /** * @param b1表示第一个参与比较的字节数组 * @param s1表示第一个参与比较的字节数组的起始位置 * @param l1表示第一个参与比较的字节数组的偏移量 * * @param b2表示第二个参与比较的字节数组 * @param s2表示第二个参与比较的字节数组的起始位置 * @param l2表示第二个参与比较的字节数组的偏移量 */
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);   
        }

    }
}

九、MapReduce常见算法
单词计数
数据去重
排序
Top K
选择
投影
分组
多表连接
单表关联

Top K源代码

package algorithm;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TopK {

    static final String INPUT_PATH = "hdfs://liguodong:9000/input";
    static final String OUT_PATH = "hdfs://liguodong:9000/out";

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUT_PATH)))
        {
            fileSystem.delete(new Path(OUT_PATH),true);
        }


        final Job job = new Job(conf, TopK.class.getSimpleName());
        //1.1 输入目录在哪里
        FileInputFormat.setInputPaths(job, INPUT_PATH);

        //指定对输入数据进行格式化处理的类
        //job.setInputFormatClass(TextInputFormat.class);

        //1.2 指定自定义的Mapper类
        job.setMapperClass(MyMapper.class);
        //指定map的输出的<k,v>类型,如果<k3,v3>的类型与<k2,v2>的类型一致,那么可以省略。
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(LongWritable.class);

        //1.3分区
        //job.setPartitionerClass(HashPartitioner.class);
        //job.setNumReduceTasks(1);

        //1.4 TODO  排序、分组
        //1.5 TODO  (可选)归约

        //2.2 指定自定义的Reducer类
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(NullWritable.class);

        //2.3 指定输出的路径
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        //指定输出的格式化类
        //job.setOutputFormatClass(TextOutputFormat.class);

        //把作业交给JobTracker运行
        job.waitForCompletion(true);
    }

    static class MyMapper extends Mapper<LongWritable,Text,LongWritable,NullWritable>{

        long max = Long.MIN_VALUE;
        @Override
        protected void map(LongWritable k1, Text v1,
                Mapper<LongWritable, Text, LongWritable,NullWritable>.Context context)
                throws IOException, InterruptedException {
            final long temp = Long.parseLong(v1.toString());
            if(temp>max)
            {
                max = temp;
            }
        }

        /* * 所有map函数执行完之后再执行。 * Called once at the end of the task. * * Returns the single instance of this class. * public static NullWritable get() { return THIS; } */     
        @Override
        protected void cleanup(
                Mapper<LongWritable, Text, LongWritable,NullWritable>.Context context)
                throws IOException, InterruptedException {
            context.write(new LongWritable(max), NullWritable.get());
        }
    }

    static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable,NullWritable>{
        long max = Long.MIN_VALUE;
        @Override
        protected void reduce(
                LongWritable k2,
                Iterable<NullWritable> v2,
                Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {

            final long temp = k2.get();
            if(temp > max){
                max = temp;
            }
        }

        protected void cleanup(
                org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) 
                throws IOException ,InterruptedException 
        {
            context.write(new LongWritable(max), NullWritable.get());
        };
    }

}

原文链接:https://blog.csdn.net/scgaliguodong123_/article/details/44747697

本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。

未经允许不得转载:起风网 » MapReduce学习笔记(二)
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录