纵有疾风起
人生不言弃

MapReduce排序及实例

排序可分为四种排序:
普通排序
部分排序
全局排序
二次排序(比如有两列数据,第一列相同时,需要对第二列进行排序。)

普通排序

普通排序是Mapreduce本身就自带排序功能;
Text对象是不适合排序的;IntWritable,LongWritable等实现了WritableComparable类型的对象都是可以排序的;

部分排序

map和reduce处理过程中默认包含了对key的排序,如果不要求全排序,可以直接把结果输出,那么每个输出文件中包含的就是安装key执行排序的结果;

全局排序

Hadoop平台并没有提供全局数据排序,而在大规模数据处理中进行数据的全局排序是非常普遍的需求;使用hadoop进行大量的数据排序最直观的方法是把文件所以内容给map之后,map不做任何处理,直接输出给一个reduce(一个reduce处理的话,不是很适合大规模的数据,效率不高。),利用hadoop自己的shuffle机制,对所有数据进行排序,而后由reduce直接输出;

如果要对大规模数据处理中进行数据的全局排序的话,
主要思路就是将数据按照区间进行分割,比如对整数排序,
[0,10000]的在partition 0中,(10000,20000]在partition 1中,
在数据分布均匀的情况下,每个分区内的数据量基本相同,这种就是比较理想的情况了,但是实际中数据往往分布不均匀,出现了数据倾斜的情况,这时按照之前的分区划分数据就不合适了,此时就需要一定的帮助——采样器;

package Sort;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class Demo {

    private final static String INPUT_PATH = "hdfs://liguodong:8020/input";
    private final static String OUTPUT_PATH = "hdfs://liguodong:8020/output";

    public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String[] values = value.toString().split("\\s+");
            context.write(new LongWritable(Long.parseLong(values[0])), NullWritable.get());

        }

    }

    public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{

        @Override
        protected void reduce(LongWritable key, Iterable<NullWritable> values,
                Context context)
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }

    }

    public static class MyPartitioner extends Partitioner<LongWritable, NullWritable>{

        @Override
        public int getPartition(LongWritable key, NullWritable value,
                int numPartitions) {
            if(key.get() <= 100){
                return 0%numPartitions;
            }
            if(key.get()>100 && key.get()<1000){
                return 1%numPartitions;
            }
            return 2;   
        }       
    }



    public static void main(String[] args) throws 
    ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUTPUT_PATH)))
        {
            fileSystem.delete(new Path(OUTPUT_PATH),true);
        }

        Job job = Job.getInstance(conf, "shuffle sort"); 

        job.setJarByClass(Demo.class);

        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));  

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setPartitionerClass(MyPartitioner.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(3);

        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}
[root@liguodong file]# vi sortsum
[root@liguodong file]# hdfs dfs -put sortsum /input
[root@liguodong file]# hdfs dfs -cat /input
43
6546
65
787
879
98
......


运行jar
[root@liguodong file]# yarn jar numsort.jar

查看执行结果
[root@liguodong file]# hdfs dfs -ls /output/
Found 4 items
-rw-r--r-- 1 root supergroup 0 2015-06-16 10:55 /output/_SUCCESS
-rw-r--r-- 1 root supergroup 28 2015-06-16 10:55 /output/part-r-00000
-rw-r--r-- 1 root supergroup 20 2015-06-16 10:55 /output/part-r-00001
-rw-r--r-- 1 root supergroup 15 2015-06-16 10:55 /output/part-r-00002
[root@liguodong file]# hdfs dfs -cat /output/part-r-00000
2
7
23
34
43
54
65
76
87
98
[root@liguodong file]# hdfs dfs -cat /output/part-r-00001
543
567
675
787
879
[root@liguodong file]# hdfs dfs -cat /output/part-r-00002
5423
6546
6554

以上程序的缺陷:人为的进行分区,这样可能数据极不对称,易产生数据倾斜。
因此,Hadoop提供了Sampler接口可以返回一组样本,该接口为Hadoop的采样器;Hadoop提供了一个TotalOrderPartitioner类,可以用来实现全局排序;

Hadoop2.2.0源码

package org.apache.hadoop.mapreduce.lib.partition;
/** * Utility for collecting samples and writing a partition file for * {@link TotalOrderPartitioner}. */
public class InputSampler<K,V> extends Configured implements Tool {
  /** * Interface to sample using an * {@link org.apache.hadoop.mapreduce.InputFormat}. */
  public interface Sampler<K,V> {
    /** * For a given job, collect and return a subset of the keys from the * input data. */
    K[] getSample(InputFormat<K,V> inf, Job job) 
    throws IOException, InterruptedException;
  }

  /** * Samples the first n records from s splits. * Inexpensive way to sample random data. */
  public static class SplitSampler<K,V> implements Sampler<K,V> {}

  /** * Sample from random points in the input. * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from * each split. */
  public static class RandomSampler<K,V> implements Sampler<K,V> {}

  /** * Sample from s splits at regular intervals. * Useful for sorted data. */
  public static class IntervalSampler<K,V> implements Sampler<K,V> {}

}
package org.apache.hadoop.mapreduce.lib.partition;
/** * Partitioner effecting a total order by reading split points from * an externally generated source. */
public class TotalOrderPartitioner<K extends WritableComparable<?>,V> extends Partitioner<K,V> implements Configurable {

}

关于partitioner这个实现可以使用采样器产生的文件。

二次排序

举例:

key1 1
key2 2
key3 3
key2 1
key1 3

中间结果:

