HBase的协处理器编码实战

1 协处理器简介

        如果要统计Hbase中的数据,比如统计某个字段的最大值、统计满足某种条件的记录数、统计各种记录的特点并按照记录特点分类等等,常规的做法是把HBase中整个表的数据Scan出来,或者加一个Filter,进行一些初步的过滤,然后在客户端进行统计处理。但是这么做会有很大的副作用,比如占用大量的网络带宽(大数据量尤为明显),RPC的压力也是不容小觑的。

        HBase作为列式数据库最经常被人诟病的特性包括:无法轻易建立“二级索引”,难以执行求和、计数、排序等操作。比如,在旧版本的(<0.92)HBase中,统计数据表的总行数,需要使用Counter方法,执行一次MapReduce Job才能得到。虽然HBase在数据存储层中集成了MapReduce,能够有效进行数据表的分布式计算,然而在很多情况下,做一些简单的相加或者聚合计算的时候,如果直接将计算过程放置在server端,能够减少网络开销,从而获得很好的性能提升。于是,HBase在0.92之后引入了协处理器(coprocessors),实现了一些激动人心的新特性:能够轻易建立二次索引、复杂过滤器以及访问控制等。

        简单理解来说,协处理器是HBase让用户的部分逻辑在数据存放端即HBase服务端进行计算的机制,它允许用户在HBase服务端运行自己的代码。

2 协处理器的分类

        协处理器分为两种类型:系统协处理器可以全局导入Region Server上的所有数据表,表协处理器是用户可以指定一张表使用的协处理器。协处理器框架为了更好支持其行为的灵活性,提供了两个不同方面的插件。一个是观察者(Observer),类似于关系数据库的触发器。另一个是终端(Endpoint),动态的终端有点像存储过程。

        Observer的设计意图是允许用户通过插入代码来重载协处理器框架的upcall方法,而具体的事件触发的callback方法由HBase的核心代码来执行。协处理器框架处理所有的callback调用细节,协处理器自身只需要插入添加或者改变的功能。

        Endpoint是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个终端,它们的实现代码会被目标Region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。

3 Protocol Buffer的使用

        由于下面的Endpoint编码示例使用了Google公司的混合语言数据标准Protocol Buffer,所以首先了解一下这个常用于RPC系统的工具。

3.1 ProtocolBuffer介绍

        Protocol Buffer是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,很适合做数据存储或RPC数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。目前提供了C++、Java、Python三种语言的 API。

       为什么要使用Protocol Buffer呢?先看一个在实际开发中经常会遇到的系统场景:我们的客户端程序是使用Java开发的,可能运行自不同的平台,如Linux、Windows或者是Android,而我们的服务器程序通常是基于Linux平台并使用C++开发完成的。在这两种程序之间进行数据通讯时存在多种方式用于设计消息格式,如:

       1、直接传递C/C++语言中字节对齐的结构体数据,只要结构体的声明为定长格式,那么该方式对于C/C++程序而言就非常方便了,仅需将接收到的数据按照结构体类型强行转换即可。事实上对于变长结构体也不会非常麻烦。在发送数据时,也只需定义一个结构体变量并设置各个成员变量的值之后,再以char*的方式将该二进制数据发送到远端。反之,该方式对于Java开发者而言就会非常繁琐,首先需要将接收到的数据存于ByteBuffer之中,再根据约定的字节序逐个读取每个字段,并将读取后的值再赋值给另外一个值对象中的域变量,以便于程序中其他代码逻辑的编写。对于该类型程序而言,联调的基准是必须客户端和服务器双方均完成了消息报文构建程序的编写后才能展开,而该设计方式将会直接导致Java程序开发的进度过慢。即便是Debug阶段,也会经常遇到Java程序中出现各种域字段拼接的小错误。

       2、使用SOAP协议(WebService)作为消息报文的格式载体,由该方式生成的报文是基于文本格式的,同时还存在大量的XML描述信息,因此将会大大增加网络IO的负担。又由于XML解析的复杂性,这也会大幅降低报文解析的性能。总之,使用该设计方式将会使系统的整体运行性能明显下降。

       对于以上两种方式所产生的问题,Protocol Buffer均可以很好的解决,不仅如此,Protocol Buffer还有一个非常重要的优点就是可以保证同一消息报文新旧版本之间的兼容性。

