配色: 字号:
part2-spark键值对操作
2018-06-18 | 阅:  转:  |  分享 
  
spark键值对操作1创建PairRDD(键值型的RDD)构建键值对RDD的方法在不同的语言中会有所不同。Java用户可以通过ne
wTuple2(elem1,elem2)来创建一个新的二元组,并且可以通过._1()和._2()方法访问其中的元素。
除此以外还可利用mapToPair()方法来构建键值对。2聚合操作聚合操作的鼻祖combineByKey(createCom
biner(),mergeValue(),mergeCombiners())所有分区同时运行combineByKey是最为常用
的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的.首先必须明确从该方法的字面上就能够知道该方法是用来将相同的键所对应的
value进行合并的。当然怎样合并取决于该函数的内部逻辑(自定义的)。对于该函数来说上述的三个参数一个不能少。这三个参数的含义分别
如下:CreateCombiner组合器函数:就是将RDD中每个分区中的元素进行变型(组合),如转为Tuple等形式。Merge
Value合并值函数:顾名思义,一般用来将同一个分区中的相同键对应的值进行合并(预聚合)mergeCombiners组合合并值
函数:将rdd中不同分区相同键的值进行组合与值合并。注意:组合指的是改变分区中元素的形式,而值合并指的是将values值进行相加。
总之:combineByKey就是先将RDD中每个分区的元素进行处理,然后再将各自分区中的元素做值合并操作,最后在做分区之间的组合
与值合并操作。combineByKey处理值的流程分析:上图中coffee,panda表示的是键,而1,2,3,9表示的值对应的
value。上图中表示一个RDD的两个分区。CombinerByKey先针对每个分区进行数据处理,当遇到一个新的key时(由于在
一个分区中key可能发生重复,如上图中coffee就出现了两次,那么所谓新key指的就是第一次遇见coffee,而再遇见以coff
ee为键的键值对,其coffee就不算是新的key了)就会执行CreateCombiner的操作,即进行初始化(就是将该键值对中
的value,转为(value,1)),这个在求value平均值的案例中体现的是(value,1)。当在遇到该key所对应的键值
对时,则对该该键值对进行MergeValue操作,即将上一个键值对于这个第二次遇见的键值对中的值进行合并求和。每个分区进行同时处
理。最后运用mergeCombiners进行各个分区之间的值的合并。求value平均值案例注:acc:(Int,Int)这个参
数是上次经过(v)=>(v,1)处理后的键值对,而(acc:(Int,Int),v)中的v表示第二个传入的键值对中的value
。新的键值经历(v)=>(v,1)而旧的键值不经过该步骤。同样传入combinerByKey中的值是value,而key不会传入
其中。上述程序中acc1acc2分别表示分区1,分区2(acc:(Int,Int),v)中(Int,Int)表示的是经过初
始化后的键值第一个Int表示的是上一个的v第二个Int表示的是上一个的计数器V表示当前的value(第二
个传入的v)结果:3分组操作典型代表就是groupByKey()groupBycogroup():将多个RDD中对的键值对进行
分组4连接操作内联,左连接,有链接,笛卡尔积pairRDD的行动操作6数据分区(1)分区的目的:在分布式程序中,通信的代价是很大
的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。Spark程序可以通过控制RDD分区方式来减少通信开销。但是也
不是所有的rdd都有必要进行分区,因为分区操作会涉及到跨节点数据混洗的操作。只有对那些数据量大的,重复多次使用的RDD才会使用分区
操作。这里的分区操作是针对键值形式的RDD来说的。分区目的案例(join)首先我们之前讨论过join这个算子究竟是宽依赖还是
窄依赖。事实上,在执行join操作之前若对两个rdd都进行了分区操作,那么此时的join就相当于窄依赖。而若没有进行分区则就是宽依
赖。举个例子:有一张很大的用户信息表userData(userid,userinfo)和一张小表events(useri
d,LinkInfo),该表用于存放过去5分钟的数据。我们需要讲这两张表进行jion操作,即实现(userid,(userinf
o,Linkinfo))。此时RDD1就是userData,RDD2就是events情景一:不对这两个RDD进行分区(图中
阴影部分代表的是分区)这里所说的分区是针对键值对的分区。上图中说明了不进行分区,的join过程。显然对于userData和ev
ent都会进行网络传输,且会发生数据混洗,由于userData表的数据量大,且会被多次使用,因此会导致整个程序执行效率低下。情景
二:只对userData进行哈西分区,不对event进行哈西分区这样的操作是有原因的,因为userData的数据量大因此可以进行分
区,而event数据量小因此没有必要进行分区。从上图的分区结果上看不仅免去了userData的数据混洗,而且让event中的元素直
接分到userData所在的节点上,这样还避免了userData过程的网络通信。需要注意,如果我们将上述的过程封装成一个方法,而
这个方法又经常被调用,那么每次调用就会被重新分区一次,事实上多次分区和不进行分区使其进行数据混洗的效果是一样的,因此要想让分区达到
真正的效果,我们还需要对分区后的数据进行持久化。即将分区的结果保存下来。如果不调用persist()的话,后续的RDD操作
会对partitioned的整个谱系重新求值,这会导致对pairs一遍又一遍地进行哈希分区操作分区的方式:范围分区器
newrangePartitioner:将指定的范围内的key,当做同一个key进行处理哈希分区器newHashParti
tioner对于join等它是有默认分区的,join的默认分区为hash分区器,且默认分区的个数为1,相当于没进行分区。可以通过p
artitionBy(newHashPartitioner(2))的方法进行指定。也可以通过rdd1.join(rdd2,
2)直接指定分区。对于sortByKey(false,2)他是有默认分区器的,默认分区器为范围分区器。如:ReduceByke
y的默认分区器:是哈希分区器注:,诸如map()这样的操作会导致新的RDD失去父RDD的分区信息,因为这样的操作理论上
可能会修改每条记录的键。Parititons与partitioner的区别rdd.partitions.size返回分区个数r
dd.partitioner.get返回分区方式(2)从分区中收益的操作cogroup()、groupWith()、join
()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、co
mbineByKey()以及lookup()其中reduceByKey是针对一个RDD来说的,若是没有分区,则就没有网络io的
消耗。数据混洗。而对于诸如cogroup()和join()这样的二元操作,预先进行数据分区会导致其中至少一个RDD(使用已
知分区器的那个RDD)不发生数据混洗。如果两个RDD使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个RDD是通
过mapValues()从另一个RDD中创建出来的,这两个RDD就会拥有相同的键和分区方式),或者其中一个RDD还没
有被计算出来,那么跨节点的数据混洗就不会发生了(3)影响分区方式的操作(一)默认了分区器的算子下面的算子其底层都设置了默认的分区方
法:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、g
roupByKey()、reduceByKey()、combineByKey()、partitionBy()、sort()、map
Values()(如果父RDD有分区方式的话)、flatMapValues()(如果父RDD有分区方式的话),以及fil
ter()(如果父RDD有分区方式的话)。其他所有的操作生成的结果都不会存在特定的分区方式。注:map没有默认的分区方法,这是
因为map操作很有可能改变键,因此没有默认的分区方法。上述的分区都只能用于键值型的RDD。(二)rdd的分区类型注:对于最终rdd
是采用何种方式进行分区的,这个并不完全取决于算子默认的分区器类型,而是取决于该rdd的父rdd的分区类型。Rdd的分区类型是指在没
有进行任何关于分区操作(刚刚创建)的时候,这个RDD跟那种算子进行运算,这个RDD就属于那种类型。如varrdd1=rdd.r
educeByKey()则rdd1就是hash分区器类型。然后varrdd2=rdd1.sortByKey()Rdd
2并不是范围分区器类型,而是与其父rdd1的类型相同是hash分区器类型。(4)PageRank算法由于PageRank算法
中涉及到迭代过程,循环调用了join算子。因此我们对该算法进行分区优化,使得优化后的pageRank算法相比于原始的pageRan
k算法避免了网络传输数据混洗的过程。因此PageRank算法是分区的收益者。PageRank算法:PageRank对网页排名的
算法,曾是Google发家致富的法宝。PageRank算法计算每一个网页的PageRank值,然后根据这个值的大小对网页的重要性进
行排序。具体介绍可以参见http://blog.csdn.net/ZCF1002797280/article/details/50
254069自定义分区函数我们说过在spark中有两种分区函数,一个是hash分区器,一个是范围分区器。事实上在有些业务逻辑中这两
种分区方法是不能够满足需求的。利如对于网页,一般具有相同域名的网页之间是可以相互跳转的。如http:/www.baidu.com/
page1和http:/www.baidu.com/page2。因此我们一般将这两个分到同一个区域中。但是如果我们令他们作为k
ey并运用hash分区器进行分区,根据hash分区器的处理方式:code=key.hashcode%numPartition
会因为http:/www.baidu.com/page1与http:/www.baidu.com/page2不同而不将他们放到同
一个节点上(分区上)。这样在进行页面跳转是往往伴随数据混洗。这个时候我们就需要自己定义一个分区方法来实现将上述的两个相同域名网页
放到同一个分区中。思路:我们仍然使用hashcode方法,模仿hashPartitioner的代码来自定义一个分区器,主要是对ke
y这一块进行设计,其余于hashPartitioner是一样的。HashPatitioner分区器代码其中nonNegativeMod方法2自定义分区方法案例自定义分区方法类要求:实现Partitioner类重写numPartitiongetPartition()equals()测试结果:RDD及其分区解析Spark中的两种分区:针对文件的block数进行的分区在suffle过程中针对键值类型数据进行的分区(哈西分区,范围分区)Spark中两种分区的区别:针对文件的block数进行的分区方式不存在“同一组的键出现在同一节点上”的说法,因为此分区方式并不是根据键值来分区的。而是根据block来分区的。而后则就有“同一组的键出现在同一节点上”的说法
献花(0)
+1
(本文系实习生101首藏)