纵有疾风起
人生不言弃

MapReduce on HBase使用与集成

为什么需要MapReduce on HBase?

hbase本身并没有提供很好地二级索引方式。如果直接使用hbase提供的scan直接扫描方式,在数据量很大的情况下就会非常慢。

可以使用Mapreduce的方法操作hbase数据库。Hadoop MapReduce提供相关API,可以与hbase数据库无缝连接。
API链接: http://hbase.apache.org/devapidocs/index.html

HBase与Hadoop的API对比

MapReduce on HBase使用与集成插图

相关类

TableMapper

package org.apache.hadoop.hbase.mapreduce;

/** * Extends the base <code>Mapper</code> class to add the required input key * and value classes. * * @param <KEYOUT> The type of the key. * @param <VALUEOUT> The type of the value. * @see org.apache.hadoop.mapreduce.Mapper */
public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}

TableReducer

package org.apache.hadoop.hbase.mapreduce;
/**
 * Extends the basic <code>Reducer</code> class to add the required key and
 * value input/output classes. While the input key and value as well as the
 * output key can be anything handed in from the previous map phase the output
 * value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put}
 * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
 * using the {@link TableOutputFormat} class.
 * <p>
 * This class is extended by {@link IdentityTableReducer} but can also be
 * subclassed to implement similar features or any custom code needed. It has
 * the advantage to enforce the output value to a specific basic type.
 * @param <KEYIN>  The type of the input key.
 * @param <VALUEIN>  The type of the input value.
 * @param <KEYOUT>  The type of the output key.
 * @see org.apache.hadoop.mapreduce.Reducer
 */
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
}

TableMapReduceUtil

public class TableMapReduceUtil {
  /** * Use this before submitting a TableMap job. It will appropriately set up * the job. * * @param table The table name to read from. * @param scan The scan instance with the columns, time range etc. * @param mapper The mapper class to use. * @param outputKeyClass The class of the output key. * @param outputValueClass The class of the output value. * @param job The current job to adjust. Make sure the passed job is * carrying all necessary HBase configuration. * @throws IOException When setting up the details fails. */
  public static void initTableMapperJob(String table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job)
  throws IOException {
    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
        job, true);
  }

  /** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * * @param table The output table. * @param reducer The reducer class to use. * @param job The current job to adjust. * @throws IOException When determining the region count fails. */
  public static void initTableReducerJob(String table,
    Class<? extends TableReducer> reducer, Job job)
  throws IOException {
    initTableReducerJob(table, reducer, job, null);
  }

}

Demo

package MapReduceHbase;

import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;



public class HbaseMR {

    private String rootDir;
    private String zkServer;
    private String port;
    private Configuration conf; 
    private HConnection hConn = null;

    private HbaseMR(String rootDir,String zkServer,String port) throws IOException{
        this.rootDir = rootDir;
        this.zkServer = zkServer;
        this.port = port;

        conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", rootDir);
        conf.set("hbase.zookeeper.quorum", zkServer);
        conf.set("hbase.zookeeper.property.clientPort", port);
        hConn = HConnectionManager.createConnection(conf); 
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        String rootDir = "hdfs://hadoop1:8020/hbase";
        String zkServer = "hadoop1";
        String port = "2181";

        HbaseMR conn = new HbaseMR(rootDir,zkServer,port);

        //Configuration conf = HBaseConfiguration.create();

        //Configuration conf = HBaseConfiguration.create();
        Job job = new Job(conn.conf,"MapReduce on HBase");
        job.setJarByClass(HbaseMR.class);

        Scan scan = new Scan();
        scan.setCaching(1000);//事先读取多少条记录


        TableMapReduceUtil.initTableMapperJob("students",scan,
                MyMapper.class,Text.class,Text.class,job);

        TableMapReduceUtil.initTableReducerJob("students_age", MyReducer.class, job);

        job.waitForCompletion(true);
    }   
}


class MyMapper extends TableMapper<Text, Text>{

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        Text k = new Text(Bytes.toString(key.get()));
        Text v = new Text(value.getValue(Bytes.toBytes("basicInfo"), 
                Bytes.toBytes("age")));
        //年龄  人名
        context.write(v, k); 
    }
}

class MyReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{

    @Override
    protected void reduce(Text k2, Iterable<Text> v2s,
            Context context)
            throws IOException, InterruptedException {  
        Put put = new Put(Bytes.toBytes(k2.toString()));
        for (Text v2 : v2s) {//遍历获得所有的人名
            //列族 列  值
            put.add(Bytes.toBytes("f1"), Bytes.toBytes(v2.toString()), 
                    Bytes.toBytes(v2.toString()));
        }       
        context.write(null, put);
    }   
}

运行之前先创建一个students_age表,列族为f1。
运行结果:
MapReduce on HBase使用与集成插图1

原文链接:https://blog.csdn.net/scgaliguodong123_/article/details/46745375

本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。

未经允许不得转载:起风网 » MapReduce on HBase使用与集成
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录