前面介绍了如何逐条和批量地添加、读取、删除数据。在这一节,我们将介绍如何一次执行多种不同类型的操作处理多行记录。 事实上,一些使用List结构的批量操作,如delete(List<Delete> 在下面介绍的批量操作中,您将会看到一个新的类型叫Row,前面将到的Put、Get、Delete类都是从Row类的子类。 void throws Object[] batch(List<Row> throws 由于Row的存在,以及它和Get、Put、Delete的继承关系,决定了可以在一个列表中混合多种不同类型的操作。示例3-16给出了这种使用的例子。 值得注意地是,您不应该将一个row的Put和Delete操作混合在一起。因为List中多个操作在服务器端执行的顺序是无法保证的,这样会得到一个无法预料到的结果。 示例3-16 批量操作 private final private final private final private final private final private final List<Row> Put put = new put.add(COLFAM2, batch.add(put); Get get1 = new get1.addColumn(COLFAM1, batch.add(get1); Delete delete = delete.deleteColumns(COLFAM1, batch.add(delete); Get get2 = new get2.addFamily(Bytes.toBytes(“BOGUS”)); batch.add(get2); Object[] results = try { table.batch(batch, } catch (Exception System.err.println(“Error: } for (int i = 0; i System.out.println(“Result[" } 首先定义了一组指向row、column family、column 整个程序的输出如下: Before batch KV: KV: KV: row1/colfam1:qual3/3/Put/vlen=4, Result[0]: Result[1]: Result[2]: Result[3]: org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException: Column family After batch KV: KV: KV: Error: Failed 1 action: servers with 在前面的例子中,事先插入了一些测试数据并打印出来,让您方便地看出示例代码做的事情。最后发现,想删除的记录成功被删除,想插入的新记录也成功被插入了。 Get操作的结果,需要从Result数组中找到。Result数组的大小和请求操作的总数相等。第一个Result是Put操作的结果,为一个空的KeyValue结构;第二个Result的值是Get操作的结果,可以打印出它的值。第三个值为Delete操作的值,为一个空的KeyValue结构。表3-7给出了Result与Row类型之前的对应关系。 表3-7 batch()调用可能返回的结果
观察示例3-16执行的结果,您可以发现空的Result对象打印出了keyvalues=NONE。 Get操作打印出了对应取到的值。对于错误列上的Put操作得到了一个异常。 值得注意地是,当您使用batch()方法时,里面的Put实例并不会使用客户端缓存。batch()调用是同步的,同时直接向服务器发送请求。没有时延和其它处理过程,这和批量put操作是不同的。因此,您需要选择合适于您的。 batch调用有两种形式,一种将Result数据放在参数中,另一种放在返回值中。 void throws Object[] throws 它们之间比较大的区别在于,当抛出异常时,第一方法的result中被填充了部分结果。而第二个方法在异常时,将会返回null。 两种方法都支持get、put、delete操作。如果执行其中的任何一个请求时出错,一个客户端异常将会被抛出,报告出错情况。客户端缓存不会被使用到。void batch(actions, 所有的操作都将在check之前执行:当您发现一个action出现了错误,但其它的操作也将被执行。在最差的场景下,所有的action都返回失败。 另一方面,batch操作在意瞬时失败,比如NotServingRegionException(比如一个Region Server被下线了),它会进行重试几次。配置项hbase.client.retries.number会设定重试的次数(默认的重试次数为10)。 Row Locks(行锁) 更新操作,比如put()、delete()、checkAndPut()等等对于一个row来说是互斥执行的,从而保证一个低层面的原子性。Region Server提供了一个row lock行级锁来保证只有拥有锁的客户端才能够对该行进行修改。在实际中,客户端并不提供这种显示的锁,而是依赖于一种将每个操作独立开的机制。 您应该最大限度地避免使用row lock,很容易出现RDBMS中类似的死锁现象。当Lock在等待超时的过程中,两个被挂起的客户端都持有一个句柄,这类句柄属于稀缺资源。如果这个锁被加在了一个访问很频繁的行上,那么很多客户端都会被阻塞。 当您使用put使用将一个Put实例发送到服务器时,如果您使用了如下的构造函数: Put(byte[] row) 在这个构造函数里,并没有出现RowLock参数,服务器会根据您的行为,自动为您创建一个锁。这个锁对于客户端是透明的,客户端无法获取到这个生命期很短的服务器端的锁。相比于服务器自动创建的隐式锁。客户端也可以使用显式锁,并且在多个操作中使用到它。可以通过以下的方法: RowLock void 第一个方法lockRow以rowkey为参数,返回一个RockLock实例,这个实例可以传递到Put和Delete的构造函数中去。一旦您成功的获取到一个锁后,在使用完之后,您必须调用unlockRow方法释放它。 您申请到的LockRow会锁定整行,它通过rowkey来确定行,一旦拥有,别的客户端将不能对该行进行更新操作。 当这个行锁无论是被客户端或者服务器占有时,另一个想申请一个该行的新行锁的行为都会被挂起,直到这行原有的行锁被释放或者过期。后者是对出错进程持有行锁情况的一个兼容。 一个行锁的默认超时时间是1分钟,可以在hbase-site.xml进行配置: <property> <name>hbase.regionserver.lease.period</name> <value>120000</value> </property> 上述代码将超时时间改为120秒。当然,这个值不能设的太大,否则,每个客户端申请一个被别的进程占用的锁的最大等待时间都会变为这个值。示例3-17给出一个由用户生成的行锁阻塞其它读取客户端的例子。 示例3-17 使用显示行锁 static class @Override public void run() try { HTable Put put.add(COLFAM1, long System.out.println(“Thread table.put(put); System.out.println(“Wait (System.currentTimeMillis() } catch System.err.println(“Thread } } } System.out.println(“Taking RowLock lock = System.out.println(“Lock Thread thread = thread.start(); try { System.out.println(“Sleeping Thread.sleep(5000); } catch // } try { Put put1 = new put1.add(COLFAM1, table.put(put1); Put put2 = new put2.add(COLFAM1, QUAL1, table.put(put2); } catch (Exception System.err.println(“Error: } finally { System.out.println(“Releasing table.unlockRow(lock); } 首先定义了一个的线程,它会不断使用隐式锁不断地访问同一行上的记录,在取得锁之前,put操作始终会被挂起,在线程内部会打印出等待的时间。 主线程,先显示的创建一个行锁,然后启动一个前面定义的线程实例。接着,主线程sleep 5秒之后,将这个锁传给Put对象,进行put操作之后,这个锁会被释放,从而前面那个无锁的线程会继续运行。 运行这段示例代码,将会得到如下的输出: Taking out lock… Lock ID: Sleeping 5secs in Thread trying to Releasing lock… Wait time: 5007ms After thread KV: KV: KV: 可以看出无锁的线程的确被阻塞了5秒钟。直到主线程做完两次put操作后释放了行锁,才继续运行。我们可以看到一个有趣的现象,由无锁对象最后插入的值,却拥有最小的时间戳。这是因为,实际上无锁线程的put操作是最早执行的。一旦它被发送到Region Server服务器,Region Server便会给它打上一个时间戳,虽然此时它还无法获取到行锁而被阻塞,但此时的时间戳已经生成了。主线程一共花了7ms向服务器提交了两次Put命令,并释放了行锁。 当您使用一个先前申请到的行锁,但是由于超时无效时,您将会收到服务器端生成的一个错误,以UnknownRowLockException异常的形式返回。它说明,服务器端已经将这个锁废弃掉了。此时,您可以将它drop掉,然后重新申请一个新的锁。 前面我们讨论了基本的CRUD类型的操作,现在轮到scan了,类似于数据库系统中的游标(cursor)。它可以充分使用到HBase提供的顺序性的、排序的存储结构。 基本介绍 scan操作与get操作非常相似。介绍它之前,必须先介绍另一个类Scan。由于scan更像是一个迭代器,因此,并没有scan()的方法,取而代之的是getScanner方法,它会返回您想要遍历访问的scanner对象。可以利用如下的方法取到它: ResultScanner ResultScanner ResultScanner throws 后面两个方法是为了加强友好性,会隐式地创建一个scan对象,然后再调用getScanner(Scan Scan类有如下的构造函数: Scan() Scan(byte[] Scan(byte[] Scan(byte[] 与Get方法不同的是,您不再指定一个具体的rowkey,您现在可以选择性的指定一个startRow参数,这个参数定义HTable中,要读取的rowkey的起始位置。可选的参数stopRow定义读取rowkey的结束位置。 startRow是被包含的,而endRow是不被包含的,因此,可以用以表述式[startRow, Scan提供的一个特殊的功能就是您不必精确地指定rowkey。相反,scan会相匹配与指定的startKey相等或者大于它的所有的rowkey。如是没有指定startKey,那么将会从表的起始位置开始遍历。 如果指定了stopKey,那么只会遍历rowkey小于stopKey的所有记录。如果没有指定stopKey,那么scan会遍历到表的末尾。 还有另一个可选的参数Filter,这个参数指向一个Filter实例。Scan对象常常使用空的构造函数来创建,另外的参数都可以通过getter和setter方法进行指定。 Scan实例被创建后,如果想加入更多的限制条件,您可以使用很多的方法来限制读出的数据。当然,您也可以创建一个空的Scan,将整个表的所有的column family和column Scan Scan 和Get类相似,您也可以使用addFamily来设置column 如果您只需要数据的一部分,那么通过限制Scan的范围,只是体现了HBase的强大之处,由于数据是以column family为物理单元分隔文件的,不在Scan范围内的column family对应的文件根本不会被打开,这只是面向列存储最大的优势所在。 Scan Scan Scan Scan setMaxVersions(int 您还可以限制Scan的timestamp,设置timestamp的范围,设置扫描的版本数。使用setStartRow(),setStopRow()和setFilter(),可以达到构造函数中的参数值同样的效果。方法hasFilter()可以判断一个Scan中是否添加了filter。表3-8列出了其它一些方法。 表3-8 Scan的方法一览
当您创建了Scan实例之后,您就需要调用HTable的getScanner方法,取得ResultScanner实例。在下一节中,我们将详细介绍ResultScanner类。 Scans不会将所有匹配到的行通过一个RPC调用发送到客户端,而是分多次。这是因为,一行数据可能很大,放在一次传输会消耗掉很多资源、花费很长的时间。 ResultScanner将scan转化成一种类似于get的操作,将结果通过迭代器访问出来。它具有自己的一些方法: Result next() Result[] next(int void close() 有两种形式的next函数可以调用,close()操作用来释放资源。 Next()调用返回一个单独的Result实例,存放一个可用的row对象。同样的,您也可以使用next(int 示例3-18 使用scan访问数据 Scan scan1 = new ResultScanner for (Result res : System.out.println(res); } scanner1.close(); Scan scan2 = new scan2.addFamily(Bytes.toBytes(“colfam1″)); ResultScanner for (Result res : System.out.println(res); } scanner2.close(); Scan scan3 = new scan3.addColumn(Bytes.toBytes(“colfam1″), addColumn(Bytes.toBytes(“colfam2″), setStartRow(Bytes.toBytes(“row-10″)). setStopRow(Bytes.toBytes(“row-20″)); ResultScanner for (Result res : System.out.println(res); } scanner3.close(); 首先创建一个空的Scan对象,使用这个对象对表进行遍历,然后关闭这个scanner释放相关资源。接着再创建一个只查询colfam1下记录的scanner,并打印相关的记录。最后创建一个只扫描列colfam1:col-5和colfam2:col-33,且rowkey范围从row-10到row-20的所有记录。 为了测试上述示例,首先创建一个表,含有colfam1和colfam2两个column family。然后很这个表中插入100行记录。我们不列出全表扫描的输出结果,而仅列出scan2和scan3的输出结果: Scanning table keyvalues={row-10/colfam1:col-5/1300803775078/Put/vlen=8, row-10/colfam2:col-33/1300803775099/Put/vlen=9} keyvalues={row-100/colfam1:col-5/1300803780079/Put/vlen=9, row-100/colfam2:col-33/1300803780095/Put/vlen=10} keyvalues={row-11/colfam1:col-5/1300803775152/Put/vlen=8, row-11/colfam2:col-33/1300803775170/Put/vlen=9} keyvalues={row-12/colfam1:col-5/1300803775212/Put/vlen=8, row-12/colfam2:col-33/1300803775246/Put/vlen=9} keyvalues={row-13/colfam1:col-5/1300803775345/Put/vlen=8, row-13/colfam2:col-33/1300803775376/Put/vlen=9} keyvalues={row-14/colfam1:col-5/1300803775479/Put/vlen=8, row-14/colfam2:col-33/1300803775498/Put/vlen=9} keyvalues={row-15/colfam1:col-5/1300803775554/Put/vlen=8, row-15/colfam2:col-33/1300803775582/Put/vlen=9} keyvalues={row-16/colfam1:col-5/1300803775665/Put/vlen=8, row-16/colfam2:col-33/1300803775687/Put/vlen=9} keyvalues={row-17/colfam1:col-5/1300803775734/Put/vlen=8, row-17/colfam2:col-33/1300803775748/Put/vlen=9} keyvalues={row-18/colfam1:col-5/1300803775791/Put/vlen=8, row-18/colfam2:col-33/1300803775805/Put/vlen=9} keyvalues={row-19/colfam1:col-5/1300803775843/Put/vlen=8, row-19/colfam2:col-33/1300803775859/Put/vlen=9} keyvalues={row-2/colfam1:col-5/1300803774463/Put/vlen=7, row-2/colfam2:col-33/1300803774485/Put/vlen=8} 再一次强调的时,输出的结果与插入的顺序无关,有趣的是,rowkey的排列按照了字母序进行输出。 以目前为止,每个next()操作都是一次RPC调用,即使当你使用next( int 您可以使用在两个层面打开它:在表一级设定,将会对这个表的所有的scanner实例生效。在scan一级设定,将只会对这个scan生效。当然您也可以对通过对HTable实例进行设定,达到对所有的表生效。 void int 当然您也可以在HBase安装时,修改默认值。可以通过修改配置文件hbase-site.xml实现: <property> <name>hbase.client.scanner.caching</name> <value>10</value> </property> 上述配置文件将cache的默认大小改为了10。当然,您还可以在代码中继续设定新的值。 在您通过getScanner方法,得到一个Scanner对象后,通过setScannerCaching()设置缓存大小,getScannerCaching()得到目前的缓存大小。API将会把设置的大小传递给scanner对象,除非您使用了Scan类的方法: void int getCaching() scan直接设置cache大小拥有最高的优先级。通过对缓存大小的设定,可以使一次RPC调用返回多行记录。两种next()都会使用到这个缓存。 您可能需要找到RPC操作的数据和内存占用情况的一个折中,scanner的缓存大小越大,读取的性能越好(当然值过大,也不好),但缓存的条目多了之后,一次传输消耗的时间越长,占用的堆空间大小也越大,还会引发OOM异常(OutOfMemoryException)。 当从服务器到客户端传输数据的时间或者客户端处理数据的时间大于了scanner设置的超时时间,那么客户端将会收到一个ScannerTimeoutException。 示例3-19 scanner超时的例子 Scan scan = new ResultScanner int scannerTimeout HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, try { Thread.sleep(scannerTimeout } catch // } while (true){ try { Result if System.out.println(result); } catch (Exception e.printStackTrace(); break; } } scanner.close(); 首先获取当前scanner的超时时间,然后sleep一会儿,等待超时。接着尝试打印出取到的结果集。将会得到如下的输出: Adding rows to Current (local) Sleeping now for Attempting to Exception in org.apache.hadoop.hbase.client.ScannerTimeoutException: since at at Caused by: passed at at … Caused by: org.apache.hadoop.hbase.UnknownScannerException: at … 在scanner超时之后,客户端尝试打印从服务器上取出的值时,将会将到一个异常。 您可能想在客户端代码中加入以下代码来增大超时时间: Configuration conf conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, 但由于scan的超时时间是配在Region Server上的,因此,上述配置并不会生效。如果您真的想修改这个值,您只有去Region Server上去修改hbase-site.xml,并重启集群。 从输出打印的堆栈可以看出,ScannerTimeoutException嵌套在了UnknownScannerException之中,这意味着next()调用使用了一个已经过期的scanner ID,但这个ID已经被删除了。换句话说,客户端存储的scanner ID,Region Server已经不认识了,从而抛出一个UnknownScannerException。 现在,您已经学会了如何使用客户端的scanner缓存来提高批量交互的性能。但有一点要注意的是,对于非常大的行,可能无法放入客户端的内存中。使用HBase客户端API中的batching,可以处理这种情况: void setBatch(int int getBatch() 与caching(处理层次为rows)相对应,batching处理的层次是columns。它控制一次next()调用传输多少个columns。通过ResultScanner的setBatch()方法可以进行设置,setBatch(5)将使每个Result实例,返回5个columns。 当一行含有非常多的列时,您可以使用setBatch方法,一次next()返回一行中的部分列。Result中返回的列也可能达不到batching的值,比较一行有17列,batching的值为5,那么前三次next()将得到5,5,5,最后一次调用只能返回2个列。 通过设置caching和batch的大小,scanner可以在选择rowkey范围查询时控制RPC的多少。示例3-20用两个参数来对Result实例大小与请求次数进行调优。 示例3-20 使用caching和batch两个参数 private static Logger log = final int[] Appender appender @Override protected void String msg = if (msg != null counters[0]++; } } @Override public void @Override public boolean return } }; log.removeAllAppenders(); log.setAdditivity(false); log.addAppender(appender); log.setLevel(Level.DEBUG); Scan scan = new scan.setCaching(caching); scan.setBatch(batch); ResultScanner for (Result result counters[1]++; } scanner.close(); System.out.println(“Caching: “, } public static void scan(1, 1); scan(200, 1); scan(2000, 100); scan(2, 100); scan(2, 10); scan(5, 100); scan(5, 20); scan(10, 10); } 示例代码首先设置caching和batch的参数,然后打印Result的大小和RPC的次数。对不同的caching和batch大小进行了组合测试。 Caching: 1, Batch: Caching: 200, Caching: 2000, Caching: 2, Batch: Caching: 2, Batch: Caching: 5, Batch: Caching: 5, Batch: Caching: 10, 通过调整两个参数的值,可以观察它们对结果的影响。表3-9给出了一些组合的结果。为了运行示例3-20,首先创建了一个拥有两个column family的表,添加了10行,每行中,每个column family下添加10个column。这就意味着一共存在着200个columns或者叫做cell,每个cell只有一个版本。 表3-9 示例参数的影响
为了计算RPC的次数,您需要首先将行数与最行的column数相乘,然后用这个值除以batch和column数中的较小值。最后用这个值除以caching大小。用数学公式表示为: RPCs = (Rows * 还需要额外的RPC操作来打开和关闭scanner。因此,还scanner还需要两次额外的RPC操作。 图3-2描述了caching和batching是如何起作用的。如图所示,该表具有9行值,每行都有不定数目的column。设置scanner中caching的大小为6,batch大小为3。您可以看到,需要3次RPC操作来转输数据(虚线包围的部分)。 图3-2 通过caching和batching控制scan操作RPC的次数 由于batch大小小于一行中column的数目,因此,服务器将3个columns打成一个Result,一次RPC操作可以传输6个这样的Result。如果batch大小不设置,而caching大小被设置时,每行记录将包含一行中所有column,这样一个Result实例中就是一个完整的行。只有当您设置了batch参数,才有可能把一行拆成多个Result实例。 一开始您可能不需要考虑caching和batching的大小设置,当您进行应用程序的调优时,您必须对这个原理非常清楚才能找到一个最好的平衡点。 辅助功能 在进一步了解客户端的其它功能之前,我们有必要先了解HBase和它的客户端提供的一些有用的辅助功能。 HTable方法集 客户端API代表了一个HTable实例,它提供了访问一个HBase表的一些方法。除了前面提到的一些对于访问HBase表的主要方法,还有另外一些值得留意的方法: void close() 该方法前面有所提及,但考虑到它的重要性,这样有必要专门再次提及。在结束了对表的访问之后,一定要调用close()接口。当close()被调用时,客户端会调用flushCache()方法,将客户端缓存区中缓存的数据提交到服务器。 byte[] 这个方法可以方例地取出表名。 HTableDescriptor HBase中每个表都会使用一个HTableDescriptor的实例。您可以通过getTableDescriptor()获取对表信息的访问。 static Boolean HTable有4个静态方法,它们都需要一个配置对象,如没有提供configuration,HTable会在程序的classpath下使用一个默认的configuration。该函数检查ZooKeeper上table表是否为enable状态。 byte[][] byte[][] pair<byte[][], 这几个函数可以访问表中当前的rowkey范围,随着数据的不断增加,调用后也会得到不同的结果。这3个方法返回byte数组。您可以使用Bytes.toStringBinary()来打印出key值。 void HRegionLocation Map<HRegionInfo, 这几个方法使您可以取出Region的信息,您可以使用第一个方法来清楚客户端上的缓存,也可以使用第三个方法来取出所有Region信息。这些方法帮助一些高级使用者来利用Region信息,比较路由负载、计算靠近数据等。 void static void static Boolean 这也是一组高级用法的API。这组API可以提前将table的Region信息缓存到客户端。使用上述API,您可以提供一个Region的列表来对Region信息进行预热。 Bytes类 该类用来将Java类型,比如String或者long转化为raw、byte数组等HBase支持的类型。因此,再介绍这个类和它的函数是有意义的。 大多数方法都有这三种形式,比如: static long static long static long 它们的输出都是byte数组,偏移量、长度,后两个可以缺省。它们的使用方式取决于您拥有的byte数组。如果您是使用Bytes.toBytes()方法得到的,那么您可以安全的使用第一个API,整个bytes数据存放着待转化的值。 在HBase内部,将数据存放在一个大的字节数组中,使用如下的方法: static int 这个方法允许您将一个Long对象写入到一个给定的字节数组中的指定位置。如果您想从一个大数组中取出数据,可以使用toLong方法。 Bytes类支持的Java类型包括:String、Boolean、short、int、long、double、float。除了这些,表3-10中还列出了一些有用的方法。 表3-10 Bytes提供的一些方法
Bytes提供的一些方法与Java提供的ByteBuffer有一些复叠。区别是前者在处理的过程中不会生成新的实例,因此,它采用了一些优化。对于HBase中,这种类型与字节之间的转化操作被频繁使用,因此,通过这种优化,可以避免非常耗时的GC操作。 |
|