分享

Spark Python API函数学习:pyspark API(1) – 过往记忆

 dazheng 2015-11-05
 
1 # print <span class="wp_keywordlink_affiliate"><a href="http://www./archives/tag/spark" title="" target="_blank" data-original-title="View all posts in Spark">Spark</a></span> version
2 print("pyspark version:" + str(sc.version))
3  
4 pyspark version:1.2.2

map

spark map
01 # map
02 # sc = spark context, parallelize creates an RDD from the passed object
03 x = sc.parallelize([1,2,3])
04 y = x.map(lambda x: (x,x**2))
05  
06 # collect copies RDD elements to a list on the driver
07 print(x.collect())
08 print(y.collect())
09  
10 [1, 2, 3]
11 [(1, 1), (2, 4), (3, 9)]

flatMap

spark flatMap
1 # flatMap
2 x = sc.parallelize([1,2,3])
3 y = x.flatMap(lambda x: (x, 100*x, x**2))
4 print(x.collect())
5 print(y.collect())
6  
7 [1, 2, 3]
8 [1, 100, 1, 2, 200, 4, 3, 300, 9]

mapPartitions

spark mapPartitions
01 # mapPartitions
02 x = sc.parallelize([1,2,3], 2)
03 def f(iterator): yield sum(iterator)
04 y = x.mapPartitions(f)
05 # glom() flattens elements on the same partition
06 print(x.glom().collect()) 
07 print(y.glom().collect())
08  
09 [[1], [2, 3]]
10 [[1], [5]]

mapPartitionsWithIndex

spark mapPartitionsWithIndex
01 # mapPartitionsWithIndex
02 x = sc.parallelize([1,2,3], 2)
03 def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator))
04 y = x.mapPartitionsWithIndex(f)
05  
06 # glom() flattens elements on the same partition
07 print(x.glom().collect()) 
08 print(y.glom().collect())
09  
10 [[1], [2, 3]]
11 [[(0, 1)], [(1, 5)]]

getNumPartitions

spark getNumPartitions
1 # getNumPartitions
2 x = sc.parallelize([1,2,3], 2)
3 y = x.getNumPartitions()
4 print(x.glom().collect())
5 print(y)
6  
7 [[1], [2, 3]]
8 2

filter

spark filter
1 # filter
2 x = sc.parallelize([1,2,3])
3 y = x.filter(lambda x: x%2 == 1# filters out even elements
4 print(x.collect())
5 print(y.collect())
6  
7 [1, 2, 3]
8 [1, 3]

distinct

spark distinct
1 # distinct
2 x = sc.parallelize(['A','A','B'])
3 y = x.distinct()
4 print(x.collect())
5 print(y.collect())
6  
7 ['A', 'A', 'B']
8 ['A', 'B']

sample

spark sample
01 # sample
02 x = sc.parallelize(range(7))
03 # call 'sample' 5 times
04 ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)]
05 print('x = ' + str(x.collect()))
06 for cnt,y in zip(range(len(ylist)), ylist):
07     print('sample:' + str(cnt) + ' y = ' +  str(y.collect()))
08  
09 x = [0, 1, 2, 3, 4, 5, 6]
10 sample:0 y = [0, 2, 5, 6]
11 sample:1 y = [2, 6]
12 sample:2 y = [0, 4, 5, 6]
13 sample:3 y = [0, 2, 6]
14 sample:4 y = [0, 3, 4]

takeSample

spark takeSample
01 # takeSample
02 x = sc.parallelize(range(7))
03 # call 'sample' 5 times
04 ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)] 
05 print('x = ' + str(x.collect()))
06 for cnt,y in zip(range(len(ylist)), ylist):
07     print('sample:' + str(cnt) + ' y = ' +  str(y))  # no collect on y
08  
09 x = [0, 1, 2, 3, 4, 5, 6]
10 sample:0 y = [0, 2, 6]
11 sample:1 y = [6, 4, 2]
12 sample:2 y = [2, 0, 4]
13 sample:3 y = [5, 4, 1]
14 sample:4 y = [3, 1, 4]

union

spark union
01 # union
02 x = sc.parallelize(['A','A','B'])
03 y = sc.parallelize(['D','C','A'])
04 z = x.union(y)
05 print(x.collect())
06 print(y.collect())
07 print(z.collect())
08  
09 ['A', 'A', 'B']
10 ['D', 'C', 'A']
11 ['A', 'A', 'B', 'D', 'C', 'A']

intersection

spark intersection
01 # intersection
02 x = sc.parallelize(['A','A','B'])
03 y = sc.parallelize(['A','C','D'])
04 z = x.intersection(y)
05 print(x.collect())
06 print(y.collect())
07 print(z.collect())
08  
09 ['A', 'A', 'B']
10 ['A', 'C', 'D']
11 ['A']

sortByKey

spark sortByKey
1 # sortByKey
2 x = sc.parallelize([('B',1),('A',2),('C',3)])
3 y = x.sortByKey()
4 print(x.collect())
5 print(y.collect())
6  
7 [('B', 1), ('A', 2), ('C', 3)]
8 [('A', 2), ('B', 1), ('C', 3)]

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多