分享

spark读写hbase性能对比

 jasonbetter 2019-03-20

2018年08月29日 19:29:01 Mr_哲

一、spark写入hbase

    hbase client以put方式封装数据,并支持逐条或批量插入。spark中内置saveAsHadoopDataset和saveAsNewAPIHadoopDataset两种方式写入hbase。为此,将同样的数据插入其中对比性能。

依赖如下:

  1. <!-- https:///artifact/org.apache.spark/spark-core -->


  2. <dependency>


  3. <groupId>org.apache.spark</groupId>


  4. <artifactId>spark-core_2.11</artifactId>


  5. <version>2.3.1</version>


  6. </dependency>


  7. <!-- https:///artifact/org.apache.hbase/hbase-client -->


  8. <dependency>


  9. <groupId>org.apache.hbase</groupId>


  10. <artifactId>hbase-client</artifactId>


  11. <version>1.4.6</version>


  12. </dependency>


  13. <!-- https:///artifact/org.apache.hbase/hbase-common -->


  14. <dependency>


  15. <groupId>org.apache.hbase</groupId>


  16. <artifactId>hbase-common</artifactId>


  17. <version>1.4.6</version>


  18. </dependency>


  19. <!-- https:///artifact/org.apache.hbase/hbase-server -->


  20. <dependency>


  21. <groupId>org.apache.hbase</groupId>


  22. <artifactId>hbase-server</artifactId>


  23. <version>1.4.6</version>


  24. </dependency>


  25. <!-- https:///artifact/org.apache.hbase/hbase-protocol -->


  26. <dependency>


  27. <groupId>org.apache.hbase</groupId>


  28. <artifactId>hbase-protocol</artifactId>


  29. <version>1.4.6</version>


  30. </dependency>


  31. <!-- https:///artifact/commons-cli/commons-cli -->


  32. <dependency>


  33. <groupId>commons-cli</groupId>


  34. <artifactId>commons-cli</artifactId>


  35. <version>1.4</version>


  36. </dependency>

1. put逐条插入

1.1 hbase客户端建表

create 'keyword1',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}

1.2 code

  1. val start_time1 = new Date().getTime


  2. keyword.foreachPartition(records =>{


  3. HBaseUtils1x.init()


  4. records.foreach(f => {


  5. val keyword = f.getString(0)


  6. val app_id = f.getString(1)


  7. val catalog_name = f.getString(2)


  8. val keyword_catalog_pv = f.getString(3)


  9. val keyword_catalog_pv_rate = f.getString(4)


  10. val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)


  11. val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)


  12. HBaseUtils1x.insertData(tableName1, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))


  13. })


  14. HBaseUtils1x.closeConnection()


  15. })


  16. var end_time1 =new Date().getTime


  17. println("HBase逐条插入运行时间为:" + (end_time1 - start_time1))

2.put批量插入

2.1 建表

create 'keyword2',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}

2.2 代码

  1. val start_time2 = new Date().getTime


  2. keyword.foreachPartition(records =>{


  3. HBaseUtils1x.init()


  4. val puts = ArrayBuffer[Put]()


  5. records.foreach(f => {


  6. val keyword = f.getString(0)


  7. val app_id = f.getString(1)


  8. val catalog_name = f.getString(2)


  9. val keyword_catalog_pv = f.getString(3)


  10. val keyword_catalog_pv_rate = f.getString(4)


  11. val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)


  12. val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)


  13. try{


  14. puts.append(HBaseUtils1x.getPutAction(rowKey,


  15. cf, columns, cols))


  16. }catch{


  17. case e:Throwable => println(f)


  18. }


  19. })


  20. import collection.JavaConverters._


  21. HBaseUtils1x.addDataBatchEx(tableName2, puts.asJava)


  22. HBaseUtils1x.closeConnection()


  23. })


  24. val end_time2 = new Date().getTime


  25. println("HBase批量插入运行时间为:" + (end_time2 - start_time2))

3. saveAsHadoopDataset写入

    使用旧的Hadoop API将RDD输出到任何Hadoop支持的存储系统,为该存储系统使用Hadoop JobConf对象。JobConf设置一个OutputFormat和任何需要输出的路径,就像为Hadoop MapReduce作业配置那样。

