分享

大数据技术,Spark之RDD,这些就够了,RDD超详细讲解(一)

 喵感数据 2020-05-23

一、RDD为什么出现?

在实际开发应用中,存在许多迭代式计算,这些应用场景的共同之处是,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。

以前常用的MapReduce框架是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销

如果有一种方法,能将结果保存在内存当中,就可以大量减少IO消耗。RDD一种弹性分布数据集,就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理。

不同RDD之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的落地存储,大大降低了数据复制、磁盘IO和序列化开销。

二、RDD是什么?

一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段(HDFS上的块),并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。

RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集来创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和groupBy)而创建得到新的RDD。

RDD提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型的算子,前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。

两类操作的主要区别是,转换操作(比如map、filter、groupBy、join等)接受RDD并返回RDD,而行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果)。

三、RDD的执行过程

RDD开发执行

1、RDD读入外部数据源(或者内存中的数据集)进行创建;注意:RDD读取数据时,一般默认2个分区。

2、RDD经过一系列的“转换”操作,每一次都会产生不同的RDD,供给下一个“转换”使用;

3、最后一个RDD经“行动”操作进行处理,并输出到外部数据源,或者变成Scala/JAVA集合或变量。

值得注意的是,RDD采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。

RDD血缘依赖转换流程

RDD血缘关系

从数据输入,到逻辑上生成A和C两个RDD,经过一系列“转换”操作,逻辑上生成了F,也是一个RDD。之所以说是逻辑上,是因为这时候计算并没有发生,只是记录了RDD之间的生成和依赖关系。当F要进行输出时,也就是当F进行“行动”操作的时候,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算。

这一处理过程:称为一个“血缘关系(Lineage)”,即DAG拓扑排序的结果。

Spark采用惰性调用,通过血缘关系连接起来的一系列RDD操作就可以实现管道化(pipeline),避免了多次转换操作之间数据同步的等待,而且不用担心有过多的中间数据。

因为这些具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理。

同时,这种通过血缘关系就是把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑。

下面我们以具体代码来讲解RDD执行过程:

object SparkTest {
  def main(args: Array[String]): Unit = {
  val conf = new SparkConf.setAppName("SparkTest").setMaster("local[*]")
  val sparkContext = new SparkContext(conf)
  val line :RDD = sparkContext.textFile("filepath")
  val filt = line.filter(_.contains("spark"))
  val data = filt.cache().count()
  println(count)
  }
}

从上可以看出,一个Spark应用程序,基本是基于RDD的一系列计算操作。

第1行代码用于创建SparkContext对象,执行上下文环境;

第2行代码从文件中读取数据创建一个RDD;

第3行代码对读取的数据,返回的RDD进行转换操作得到一个新的RDD,即filterRDD;

filt.cache()表示对lines进行持久化,把它保存在内存或磁盘中。这里采用cache接口把数据集保存在内存中,方便后续重复使用。

注意:当数据被反复访问时,比如查询一些热点数据、或者运行迭代算法时,把数据缓存到内存中这是非常有用的。而且通过cache()可以缓存非常大的数据集,支持跨越几十甚至上百个节点;filt.count()是一个行动操作,用于计算一个RDD集合中包含的元素个数。

这个程序的执行过程如下:

  1. 创建这个Spark程序的执行上下文,即创建SparkContext对象;

  2. 从外部数据源中读取数据创建fileRDD对象;

  3. 构建起fileRDD和filterRDD之间的依赖关系,形成DAG图,这时候并没有发生真正的计算,只是记录转换的轨迹,也就是血缘依赖关系;

  4. 执行action代码时,count()是一个行动类型的RDD,触发真正的计算。开始执行从fileRDD到filterRDD的转换操作,并把结果持久化到内存中,最后计算出filterRDD中包含的元素个数。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多