分享

Spark你一定学得会(二)No.8

 尹广学 2018-01-30

第一次分享的妥妥就是入门的干货,小伙伴们最好可以自己敲一敲代码,不然只看我的分享一点用都木有。但还是有很多小伙伴表示看不懂,没关系,慢慢来自己操作一遍有什么问题后台问我就可以了。

啥也不说了,就是干货,首先祭上今天的关键代码。今天主要跟大家讲讲Spark里面RDD的持久化机制。首先持久化机制有什么用呢?一个作用是保存到硬盘给其他小伙伴查看,另外一个作用是重用,我们都知道RDD是不可变的,所以当RDD有重用的时候,如果没有持久化,RDD都会乖乖地重新算。。

objectRDDPersist {

defmain(args: Array[String]): Unit = {

valconf: SparkConf =newSparkConf().setMaster('local[*]').setAppName('helloWorld')

valsc: SparkContext =newSparkContext(conf);

sc.setCheckpointDir('~/checkpoint');

varsomeList =List[String]('hello','world')

//我们先假设这里有一百万的数据

valdefaultPartition =5;

valwordsRDD: RDD[String] = sc.parallelize(someList,defaultPartition);

valsomeThingSenceless = wordsRDD.map(x => x ',').map(x => ((Math.floor(Math.random()*100)) ,x)).filter(x => x._1 <50)

someThingSenceless.count();

someThingSenceless.groupByKey().count();

wordsRDD.repartition(defaultPartition*2);

wordsRDD.coalesce(defaultPartition ,false)

wordsRDD.cache()

wordsRDD.persist(StorageLevel.MEMORY_ONLY);

/*

StorageLevel.DISK_ONLY

StorageLevel.DISK_ONLY_2

StorageLevel.MEMORY_ONLY

StorageLevel.MEMORY_ONLY_2

StorageLevel.MEMORY_ONLY_SER

StorageLevel.MEMORY_ONLY_SER_2

StorageLevel.MEMORY_AND_DISK

StorageLevel.MEMORY_AND_DISK_2

StorageLevel.MEMORY_AND_DISK_SER

StorageLevel.MEMORY_AND_DISK_SER_2

*/

wordsRDD.saveAsTextFile('~/')

wordsRDD.unpersist();

wordsRDD.checkpoint();

}

从前面的分享我们可以知道,Spark快的根本原因就是Spark的中间过程都存储在内存里面,而这一切的基础就是RDD的持久化,RDD默认是根据partition按分区持久化在内存里边的。啥意思呢,一份数据就好像一个钥匙串,而这钥匙串会分成好多根钥匙保存到集群机器上。

varsomeList =List[String]('hello','world')

//我们先假设这里有一百万的数据

valdefaultPartition =5;

valwordsRDD: RDD[String] = sc.parallelize(someList,defaultPartition);

就是初始化了一个有一百万数据的RDD,分布在5个分区里面。

五个分区好像有点少,想分配到更多的分区咋办?

wordsRDD.repartition(defaultPartition*2);

wordsRDD.coalesce(defaultPartition ,false)

coalesce是一个重新分区操作,第一个参数是目标的分区数,第二个参数是是否进行suffle,也就是混淆重新洗牌。如果不重新洗牌,那么就会按照分区数一对一分配到更多的分区上,比如一个分区拆分成两个分区这样,而不是把两个分区进行混淆重新洗牌然后平均分配到三个分区里边。怎么理解这个概念呢,看下面这个图,文字可以忽略,左边就是没有suffle的,右边是有的。


repartition是coalesce的简单实现,就是把第二个参数默认为true,就是repartition了。

valsomeThingSenceless = wordsRDD.map(x => x ',').map(x => ((Math.floor(Math.random()*100)) ,x)).filter(x => x._1 <50)

someThingSenceless.count();

someThingSenceless.groupByKey().count();

这里的操作都没什么意义,就是做一些复杂的操作而已。如果我们不对wordsRDD 进行持久化,那么一切的操作都会从源头开始,一步一步往下算,不会复用原始的数据。这明显不是我们想要的,如果我们不想这样,那要怎么办呢?

wordsRDD.cache()

wordsRDD.persist(StorageLevel.MEMORY_ONLY);

/*

StorageLevel.DISK_ONLY

StorageLevel.DISK_ONLY_2

StorageLevel.MEMORY_ONLY

StorageLevel.MEMORY_ONLY_2

StorageLevel.MEMORY_ONLY_SER

StorageLevel.MEMORY_ONLY_SER_2

StorageLevel.MEMORY_AND_DISK

StorageLevel.MEMORY_AND_DISK_2

StorageLevel.MEMORY_AND_DISK_SER

StorageLevel.MEMORY_AND_DISK_SER_2

*/

虽然说Spark的东西都尽量放到内存里边,但是对于非常非常巨大的数据集,Spark能拿它怎么办呢?总不能全丢进去然后丢弃一大批吧?所以就有了以下的策略选择了。

cache()方法很好理解,就是全都放在内存里面,也就是MEMORY_ONLY。persist方法的默认参数也是仅内存。MEMORY_AND_DISK的意思就是,内存能放得下就放内存,放不下就放硬盘。如何理解这几个参数的搭配呢?从三个要素来搭配就行了,存储位置、存储份数,是否序列化。三个的所有搭配都在上面的。

通常建议是使用StorageLevel.MEMORY_AND_DISK_SER。为什么使用这个呢?比较明显的原因,就是可以通过序列化压缩的方式,保证内存利用最大化。

wordsRDD.saveAsTextFile('~/')

细心的观众可能会发现,这又是什么玩意。这就是把目前的数据集直接保存到Driver硬盘上的一个API而已。

我们都知道RDD是有parent的,也就是RDD是一个有向无环图,那么如果调用链太长太长了,那么最后的一个操作引起机器失联,会导致整个计算从头开始,这样的开销非常非常大,特别是在深度非常深的迭代操作中,那怎么解决这个问题呢?

sc.setCheckpointDir('~/checkpoint');

wordsRDD.checkpoint();

答案就是使用checkpoint,这个是啥意思呢,就是把RDD保存到磁盘或者HDFS上,直接把当前RDD当成顶级RDD,也就没有所谓RDD的依赖链了,可以缓解这个多次迭代导致的问题。但是世界上哪有这么好的事,checkpoint的代价就是RDD保存在硬盘上,效率总是要降低的。

数据没用了咋办?删掉呗。无论是什么方式,都可以使用unpersist这个API进行删除所有的持久化。

wordsRDD.unpersist();

好了今天的分享就到这里,RDD的管理只是Spark的内存管理的一部分,优先理解这部分内容,对开发Spark应用优化有非常重要的作用,就酱。

如果分享的话,我会很开心的。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多