当spark程序中,存在过多的小任务的时候,可以通过 RDD.coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本,避免Shuffle导致,比RDD.repartition效率提高不少。
rdd.coalesce方法的作用是创建CoalescedRDD,源码如下:
def coalesce(numPartitions: Int, shuffle: Boolean = f...
spark-streaming实时接收数据并处理。一个非常广泛的需求是spark-streaming实时接收的数据需要跟保存在HDFS上的大量数据进行Join。要实现这个需求保证实时性需要解决以下几个问题:
1.spark-streaming的数据接收间隔往往很小,比如只有几秒钟。HDFS上的数据如果很大的话,不能每个接收batch都从HDFS读取数据,避免频繁大量磁盘I/O
2.HDFS大量...
本篇文章主要将Aggregate操作的时候的数据存储和实现过程...
Spark SQL物理计划要到Spark-core执行,需要将Spark SQL物理计划转化成RDD,并且建立RDD之间的依赖关系。这个过程可以通过如下图大概表示:
上图中绿色部分指Spark物理计划到RDD过程中数据结构的变迁过程。黄色部分表示变迁过程中,实现加工作用的数据结构。...
这几天遇到了需要从hdfs加载json字符串,然后转化成json对象的场景。刚开始的实现方式见如下代码:
val loginLogRDD = sc.objectFile[String](loginFile, loadLoginFilePartitionNum)
.filter(jsonString => {
//val loginItem = line.toString...
本文以KafkaDirectDStream方式为例说明Spark-Streaming checkpoint的原理
JobGenrerator.generateJobs负责Streaming Job的产生,产生并且提交执行Job之后,会发送DoCheckpoint事件,源码如下:
private def generateJobs(time: Time) {
// Set the...
在spark的首页ui上经常显示任务和Stage被skipped,如以下截图所式:
本文将阐述什么情况下Stage或者Task会被置为skipped,以及stage和task被值skipped之后是否spark application执行会出问题?
当一个Spark Job的ResultStage的最后一个Task成功执行之后,DAGScheduler.handleTaskCo...
Spark经常需要从hdfs读取文件生成RDD,然后进行计算分析。这种从hdfs读取文件生成的RDD就是HadoopRDD。那么HadoopRDD的分区是怎么计算出来的?如果从hdfs读取的文件非常大,如何高效的从hdfs加载文件生成HadoopRDD呢?本篇文章探讨这两个问题。
SparkContext.objectFile方法经常用于从hdfs加载文件,从加载hdfs文件到生成Had...
RDD在调用引起Shuffle的方法的时候,如果没有显示指定ShuffledRDD的分区,那么会调用Partitioner.defaultPartitioner方法来确定ShuffledRDD的分区,比如RDD.combineByKey:
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombi...
Spark-Streaming Job的生成和执行可以通过如下图表示:
Spark-Streaming Job的生产和和执行由以下3个部分相互作用生成:
Driver程序:用户通过编写Driver程序描述了DStream的依赖关系,Driver程序根据DStream描述的依赖关系描述了RDD的依赖关系,也就是描述了Streaming Job的逻辑执行图
Spark-Streaming...
DAGScheduler.submitStage建立Spark应用的物理执行图,DAGScheduler.submitStage通过调用DAGSchdeuler.getMissingParentStages找到一个Stage的祖宗Stage并把祖宗Stage加入到物理执行图中。在这里如果发现依赖的RDD的全部分区已经存储到了BlockManager,也就是已经成功Cache,那么这个RDD以及它的...
如下时序图表示了RDD.persist方法执行之后,Spark是如何cache分区数据的。时序图可放大显示
本篇文章中,RDD.persist(StorageLevel)参数StorageLevel为:MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
也就是cache数据的时候,如...
如下时序图表示了RDD.persist方法执行之后,Spark是如何cache分区数据的。
本篇文章中,RDD.persist(StorageLevel)参数StorageLevel为:MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
也就是cache数据的时候,如果有足够的内存则将数据cache...
本篇文章以RDD.aggregateByKey引起的SortShuffleWriter为例说明Shuffle map端的原理和实现,为了便于说明问题这里的所有执行流程都是默认执行流程
为了便于说明问题,本文中Stage1是shuffle map操作所在的Stage,Stage2是shuffle reduce操作所在的Stage,本文中spark.shuffle.blockTransf...
本篇文章以RDD.aggregateByKey引起的SortShuffleWriter为例说明Shuffle map端的原理和实现...
Spark根据RDD间的依赖关系是否是Shuffle依赖进行Stage的划分,先执行的Stage标记为Stage1,后执行的Stage标记为Stage2。Shuffle是Stage分2步操作
Map操作和Recude操作可以通过下面这个图表示出来:
1. Map操作。Map操作在Stage1结束的时候执行;Map操作的作用是将Stage1阶段的一个pa...
Spark中RDD依赖的类关系如下图:...
KafkaRDD分区个数的确定和每个分区数据接收的计算在KafkUtils.createDirectStream创建了DirectDStream,代码如下:def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag]...
SparkContext是Spark Application程序的表示。在Driver程序中首先创建SparkContext对象,在创建这个对象的时候,Spark Application运行需要的重要参数会在这里初始化。下面的图表述了SparkContext创建初始化的重要参数。
DAGSchedule的作用:Spark Stage的切分等功能,它主要描述了Spark Appli...
上一讲主要降到了spark executor资源在Master的分配原理。今天来讲Spark Executor的创建和启动过程。创建的过程可以功过如下时序图表示:
在Standalone模式下,Backend.start()方法最终调用了SparkDeploySchedulerBackend.start(),这个方法的作用是:
1. 调用父类的CoarseGrainedSchedu...
|
|