分享

在Spark(1.6.0)中处理hbase( 1.0.0)中的数据

 开心豆豆2010 2018-07-12

最近开始学习Spark,因为原先分析收集的数据都是存储在hadoop和hbase中,经过这两天终于解决了问题。

这里不说集群搭建,直接说如何设置和代码上如何处理能从hbase中读取数据。


1. 在spark的环境启动环境中需要加入hbase lib目录下的所有jar包:

    首先,在spark_home/lib目录下创建hbase目录,然后拷贝hbase_home/lib下所有jar包;

    然后,修改spark_classpath,在spark_home/conf/spark-env.sh中添加上边拷贝过来的jar包,例如export SPARK_CLASSPATH=/usr/local/spark/spark-1.6.0-bin-hadoop2.6/lib/hbase/*

    不添加的话就会出现unread block data的错误。


2. 运行spark任务的时候,如果出现这个异常:TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory,需要检查几个方面的配置:

    a. spark内存是否足够,可以增加spark worker内存,也可以降低任务使用的内存

         在spark-env.sh中修改配置SPARK_WORKER_MEMORY可以修改worker的内存

         或者在任务中设置spark.executor.memory修改任务使用的内存

    b. 执行任务的时候,spark会为系统预留一部分内存,默认是几百M吧,具体值没有记下来,不过从worker日志是可以看到的,可以在任务中设置spark.testing.reservedMemory参数进行修改


3. 从hbase中读取数据时异常:org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permiss  

    这个可以明显看出是没有访问权限问题,从hbase日志中也可以看到类似security.UserGroupInformation: No groups available for user XXXXX的日志

    因为我是在windows eclipse上直接提交的任务,我就直接在windows上创建了一个hadoop账户(和我hbase集群使用的账户一样),然后就解决问题。

    最后,如果任务中设计到map/reduce方面的代码,是需要把代码打成jar包提交到spark上的,否则会抛出class not found的异常。


最后,附上我这次加的几行测试代码:

  1. // 初始化JavaSparkContext
  2. SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("spark://OPENFIRE-DEV:7080")
  3. .set("spark.executor.memory", "64m").set("spark.testing.reservedMemory", "102400");
  4. JavaSparkContext sc = new JavaSparkContext(sparkConf);
  5. // 这里我是从sprig容器中获取的HbaseUtil类,自己写的工具类,方便获取Configuration而已
  6. HbaseUtil hbaseUtil = (HbaseUtil) factory.getBean("hbaseUtil");
  7. Configuration hbaseConf = hbaseUtil.getConfiguration();
  8. // 从Table: member_biz_log中读取Family: app_log中的数据
  9. Scan scan = new Scan();
  10. scan.addFamily(Bytes.toBytes("app_log"));
  11. hbaseConf.set(TableInputFormat.INPUT_TABLE, "member_biz_log");
  12. ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
  13. String ScanToString = Base64.encodeBytes(proto.toByteArray());
  14. hbaseConf.set(TableInputFormat.SCAN, ScanToString);
  15. JavaPairRDD<ImmutableBytesWritable, Result> myRDD = sc.newAPIHadoopRDD(hbaseConf, TableInputFormat.class,
  16. ImmutableBytesWritable.class, Result.class);
  17. // 随便乱写的测试代码
  18. JavaPairRDD<String, Integer> result = myRDD
  19. .mapToPair(new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() {
  20. private static final long serialVersionUID = -6214814960929421459L;
  21. @Override
  22. public Tuple2<String, Integer> call(
  23. Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
  24. byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("ERROR"),
  25. Bytes.toBytes("logInfo"));
  26. if (o != null) {
  27. System.out.println(Bytes.toString(o));
  28. return new Tuple2<String, Integer>(Bytes.toString(o), 1);
  29. }
  30. return null;
  31. }
  32. });
  33. System.out.println(result.count());


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多