3.1 建表

create 'keyword3',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}

3.2 代码

  1. val start_time3 = new Date().getTime


  2. keyword.rdd.map(f =>{


  3. val keyword = f.getString(0)


  4. val app_id = f.getString(1)


  5. val catalog_name = f.getString(2)


  6. val keyword_catalog_pv = f.getString(3)


  7. val keyword_catalog_pv_rate = f.getString(4)


  8. val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)


  9. val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)


  10. (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))


  11. }).saveAsHadoopDataset(HBaseUtils1x.getJobConf(tableName3))


  12. val end_time3 = new Date().getTime


  13. println("saveAsHadoopDataset方式写入运行时间为:" + (end_time3 - start_time3))

4. saveAsNewAPIHadoopDataset写入

    使用新的Hadoop API将RDD输出到任何Hadoop支持存储系统,为该存储系统使用Hadoop Configuration对象.Conf设置一个OutputFormat和任何需要的输出路径,就像为Hadoop MapReduce作业配置那样。

4.1 建表

create 'keyword4',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}

4.2 code

  1. val start_time4 = new Date().getTime


  2. keyword.rdd.map(f =>{


  3. val keyword = f.getString(0)


  4. val app_id = f.getString(1)


  5. val catalog_name = f.getString(2)


  6. val keyword_catalog_pv = f.getString(3)


  7. val keyword_catalog_pv_rate = f.getString(4)


  8. val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)


  9. val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)


  10. (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))


  11. }).saveAsNewAPIHadoopDataset(HBaseUtils1x.getNewJobConf(tableName4,spark.sparkContext))


  12. val end_time4 = new Date().getTime


  13. println("saveAsNewAPIHadoopDataset方式写入运行时间为:" + (end_time4 - start_time4))

5. 性能对比


put逐条插入

put批量插入

saveAsHadoopDataset

saveAsNewAPIHadoopDataset

性能(ms)

40898

2208

1695

1690

可以看出,saveAsHadoopDataset和saveAsNewAPIHadoopDataset方式要优于put逐条插入和批量插入。

二、spark读取hbase

newAPIHadoopRDD API可以将hbase表转化为RDD,具体使用如下:

  1. val start_time1 = new Date().getTime


  2. val hbaseRdd = spark.sparkContext.newAPIHadoopRDD(HBaseUtils1x.getNewConf(tableName1), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])


  3. println(hbaseRdd.count())


  4. hbaseRdd.foreach{


  5. case(_,result) => {


  6. // 获取行键


  7. val rowKey = Bytes.toString(result.getRow)


  8. val keyword = Bytes.toString(result.getValue(cf.getBytes(), "keyword".getBytes()))


  9. val keyword_catalog_pv_rate = Bytes.toDouble(result.getValue(cf.getBytes(), "keyword_catalog_pv_rate".getBytes()))


  10. println(rowKey + "," + keyword + "," + keyword_catalog_pv_rate)


  11. }


  12. }

