分享

HBase权威指南(中文版)——第三章(第二部分)

 闲来看看 2013-08-14

第三章 客户端API: 基础篇(第三部分)

批量操作

前面介绍了如何逐条和批量地添加、读取、删除数据。在这一节,我们将介绍如何一次执行多种不同类型的操作处理多行记录。

事实上,一些使用List结构的批量操作,如delete(List<Delete>
deletes)
或者get(List(Get> gets)等,底层都是依靠batch实现的。封装这些接口可以更好的提高友好性。

在下面介绍的批量操作中,您将会看到一个新的类型叫Row,前面将到的PutGetDelete类都是从Row类的子类。

void
batch(List<Row> actions, Object[] results)

throws
IOException, InterruptedException

Object[] batch(List<Row>
actions)

throws
IOException, InterruptedException

由于Row的存在,以及它和GetPutDelete的继承关系,决定了可以在一个列表中混合多种不同类型的操作。示例3-16给出了这种使用的例子。

值得注意地是,您不应该将一个rowPutDelete操作混合在一起。因为List中多个操作在服务器端执行的顺序是无法保证的,这样会得到一个无法预料到的结果。

示例3-16 批量操作

private final
static byte[] ROW1 = Bytes.toBytes(“row1″);

private final
static byte[] ROW2 = Bytes.toBytes(“row2″);

private final
static byte[] COLFAM1 = Bytes.toBytes(“colfam1″);

private final
static byte[] COLFAM2 = Bytes.toBytes(“colfam2″);

private final
static byte[] QUAL1 = Bytes.toBytes(“qual1″);

private final
static byte[] QUAL2 = Bytes.toBytes(“qual2″);

List<Row>
batch = new ArrayList<Row>();

Put put = new
Put(ROW2);

put.add(COLFAM2,
QUAL1, Bytes.toBytes(“val5″));

batch.add(put);

Get get1 = new
Get(ROW1);

get1.addColumn(COLFAM1,
QUAL1);

batch.add(get1);

Delete delete =
new Delete(ROW1);

delete.deleteColumns(COLFAM1,
QUAL2);

batch.add(delete);

Get get2 = new
Get(ROW2);

get2.addFamily(Bytes.toBytes(“BOGUS”));

batch.add(get2);

Object[] results =
new Object[batch.size()];

try {

table.batch(batch,
results);

} catch (Exception
e) {

System.err.println(“Error:
” + e);

}

for (int i = 0; i
< results.length; i++) {

System.out.println(“Result["
+ i + "]: ” + results[i]);

}

首先定义了一组指向rowcolumn familycolumn
qualifier
的常量,方便重用。然后创建一个Row实例的列表。向其中分别加入一个PutGetDelete实例,再添加一个指向不存在列的Put操作。创建一个Result数组,大小和batch的大小相同。然后执行batch方法,返回的结果放在result中,最后打印出Result数组的值。

整个程序的输出如下:

Before batch
call…

KV:
row1/colfam1:qual1/1/Put/vlen=4, Value: val1

KV:
row1/colfam1:qual2/2/Put/vlen=4, Value: val2

KV: row1/colfam1:qual3/3/Put/vlen=4,
Value: val3

Result[0]:
keyvalues=NONE

Result[1]:
keyvalues={row1/colfam1:qual1/1/Put/vlen=4}

Result[2]:
keyvalues=NONE

Result[3]:
org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException:

org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException:

Column family
BOGUS does not exist in …

After batch
call…

KV:
row1/colfam1:qual1/1/Put/vlen=4, Value: val1

KV:
row1/colfam1:qual3/3/Put/vlen=4, Value: val3

KV:
row2/colfam2:qual1/1308836506340/Put/vlen=4, Value: val5

Error:
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:

Failed 1 action:
NoSuchColumnFamilyException: 1 time,

servers with
issues: 10.0.0.43:60020,

在前面的例子中,事先插入了一些测试数据并打印出来,让您方便地看出示例代码做的事情。最后发现,想删除的记录成功被删除,想插入的新记录也成功被插入了。

Get操作的结果,需要从Result数组中找到。Result数组的大小和请求操作的总数相等。第一个ResultPut操作的结果,为一个空的KeyValue结构;第二个Result的值是Get操作的结果,可以打印出它的值。第三个值为Delete操作的值,为一个空的KeyValue结构。表3-7给出了ResultRow类型之前的对应关系。

3-7 batch()调用可能返回的结果

Result

Description

null

The operation has failed to
communicate with the remote server.

Empty Result

Returned for successful Put
and Delete operations.

Result

Returned for successful Get
operations, but may also be empty when there was no matching row or column.

Throwable

In case the servers return an
exception for the operation it is returned to the client as-is. You can use
it to check what went wrong and maybe handle the problem automatically in
your code.

观察示例3-16执行的结果,您可以发现空的Result对象打印出了keyvalues=NONE Get操作打印出了对应取到的值。对于错误列上的Put操作得到了一个异常。

值得注意地是,当您使用batch()方法时,里面的Put实例并不会使用客户端缓存。batch()调用是同步的,同时直接向服务器发送请求。没有时延和其它处理过程,这和批量put操作是不同的。因此,您需要选择合适于您的。

batch调用有两种形式,一种将Result数据放在参数中,另一种放在返回值中。

void
batch(List<Row> actions, Object[] results)

throws
IOException, InterruptedException

Object[]
batch(List<Row> actions)

throws
IOException, InterruptedException

它们之间比较大的区别在于,当抛出异常时,第一方法的result中被填充了部分结果。而第二个方法在异常时,将会返回null

两种方法都支持getputdelete操作。如果执行其中的任何一个请求时出错,一个客户端异常将会被抛出,报告出错情况。客户端缓存不会被使用到。void batch(actions,
results)
会得到所有成功操作的结果和其中失败的服务端异常。Object[] batch( actions )只返回客户端异常,没有结果被填充到返回值中。

所有的操作都将在check之前执行:当您发现一个action出现了错误,但其它的操作也将被执行。在最差的场景下,所有的action都返回失败。

另一方面,batch操作在意瞬时失败,比如NotServingRegionException(比如一个Region Server被下线了),它会进行重试几次。配置项hbase.client.retries.number会设定重试的次数(默认的重试次数为10)。

Row Locks(行锁)

       更新操作,比如put()delete()checkAndPut()等等对于一个row来说是互斥执行的,从而保证一个低层面的原子性。Region Server提供了一个row lock行级锁来保证只有拥有锁的客户端才能够对该行进行修改。在实际中,客户端并不提供这种显示的锁,而是依赖于一种将每个操作独立开的机制。

       您应该最大限度地避免使用row lock,很容易出现RDBMS中类似的死锁现象。当Lock在等待超时的过程中,两个被挂起的客户端都持有一个句柄,这类句柄属于稀缺资源。如果这个锁被加在了一个访问很频繁的行上,那么很多客户端都会被阻塞。

当您使用put使用将一个Put实例发送到服务器时,如果您使用了如下的构造函数:

Put(byte[] row)

在这个构造函数里,并没有出现RowLock参数,服务器会根据您的行为,自动为您创建一个锁。这个锁对于客户端是透明的,客户端无法获取到这个生命期很短的服务器端的锁。相比于服务器自动创建的隐式锁。客户端也可以使用显式锁,并且在多个操作中使用到它。可以通过以下的方法:

RowLock
lockRow(byte[] row) throws IOException

void
unlockRow(RowLock rl) throws IOException

第一个方法lockRowrowkey为参数,返回一个RockLock实例,这个实例可以传递到PutDelete的构造函数中去。一旦您成功的获取到一个锁后,在使用完之后,您必须调用unlockRow方法释放它。

您申请到的LockRow会锁定整行,它通过rowkey来确定行,一旦拥有,别的客户端将不能对该行进行更新操作。

当这个行锁无论是被客户端或者服务器占有时,另一个想申请一个该行的新行锁的行为都会被挂起,直到这行原有的行锁被释放或者过期。后者是对出错进程持有行锁情况的一个兼容。

一个行锁的默认超时时间是1分钟,可以在hbase-site.xml进行配置:

<property>

<name>hbase.regionserver.lease.period</name>

<value>120000</value>

</property>

上述代码将超时时间改为120秒。当然,这个值不能设的太大,否则,每个客户端申请一个被别的进程占用的锁的最大等待时间都会变为这个值。示例3-17给出一个由用户生成的行锁阻塞其它读取客户端的例子。

示例3-17 使用显示行锁

static class
UnlockedPut implements Runnable {

@Override

public void run()
{

try {

HTable
table = new HTable(conf, “testtable”);

Put
put = new Put(ROW1);

put.add(COLFAM1,
QUAL1, VAL3);

long
time = System.currentTimeMillis();

System.out.println(“Thread
trying to put same row now…”);

table.put(put);

System.out.println(“Wait
time: ” +

(System.currentTimeMillis()
- time) + “ms”);

} catch
(IOException e) {

System.err.println(“Thread
error: ” + e);

}

}

}

System.out.println(“Taking
out lock…”);

RowLock lock =
table.lockRow(ROW1);

System.out.println(“Lock
ID: ” + lock.getLockId());

Thread thread =
new Thread(new UnlockedPut());

thread.start();

try {

System.out.println(“Sleeping
5secs in main()…”);

Thread.sleep(5000);

} catch
(InterruptedException e) {

//
ignore

}

try {

Put put1 = new
Put(ROW1, lock);

put1.add(COLFAM1,
QUAL1, VAL1);

table.put(put1);

Put put2 = new
Put(ROW1, lock);

put2.add(COLFAM1, QUAL1,
VAL2);

table.put(put2);

} catch (Exception
e) {

System.err.println(“Error:
” + e);

} finally {

System.out.println(“Releasing
lock…”);

table.unlockRow(lock);

}

首先定义了一个的线程,它会不断使用隐式锁不断地访问同一行上的记录,在取得锁之前,put操作始终会被挂起,在线程内部会打印出等待的时间。

主线程,先显示的创建一个行锁,然后启动一个前面定义的线程实例。接着,主线程sleep 5秒之后,将这个锁传给Put对象,进行put操作之后,这个锁会被释放,从而前面那个无锁的线程会继续运行。

运行这段示例代码,将会得到如下的输出:

Taking out lock…

Lock ID:
4751274798057238718

Sleeping 5secs in
main()…

Thread trying to
put same row now…

Releasing lock…

Wait time: 5007ms

After thread
ended…

KV:
row1/colfam1:qual1/1300775520118/Put/vlen=4, Value: val2

KV:
row1/colfam1:qual1/1300775520113/Put/vlen=4, Value: val1

KV:
row1/colfam1:qual1/1300775515116/Put/vlen=4, Value: val3

可以看出无锁的线程的确被阻塞了5秒钟。直到主线程做完两次put操作后释放了行锁,才继续运行。我们可以看到一个有趣的现象,由无锁对象最后插入的值,却拥有最小的时间戳。这是因为,实际上无锁线程的put操作是最早执行的。一旦它被发送到Region Server服务器,Region Server便会给它打上一个时间戳,虽然此时它还无法获取到行锁而被阻塞,但此时的时间戳已经生成了。主线程一共花了7ms向服务器提交了两次Put命令,并释放了行锁。

当您使用一个先前申请到的行锁,但是由于超时无效时,您将会收到服务器端生成的一个错误,以UnknownRowLockException异常的形式返回。它说明,服务器端已经将这个锁废弃掉了。此时,您可以将它drop掉,然后重新申请一个新的锁。

Scan(扫描)

前面我们讨论了基本的CRUD类型的操作,现在轮到scan了,类似于数据库系统中的游标(cursor)。它可以充分使用到HBase提供的顺序性的、排序的存储结构。

基本介绍

scan操作与get操作非常相似。介绍它之前,必须先介绍另一个类Scan。由于scan更像是一个迭代器,因此,并没有scan()的方法,取而代之的是getScanner方法,它会返回您想要遍历访问的scanner对象。可以利用如下的方法取到它:

ResultScanner
getScanner(Scan scan) throws IOException

ResultScanner
getScanner(byte[] family) throws IOException

ResultScanner
getScanner(byte[] family, byte[] qualifier)

throws
IOException

后面两个方法是为了加强友好性,会隐式地创建一个scan对象,然后再调用getScanner(Scan
scan)
方法取到ResultScanner

Scan类有如下的构造函数:

Scan()

Scan(byte[]
startRow, Filter filter)

Scan(byte[]
startRow)

Scan(byte[]
startRow, byte[] stopRow)

Get方法不同的是,您不再指定一个具体的rowkey,您现在可以选择性的指定一个startRow参数,这个参数定义HTable中,要读取的rowkey的起始位置。可选的参数stopRow定义读取rowkey的结束位置。

startRow是被包含的,而endRow是不被包含的,因此,可以用以表述式[startRow,
stopRow)
来描述这种关系。

Scan提供的一个特殊的功能就是您不必精确地指定rowkey。相反,scan会相匹配与指定的startKey相等或者大于它的所有的rowkey。如是没有指定startKey,那么将会从表的起始位置开始遍历。

如果指定了stopKey,那么只会遍历rowkey小于stopKey的所有记录。如果没有指定stopKey,那么scan会遍历到表的末尾。

还有另一个可选的参数Filter,这个参数指向一个Filter实例。Scan对象常常使用空的构造函数来创建,另外的参数都可以通过gettersetter方法进行指定。

Scan实例被创建后,如果想加入更多的限制条件,您可以使用很多的方法来限制读出的数据。当然,您也可以创建一个空的Scan,将整个表的所有的column familycolumn
qualifier
读出来。

Scan
addFamily(byte [] family)

Scan
addColumn(byte[] family, byte[] qualifier)

Get类相似,您也可以使用addFamily来设置column
families
或者使用addColumn来设置column,从而限定读出数据的条件。

如果您只需要数据的一部分,那么通过限制Scan的范围,只是体现了HBase的强大之处,由于数据是以column family为物理单元分隔文件的,不在Scan范围内的column family对应的文件根本不会被打开,这只是面向列存储最大的优势所在。

Scan
setTimeRange(long minStamp, long maxStamp) throws IOException

Scan
setTimeStamp(long timestamp)

Scan
setMaxVersions()

Scan setMaxVersions(int
maxVersions)

您还可以限制Scantimestamp,设置timestamp的范围,设置扫描的版本数。使用setStartRow()setStopRow()setFilter(),可以达到构造函数中的参数值同样的效果。方法hasFilter()可以判断一个Scan中是否添加了filter。表3-8列出了其它一些方法。

3-8 Scan的方法一览

Result

Description

getStartRow()/getStopRow()

Can be used to retrieve the
currently assigned values.

getTimeRange()

Retrieves the associated timestamp or time range of the Get
instance. Note that there is no getTimeStamp() since the API converts a value
assigned with setTimeStamp() into a TimeRange instance internally, setting
the minimum and maximum values to the given timestamp.

getMaxVersions()

Returns the currently configured number of versions that should
be retrieved from the table for every column.

getFilter()

Special filter instances can be used to select certain columns
or cells, based on a wide variety of conditions. You can get the currently
assigned filter using this method. It may return null if none was previously
set.

setCacheBlocks()

/getCacheBlocks()

Each HBase region server has a block cache that efficiently
retains recently accessed data for subsequent reads of contiguous
information. In some events it is better to not engage the cache to avoid too
much churn when doing full table scans. These methods give you control over
this feature.

numFamilies()

Convenience method to retrieve the size of the family map,
containing the families added using the addFamily() or addColumn() calls.

hasFamilies()

Another helper to check if a familyor columnhas been added to the current
instance of the Scan class.

getFamilies()

/setFamilyMap()

/getFamilyMap()

These methods give you access to the column families and
specific columns, as added by the addFamily() and/or addColumn() calls. The
family map is a map where the key is the family name and the value is a list
of added column qualifiers for this particular family. The getFamilies()
returns an array of all stored families, i.e., containing only the family
names (as

byte[] arrays).

当您创建了Scan实例之后,您就需要调用HTablegetScanner方法,取得ResultScanner实例。在下一节中,我们将详细介绍ResultScanner类。

ResultScanner

Scans不会将所有匹配到的行通过一个RPC调用发送到客户端,而是分多次。这是因为,一行数据可能很大,放在一次传输会消耗掉很多资源、花费很长的时间。

ResultScannerscan转化成一种类似于get的操作,将结果通过迭代器访问出来。它具有自己的一些方法:

Result next()
throws IOException

Result[] next(int
nbRows) throws IOException

void close()

有两种形式的next函数可以调用,close()操作用来释放资源。

Next()调用返回一个单独的Result实例,存放一个可用的row对象。同样的,您也可以使用next(int
nbRows)
一次取回来很多行,该调用返回一个Result的数组对象,数组中的每一行都代表一个唯一的row。当然,取到的值可能小于nbRows,但这很少在未取完数据时发生。可以查看,前面对Result实例的介绍,学习如何使用这个类。示例3-18给出了如何使用scan访问一个表。

示例3-18 使用scan访问数据

Scan scan1 = new
Scan();

ResultScanner
scanner1 = table.getScanner(scan1);

for (Result res :
scanner1) {

System.out.println(res);

}

scanner1.close();

Scan scan2 = new
Scan();

scan2.addFamily(Bytes.toBytes(“colfam1″));

ResultScanner
scanner2 = table.getScanner(scan2);

for (Result res :
scanner2) {

System.out.println(res);

}

scanner2.close();

Scan scan3 = new
Scan();

scan3.addColumn(Bytes.toBytes(“colfam1″),
Bytes.toBytes(“col-5″)).

addColumn(Bytes.toBytes(“colfam2″),
Bytes.toBytes(“col-33″)).

setStartRow(Bytes.toBytes(“row-10″)).

setStopRow(Bytes.toBytes(“row-20″));

ResultScanner
scanner3 = table.getScanner(scan3);

for (Result res :
scanner3) {

System.out.println(res);

}

scanner3.close();

首先创建一个空的Scan对象,使用这个对象对表进行遍历,然后关闭这个scanner释放相关资源。接着再创建一个只查询colfam1下记录的scanner,并打印相关的记录。最后创建一个只扫描列colfam1:col-5colfam2:col-33,且rowkey范围从row-10row-20的所有记录。

为了测试上述示例,首先创建一个表,含有colfam1colfam2两个column family。然后很这个表中插入100行记录。我们不列出全表扫描的输出结果,而仅列出scan2scan3的输出结果:

Scanning table
#3…

keyvalues={row-10/colfam1:col-5/1300803775078/Put/vlen=8,

row-10/colfam2:col-33/1300803775099/Put/vlen=9}

keyvalues={row-100/colfam1:col-5/1300803780079/Put/vlen=9,

row-100/colfam2:col-33/1300803780095/Put/vlen=10}

keyvalues={row-11/colfam1:col-5/1300803775152/Put/vlen=8,

row-11/colfam2:col-33/1300803775170/Put/vlen=9}

keyvalues={row-12/colfam1:col-5/1300803775212/Put/vlen=8,

row-12/colfam2:col-33/1300803775246/Put/vlen=9}

keyvalues={row-13/colfam1:col-5/1300803775345/Put/vlen=8,

row-13/colfam2:col-33/1300803775376/Put/vlen=9}

keyvalues={row-14/colfam1:col-5/1300803775479/Put/vlen=8,

row-14/colfam2:col-33/1300803775498/Put/vlen=9}

keyvalues={row-15/colfam1:col-5/1300803775554/Put/vlen=8,

row-15/colfam2:col-33/1300803775582/Put/vlen=9}

keyvalues={row-16/colfam1:col-5/1300803775665/Put/vlen=8,

row-16/colfam2:col-33/1300803775687/Put/vlen=9}

keyvalues={row-17/colfam1:col-5/1300803775734/Put/vlen=8,

row-17/colfam2:col-33/1300803775748/Put/vlen=9}

keyvalues={row-18/colfam1:col-5/1300803775791/Put/vlen=8,

row-18/colfam2:col-33/1300803775805/Put/vlen=9}

keyvalues={row-19/colfam1:col-5/1300803775843/Put/vlen=8,

row-19/colfam2:col-33/1300803775859/Put/vlen=9}

keyvalues={row-2/colfam1:col-5/1300803774463/Put/vlen=7,

row-2/colfam2:col-33/1300803774485/Put/vlen=8}

再一次强调的时,输出的结果与插入的顺序无关,有趣的是,rowkey的排列按照了字母序进行输出。

Caching vs. Batching

以目前为止,每个next()操作都是一次RPC调用,即使当你使用next( int
nbRows)
时。因为这个next( int nbRows )只是一个简单的next()调用的客户端循环。显然,这样的性能是很难令人满意的。因此,在一次RPC调用中返回多行便显得意义重大。这个机制叫制scanner缓存(scanner
caching)
,它默认是关闭的。

您可以使用在两个层面打开它:在表一级设定,将会对这个表的所有的scanner实例生效。在scan一级设定,将只会对这个scan生效。当然您也可以对通过对HTable实例进行设定,达到对所有的表生效。

void
setScannerCaching(int scannerCaching)

int
getScannerCaching()

当然您也可以在HBase安装时,修改默认值。可以通过修改配置文件hbase-site.xml实现:

<property>

<name>hbase.client.scanner.caching</name>

<value>10</value>

</property>

上述配置文件将cache的默认大小改为了10。当然,您还可以在代码中继续设定新的值。

在您通过getScanner方法,得到一个Scanner对象后,通过setScannerCaching()设置缓存大小,getScannerCaching()得到目前的缓存大小。API将会把设置的大小传递给scanner对象,除非您使用了Scan类的方法:

void
setCaching(int caching)

int getCaching()

scan直接设置cache大小拥有最高的优先级。通过对缓存大小的设定,可以使一次RPC调用返回多行记录。两种next()都会使用到这个缓存。

您可能需要找到RPC操作的数据和内存占用情况的一个折中,scanner的缓存大小越大,读取的性能越好(当然值过大,也不好),但缓存的条目多了之后,一次传输消耗的时间越长,占用的堆空间大小也越大,还会引发OOM异常(OutOfMemoryException)

当从服务器到客户端传输数据的时间或者客户端处理数据的时间大于了scanner设置的超时时间,那么客户端将会收到一个ScannerTimeoutException

示例3-19 scanner超时的例子

Scan scan = new
Scan();

ResultScanner
scanner = table.getScanner(scan);

int scannerTimeout
= (int) conf.getLong(

HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
-1);

try {

Thread.sleep(scannerTimeout
+ 5000);

} catch
(InterruptedException e) {

//
ignore

}

while (true){

try {

Result
result = scanner.next();

if
(result == null) break;

System.out.println(result);

} catch (Exception
e) {

e.printStackTrace();

break;

}

}

scanner.close();

首先获取当前scanner的超时时间,然后sleep一会儿,等待超时。接着尝试打印出取到的结果集。将会得到如下的输出:

Adding rows to
table…

Current (local)
lease period: 60000

Sleeping now for
65000ms…

Attempting to
iterate over scanner…

Exception in
thread “main” java.lang.RuntimeException:

org.apache.hadoop.hbase.client.ScannerTimeoutException:
65094ms passed

since
the last invocation, timeout is currently set to 60000

at
org.apache.hadoop.hbase.client.HTable$ClientScanner$1.hasNext

at
ScanTimeoutExample.main

Caused by:
org.apache.hadoop.hbase.client.ScannerTimeoutException: 65094ms

passed
since the last invocation, timeout is currently set to 60000

at
org.apache.hadoop.hbase.client.HTable$ClientScanner.next

at
org.apache.hadoop.hbase.client.HTable$ClientScanner$1.hasNext


1 more

Caused by:
org.apache.hadoop.hbase.UnknownScannerException:

org.apache.hadoop.hbase.UnknownScannerException:
Name: -315058406354472427

at
org.apache.hadoop.hbase.regionserver.HRegionServer.next

scanner超时之后,客户端尝试打印从服务器上取出的值时,将会将到一个异常。

您可能想在客户端代码中加入以下代码来增大超时时间:

Configuration conf
= HBaseConfiguration.create()

conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
120000)

但由于scan的超时时间是配在Region Server上的,因此,上述配置并不会生效。如果您真的想修改这个值,您只有去Region Server上去修改hbase-site.xml,并重启集群。

从输出打印的堆栈可以看出,ScannerTimeoutException嵌套在了UnknownScannerException之中,这意味着next()调用使用了一个已经过期的scanner ID但这个ID已经被删除了。换句话说,客户端存储的scanner IDRegion Server已经不认识了,从而抛出一个UnknownScannerException

现在,您已经学会了如何使用客户端的scanner缓存来提高批量交互的性能。但有一点要注意的是,对于非常大的行,可能无法放入客户端的内存中。使用HBase客户端API中的batching,可以处理这种情况:

void setBatch(int
batch)

int getBatch()

caching(处理层次为rows)相对应,batching处理的层次是columns。它控制一次next()调用传输多少个columns通过ResultScannersetBatch()方法可以进行设置,setBatch(5)将使每个Result实例,返回5columns

当一行含有非常多的列时,您可以使用setBatch方法,一次next()返回一行中的部分列。Result中返回的列也可能达不到batching的值,比较一行有17列,batching的值为5,那么前三次next()将得到5,5,5,最后一次调用只能返回2个列。

通过设置cachingbatch的大小,scanner可以在选择rowkey范围查询时控制RPC的多少。示例3-20用两个参数来对Result实例大小与请求次数进行调优。

示例3-20 使用cachingbatch两个参数

private static
void scan(int caching, int batch) throws IOException {

Logger log =
Logger.getLogger(“org.apache.hadoop”);

final int[]
counters = {0, 0};

Appender appender
= new AppenderSkeleton() {

@Override

protected void
append(LoggingEvent event) {

String msg =
event.getMessage().toString();

if (msg != null
&& msg.contains(“Call: next”)) {

counters[0]++;

}

}

@Override

public void
close() {}

@Override

public boolean
requiresLayout() {

return
false;

}

};

log.removeAllAppenders();

log.setAdditivity(false);

log.addAppender(appender);

log.setLevel(Level.DEBUG);

Scan scan = new
Scan();

scan.setCaching(caching);

scan.setBatch(batch);

ResultScanner
scanner = table.getScanner(scan);

for (Result result
: scanner) {

counters[1]++;

}

scanner.close();

System.out.println(“Caching:
” + caching + “, Batch: ” + batch +

“,
Results: ” + counters[1] + “, RPCs: ” + counters[0]);

}

public static void
main(String[] args) throws IOException {

scan(1, 1);

scan(200, 1);

scan(2000, 100);

scan(2, 100);

scan(2, 10);

scan(5, 100);

scan(5, 20);

scan(10, 10);

}

示例代码首先设置cachingbatch的参数,然后打印Result的大小和RPC的次数。对不同的cachingbatch大小进行了组合测试。

Caching: 1, Batch:
1, Results: 200, RPCs: 201

Caching: 200,
Batch: 1, Results: 200, RPCs: 2

Caching: 2000,
Batch: 100, Results: 10, RPCs: 1

Caching: 2, Batch:
100, Results: 10, RPCs: 6

Caching: 2, Batch:
10, Results: 20, RPCs: 11

Caching: 5, Batch:
100, Results: 10, RPCs: 3

Caching: 5, Batch:
20, Results: 10, RPCs: 3

Caching: 10,
Batch: 10, Results: 20, RPCs: 3

通过调整两个参数的值,可以观察它们对结果的影响。表3-9给出了一些组合的结果。为了运行示例3-20,首先创建了一个拥有两个column family的表,添加了10行,每行中,每个column family下添加10column。这就意味着一共存在着200columns或者叫做cell,每个cell只有一个版本。

3-9 示例参数的影响

Caching

Batch

Results

RPCs

Description

1

1

200

201

Each column is returned as a
separate Result instance. One more RPC is needed to realize the scan is
complete.

200

1

200

2

Each column is a separate
Result, but they are all transferred in one RPC (plus the extra check).

2

10

20

11

The batch is half the row
width, so 200 divided by 10 is 20 Results needed. 10 RPCs (plus the check) to
transfer them.

5

100

10

3

The batch is too large for
each row, so all 20 columns are batched. This requires 10 Result instances.
Caching brings the number of RPCs down to two (plus the check).

5

20

10

3

This is the same as above, but
this time the batch matches the columns available. The outcome is the same.

10

10

20

3

This divides the table into
smaller Result instances, but larger caching also means only two RPCs are
needed.

为了计算RPC的次数,您需要首先将行数与最行的column数相乘,然后用这个值除以batchcolumn数中的较小值。最后用这个值除以caching大小。用数学公式表示为:

RPCs = (Rows *
Cols per Row) / Min(Cols per Row, Batch Size) /Scanner Caching

还需要额外的RPC操作来打开和关闭scanner。因此,还scanner还需要两次额外的RPC操作。

3-2描述了cachingbatching是如何起作用的。如图所示,该表具有9行值,每行都有不定数目的column。设置scannercaching的大小为6batch大小为3。您可以看到,需要3RPC操作来转输数据(虚线包围的部分)。

3-2 通过cachingbatching控制scan操作RPC的次数

由于batch大小小于一行中column的数目,因此,服务器将3columns打成一个Result,一次RPC操作可以传输6个这样的Result。如果batch大小不设置,而caching大小被设置时,每行记录将包含一行中所有column,这样一个Result实例中就是一个完整的行。只有当您设置了batch参数,才有可能把一行拆成多个Result实例。

一开始您可能不需要考虑cachingbatching的大小设置,当您进行应用程序的调优时,您必须对这个原理非常清楚才能找到一个最好的平衡点。

辅助功能

在进一步了解客户端的其它功能之前,我们有必要先了解HBase和它的客户端提供的一些有用的辅助功能。

HTable方法集

客户端API代表了一个HTable实例,它提供了访问一个HBase表的一些方法。除了前面提到的一些对于访问HBase表的主要方法,还有另外一些值得留意的方法:

void close()

该方法前面有所提及,但考虑到它的重要性,这样有必要专门再次提及。在结束了对表的访问之后,一定要调用close()接口。当close()被调用时,客户端会调用flushCache()方法,将客户端缓存区中缓存的数据提交到服务器。

byte[]
getTableName()

这个方法可以方例地取出表名。

HTableDescriptor
getTableDescriptor()

HBase中每个表都会使用一个HTableDescriptor的实例。您可以通过getTableDescriptor()获取对表信息的访问。

static Boolean
isTableEnabled(table)

HTable4个静态方法,它们都需要一个配置对象,如没有提供configurationHTable会在程序的classpath下使用一个默认的configuration。该函数检查ZooKeepertable表是否为enable状态。

byte[][]
getStartKeys()

byte[][]
getEndKeys()

pair<byte[][],
byte[][]> getStartEndKeys()

这几个函数可以访问表中当前的rowkey范围,随着数据的不断增加,调用后也会得到不同的结果。这3个方法返回byte数组。您可以使用Bytes.toStringBinary()来打印出key值。

void
clearRegionCache()

HRegionLocation
getRegionLocation( row )

Map<HRegionInfo,
HServerAddress> getRegionInfo()

这几个方法使您可以取出Region的信息,您可以使用第一个方法来清楚客户端上的缓存,也可以使用第三个方法来取出所有Region信息。这些方法帮助一些高级使用者来利用Region信息,比较路由负载、计算靠近数据等。

void
prewarmRegionCache(Map<HRegionInfo, HServerAddress> regionMap)

static void
setRegionCachePrefetch(table, enable)

static Boolean
getRegionCachePrefetch(table)

这也是一组高级用法的API。这组API可以提前将tableRegion信息缓存到客户端。使用上述API,您可以提供一个Region的列表来对Region信息进行预热。

Bytes

该类用来将Java类型,比如String或者long转化为rawbyte数组等HBase支持的类型。因此,再介绍这个类和它的函数是有意义的。

大多数方法都有这三种形式,比如:

static long
toLong(byte[] bytes)

static long
toLong(byte[] bytes, int offset)

static long
toLong(byte[] bytes, int offset, int length)

它们的输出都是byte数组,偏移量、长度,后两个可以缺省。它们的使用方式取决于您拥有的byte数组。如果您是使用Bytes.toBytes()方法得到的,那么您可以安全的使用第一个API,整个bytes数据存放着待转化的值。

HBase内部,将数据存放在一个大的字节数组中,使用如下的方法:

static int
putLong(byte[] bytes, int offset, long val)

这个方法允许您将一个Long对象写入到一个给定的字节数组中的指定位置。如果您想从一个大数组中取出数据,可以使用toLong方法。

Bytes类支持的Java类型包括:StringBooleanshortintlongdoublefloat。除了这些,表3-10中还列出了一些有用的方法。

3-10 Bytes提供的一些方法

Result

Description

toStringBinary()

While working very similar to
toString(), this variant has an extra safeguard to convert nonprintable data
into their human-readable hexadecimal numbers. Whenever you are not sure what
a byte array contains you should use this method to print its content, for
example, to the console, or into a logfile.

compareTo()/equals()

These methods allow you to
compare two byte[], that is, byte arrays. The former gives you a comparison
result and the latter a boolean value, indicating whether the given arrays
are equal to each other.

add()/head()/tail()

You can use these to add two
byte arrays to each other, resulting in a new, concatenated array, or to get
the first, or last, few bytes of the given byte array.

binarySearch()

This performs a binary search
in the given array of values. It operates on byte arrays for the values and
the key you are searching for.

incrementBytes()

This increments a long value
in its byte array representation, as if you had used toBytes(long) to create
it. You can decrement using a negative amount parameter.

Bytes提供的一些方法与Java提供的ByteBuffer有一些复叠。区别是前者在处理的过程中不会生成新的实例,因此,它采用了一些优化。对于HBase中,这种类型与字节之间的转化操作被频繁使用,因此,通过这种优化,可以避免非常耗时的GC操作。


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约