<key1,1> 1
<key1,3>  3
<key2,1>  1
<key2,2>  2
<key3,3>  3

排序结果:

key1 1
key1 3
key2 1
key2 2
key3 3

1、MapReduce默认会对key进行排序;
2、主要思路:
重写Partitioner,完成key分区,形成第一次排序;
可参考如下:
http://blog.csdn.net/scgaliguodong123_/article/details/46489357
实现WritableComparator,完成自己的排序逻辑,完成key的第2次排序;
可参考如下:
http://blog.csdn.net/scgaliguodong123_/article/details/46010947

Hadoop官方提供的例子:

package org.apache.hadoop.examples;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

/** * This is an example Hadoop Map/Reduce application. * It reads the text input files that must contain two integers per a line. * The output is sorted by the first and second number and grouped on the * first number. * * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort * <i>in-dir</i> <i>out-dir</i> */
public class SecondarySort {

  /** * Define a pair of integers that are writable. * They are serialized in a byte comparable format. */
  public static class IntPair 
                      implements WritableComparable<IntPair> {
    private int first = 0;
    private int second = 0;

    /** * Set the left and right values. */
    public void set(int left, int right) {
      first = left;
      second = right;
    }
    public int getFirst() {
      return first;
    }
    public int getSecond() {
      return second;
    }
    /** * Read the two integers. * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1 */
    @Override
    public void readFields(DataInput in) throws IOException {
      first = in.readInt() + Integer.MIN_VALUE;
      second = in.readInt() + Integer.MIN_VALUE;
    }
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeInt(first - Integer.MIN_VALUE);
      out.writeInt(second - Integer.MIN_VALUE);
    }
    @Override
    public int hashCode() {
      return first * 157 + second;
    }
    @Override
    public boolean equals(Object right) {
      if (right instanceof IntPair) {
        IntPair r = (IntPair) right;
        return r.first == first && r.second == second;
      } else {
        return false;
      }
    }
    /** A Comparator that compares serialized IntPair. */ 
    public static class Comparator extends WritableComparator {
      public Comparator() {
        super(IntPair.class);
      }

      public int compare(byte[] b1, int s1, int l1,
                         byte[] b2, int s2, int l2) {
        return compareBytes(b1, s1, l1, b2, s2, l2);
      }
    }

    static {                                        // register this comparator
      WritableComparator.define(IntPair.class, new Comparator());
    }

    @Override
    public int compareTo(IntPair o) {
      if (first != o.first) {
        return first < o.first ? -1 : 1;
      } else if (second != o.second) {
        return second < o.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }

  /** * Partition based on the first part of the pair. */
  public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
    @Override
    public int getPartition(IntPair key, IntWritable value, 
                            int numPartitions) {
      return Math.abs(key.getFirst() * 127) % numPartitions;
    }
  }

  /** * Compare only the first part of the pair, so that reduce is called once * for each value of the first part. */
  public static class FirstGroupingComparator 
                implements RawComparator<IntPair> {
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                                             b2, s2, Integer.SIZE/8);
    }

    @Override
    public int compare(IntPair o1, IntPair o2) {
      int l = o1.getFirst();
      int r = o2.getFirst();
      return l == r ? 0 : (l < r ? -1 : 1);
    }
  }

  /** * Read two integers from each line and generate a key, value pair * as ((left, right), right). */
  public static class MapClass 
         extends Mapper<LongWritable, Text, IntPair, IntWritable> {

    private final IntPair key = new IntPair();
    private final IntWritable value = new IntWritable();

    @Override
    public void map(LongWritable inKey, Text inValue, 
                    Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(inValue.toString());
      int left = 0;
      int right = 0;
      if (itr.hasMoreTokens()) {
        left = Integer.parseInt(itr.nextToken());
        if (itr.hasMoreTokens()) {
          right = Integer.parseInt(itr.nextToken());
        }
        key.set(left, right);
        value.set(right);
        context.write(key, value);
      }
    }
  }

  /** * A reducer class that just emits the sum of the input values. */
  public static class Reduce 
         extends Reducer<IntPair, IntWritable, Text, IntWritable> {
    private static final Text SEPARATOR = 
      new Text("------------------------------------------------");
    private final Text first = new Text();

    @Override
    public void reduce(IntPair key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      context.write(SEPARATOR, null);
      first.set(Integer.toString(key.getFirst()));
      for(IntWritable value: values) {
        context.write(first, value);
      }
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: secondarysort <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "secondary sort");
    job.setJarByClass(SecondarySort.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);

    // group and partition by the first int in the pair
    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(FirstGroupingComparator.class);

    // the map output is IntPair, IntWritable
    job.setMapOutputKeyClass(IntPair.class);
    job.setMapOutputValueClass(IntWritable.class);

    // the reduce output is Text, IntWritable
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}

运行结果:

[root@liguodong mapreduce]# hdfs dfs -put sort hdfs://liguodong:8020/input           
[root@liguodong mapreduce]# hdfs dfs -cat hdfs://liguodong:8020/input
1 1
2 2
3 3
2 1
1 3

[root@liguodong mapreduce]# yarn jar hadoop-mapreduce-examples-2.6.0.jar secondarysort /input /output
...... [root@liguodong mapreduce]# hdfs dfs -cat /output/p* ------------------------------------------------
1       1
1 3 ------------------------------------------------
2       1
2 2 ------------------------------------------------
3       3

原文链接:https://blog.csdn.net/scgaliguodong123_/article/details/46514673

本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。

未经允许不得转载:起风网 » MapReduce排序及实例
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录