分享

使用JAVA API和MapReduce读取HBase里的数据(可用作HBase数据清洗)

 青叶i生活 2018-01-25

一.使用JAVA API的方式

private static Table table = null;
// 声明静态配置
static Configuration conf = null;

static {
    conf = HBaseConfiguration.create();
    // 配置hbase.zookeeper.quorum: 后接zookeeper集群的机器列表
    conf.set("hbase.zookeeper.quorum", HConfiguration.hbase_zookeeper_quorum);
    conf.set("hbase.zookeeper.property.clientPort", "2181");

    try {
        conn = ConnectionFactory.createConnection(conf);
    } catch (IOException e) {
        e.printStackTrace();
    }
}


// 获取htable实例
public static void getHTable() {

    if (table == null) {
        try {
            table = conn.getTable(TableName.valueOf("tablename"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
/*
 * 遍历查询hbase表
 *
 * @tableName 表名
 */
public static void getResultScann(String start_rowkey, String stop_rowkey) throws IOException {
    Scan scan = new Scan();
    Table tables = null;
    scan.setStartRow(Bytes.toBytes(start_rowkey));
    scan.setStopRow(Bytes.toBytes(stop_rowkey));

    //FirstKeyOnlyFilter filter = new FirstKeyOnlyFilter(); 
    // 只查询每个行键的第一个键值对的Filter
    //scan.setFilter(filter);

    ResultScanner rs = null;
    try {
        tables = conn.getTable(TableName.valueOf("tablename"));
        rs = tables.getScanner(scan);

        int index = 0;

        for (Result result : rs) { //按行遍历
            System.out.println("第"+index+++"行----列数::" + result.listCells().size());

            for (Cell kv : result.listCells()) { // 遍历每一行的各列
                System.out.println("row:" + Bytes.toString(kv.getRow()));
                System.out.println("family:" + Bytes.toString(kv.getFamily()));
                System.out.println("qualifier:" + Bytes.toString(kv.getQualifier()));
                System.out.println("value:" + Bytes.toString(kv.getValue()));
                System.out.println("timestamp:" + kv.getTimestamp());
                System.out.println("-------------------------------------------");
            }
        }
    } catch (IOException e) {
        System.out.println("error");
        e.printStackTrace();
    } finally {
        rs.close();
        tables.close();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

二.使用MapReduce方式


public class OnlyMapScan {

    /**
     * Mapper
     */
    public static class MyMapper extends TableMapper<Text, NullWritable> {

        public void map(ImmutableBytesWritable rows, Result result, Context context) throws IOException, InterruptedException {
            //把取到的值直接打印
            for (Cell kv : result.listCells()) { // 遍历每一行的各列
                //假如我们当时插入HBase的时候没有把int、float等类型的数据转换成String,这里就会乱码了
                String row = new String(kv.getRowArray(),kv.getRowOffset(),kv.getRowLength(),"UTF-8");
                String family = new String(kv.getFamilyArray(),kv.getFamilyOffset(),kv.getFamilyLength(),"UTF-8");
                String qualifier = new String(kv.getQualifierArray(),kv.getQualifierOffset(),kv.getQualifierLength(),"UTF-8");
                String value = new String(kv.getValueArray(),kv.getValueOffset(),kv.getValueLength(),"UTF-8");

                System.out.println("row:" + row);
                System.out.println("family:" + family);
                System.out.println("qualifier:" + qualifier);
                System.out.println("value:" + value);
                System.out.println("timestamp:" + kv.getTimestamp());
                System.out.println("-------------------------------------------");
            }
        }
    }

    /**
     * Main
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {


        Configuration configuration = HBaseConfiguration.create();
        //设置zookeeper
        configuration.set("hbase.zookeeper.quorum", HConfiguration.hbase_zookeeper_quorum);
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        configuration.set(TableInputFormat.INPUT_TABLE, HConfiguration.tableName);
        //将该值改大,防止hbase超时退出
        configuration.set("dfs.socket.timeout", "18000");

        Scan scan = new Scan();
        scan.setCaching(1024);
        scan.setCacheBlocks(false);
        scan.setStartRow(Bytes.toBytes("73037041-AA"));
        scan.setStopRow(Bytes.toBytes("73037045-AA"));

        Job job = new Job(configuration, "ScanHbaseJob");


        TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("student"), scan, MyMapper.class, Text.class, NullWritable.class, job);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
    }
}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多