周末的任务是更新Learning Spark系列第三篇和机器学习算法一篇,以为自己写不完了,但为了改正拖延症,还是得完成给自己定的任务啊 = =。这三章主要讲Spark的运行过程(本地 集群),性能调优以及Spark SQL相关的知识,如果对Spark不熟的同学可以先看看之前总结的两篇文章: 【原】Learning Spark (Python版) 学习笔记(一)----RDD 基本概念与命令 【原】Learning Spark (Python版) 学习笔记(二)----键值对、数据读取与保存、共享特性
########################################我是正文分割线######################################
第七章主要讲了Spark的运行架构以及在集群上的配置,这部分文字比较多,可能会比较枯燥,主要是讲整个过程是怎么运行的。首先我们来了解一下Spark在分布式环境中的架构,如图1 所示
图1 Spark分布式结构图
如上图所示,在Spark集群中有一个节点负责中央协调,调度各个分布式工作节点。这个中央协调点叫“驱动器节点(Driver)”,与之对应的工作节点叫“执行器节点(executor)”。驱动器节点和所有的执行器节点被称为一个Spark应用(Application)。Spark应用通过一个“集群管理器(Cluster Manager)”的外部服务在集群中的机器上启动,其中它自带的集群管理器叫“独立集群管理器”。 驱动器节点: 作用
职责
执行器节点: 作用:
职责:
集群管理器: 在图一中我们看到,Spark依赖于集群管理器来启动执行器节点,而在某些特殊情况下,也会依赖集群管理器来启动驱动器节点。Spark有自带的独立集群管理器,也可以运行在其他外部集群管理器上,如YARN和Mesos等。下面讲一下两种比较常见的外部集群管理器: 独立集群管理器:
Hadoop YARN:
Apache Mesos:
选择合适的集群管理器:
提交应用: 使用spark-submit脚本提交应用,可以根据不同的情况设置成在本地运行和在集群运行等:
总结一下Spark在集群上的运行过程:
#########################################我是看累了休息会儿的分割线##############################
前面已经讲完了Spark的运行过程,包括本地和集群上的。现在我们来讲讲Spark的调优与调试。 我们知道,Spark执行一个应用时,由作业、任务和步骤组成。先回顾一下:
在第一篇中我们也讲过,当我们创建转化(Transformation)RDD时,是执行'Lazy'(惰性)计算的,只有当出现Action操作时才会触发真正的计算。而Action操作是如何调用Transformation计算的呢?实际上,Spark调度器会创建出用于计算Action操作的RDD物理执行计划,当它从最终被调用Action操作的RDD时,向上回溯所有必需计算的RDD。调度器会访问RDD的父节点、父节点的父节点,以此类推,递归向上生成计算所有必要的祖先RDD的物理计划。 然而,当调度器图与执行步骤的对应关系并不一定是一对一的。当RDD不需要混洗数据就可以从父节点计算出来,RDD不需要混洗数据就可以从父节点计算出来,或把多个RDD合并到一个步骤中时,调度器就会自动进行进行'流水线执行'(pipeline)。例如下图中,尽管有很多级父RDD,但从缩进来看,只有两个步骤,说明物理执行只需要两个步骤。因为这个执行序列中有几个连续的筛选和映射操作,所以才会出现流水线执行。
当步骤图确定下来后,任务就会被创建出来并发给内部的调度器,这些步骤会以特定的顺序执行。一个物理步骤会启动很多任务,每个任务都是在不同的数据分区上做同样的事情,任务内部的流程是一样的,如下所示:
总结一下,Spark执行的流程:
Spark调优 到这里我们已经基本了解Spark的内部工作原理了,那么在哪些地方可以进行调优呢?有以下四个方面: 并行度
序列化格式
内存管理
当然,除了调整内存比例,也可以改变内存的存储顺序。我们知道,Spark默认的cache()操作是以Memory_ONLY的存储等级持久化数据的,也就是说内存优先。如果RDD分区时的空间不够,旧的分区会直接删除。(妹的删数据也不带打声招呼的 = =!)当用到这些分区时,又会重新进行计算。所以,如果我们用Memory_AND_DISK的存储等级调用persist()方法效果会更好。因为当内存满的时候,放不下的旧分区会被写入磁盘,再用的时候就从磁盘里读取回来,这样比重新计算各分区的消耗要小得多,性能也更稳定(不会动不动报Memory Error了,哈哈)。特别是当RDD从数据库中读取数据的话,最好选择内存 磁盘的存储等级吧。
硬件供给
##################################我是文章快结束的分割线######################################
最后我们来讲讲Spark SQL,上一篇中我们已经总结了如何使用Spark读取和保存文件,涉及到了这部分内容,所以这一篇中只会简要的说明一下: 导入Spark SQL与简单的查询示例 1 #初始化Spark SQL
2 #导入Spark SQL
3 from pyspark.sql import HiveContext,Row
4 #当不能引入Hive依赖时
5 from pyspark.sql import SQLContext,Row
6 #创建SQL上下文环境
7 hiveCtx = HiveContext(sc)
8 #基本查询示例
9 input = hiveCtx.jsonFile(inputFile)
10 #注册输入的SchemaRDD(SchemaRDD在Spark 1.3版本后已经改为DataFrame)
11 input.registerTempTable('tweets')
12 #依据retweetCount(转发计数)选出推文
13 topTweets = hiveCtx.sql('SELECT text,retweetCount FROM tweets ORDER BY retweetCount LIMIT 10')
缓存
读取和存储数据 Apache Hive 1 #使用Python从Hive中读取
2 from pyspark.sql import HiveContext
3
4 hiveCtx = HiveContext(sc)
5 rows = hiveCtx.sql('SELECT key,value FROM mytable')
6 keys = rows.map(lambda: row,row[0])
Parquet 1 #Python中的Parquet数据读取
2 #从一个有name和favoriteAnimal字段的Parquet文件中读取数据
3 rows = hiveCtx.parquetFile(parquetFile)
4 names = rows.map(lambda row: row.name)
5 print 'Everyone'
6 print names.collect()
7
8 #Python中的Parquet数据查询
9 #这里把Parquet文件注册为Spark SQL的临时表来查询数据
10 #寻找熊猫爱好者
11 tbl = rows.registerTempTable('people')
12 pandaFriends = hiveCtx.sql('SELECT name FROM people WHERE favouriteAnimal = \'panda\'')
13 print 'Panda friends'
14 print pandaFriends.map(lambda row:row.name).collect()
15
16 #使用saveAsParquetFile()保存文件
17 pandaFriends.saveAsParqueFile('hdfs://')
JSON 1 #在python中读取JSON数据 2 input= hiveCtx.jsonFile(inputFile)
使用BeeLine 创建、列举、查询Hive表
用户自定义函数(UDF) 1 #Python版本的字符串长度UDF
2 hiveCtx.registerFuction('strLenPython',lambda x :len(x),IntegerType())
3 LengthSchemaRDD = hiveCtx.sql('SELECT strLenPython('text') FROM tweets LIMIT 10')
Spark SQL性能 Spark SQL在缓存数据时,使用的是内存式的列式存储,即Parquet格式,不仅节约了缓存时间,而且尽可能的减少了后续查询中针对某几个字段时的数据读取。 性能调优选项
到这里,第七章-第九章的内容就全部总结完了,看完之后会对Spark的运行过程,性能调优以及存储格式等有一个更清晰的概念。下一篇是最后一篇,5.15更新,主要讲Spark Streaming和Spark MLlib机器学习的内容。顺便也可以和PySpark做机器学习做一下对比:D
|
|
来自: 昵称32165413 > 《文件夹1》