三、完整代码

  1. package com.sparkStudy.utils




  2. import java.util.Date


  3. import org.apache.hadoop.hbase.client.{Put, Result}


  4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable


  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat


  6. import org.apache.hadoop.hbase.util.{Bytes, MD5Hash}


  7. import org.apache.spark.sql.SparkSession


  8. import scala.collection.mutable.ArrayBuffer




  9. /**


  10. * @Author: JZ.lee


  11. * @Description: TODO


  12. * @Date: 18-8-28 下午4:28


  13. * @Modified By:


  14. */


  15. object SparkRWHBase {


  16. def main(args: Array[String]): Unit = {


  17. val spark = SparkSession.builder()


  18. .appName("SparkRWHBase")


  19. .master("local[2]")


  20. .config("spark.some.config.option", "some-value")


  21. .getOrCreate()




  22. val keyword = spark.read


  23. .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")


  24. .option("header",false)


  25. .option("delimiter",",")


  26. .load("file:/opt/data/keyword_catalog_day.csv")




  27. val tableName1 = "keyword1"


  28. val tableName2 = "keyword2"


  29. val tableName3 = "keyword3"


  30. val tableName4 = "keyword4"


  31. val cf = "info"


  32. val columns = Array("keyword", "app_id", "catalog_name", "keyword_catalog_pv", "keyword_catalog_pv_rate")




  33. val start_time1 = new Date().getTime


  34. keyword.foreachPartition(records =>{


  35. HBaseUtils1x.init()


  36. records.foreach(f => {


  37. val keyword = f.getString(0)


  38. val app_id = f.getString(1)


  39. val catalog_name = f.getString(2)


  40. val keyword_catalog_pv = f.getString(3)


  41. val keyword_catalog_pv_rate = f.getString(4)


  42. val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)


  43. val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)


  44. HBaseUtils1x.insertData(tableName1, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))


  45. })


  46. HBaseUtils1x.closeConnection()


  47. })


  48. var end_time1 =new Date().getTime


  49. println("HBase逐条插入运行时间为:" + (end_time1 - start_time1))




  50. val start_time2 = new Date().getTime


  51. keyword.foreachPartition(records =>{


  52. HBaseUtils1x.init()


  53. val puts = ArrayBuffer[Put]()


  54. records.foreach(f => {


  55. val keyword = f.getString(0)


  56. val app_id = f.getString(1)


  57. val catalog_name = f.getString(2)


  58. val keyword_catalog_pv = f.getString(3)


  59. val keyword_catalog_pv_rate = f.getString(4)


  60. val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)


  61. val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)


  62. try{


  63. puts.append(HBaseUtils1x.getPutAction(rowKey,


  64. cf, columns, cols))


  65. }catch{


  66. case e:Throwable => println(f)


  67. }


  68. })


  69. import collection.JavaConverters._


  70. HBaseUtils1x.addDataBatchEx(tableName2, puts.asJava)


  71. HBaseUtils1x.closeConnection()


  72. })


  73. val end_time2 = new Date().getTime


  74. println("HBase批量插入运行时间为:" + (end_time2 - start_time2))




  75. val start_time3 = new Date().getTime


  76. keyword.rdd.map(f =>{


  77. val keyword = f.getString(0)


  78. val app_id = f.getString(1)


  79. val catalog_name = f.getString(2)


  80. val keyword_catalog_pv = f.getString(3)


  81. val keyword_catalog_pv_rate = f.getString(4)


  82. val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)


  83. val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)


  84. (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))


  85. }).saveAsHadoopDataset(HBaseUtils1x.getJobConf(tableName3))


  86. val end_time3 = new Date().getTime


  87. println("saveAsHadoopDataset方式写入运行时间为:" + (end_time3 - start_time3))


  88. //


  89. val start_time4 = new Date().getTime


  90. keyword.rdd.map(f =>{


  91. val keyword = f.getString(0)


  92. val app_id = f.getString(1)


  93. val catalog_name = f.getString(2)


  94. val keyword_catalog_pv = f.getString(3)


  95. val keyword_catalog_pv_rate = f.getString(4)


  96. val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)


  97. val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)


  98. (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))


  99. }).saveAsNewAPIHadoopDataset(HBaseUtils1x.getNewJobConf(tableName4,spark.sparkContext))


  100. val end_time4 = new Date().getTime


  101. println("saveAsNewAPIHadoopDataset方式写入运行时间为:" + (end_time4 - start_time4))




  102. val hbaseRdd = spark.sparkContext.newAPIHadoopRDD(HBaseUtils1x.getNewConf(tableName1), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])


  103. println(hbaseRdd.count())


  104. hbaseRdd.foreach{


  105. case(_,result) => {


  106. // 获取行键


  107. val rowKey = Bytes.toString(result.getRow)


  108. val keyword = Bytes.toString(result.getValue(cf.getBytes(), "keyword".getBytes()))


  109. val keyword_catalog_pv_rate = Bytes.toDouble(result.getValue(cf.getBytes(), "keyword_catalog_pv_rate".getBytes()))


  110. println(rowKey + "," + keyword + "," + keyword_catalog_pv_rate)


  111. }


  112. }


  113. }


  114. }


  115. package com.sparkStudy.utils




  116. import org.apache.hadoop.conf.Configuration


  117. import org.apache.hadoop.hbase.client.BufferedMutator.ExceptionListener


  118. import org.apache.hadoop.hbase.client._


  119. import org.apache.hadoop.hbase.io.ImmutableBytesWritable


  120. import org.apache.hadoop.hbase.protobuf.ProtobufUtil


  121. import org.apache.hadoop.hbase.util.{Base64, Bytes}


  122. import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}


  123. import org.apache.hadoop.mapred.JobConf


  124. import org.apache.hadoop.mapreduce.Job


  125. import org.apache.spark.SparkContext


  126. import org.slf4j.LoggerFactory






  127. /**


  128. * @Author: JZ.Lee


  129. * @Description:HBase1x增删改查


  130. * @Date: Created at 上午11:02 18-8-14


  131. * @Modified By:


  132. */


  133. object HBaseUtils1x {


  134. private val LOGGER = LoggerFactory.getLogger(this.getClass)


  135. private var connection:Connection = null


  136. private var conf:Configuration = null




  137. def init() = {


  138. conf = HBaseConfiguration.create()


  139. conf.set("hbase.zookeeper.quorum", "lee")


  140. connection = ConnectionFactory.createConnection(conf)


  141. }




  142. def getJobConf(tableName:String) = {


  143. val conf = HBaseConfiguration.create()


  144. val jobConf = new JobConf(conf)


  145. jobConf.set("hbase.zookeeper.quorum", "lee")


  146. jobConf.set("hbase.zookeeper.property.clientPort", "2181")


  147. jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)


  148. jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])


  149. jobConf


  150. }




  151. def getNewConf(tableName:String) = {


  152. conf = HBaseConfiguration.create()


  153. conf.set("hbase.zookeeper.quorum", "lee")


  154. conf.set("hbase.zookeeper.property.clientPort", "2181")


  155. conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,tableName)


  156. val scan = new Scan()


  157. conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))


  158. conf


  159. }




  160. def getNewJobConf(tableName:String) = {


  161. val conf = HBaseConfiguration.create()

  162. conf.set("hbase.zookeeper.quorum", Constants.ZOOKEEPER_SERVER_NODE)

  163. conf.set("hbase.zookeeper.property.clientPort", "2181")

  164. conf.set("hbase.defaults.for.version.skip", "true")

  165. conf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)

  166. conf.setClass("mapreduce.job.outputformat.class", classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]],

  167. classOf[org.apache.hadoop.mapreduce.OutputFormat[String, Mutation]])

  168. new JobConf(conf)

  169. }




  170. def closeConnection(): Unit = {


  171. connection.close()


  172. }


  173. def getGetAction(rowKey: String):Get = {


  174. val getAction = new Get(Bytes.toBytes(rowKey));


  175. getAction.setCacheBlocks(false);


  176. getAction


  177. }




  178. def getPutAction(rowKey: String, familyName:String, column: Array[String], value: Array[String]):Put = {


  179. val put: Put = new Put(Bytes.toBytes(rowKey));


  180. for (i <- 0 until(column.length)) {


  181. put.add(Bytes.toBytes(familyName), Bytes.toBytes(column(i)), Bytes.toBytes(value(i)));


  182. }


  183. put


  184. }




  185. def insertData(tableName:String, put: Put) = {


  186. val name = TableName.valueOf(tableName)


  187. val table = connection.getTable(name)


  188. table.put(put)


  189. }




  190. def addDataBatchEx(tableName:String, puts:java.util.List[Put]): Unit = {


  191. val name = TableName.valueOf(tableName)


  192. val table = connection.getTable(name)


  193. val listener = new ExceptionListener {


  194. override def onException


  195. (e: RetriesExhaustedWithDetailsException, bufferedMutator: BufferedMutator): Unit = {


  196. for(i <-0 until e.getNumExceptions){


  197. LOGGER.info("写入put失败:" + e.getRow(i))


  198. }


  199. }


  200. }


  201. val params = new BufferedMutatorParams(name)


  202. .listener(listener)


  203. .writeBufferSize(4*1024*1024)


  204. try{


  205. val mutator = connection.getBufferedMutator(params)


  206. mutator.mutate(puts)


  207. mutator.close()


  208. }catch {


  209. case e:Throwable => e.printStackTrace()


  210. }


  211. }


  212. }


  213. -------------------------------------------------------

  214.  版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/leen0304/article/details/78855576

    概要

    saveAsHadoopDataset:
    使用旧的Hadoop API将RDD输出到任何Hadoop支持的存储系统,为该存储系统使用Hadoop JobConf 对象。
    JobConf设置一个OutputFormat和任何需要的输出路径(如要写入的表名),就像为Hadoop MapReduce作业配置的那样。

    saveAsNewAPIHadoopDataset:

    使用新的Hadoop API将RDD输出到任何Hadoop支持的存储系统,为该存储系统使用Hadoop Configuration对象。
    Conf设置一个OutputFormat和任何需要的输出路径(如要写入的表名),就像为Hadoop MapReduce作业配置的那样。

    saveAsHadoopDataset

    saveAsHadoopDataset(conf: JobConf): Unit 

    案例:将RDD写入hbase

    saveAsNewAPIHadoopDataset

     saveAsNewAPIHadoopDataset(conf: Configuration): Unit 

    案例:将RDD写入HBASE


    1. import org.apache.hadoop.hbase.client.Put

    2. import org.apache.hadoop.hbase.io.ImmutableBytesWritable

    3. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

    4. import org.apache.hadoop.hbase.client.Result

    5. import org.apache.hadoop.hbase.util.Bytes

    6. import org.apache.hadoop.mapreduce.Job

    7. import org.apache.spark.{SparkContext, SparkConf}


    8. /**

    9. * User:leen

    10. * Date:2017/12/20 0020

    11. * Time:17:34

    12. */

    13. object HbaseTest2 {


    14. def main(args: Array[String]): Unit = {

    15. val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")

    16. val sc = new SparkContext(sparkConf)


    17. val tablename = "account"


    18. sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")

    19. sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")

    20. sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)


    21. val job = Job.getInstance(sc.hadoopConfiguration)

    22. job.setOutputKeyClass(classOf[ImmutableBytesWritable])

    23. job.setOutputValueClass(classOf[Result])

    24. job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])


    25. val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))


    26. val rdd = indataRDD.map(_.split(',')).map{arr=>{

    27. val put = new Put(Bytes.toBytes(arr(0)))

    28. put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))

    29. put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))

    30. (new ImmutableBytesWritable, put)

    31. }}


    32. rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())


    33. sc.stop()

    34. }

    35. }


    1. import org.apache.hadoop.hbase.HBaseConfiguration

    2. import org.apache.hadoop.hbase.client.Put

    3. import org.apache.hadoop.hbase.io.ImmutableBytesWritable

    4. import org.apache.hadoop.hbase.mapred.TableOutputFormat

    5. import org.apache.hadoop.hbase.util.Bytes

    6. import org.apache.hadoop.mapred.JobConf

    7. import org.apache.spark.{SparkContext, SparkConf}


    8. /**

    9. * User:leen

    10. * Date:2017/12/20 0020

    11. * Time:16:51

    12. */

    13. object HbaseTest1 {

    14. def main(args: Array[String]): Unit = {

    15. val sparkConf = new SparkConf().setAppName("HBaseTest1").setMaster("local")

    16. val sc = new SparkContext(sparkConf)


    17. val conf = HBaseConfiguration.create()

    18. //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置

    19. conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")

    20. //设置zookeeper连接端口,默认2181

    21. conf.set("hbase.zookeeper.property.clientPort", "2181")


    22. val tablename = "account"


    23. //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的

    24. val jobConf = new JobConf(conf)

    25. jobConf.setOutputFormat(classOf[TableOutputFormat])

    26. jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)


    27. val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))


    28. val rdd = indataRDD.map(_.split(',')).map{arr=>{


    29. // 一个Put对象就是一行记录,在构造方法中指定主键

    30. // 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换

    31. // Put.add方法接收三个参数:列族,列名,数据

    32. val put = new Put(Bytes.toBytes(arr(0).toInt))

    33. put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))

    34. put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))

    35. //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset

    36. (new ImmutableBytesWritable, put)

    37. }}


    38. rdd.saveAsHadoopDataset(jobConf)


    39. sc.stop()

    40. }

    41. }

  215. Spark算子[20]:saveAsHadoopDataset、saveAsNewAPIHadoopDataset 实例详解

    2017年12月20日 17:27:19 生命不息丶折腾不止 阅读数:1592

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多