3.2 安装Protocol Buffer

// 在https://developers.google.com/protocol-buffers/docs/downloads下载protobuf-2.6.1.tar.gz后解压至指定目录

$ tar -xvf protobuf-2.6.1.tar.gz -C app/

// 删除压缩包

$ rm protobuf-2.6.1.tar.gz

// 安装c++编译器相关包

$ sudo apt-get install g++

// 编译安装protobuf

$ cd app/protobuf-2.6.1/

$ ./configure

$ make

$ make check

$ sudo make install

// 添加到lib

$ vim ~/.bashrc

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib

$ source ~/.bashrc

// 验证是否安装成功

$ protoc --version

3.3 编写proto文件

       首先需要编写一个 proto 文件,定义程序中需要处理的结构化数据。proto 文件非常类似java或者C语言的数据定义。如下代码给出了示例中定义RPC接口的 endpoint.proto文件内容:

// 定义常用选项
option java_package = "com.hbase.demo.endpoint"; //指定生成Java代码的包名
option java_outer_classname = "Sum"; //指定生成Java代码的外部类名称
option java_generic_services = true; //基于服务定义产生抽象服务代码
option optimize_for = SPEED; //指定优化级别
// 定义请求包
message SumRequest {
 required string family = 1; //列族
 required string column = 2; //列名
}
// 定义回复包
message SumResponse {
 required int64 sum = 1 [default = 0]; //求和结果
}
// 定义RPC服务
service SumService {
 //获取求和结果
 rpc getSum(SumRequest)
  returns (SumResponse);
}

3.4 编译proto文件

 

// 将proto文件编译生成java代码

$ protoc endpoint.proto --java_out=./

// 生成的文件Sum.java如下图所示:
HBase的协处理器编码实战

 

 

4 Endpoint编码示例

 

       业务逻辑如求和、排序等功能放在服务端,在服务端完成计算后将结果发送给客户端,可以减少数据的传输量。下面的示例将在HBase的服务端生成一个RPC服务,即在服务端对指定表的指定列值进行求和计算,并将计算结果返回给客户端。客户端调用该RPC服务,获取响应结果后输出。

4.1 服务端代码

       首先,将通过Protocol Buffer生成的RPC接口文件Sum.java导入项目,然后在项目中新建类SumEndPoint编写服务端代码:

package com.hbase.demo.endpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.Hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.hbase.demo.endpoint.Sum.SumRequest;
import com.hbase.demo.endpoint.Sum.SumResponse;
import com.hbase.demo.endpoint.Sum.SumService;
 
/**
 * @author developer
 * 说明:hbase协处理器endpooint的服务端代码
 * 功能:继承通过protocol buffer生成的rpc接口,在服务端获取指定列的数据后进行求和操作,最后将结果返回客户端
 */
public class SumEndPoint extends SumService implements Coprocessor,CoprocessorService {
   
    private RegionCoprocessorEnvironment env;  // 定义环境
   
    @Override
    public Service getService() {
        return this;
    }

    @Override
    public void getSum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
        // 定义变量
        SumResponse response = null;
        InternalScanner scanner = null;
        // 设置扫描对象
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes(request.getFamily()));
        scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
        // 扫描每个region,取值后求和
        try {
            scanner = env.getRegion().getScanner(scan);
            List<Cell> results = new ArrayList<Cell>();
            boolean hasMore = false;
            Long sum = 0L;
            do {
                hasMore = scanner.next(results);
                for (Cell cell : results) {
                    sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));
                }
                results.clear();
            } while (hasMore);
            // 设置返回结果
            response = SumResponse.newBuilder().setSum(sum).build();
        } catch (IOException e) {
            ResponseConverter.setControllerException(controller, e);
        } finally {
            if (scanner != null) {
                try {
                    scanner.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        // 将rpc结果返回给客户端
        done.run(response);
    }
   
    // 协处理器初始化时调用的方法
    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        if (env instanceof RegionCoprocessorEnvironment) {
            this.env = (RegionCoprocessorEnvironment)env;
        } else {
            throw new CoprocessorException("no load region");
        }
    }
   
    // 协处理器结束时调用的方法
    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
       
    }

}

相关推荐