MapReduce on HBase使用与集成

为什么需要MapReduce on HBase?


可以使用Mapreduce的方法操作hbase数据库。Hadoop MapReduce提供相关API,可以与hbase数据库无缝连接。
API链接: http://hbase.apache.org/devapidocs/index.html




package org.apache.hadoop.hbase.mapreduce;

/** * Extends the base <code>Mapper</code> class to add the required input key * and value classes. * * @param <KEYOUT> The type of the key. * @param <VALUEOUT> The type of the value. * @see org.apache.hadoop.mapreduce.Mapper */
public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {


package org.apache.hadoop.hbase.mapreduce;
 * Extends the basic <code>Reducer</code> class to add the required key and
 * value input/output classes. While the input key and value as well as the
 * output key can be anything handed in from the previous map phase the output
 * value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put}
 * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
 * using the {@link TableOutputFormat} class.
 * <p>
 * This class is extended by {@link IdentityTableReducer} but can also be
 * subclassed to implement similar features or any custom code needed. It has
 * the advantage to enforce the output value to a specific basic type.
 * @param <KEYIN>  The type of the input key.
 * @param <VALUEIN>  The type of the input value.
 * @param <KEYOUT>  The type of the output key.
 * @see org.apache.hadoop.mapreduce.Reducer
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {


public class TableMapReduceUtil {
  /** * Use this before submitting a TableMap job. It will appropriately set up * the job. * * @param table The table name to read from. * @param scan The scan instance with the columns, time range etc. * @param mapper The mapper class to use. * @param outputKeyClass The class of the output key. * @param outputValueClass The class of the output value. * @param job The current job to adjust. Make sure the passed job is * carrying all necessary HBase configuration. * @throws IOException When setting up the details fails. */
  public static void initTableMapperJob(String table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job)
  throws IOException {
    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
        job, true);

  /** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * * @param table The output table. * @param reducer The reducer class to use. * @param job The current job to adjust. * @throws IOException When determining the region count fails. */
  public static void initTableReducerJob(String table,
    Class<? extends TableReducer> reducer, Job job)
  throws IOException {
    initTableReducerJob(table, reducer, job, null);



package MapReduceHbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class HbaseMR {

    private String rootDir;
    private String zkServer;
    private String port;
    private Configuration conf; 
    private HConnection hConn = null;

    private HbaseMR(String rootDir,String zkServer,String port) throws IOException{
        this.rootDir = rootDir;
        this.zkServer = zkServer;
        this.port = port;

        conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", rootDir);
        conf.set("hbase.zookeeper.quorum", zkServer);
        conf.set("hbase.zookeeper.property.clientPort", port);
        hConn = HConnectionManager.createConnection(conf); 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        String rootDir = "hdfs://hadoop1:8020/hbase";
        String zkServer = "hadoop1";
        String port = "2181";

        HbaseMR conn = new HbaseMR(rootDir,zkServer,port);

        //Configuration conf = HBaseConfiguration.create();

        //Configuration conf = HBaseConfiguration.create();
        Job job = new Job(conn.conf,"MapReduce on HBase");

        Scan scan = new Scan();


        TableMapReduceUtil.initTableReducerJob("students_age", MyReducer.class, job);


class MyMapper extends TableMapper<Text, Text>{

    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        Text k = new Text(Bytes.toString(key.get()));
        Text v = new Text(value.getValue(Bytes.toBytes("basicInfo"), 
        //年龄  人名
        context.write(v, k); 

class MyReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{

    protected void reduce(Text k2, Iterable<Text> v2s,
            Context context)
            throws IOException, InterruptedException {  
        Put put = new Put(Bytes.toBytes(k2.toString()));
        for (Text v2 : v2s) {//遍历获得所有的人名
            //列族 列  值
            put.add(Bytes.toBytes("f1"), Bytes.toBytes(v2.toString()), 
        context.write(null, put);



