countByKey
2 | x = sc.parallelize([( 'B' , 1 ),( 'B' , 2 ),( 'A' , 3 ),( 'A' , 4 ),( 'A' , 5 )]) |
7 | [( 'B' , 1 ), ( 'B' , 2 ), ( 'A' , 3 ), ( 'A' , 4 ), ( 'A' , 5 )] |
8 | defaultdict(< type 'int' >, { 'A' : 3 , 'B' : 2 }) |
join
02 | x = sc.parallelize([( 'C' , 4 ),( 'B' , 3 ),( 'A' , 2 ),( 'A' , 1 )]) |
03 | y = sc.parallelize([( 'A' , 8 ),( 'B' , 7 ),( 'A' , 6 ),( 'D' , 5 )]) |
09 | [( 'C' , 4 ), ( 'B' , 3 ), ( 'A' , 2 ), ( 'A' , 1 )] |
10 | [( 'A' , 8 ), ( 'B' , 7 ), ( 'A' , 6 ), ( 'D' , 5 )] |
11 | [( 'A' , ( 2 , 8 )), ( 'A' , ( 2 , 6 )), ( 'A' , ( 1 , 8 )), ( 'A' , ( 1 , 6 )), ( 'B' , ( 3 , 7 ))] |
leftOuterJoin
02 | x = sc.parallelize([( 'C' , 4 ),( 'B' , 3 ),( 'A' , 2 ),( 'A' , 1 )]) |
03 | y = sc.parallelize([( 'A' , 8 ),( 'B' , 7 ),( 'A' , 6 ),( 'D' , 5 )]) |
09 | [( 'C' , 4 ), ( 'B' , 3 ), ( 'A' , 2 ), ( 'A' , 1 )] |
10 | [( 'A' , 8 ), ( 'B' , 7 ), ( 'A' , 6 ), ( 'D' , 5 )] |
11 | [( 'A' , ( 2 , 8 )), ( 'A' , ( 2 , 6 )), ( 'A' , ( 1 , 8 )), ( 'A' , ( 1 , 6 )), ( 'C' , ( 4 , None )), ( 'B' , ( 3 , 7 ))] |
rightOuterJoin
02 | x = sc.parallelize([( 'C' , 4 ),( 'B' , 3 ),( 'A' , 2 ),( 'A' , 1 )]) |
03 | y = sc.parallelize([( 'A' , 8 ),( 'B' , 7 ),( 'A' , 6 ),( 'D' , 5 )]) |
04 | z = x.rightOuterJoin(y) |
09 | [( 'C' , 4 ), ( 'B' , 3 ), ( 'A' , 2 ), ( 'A' , 1 )] |
10 | [( 'A' , 8 ), ( 'B' , 7 ), ( 'A' , 6 ), ( 'D' , 5 )] |
11 | [( 'A' , ( 2 , 8 )), ( 'A' , ( 2 , 6 )), ( 'A' , ( 1 , 8 )), ( 'A' , ( 1 , 6 )), ( 'B' , ( 3 , 7 )), ( 'D' , ( None , 5 ))] |
partitionBy
2 | x = sc.parallelize([( 0 , 1 ),( 1 , 2 ),( 2 , 3 )], 2 ) |
3 | y = x.partitionBy(numPartitions = 3 , partitionFunc = lambda x: x) # only key is passed to paritionFunc |
4 | print (x.glom().collect()) |
5 | print (y.glom().collect()) |
7 | [[( 0 , 1 )], [( 1 , 2 ), ( 2 , 3 )]] |
8 | [[( 0 , 1 )], [( 1 , 2 )], [( 2 , 3 )]] |
combineByKey
02 | x = sc.parallelize([( 'B' , 1 ),( 'B' , 2 ),( 'A' , 3 ),( 'A' , 4 ),( 'A' , 5 )]) |
03 | createCombiner = ( lambda el: [(el,el * * 2 )]) |
04 | mergeVal = ( lambda aggregated, el: aggregated + [(el,el * * 2 )]) # append to aggregated |
05 | mergeComb = ( lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2 |
06 | y = x.combineByKey(createCombiner,mergeVal,mergeComb) |
10 | [( 'B' , 1 ), ( 'B' , 2 ), ( 'A' , 3 ), ( 'A' , 4 ), ( 'A' , 5 )] |
11 | [( 'A' , [( 3 , 9 ), ( 4 , 16 ), ( 5 , 25 )]), ( 'B' , [( 1 , 1 ), ( 2 , 4 )])] |
aggregateByKey
02 | x = sc.parallelize([( 'B' , 1 ),( 'B' , 2 ),( 'A' , 3 ),( 'A' , 4 ),( 'A' , 5 )]) |
03 | zeroValue = [] # empty list is 'zero value' for append operation |
04 | mergeVal = ( lambda aggregated, el: aggregated + [(el,el * * 2 )]) |
05 | mergeComb = ( lambda agg1,agg2: agg1 + agg2 ) |
06 | y = x.aggregateByKey(zeroValue,mergeVal,mergeComb) |
10 | [( 'B' , 1 ), ( 'B' , 2 ), ( 'A' , 3 ), ( 'A' , 4 ), ( 'A' , 5 )] |
11 | [( 'A' , [( 3 , 9 ), ( 4 , 16 ), ( 5 , 25 )]), ( 'B' , [( 1 , 1 ), ( 2 , 4 )])] |
foldByKey
2 | x = sc.parallelize([( 'B' , 1 ),( 'B' , 2 ),( 'A' , 3 ),( 'A' , 4 ),( 'A' , 5 )]) |
3 | zeroValue = 1 # one is 'zero value' for multiplication |
4 | y = x.foldByKey(zeroValue, lambda agg,x: agg * x ) # computes cumulative product within each key |
8 | [( 'B' , 1 ), ( 'B' , 2 ), ( 'A' , 3 ), ( 'A' , 4 ), ( 'A' , 5 )] |
groupByKey
2 | x = sc.parallelize([( 'B' , 5 ),( 'B' , 4 ),( 'A' , 3 ),( 'A' , 2 ),( 'A' , 1 )]) |
5 | print ([(j[ 0 ],[i for i in j[ 1 ]]) for j in y.collect()]) |
7 | [( 'B' , 5 ), ( 'B' , 4 ), ( 'A' , 3 ), ( 'A' , 2 ), ( 'A' , 1 )] |
8 | [( 'A' , [ 3 , 2 , 1 ]), ( 'B' , [ 5 , 4 ])] |
flatMapValues
2 | x = sc.parallelize([( 'A' ,( 1 , 2 , 3 )),( 'B' ,( 4 , 5 ))]) |
3 | y = x.flatMapValues( lambda x: [i * * 2 for i in x]) # function is applied to entire value, then result is flattened |
7 | [( 'A' , ( 1 , 2 , 3 )), ( 'B' , ( 4 , 5 ))] |
8 | [( 'A' , 1 ), ( 'A' , 4 ), ( 'A' , 9 ), ( 'B' , 16 ), ( 'B' , 25 )] |
mapValues
2 | x = sc.parallelize([( 'A' ,( 1 , 2 , 3 )),( 'B' ,( 4 , 5 ))]) |
3 | y = x.mapValues( lambda x: [i * * 2 for i in x]) # function is applied to entire value |
7 | [( 'A' , ( 1 , 2 , 3 )), ( 'B' , ( 4 , 5 ))] |
8 | [( 'A' , [ 1 , 4 , 9 ]), ( 'B' , [ 16 , 25 ])] |
groupWith
02 | x = sc.parallelize([( 'C' , 4 ),( 'B' ,( 3 , 3 )),( 'A' , 2 ),( 'A' ,( 1 , 1 ))]) |
03 | y = sc.parallelize([( 'B' ,( 7 , 7 )),( 'A' , 6 ),( 'D' ,( 5 , 5 ))]) |
04 | z = sc.parallelize([( 'D' , 9 ),( 'B' ,( 8 , 8 ))]) |
10 | for key,val in list (a.collect()): |
11 | print (key, [ list (i) for i in val]) |
13 | [( 'C' , 4 ), ( 'B' , ( 3 , 3 )), ( 'A' , 2 ), ( 'A' , ( 1 , 1 ))] |
14 | [( 'B' , ( 7 , 7 )), ( 'A' , 6 ), ( 'D' , ( 5 , 5 ))] |
15 | [( 'D' , 9 ), ( 'B' , ( 8 , 8 ))] |
19 | B [[( 3 , 3 )], [( 7 , 7 )], [( 8 , 8 )]] |
20 | A [[ 2 , ( 1 , 1 )], [ 6 ], []] |
cogroup
02 | x = sc.parallelize([( 'C' , 4 ),( 'B' ,( 3 , 3 )),( 'A' , 2 ),( 'A' ,( 1 , 1 ))]) |
03 | y = sc.parallelize([( 'A' , 8 ),( 'B' , 7 ),( 'A' , 6 ),( 'D' ,( 5 , 5 ))]) |
07 | for key,val in list (z.collect()): |
08 | print (key, [ list (i) for i in val]) |
10 | [( 'C' , 4 ), ( 'B' , ( 3 , 3 )), ( 'A' , 2 ), ( 'A' , ( 1 , 1 ))] |
11 | [( 'A' , 8 ), ( 'B' , 7 ), ( 'A' , 6 ), ( 'D' , ( 5 , 5 ))] |
12 | A [[ 2 , ( 1 , 1 )], [ 8 , 6 ]] |
sampleByKey
2 | x = sc.parallelize([( 'A' , 1 ),( 'B' , 2 ),( 'C' , 3 ),( 'B' , 4 ),( 'A' , 5 )]) |
3 | y = x.sampleByKey(withReplacement = False , fractions = { 'A' : 0.5 , 'B' : 1 , 'C' : 0.2 }) |
7 | [( 'A' , 1 ), ( 'B' , 2 ), ( 'C' , 3 ), ( 'B' , 4 ), ( 'A' , 5 )] |
8 | [( 'B' , 2 ), ( 'C' , 3 ), ( 'B' , 4 )] |
subtractByKey
02 | x = sc.parallelize([( 'C' , 1 ),( 'B' , 2 ),( 'A' , 3 ),( 'A' , 4 )]) |
03 | y = sc.parallelize([( 'A' , 5 ),( 'D' , 6 ),( 'A' , 7 ),( 'D' , 8 )]) |
09 | [( 'C' , 1 ), ( 'B' , 2 ), ( 'A' , 3 ), ( 'A' , 4 )] |
10 | [( 'A' , 5 ), ( 'D' , 6 ), ( 'A' , 7 ), ( 'D' , 8 )] |
subtract
02 | x = sc.parallelize([( 'C' , 4 ),( 'B' , 3 ),( 'A' , 2 ),( 'A' , 1 )]) |
03 | y = sc.parallelize([( 'C' , 8 ),( 'A' , 2 ),( 'D' , 1 )]) |
09 | [( 'C' , 4 ), ( 'B' , 3 ), ( 'A' , 2 ), ( 'A' , 1 )] |
10 | [( 'C' , 8 ), ( 'A' , 2 ), ( 'D' , 1 )] |
11 | [( 'A' , 1 ), ( 'C' , 4 ), ( 'B' , 3 )] |
keyBy
2 | x = sc.parallelize([ 1 , 2 , 3 ]) |
3 | y = x.keyBy( lambda x: x * * 2 ) |
8 | [( 1 , 1 ), ( 4 , 2 ), ( 9 , 3 )] |
repartition
2 | x = sc.parallelize([ 1 , 2 , 3 , 4 , 5 ], 2 ) |
3 | y = x.repartition(numPartitions = 3 ) |
4 | print (x.glom().collect()) |
5 | print (y.glom().collect()) |
coalesce
2 | x = sc.parallelize([ 1 , 2 , 3 , 4 , 5 ], 2 ) |
3 | y = x.coalesce(numPartitions = 1 ) |
4 | print (x.glom().collect()) |
5 | print (y.glom().collect()) |
zip
02 | x = sc.parallelize([ 'B' , 'A' , 'A' ]) |
03 | # zip expects x and y to have same #partitions and #elements/partition |
04 | y = x. map ( lambda x: ord (x)) |
12 | [( 'B' , 66 ), ( 'A' , 65 ), ( 'A' , 65 )] |
zipWithIndex
2 | x = sc.parallelize([ 'B' , 'A' , 'A' ], 2 ) |
4 | print (x.glom().collect()) |
8 | [( 'B' , 0 ), ( 'A' , 1 ), ( 'A' , 2 )] |
zipWithUniqueId
2 | x = sc.parallelize([ 'B' , 'A' , 'A' ], 2 ) |
4 | print (x.glom().collect()) |
8 | [( 'B' , 0 ), ( 'A' , 1 ), ( 'A' , 3 )] |
PDF版下载
点击进入下载
|