分享

spark中的算子

 头号码甲 2022-11-08 发布于北京

Transformation类型算子:不会定义后立即执行的算子

Actions类型算子:立即执行

 

1.map算子

  把原来的数据用map的自定义形式来切换成新的RDD。

scala> rdd_f1.collect()
res32: Array[String] = Array(i am a sutdnet, i am a boy)

scala> var rdd_f2 = rdd_f1.map(x=>x.split)
split   splitAt

//把原来的两个字符串拆分为两个素组 每个单词是数组里面的元素 拆分符号为“ ”空格 scala> var rdd_f2 = rdd_f1.map(x=>x.split(" ")) rdd_f2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[48] at map at <console>:25 scala> rdd_f2.collect() res33: Array[Array[String]] = Array(Array(i, am, a, sutdnet), Array(i, am, a, boy))

 2.flatMap算子

  相对于map,flatMap会在切割后把数组拆开 但是只会拆一层(最外层)

scala> var rdd_f3 = rdd_f1.flatMap(x=>x.split(" "))

scala> rdd_f3.collect()
res34: Array[String] = Array(i, am, a, sutdnet, i, am, a, boy)

 3.mapPartitions算子

  相对于map功能,输入的元素是整个分区,也就是说传入函数的操作对象是每个分区的iterator集合。该操作不会导致Partitons数量的变化。

//创建一个1-10的数据集合
scala> var rdd_mp = sc.parallelize(1 to 10)

//取出大于3的数据 scala> val mapPartitonsRDD = rdd_mp.mapPartitions(iter => iter.filter(_>3))
//打印 scala> mapPartitonsRDD.collect() res35: Array[Int] = Array(4, 5, 6, 7, 8, 9, 10)

 4.sortBy算子

  sortBy(f:(T) =>  K, ascending, numPartitions)

  f:(T) => K:左边是要被排序对象中的每一个元素 右边的返回值是元素中要进行排序的值。

  ascending:true升序排列,false降序

  numPartitions:排序后RDD分区数,默认排序后分区数和排序前相等。

scala> val rdd_sortBy = sc.parallelize(List(("zhangsna",20),("lisi",10),("wangwu",24)))

scala> rdd_sortBy.collect()
res43: Array[(String, Int)] = Array((zhangsna,20), (lisi,10), (wangwu,24))

scala> rdd_sortBy.sortBy(x=>x._2,true).collect
collect   collectAsMap   collectAsync

scala> rdd_sortBy.sortBy(x=>x._2,true).collect()
res44: Array[(String, Int)] = Array((lisi,10), (zhangsna,20), (wangwu,24))

 5.filter算子

  返回值为true的元素 组成新的RDD 相对于一个比较。

scala> rdd1.collect()
res45: Array[Int] = Array(1, 2, 3)

scala> val result=rdd1.filter(x=>x>1)

scala> result.collect()
res46: Array[Int] = Array(2, 3)

 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多