作者:@古明地盆 楔子这次我们来聊一聊 Spark,它是一款基于内存的并行计算框架,现在大数据公司很多都是采用 Spark 框架。在之前介绍 MapReduce 的时候,我们说相比 Spark,MapReduce是比较鸡肋的,那么 Spark 到底有哪些优点呢?就让我们一起来学习吧。
关于 Spark,它是基于 Scala 语言编写的,但我本人是 Python 方向的,因此我会基于 Python 来进行编程。Python 操作 Spark 使用的模块是 PySpark,直接 pip 安装即可。 然后我们来安装 Spark 框架,不过 Spark 只是用来做纯计算的,它不具备数据存储功能,因此我们还需要依赖 HDFS。但是 Spark 不仅仅可以从 HDFS 上读取数据,它支持很多种数据源,比如:本地文件、S3,甚至是MySQL,只不过我们一般都是 HDFS。
好了,下面我们去官网下载 Spark,Spark 是 Apache 的顶级项目,所以它的官网是 spark.。我们直接去 http://spark./downloads.html 下载即可,我这里下载的是 spark-2.4.3-bin-hadoop2.7.tgz,上传到我的阿里云服务器,解压到 /opt 目录下。 然后将 bin 目录配置进环境变量,我们的安装就算完成了,没错就是这么简单。我们接下来要使用 Python 去操作 Spark,所以我们还需要安装相应的模块。
以上操作执行完毕后,我们便可以开始 Spark 的学习了。 Spark核心之RDD首先我们来介绍一下 RDD,它是 Spark 的核心。 什么是 RDD?RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心。尽管现在都使用 DataFrame、Dataset 进行编程,但是它们的底层依旧是依赖于RDD的。我们来解释一下 RDD 的这几个单词含义。
RDD 是 Spark 的一个最基本的抽象 (如果你看一下源码的话,你会发现RDD在底层是一个抽象类,抽象类显然不能直接使用,必须要继承它然后实现它内部的一些方法后才可以使用),它代表了不可变的、元素的分区(partition)集合,这些分区可以被并行操作。假设我们有一个包含 300 万个元素的数组,那么我们就可以将这个数组分成 3 份,每一份对应一个分区,每个分区都可以在不同的机器上进行运算,这样就能提高运算效率。 RDD 支持很多操作,比如:map、filter 等等,我们后面会慢慢介绍。当然,RDD在 Spark 的源码是一个类,但是我们后面有时候会把 RDD 和 RDD实例对象 都叫做 RDD,没有刻意区分,心里面清楚就可以啦。 RDD特性RDD有如下五大特性:
图解RDDSpark 在运行的时候,每一个计算任务就是一个 Task,另外:对于RDD而言,不是一个RDD计算对应一个 Task,而是RDD内部的每一个分区计算都会对应一个 task。假设这个RDD具有5个分区,那么对这个RDD进行一个map操作,就会生成5个 Task。另外,分区的数据是可以进行 persist(持久化)的,比如:内存、磁盘、内存 磁盘、多副本、序列化。 SparkContext和SparkConf在介绍RDD之前,我们需要了解一下什么是 SparkContext 和 SparkConf,因为我们肯定要先连接到 Spark 集群,才可以创建 RDD 进行编程。 SparkContext 是 pyspark 的编程入口,作业的提交,任务的分发,应用的注册都会在SparkContext 中进行。一个 SparkContext 实例对象代表了和 Spark 的一个连接,只有建立的连接才可以把作业提交到 Spark 集群当中去。实例化了 SparkContext 之后才能创建RDD、以及我们后面会介绍的 Broadcast 广播变量。 SparkConf是用来设置配置的,然后传递给SparkContext。 对于创建一个SparkContext对象,首先我们可以通过pyspark模块来创建: from pyspark import SparkContextfrom pyspark import SparkConf# setAppName是设置展示在webUI上的名字,setMaster表示运行模式# 但是我们目前是硬编码,官方推荐在提交任务的时候传递。当然我们后面说,现在有个印象即可conf = SparkConf().setAppName('satori').setMaster('local')# 此时我们就可以实例化出来一个SparkContext对象了,传递SparkConf对象sc = SparkContext(conf=conf)# 我们就可以使用sc来创建RDD# 总之记住:SparkContext是用来实例化一个对象和spark集群建立连接的# SparkConf是用来设置一些配置的,传递给SparkContext 其次我们通过shell进行操作,我们直接终端输入 pyspark: 当我们启动之后,输入 sc,我们看到 pyspark shell 直接为我们创建了一个默认的 SparkContext 实例对象,master叫做local[*](*表示使用计算机所有的核),appName 叫做 PySparkShell。 我们在介绍 RDD 相关操作的时候,会先使用 shell 的方式进行演示,当然使用 py 脚本编程的时候也是一样的。另外,pyspark 使用的是原生的 CPython 解释器,所以像 numpy、pandas 之类的包,原生 Python 交互式中可以导入的,在 pyspark shell 里面也是可以导入的。 另外,pyspark 默认启动的时候使用的是 Python2,显然我们需要将其改为 Python3。修改 # 加入如下内容, 如果没有设置环境变量, 则需要输入 python3 的完整路径export PYSPARK_DRIVER_PYTHON=python3export PYSPARK_PYTHON=python3 当然 spark-env.sh 这个文件默认是没有的,但是有一个 spark-env.sh.template,直接 cp 一份即可。 然后我们再执行 pyspark 的时候,使用的就是 python3 了。 另外我们在启动 pyspark shell 可不可以手动指定master和name呢?答案显然是可以的,我们来试一下。 我们看到我们在创建的时候手动设置的 master 和 name 生效了,我们再通过 webUI 来看一下,pyspark 的 webUI 默认是4040。 里面包含了很多很多的属性,可以仔细看一下。 创建RDD我们说 RDD 是 Spark 的核心,那么如何创建一个RDD呢?答案显然是通过SparkContext 实例对象,因为上面已经说了。你可以通过编写 py 文件的方式(我们后面会说)手动创建一个 SparkContext 实例对象,也可以通过启动 pyspark shell,直接使用默认为你创建好的,对,就是那个sc。由于 SparkContext 实例对象操作方式都是一样的,所以我们目前就先使用 pyspark shell 来进行编程。后面我们会说如何通过编写脚本的方式进行 Spark 编程,以及作业如何提交到 spark 上运行。 通过sc(为了方便,sc就代指了SparkContext实例对象)创建RDD有两种方式。
下面我们就来代码操作如何创建 RDD,注意:现在我们是在 pyspark shell 中进行操作的。所以sc是创建好的,不要看到了sc觉得纳闷,为什么变量没定义就可以使用;还有由于是交互式环境,我们也不需要 print,如果返回值不为 None,会自动打印。 从已经存在的集合创建可以将一个已存在的集合(这里的集合指的是Python中的元组、列表、集合等容器)。 >>> data = range(10)>>> rdd1 = sc.parallelize(data) # 调用sc.parallelize方法,可以将已经存在的集合转为RDD>>> datarange(0, 10)>>> rdd1 # 输出得到的是一个RDD对象PythonRDD[1] at RDD at PythonRDD.scala:53>>> rdd1.collect() # 如果想输出的话,调用collect方法,这些后面会说。[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> # 进行map操作得到rdd2>>> rdd2 = rdd1.map(lambda x: x 1)>>> rdd2.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]>>> # 进行reduce操作>>> rdd2.reduce(lambda x, y: x y)55>>> # 这些RDD相关的操作函数我们后面会说,但是从python的内置函数map、reduce显然也能明白是干什么的 我们看一下web界面。 上面显示了三个 Job(Job是什么后面会说),为什么是三个,我们也后面再说。此外我们看下表格中的最后一列,我们看到蓝色长条上面写着 2/2,这表示每个 RDD 默认是两个分区,当然我们通过 parallelize 创建 RDD 的时候也可以指定分区。 >>> rdd3 = sc.parallelize(data, 5) # 指定5个分区>>> rdd3.collect() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]>>> 虽然结果没有变化,但是我们来看一下web界面。 我们看到又多了一个 Job,该 Job 上的 Task 是 5,因为我们指定了 5 个分区,而每个分区都会对应一个 Task。因为分区可大可小,如果每一个节点的 CPU 只执行一个分区可能有点浪费,如果跑的快的、或者分区的数据集比较少的,很快就跑完了,那么容易造成资源浪费,因此 Spark 官方建议每个 CPU 对应 2 到 4 个分区,这样可以把资源充分利用起来。至于具体设置多少个,这个就取决于实际项目、以及规定的处理时间、节点对应的机器性能等等,所以如果你根据业务找到了比较好的分区个数,那么就传递给 parallelize 的第二个参数即可。 从存储系统里面的文件创建我们还可以读取存储系统里面的文件来创建 RDD,我们演示一下从本地读取文件、和从 HDFS 上读取文件。 在本地创建一个 vtuber.txt,内容如下,并上传到 HDFS 上面。 >>> # 读取文件使用textFile,接收一个文件路径,当然同时也可以指定分区>>> # 我们可以从本地读取,读取的格式为'file://文件路径'>>> rdd1 = sc.textFile('file:///root/vtuber.txt')>>> rdd1.collect() # 我们看到默认是以\n分隔的['hello matsuri', 'hello mea', 'hello nana', 'hello mana']>>> >>> # 从hdfs上读取,格式为'hdfs://ip:port文件路径',port就是hdfs集群上的端口,就是你在core-site.xml里面设置的>>> rdd2 = sc.textFile('hdfs://localhost:9000/vtuber.txt', 4)>>> rdd2.collect()
['hello matsuri', 'hello mea', 'hello nana', 'hello mana']
>>>>>> rdd3 = rdd1.map(lambda x: x.split())>>> rdd3.collect()
[['hello', 'matsuri'], ['hello', 'mea'], ['hello', 'nana'], ['hello', 'mana']]>>> 我们看到通过 textFile 读取外部文件的方式创建 RDD 也是没有问题的,但是需要注意的是:如果你是 Spark 集群,并且还是通过本地文件的方式,那么你要保证该文件在所有节点上都存在。 我目前都是单节点的,当然对于学习来讲单节点和多节点都是差不多的,不可能因为用的多节点,语法就变了,只是多节点在操作的时候要考虑到通信、资源等问题。比如:我们这里读取的是本地的 /root/vtuber.txt,这就表示访问本地的 /root/vtuber.txt 文件,如果你搭建的是集群,那么你要保证每个节点都存在 /root/vtuber.txt,否则节点根本获取不到这个数据。所以在学习语法的时候我个人不建议搭建 Spark 集群(也就是所谓的standalone模式),公司生产上面也很少使用这种模式,当然不是没有,只是很少,绝大部分都是跑在yarn上面的。关于 Spark 的运行模式,资源管理以及调度、我们后面也会慢慢聊。 因此对于 Spark 集群而言,解决办法就是把文件拷贝到每一个节点上面,或者使用网络共享的文件系统。 另外 textFile 不光可以读取文件,还可以读取目录:/dir、模糊匹配:/dir/*.txt、以及读取gz压缩包都是支持的。 除了 textFile,还可以使用 wholeTextFiles 读取目录。
>>> sc.wholeTextFiles('hdfs://localhost:9000/').collect() [('hdfs://localhost:9000/satori.txt', 'hello matsuri\nhello mea\nhello nana\nhello mana\n')]>>> # 我这里/目录下面只有一个文件,把文件内容全部读取出来了 我们现在知道如何读取文件转化为 RDD,那么我们如何将 RDD 保存为文件呢?可以使用 saveAsTextFile。 >>> data = [1, 2, 3, 4, 5]>>> rdd = sc.parallelize(data)>>> rdd1 = rdd.map(lambda x: f'夏色祭{x}号')>>> # 默认是本地,当然也可以指定 file://>>> rdd1.saveAsTextFile('/root/a.txt')>>> # 保存到hdfs上面>>> rdd1.saveAsTextFile('hdfs://localhost:9000/a.txt') 但是我们发现保存的 a.txt 并不是一个文件,并不是说把整个rdd都保存一个文件,这个是由你的分区决定的。保存的是一个目录,文件在目录里面,我们看到有两部分,因为是两个分区。 >>> data = [1, 2, 3, 4, 5]>>> rdd = sc.parallelize(data, 5) # 这里我们创建rdd的时候,指定5个分区>>> rdd1 = rdd.map(lambda x: f'夏色祭{x}号')>>> # 保存为b.txt,显然这个b.txt是个目录>>> rdd1.saveAsTextFile('/root/b.txt')>>> rdd1.saveAsTextFile('hdfs://localhost:9000/b.txt') 结果跟我们预想的是一样的,有多少个分区就会有多少个 part,因为 Spark 是把每个分区单独写入一个文件里面。至于 HDFS 我们就不用演示了,一样的,算了还是看看吧。 Spark应用程序开发以及运行我们目前是通过 pyspark shell 进行操作的,显然这仅仅是用来做测试使用的,我们真正开发项目肯定是使用 ide 进行操作的 (vim、notepad 也当成是ide,Σ(⊙▽⊙'a)。下面我们就来看看如何使用 Python 开发一个Spark 应用程序,并且运行它。这里我在 Windows 上使用 PyCharm 开发,注意:但是 Python 解释器配置的我阿里云上 Python3,PyCharm是支持这个功能的,远程连接服务器上的 Python 环境,所以我们在Windows上操作的 Python是 Linux 上的 Python。 import osimport platform
print(os.name) # posixprint(platform.system()) # Linuxprint(os.listdir('/'))'''
['home', 'run', 'tmp', 'opt', 'usr', 'lost found', 'srv', 'lib', '.autorelabel', ...]
''' 还有一种简便的方法,你在服务器上启动一个 jupyter notebook,然后再 Windows 上通过浏览器打开、输入 token 远程连接也是可以的。当然如果需要编写的 py 文件比较多就不推荐了,如果只是学习的话还是可以的。 from pyspark import SparkContextfrom pyspark import SparkConf# 创建SparkConf实例:设置的是 Spark 相关的参数信息# 我们这里只设置 appName,master默认就好,当然名字设置不设置也无所谓啊conf = SparkConf().setAppName('matsuri')# 我们可以传入conf,创建SparkContext对象。但master、appName 是可以在 SparkContext 里面单独设置的# 当然官方不推荐这种硬编码的模式,而是通过提交任务的时候指定,所以这里的 conf 我们就不传递了sc = SparkContext() data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data)# 不在shell里面了,我们需要print才能看到结果print(rdd.collect()) # [1, 2, 3, 4, 5]# 好的习惯,编程结束之后stop掉,表示关闭与spark的连接# 否则当你再次创建相同的SparkContext实例的时候就会报错# 会提示你:Cannot run multiple SparkContexts at once; existing SparkContext(app=satori, master=local[*]sc.stop() 然后我们提交到spark上面运行,当然你可以通过 python3 xxx.py 的方式执行,但最好的做法是提交到 Spark 上运行。
上面的代码我们起名为 1.py,然后提交作业:spark-submit --master local[*] --name 夏色祭 1.py。 我们提交之后,执行是成功了的,但是输出的东西灰常多,程序的结果就隐藏在中间。 那么问题来了,如果我有很多文件怎么办?要是标准库里面的包我们可以导入,但如果是我们自己写的依赖怎么提交呢?首先多个文件 (目录)里面一定存在一个启动文件,用来启动整个程序。假设这个启动文件叫start.py(当然启动文件一定在项目的最外层,如果在项目的包里面,那么它就不可能成为启动文件),那么把除了 start.py 的其它文件(目录)打包成一个 zip 包或者 egg,假设叫做 dependency.egg,那么执行的时候就可以这么执行:
如果我们写的程序需要从命令行中传递参数,那么直接跟在start.py(启动文件)后面就行。 关于输出结果,我们只截取了一部分,详细信息可以自己慢慢查看。以及 spark-submit 支持的其它参数,也可以通过 spark-submit --help 来查看,不过很多都用不到,因为 spark-submit 不仅可以提交python程序,还可以提交java等其它程序,里面的很多参数是为其它语言编写的程序准备的,python用不到。 RDD相关操作我们已经知道如何创建一个 RDD、以及使用 python 开发 Spark 程序并提交运行,那么下面我们来看看 RDD 都能进行哪些操作。我们读取数据转成 RDD 之后肯定是要进行操作的,我们之前看到了map、reduce、collect等操作,但是除了这些,RDD还支持很多其他的操作,我们来看一下。 RDD的操作分为两种:transformation和action。
直接看可能不好理解,我们来举个例子。我们对一个RDD进行 map 操作得到了新的RDD,但是这个RDD它并不是具体的值。我们对RDD进行 collect 操作的时候,才会把值返回回来。实际上,所有的transformation都是惰性的,意思是我们进行map操作的时候,RDD只是记录了这个操作,但是它并没有具体的计算,当我们进行collect求值的时候才会真正的开始进行计算。 >>> data = [1, 2, 3, 4, 5]>>> rdd = sc.parallelize(data)>>> rdd1 = rdd.map(lambda x: str(x) '~~~')>>> rdd2 = rdd1.map(lambda x: '~~~' x)>>> >>> rdd2.collect()
['~~~1~~~', '~~~2~~~', '~~~3~~~', '~~~4~~~', '~~~5~~~']>>> 我们对 rdd 进行操作得到 rdd1,rdd1 得到 rdd2,像这种对一个 RDD 操作得到新的 RDD 的过程我们称之为transformation,它是惰性的 下面我们就来举例说明RDD的相关操作: mapmap:接收一个函数,会对RDD里面每一个分区的每一个元素都执行相同的操作。话说,能用pyspark的编程的,我估计这些说了都是废话。因此如果有些函数和python的内置函数比较类似的,我就不说那么详细了。 >>> rdd1 = sc.parallelize([1, 2, 3, 4, 5])>>> # 给里面每一个元素都执行加1的操作>>> rdd1.map(lambda x: x 1).collect() [2, 3, 4, 5, 6] filterfilter:类似Python中的filter,选择出符合条件的 >>> numbers = [1, 2, 3, 4, 5, 6, 7, 8]>>> rdd = sc.parallelize(numbers)>>> rdd.filter(lambda x: x > 3).collect()
[4, 5, 6, 7, 8]>>> >>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4, 6, 8] flatMapflatMap:和map不同的是,map是输出一个值返回一个值,而flatMap是输入一个值,返回一个序列、然后将这个序列打开,我们举例说明。 >>> word = ['satori']>>> # 函数接收什么,返回什么,所以还是原来的结果>>> sc.parallelize(word).map(lambda x: x).collect() ['satori']>>> # 接收一个值,返回一个序列,然后会自动将这个序列打开>>> sc.parallelize(word).flatMap(lambda x: x).collect() ['s', 'a', 't', 'o', 'r', 'i']>>> >>> # split之后是一个列表,对于map,那么返回的就是列表>>> words = ['hello mashiro', 'hello satori']>>> sc.parallelize(words).map(lambda x: x.split(' ')).collect() [['hello', 'mashiro'], ['hello', 'satori']]>>> # 但对于flatMap来说,会将这个列表打开>>> sc.parallelize(words).flatMap(lambda x: x.split(' ')).collect() ['hello', 'mashiro', 'hello', 'satori']>>> 所以从名字上看,flatMap相比map多了一个flat,也是很形象的,flat表示平的,操作上就是直接将列表打开,不再嵌套。另外我们看到我们将很多操作都写在了一行,这是没有问题的,如果操作比较多,我们鼓励写在一行,这叫做链式编程。当然如果为了直观,你也可以分为多行来写,反正transformation也是懒加载。 groupByKeygroupByKey:这个语言表达有点困难,我们直接看一个例子。 >>> val = [('a', 'hello'), ('a', 'how are you'), ('b', 'who am i'), ('a', 4)]>>> rdd = sc.parallelize(val)>>> >>> rdd.groupByKey().collect()
[('b', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe37b8>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3630>)]>>> rdd.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
[('b', ['who am i']), ('a', ['hello', 'how are you', 4])]>>> 我们看到使用groupByKey的rdd,是一个由 >>> words = ['hello mashiro', 'hello world', 'hello koishi']>>> rdd = sc.parallelize(words)>>> # 先进行分隔>>> rdd1 = rdd.flatMap(lambda x: x.split(' '))>>> rdd1.collect() ['hello', 'mashiro', 'hello', 'world', 'hello', 'koishi']>>> # 给每个词都标上一个1,因为它们每个词都出现了1次>>> rdd2 = rdd1.map(lambda x: (x, 1))>>> rdd2.collect() [('hello', 1), ('mashiro', 1), ('hello', 1), ('world', 1), ('hello', 1), ('koishi', 1)]>>> >>> # 使用groupByKey将值相同的汇聚到一起>>> rdd3 = rdd2.groupByKey()>>> rdd3.collect() [('mashiro', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3828>), ('world', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3128>), ('koishi', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3c50>), ('hello', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3470>)]>>> # 变成list对象>>> rdd4 = rdd3.map(lambda x: (x[0], list(x[1])))>>> rdd4.collect() [('mashiro', [1]), ('world', [1]), ('koishi', [1]), ('hello', [1, 1, 1])]>>> # 进行求和,即可得到每个词出现的次数。当然求和的话可以直接使用sum,没必要先变成list对象>>> rdd5 = rdd4.map(lambda x: (x[0], sum(x[1])))>>> rdd5.collect() [('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]>>> >>> 还记得之前说的链式编程吗?其实这个词频统计很简单,工作上是没必要写这么多行的。 >>> words = ['hello mashiro', 'hello world', 'hello koishi']>>> rdd = sc.parallelize(words)>>> rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).groupByKey().map(lambda x: (x[0], sum(x[1]))).collect()
[('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)] 所以groupByKey非常适合词频统计,这里面不接收参数,调用这个方法RDD需要是一个列表或者元组、里面嵌套多个列表或者元组 reduceByKey调用reduceByKey方法的rdd对应的数据集和groupByKey是一样的,我们一旦看到ByKey,就应该想到序列里面的元素要是一个有两个元素的序列,然后第一个元素相同的分发到一起。但是它和groupByKey不同的是,groupByKey不接收参数,然后直接把第一个元素相同聚合在一起,而reduceByKey会比groupByKey多一步,因为它需要接受一个函数,会自动将分发到一起的值 >>> words = ['hello mashiro', 'hello world', 'hello koishi']>>> rdd = sc.parallelize(words)>>> rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).groupByKey().map(lambda x: (x[0], sum(x[1]))).collect() [('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]>>> >>> rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x y).collect() [('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)] 和groupByKey对比的话,还是很清晰的。 sortByKeysortByKey:从名字能看出来,这个是排序用的,按照索引为0的元素进行排序。 >>> words = [('c', 2), ('a', 1), ('b', 3)]>>> rdd = sc.parallelize(words)>>> >>> rdd.sortByKey().collect()
[('a', 1), ('b', 3), ('c', 2)]>>> >>> rdd.sortByKey(False).collect()
[('c', 2), ('b', 3), ('a', 1)]>>> # 把元祖里面的两个元素想象成字典的key: value,ByKey自然是根据Key来进行操作>>> # 可显然我们是想根据value来进行排序,根据出现次数多的进行排序。所以我们可以先交换顺序,排完了再交换回来>>> rdd.map(lambda x: (x[1], x[0])).sortByKey().map(lambda x: (x[1], x[0])).collect()
[('a', 1), ('c', 2), ('b', 3)]>>> rdd.map(lambda x: (x[1], x[0])).sortByKey(False).map(lambda x: (x[1], x[0])).collect()
[('b', 3), ('c', 2), ('a', 1)]>>> # 默认从小到大排,False则表示逆序、从大到小排 unionunion:合并两个RDD >>> rdd1 = sc.parallelize([1, 2, 3])>>> rdd2 = sc.parallelize([11, 22, 33])>>> # 很简单,就是将两个RDD合并>>> rdd1.union(rdd2).collect() [1, 2, 3, 11, 22, 33]>>> # 甚至和自身做union也是可以的>>> rdd1.union(rdd1).collect() [1, 2, 3, 1, 2, 3] distinctdistinct:去重,我们看到这有点像sql啊。其实spark还支持spark sql、也就是写sql语句的方式进行编程。我们后面、或者下一篇博客会说。 >>> rdd = sc.parallelize([11, 11, 2, 22, 3, 33, 3]).distinct()>>> # 不过去重之后貌似没什么顺序了>>> rdd.collect()
[2, 22, 11, 3, 33] joinjoin:熟悉sql的估计肯定不陌生,join有以下几种:inner join、left join、right join、outer join。这个操作join的RDD和xxxByKey对应的RDD应该具有相同的数据格式,对,就是[(x1, y1), (x2, y2)...]这种格式。
>>> rdd1 = sc.parallelize([('name', '古明地觉'), ('age', 16), ('gender', 'female')])>>> rdd2 = sc.parallelize([('name', '古明地恋'), ('age', 15), ('place', '东方地灵殿')])>>> >>> # join默认是内连接,还是想象成key: value,把两个RDD的key相同的汇聚在一起>>> # 如果不存在相同的key,那么舍弃>>> rdd1.join(rdd2).collect() [('name', ('古明地觉', '古明地恋')), ('age', (16, 15))]>>> >>> # 以左RDD为基准,如果右RDD没有与之匹配的则为None,比如rdd1的'gender'在rdd2中不存在,所以置为None>>> rdd1.leftOuterJoin(rdd2).collect() [('name', ('古明地觉', '古明地恋')), ('gender', ('female', None)), ('age', (16, 15))]>>> >>> # 同理以右RDD为基准,当然啦,顺序还是从左到右的,里面的元素显示rdd1的元素,再是rdd2的元素>>> rdd1.rightOuterJoin(rdd2).collect() [('name', ('古明地觉', '古明地恋')), ('age', (16, 15)), ('place', (None, '东方地灵殿'))]>>> >>> # 全连接,不用我说了>>> rdd1.fullOuterJoin(rdd2).collect() [('name', ('古明地觉', '古明地恋')), ('gender', ('female', None)), ('age', (16, 15)), ('place', (None, '东方地灵殿'))] zipzip:类似于python中的zip,但是要求两个RDD的元素个数以及分区数必须一样。 >>> rdd1 = sc.parallelize(['a', 'b', 'c'])>>> rdd2 = sc.parallelize([1, 2, 3])>>> >>> rdd1.zip(rdd2).collect()
[('a', 1), ('b', 2), ('c', 3)]>>> zipWithIndexzipWithIndex:对单个RDD操作的,会给每个元素加上一层索引,从0开始自增。 >>> rdd1 = sc.parallelize(['a', 'b', 'c'])>>> rdd1.zipWithIndex().collect() [('a', 0), ('b', 1), ('c', 2)] 以上就是一些常用的transformation操作,我们说RDD转换得到新的RDD这个过程叫做transformation,它是惰性的,只是记录了操作,但是并没有立刻进行计算。当遇到action操作时 mapPartitionsmapPartitions:这个是对每一个分区进行map >>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)>>> # 函数参数x不再是rdd的每一个元素,而是rdd的每一个分区>>> # 这个不能写return,要写yield,或者返回一个可迭代的对象,会自动获取里面的所有元素>>> def f(x): yield sum(x)... >>> # 三个分区,显然一个分区两个元素,那么会把每个分区的所有元素进行相加>>> rdd.mapPartitions(f).collect()
[3, 7, 11]
>>> # sum(x)不是一个可迭代的,我们需要放在一个列表里面,或者定义函数使用yield也行>>> # 会自动遍历返回的可迭代对象,把元素依次放到列表里面>>> rdd.mapPartitions(lambda x: [sum(x)]).collect()
[3, 7, 11] mapPartitionsWithIndexmapPartitionsWithIndex:还是对每一个分区进行map,但是会多出一个索引 >>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)>>> rdd.mapPartitionsWithIndex(lambda index, x: (index, sum(x))).collect() [0, 3, 1, 7, 2, 11] 列表中的0 1 2表示分区索引。 intersectionintersection:union是将两个RDD合并,其实是取两者的并集,intersection则是取交集,subtract则是取差集。 >>> rdd1 = sc.parallelize([1, 2, 3])>>> rdd2 = sc.parallelize([1, 22, 3])>>> rdd1.intersection(rdd2).collect()
[1, 3]>>> rdd1.subtract(rdd2).collect()
[2] sortBysortBy:我们之前说过sortByKey会默认按照key来排序,sortBy需要我们自己指定,可以按照key也可以按照value >>> rdd = sc.parallelize([('a', 1), ('c', 2), ('b', 3)])>>> rdd.sortBy(lambda x: x[0]).collect() [('a', 1), ('b', 3), ('c', 2)]>>> rdd.sortBy(lambda x: x[1]).collect() [('a', 1), ('c', 2), ('b', 3)]>>> >>> rdd.sortBy(lambda x: x[0], False).collect() [('c', 2), ('b', 3), ('a', 1)]>>> rdd.sortBy(lambda x: x[1], False).collect() [('b', 3), ('c', 2), ('a', 1)]>>> coalescecoalesce:改变RDD的分区数。分区数会影响作业的并行度,因此会视作业的具体情况而定。这个方法第一个参数接收要改变的分区个数,第二个参数是shuffle,默认为False,表示重新分区的时候不进行shuffle操作,此时效率较高;如果指定为True,表示重分区的时候进行shuffle操作,此时效果等价于下面要介绍的repartition,效率较低。关于什么是shuffle操作,我们后面会说。 >>> rdd = sc.parallelize(range(10), 5)>>> # 使用该函数可以查看分区数>>> rdd.getNumPartitions()5>>> # 改变分区数,变成3>>> rdd1 = rdd.coalesce(3)>>> rdd1.getNumPartitions()3>>> # 分区数只能变少,不能变多>>> rdd2 = rdd1.coalesce(4)>>> rdd2.getNumPartitions()3>>> repartitionrepartition:该方法也是对RDD进行重新分区,其内部使用shuffle算法,并且分区可以变多、也可以变少,如果是减少分区数,那么推荐使用coalesce。 >>> rdd = sc.parallelize([1, 2, 3, 4])>>> rdd1 = rdd.repartition(4)>>> rdd1.getNumPartitions()4>>> rdd1.repartition(2).getNumPartitions()2>>> flatMapValuesflatMapValues:和groupByKey相反,我们看个栗子就清楚了。 >>> rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 2), ('a', 3), ('b', 2)])>>> rdd1 = rdd.groupByKey().map(lambda x: (x[0], list(x[1])))>>> rdd1.collect()
[('b', [1, 2]), ('a', [1, 2, 3])]>>> # 所以它个groupByKey是相反的,这里面一般写lambda x: x>>> rdd1.flatMapValues(lambda x: x).collect()
[('b', 1), ('b', 2), ('a', 1), ('a', 2), ('a', 3)] groupBygroupBy:之前的groupByKey默认是按照相同的key进行聚合,这里则可以单独指定,并且里面序列里面的元素可以不再是元组,普通的整型也是可以的。 >>> rdd = sc.parallelize([12, 'a', 'ab', '1', 23, 'xx'])>>> # 将里面的元素变成str之后,长度大于1的分为一组,小于等于1的分为一组>>> rdd.groupBy(lambda x: len(str(x))>1).collect() [(False, <pyspark.resultiterable.ResultIterable object at 0x7f4c4f40f5c0>), (True, <pyspark.resultiterable.ResultIterable object at 0x7f4c4f40f048>)] >>>>>> rdd.groupBy(lambda x: len(str(x))>1).map(lambda x: (x[0], list(x[1]))).collect() [(False, ['a', '1']), (True, [12, 'ab', 23, 'xx'])] keyBykeyBy:看例子就能理解,其实很多方法我们完全可以用已经存在的来替代。 >>> rdd = sc.parallelize([1, 2, 3])>>> rdd.keyBy(lambda x: f'hello_{x}').collect()
[('hello_1', 1), ('hello_2', 2), ('hello_3', 3)]>>> >>> rdd.map(lambda x: (f'hello_{x}', x)).collect()
[('hello_1', 1), ('hello_2', 2), ('hello_3', 3)] 可以看到keyBy就是将函数返回的元素和原来的元素组合成一个二元tuple,这个我们完全可以使用map来替代,或许keyBy简单了那么一点点,但是说实话我个人还是习惯用map。其实一些api如果没有什么不可替代性、或者无法在很大程度上简化工作量的话,我觉得记太多反而是个负担。 keys和valueskeys:获取所有的key。values:获取所有的value。我们这里的key和value都指的是二元tuple里面的两个元素。其实RDD对应的数据类型无非两种,一种是对应的列表里面都是整型或者字符串的RDD,另一种是里面都是二元tuple >>> rdd = sc.parallelize([('a', 1), ('b', 'a'), ('c', 'c')])>>> rdd.keys().collect() ['a', 'b', 'c']>>> rdd.values().collect() [1, 'a', 'c'] glomglom:将每一个分区变成一个单独的列表 >>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)>>> rdd.glom().collect()
[[1, 2], [3, 4], [5, 6]]>>> pipepipe:将RDD里面的每一个元素都执行相同的linux命令 >>> rdd = sc.parallelize(['hello', 'hello1', 'hello2'], 3)>>> rdd.pipe('cat').collect() ['hello', 'hello1', 'hello2']>>> # 1 1 6表示1行、1个单词、6个字符>>> rdd.pipe('wc').collect() [' 1 1 6', ' 1 1 7', ' 1 1 7']>>> randomSplitrandomSplit:将RDD里面的元素随机分隔 >>> rdd = sc.parallelize(range(10))>>> rdd1 = rdd.randomSplit([1, 4])>>> rdd1
[PythonRDD[203] at RDD at PythonRDD.scala:53, PythonRDD[204] at RDD at PythonRDD.scala:53]>>> [_.collect() for _ in rdd1]
[[5, 7, 9], [0, 1, 2, 3, 4, 6, 8]]>>> samplesample:随机取样 >>> rdd = sc.parallelize(range(10))>>> # 参数一:是否有放回。参数二:抽样比例。参数三:随机种子>>> rdd.sample(True, 0.2, 123).collect() [0, 9] foldByKeyfoldByKey:针对于key: value形式的RDD,进行聚合 >>> rdd = sc.parallelize([('a', (1, 2, 3, 4)), ('b', (11, 22, 33, 44))])>>> rdd1 = rdd.flatMapValues(lambda x: x)>>> rdd1.collect()
[('a', 1), ('a', 2), ('a', 3), ('a', 4), ('b', 11), ('b', 22), ('b', 33), ('b', 44)]>>> # 参数一:起始值,参数二:操作函数>>> rdd1.foldByKey(0, lambda x, y: x y).collect()
[('b', 110), ('a', 10)]>>> # 起始值指定20,那么会把20也当成一个元素、也就是初始元素,扔到函数里面去>>> rdd1.foldByKey(20, lambda x, y: x y).collect()
[('b', 130), ('a', 30)]>>> # 我们看到0确实在里面>>> rdd1.foldByKey(0, lambda x, y: f'{x}->{y}').collect()
[('b', '0->11->22->33->44'), ('a', '0->1->2->3->4')]>>> 以上就是一些transformation算子,有一些算子比较简单我就没介绍,比如mapValues之类的,我们完全可以使用map来替代,也很简单,没必要记这么多。如果有一些没有介绍到的,可以自己通过pycharm查看RDD这个类源码,看看它都支持哪些方法。源码是很详细的,都有大量的注释。 那么下面我们来看一下action方法,action方法估计我们最一开始就见过了,没错就是collect,把RDD里面的内容以列表的形式返回,那么除了collect还有哪些action算子呢?我们来看一下。 reducereduce:这个应该也早就见过了,将里面的内容相加。 >>> rdd = sc.parallelize([1, 2, 3, 4])>>> rdd.reduce(lambda x, y: x y)10 countcount:计算元素的个数。 >>> rdd = sc.parallelize([1, 2, 3, [4, 5]])>>> rdd.count()4 take、firsttake、first:获取指定个数的元素、获取第一个元素。 >>> rdd = sc.parallelize([1, 2, [3, 4, 5], 6, 7, 8])>>> # 如果指定的个数超过了元素的总个数也不会报错,而是返回所有元素,即便RDD为空也可以。>>> rdd.take(3) [1, 2, [3, 4, 5]]>>> # 注意:对于first来说,空的rdd调用的话会报错>>> rdd.first()1 max、min、mean、summax、min、mean、sum:获取元素最大值、最小值、平均值、总和。 >>> rdd = sc.parallelize([11, 22, 33, 22])>>> rdd.max()33>>> rdd.min()11>>> rdd.mean()22.0>>> rdd.sum()88 当然还有其它的数学函数,比如:stdev,求标准差、variance,求方差等等。遇到相应的需求,可以去查找。并且对于上面的数学操作,还分别对应另一个函数,比如:count -> countApprox,sum -> sumApprox等等,这些函数的特点是可以传入一个timeout,单位为毫秒,要是在指定的时间内没有计算完毕的话,那么就直接返回当前的计算结果。可以自己尝试一下。 foreachforeach:类似于map,对序列里面的每一个元素都执行相同的操作。 >>> rdd = sc.parallelize([11, 22, 33, 22])>>> # 但是foreach不会有任何的反应,不会跟map一样返回新的RDD>>> rdd.foreach(lambda x: x 1)>>> # 我们可以执行打印操作>>> rdd.foreach(lambda x: print(x, x 123))11 13422 14533 15622 145>>> foreachPartitionforeachPartition:会对每一个分区都执行相同的操作。 >>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)>>> rdd.foreachPartition(lambda x: print(x))
<itertools.chain object at 0x7f9c90ca0978>
<itertools.chain object at 0x7f9c90ca0978>
<itertools.chain object at 0x7f9c90ca0978>>>> rdd.foreachPartition(lambda x: print(list(x)))
[1, 2]
[3, 4]
[5, 6]>>> aggregateaggregate:这个稍微有点复杂,里面接收三个参数。
>>> rdd = sc.parallelize([1, 2, 3, 1, 2, 3], 3)>>> # 指定了三个分区,那么结果每个分区对应的值应该是这样: [1, 2] [3, 1] [2, 3]>>> # 每个分区按照第二个参数指定的操作进行计算,别忘记初始值,这个是作用在每个分区上面的>>> # 结果就是:2 * 1 * 2, 2 * 3 * 1, 2 * 2 * 3 --> 4, 6, 12>>> # 然后每个分区返回的结果执行第三个参数指定的操作,加在一起,所以是24>>> rdd.aggregate(2, lambda x, y:x*y, lambda x, y: x y)24 aggregateByKeyaggregateByKey:这个是一个transformation方法,不是action,之所以放进来是为了和aggregate进行对比便于理解。这个是把相同的key分成一组,说不好说,直接看例子吧 >>> rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('c', 1), ('c', 2), ('c', 3)], 3)>>> # 相同的分为一组,但是注意分区,倒数第三个('c', 1)是和('b', 3)在一个分区里面的>>> # [('a', [1, 2])] [('b', [3]), ('c', [1])] [('c', [2, 3])]>>> # 初始元素和里面元素依次相乘--> [('a', 4)] [('b', 6), ('c', 2)] [('c', 12)]>>> # 然后对分区里面相同key再次进行参数三指定的操作--> [('a', 4)] [('b', 6)] [('c', 14)]>>> # 上面的每一个列表看成是一个分区即可,为了清晰展示,我把每一个分区单独写成了一个列表>>> rdd.aggregateByKey(2, lambda x,y:x*y, lambda x,y:x y).collect()
[('b', 6), ('a', 4), ('c', 14)] 另外,对于很多的transformation操作,我们都是可以通过参数:numPartitions指定生成的新的RDD的分区的,不过一般情况下我们不指定这个参数,会和初始的RDD的分区数保持一致。当然如果初始的RDD的分区数设置的不合理,那么是可以在transformation操作的时候进行更改的。 foldfold:类似于aggregateByKey,但它是action方法,而且调用的不是key、value形式的RDD、并且只需要指定一个函数,会对每个分区、以及每个分区返回的结果都执行相同的操作 >>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)>>> # [1, 2] [3, 4] [5, 6] -> 2 * 1 * 2, 2 * 3 * 4, 2 * 5 * 6>>> # 4 * 24 * 60 * 2 = 11520,并且每一个分区计算之后的结果还要乘上指定的初始值,这一点需要注意>>> rdd.fold(2, lambda x,y: x*y)11520>>> collectAsMapcollectAsMap:对于内部是二元tuple的RDD,我们可以转化为字典。 >>> rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('c', 1), ('c', 2), ('c', 3)], 3)>>> # key相同的,value就会被替换掉>>> rdd.collectAsMap()
{'a': 2, 'b': 3, 'c': 3}>>> idid:返回RDD的id值,每个RDD的id值是唯一的 >>> rdd1 = sc.parallelize([])>>> rdd2 = sc.parallelize([])>>> rdd3 = sc.parallelize([])>>> >>> rdd1.id(), rdd2.id(), rdd3.id() (326, 327, 328)>>> histogramhistogram:返回一个直方图数据,看栗子 >>> rdd = sc.parallelize(range(10))>>> # 返回0-5以及5-8中间的元素个数,当然会连同区间一起返回。注意区间是左闭右开的>>> rdd.histogram([0, 5, 8])
([0, 5, 8], [5, 4])>>> # 如果不指定列表,而是指定整型的话>>> # 会自动为我们将[min, max]等分4个区间,那么第一个列表就有5个元素>>> rdd = sc.parallelize([0, 11, 33, 22, 44, 55, 66, 33, 100])>>> rdd.histogram(4)
([0, 25, 50, 75, 100], [3, 3, 2, 1])>>> isEmptyisEmpty:检测一个RDD是否为空 >>> rdd1 = sc.parallelize([])>>> rdd2 = sc.parallelize([1])>>> >>> rdd1.isEmpty(), rdd2.isEmpty() (True, False) lookuplookup:查找指定key对应的value,那么显然操作的RDD要是key: value形式的 >>> rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 'a')])>>> rdd.lookup('a')
[1, 2]>>> rdd.lookup('b')
['a']>>> 总结以上就是RDD的一些操作,当然我们这里没有全部介绍完,但是也介绍挺多了,如果工作中不够用的话,那么只能看源码了。当然这么多一次性肯定是无法全部背下来的,需要用的时候再去查即可,当然还是要多动手敲,孰能生巧。 spark运行模式下面我们来看一下 Spark 的运行模式,根据上一篇博客我们知道 Spark 的运行模式分为以下几种:local、standalone、hadoop yarn。我们说本地开发最好用local模式,直接搭建一个 Spark 环境就可以跑了,因为测试的话本地是最方便的;standalone,用的比较少;hadoop yarn,这个是用的最多的,用spark的公司至少有 70% 是用 yarn 这个模式的。yarn是一个资源管理器,我们后面会说。下面我们就来讲解这几种运行模式。 local运行模式这个模式应该是最熟悉的模式了,因为我们之前介绍RDD的时候用的就是这个模式,所以我们看到在编写代码的时候,进行测试使用 local 模式是足够的。 我们之前用的 pyspark shell,这个是为了方便本地测试的,以及我们还知道了如何向 Spark 提交一个作业,使用 spark-submit,我们当时是这么提交的:
所以 local 模式是比较简单的,一般在测试的时候使用。先取出少量数据,然后先把功能跑通再说。其实不管是什么模式,我们代码是不需要变的,只是换了一种模式运行,这也是 spark 非常方便的地方。 standalone运行模式standalone是spark装好之后自带的模式,怎么搭建standalone了。首先你要保证你有多台机器,对于standalone模式,肯定有一台机器是master,剩下的属于worker,下面我们就来演示如何搭建。 首先在SPARK_HOME目录的conf目录下有一个spark_env.sh,我们之前还配置了 python 环境,将这个文件打开。 # 这里面的代码全部被注释掉了,我们需要什么直接拷贝在底下即可# 这个是JAVA_HOME,我之前没有说spark环境怎么搭,因为这些网上都有,所以这里再提一遍# 这个配置jdkexport JAVA_HOME=/opt/jdk1.8.0_221/# 这里的是我们的关键,我们导出了SPARK_MASTER_HOST和SPARK_MASTER_PORT# 那么spark001就是我们集群的master,或者你输入ip也可以export SPARK_MASTER_HOST=spark001export SPARK_MASTER_PORT=7077# 这个是我们在使用spark-submit的时候保证执行的python解释器为python3, 最好设置完整路径export PYSPARK_PYTHON=/usr/bin/python3export PYSPARK_DRIVER_PYTHON=/usr/bin/python3 然后再打开slaves,我们spark001是我们当前的master,假设我们还有三台机器,分别是spark002、spark003、spark004,那么就直接把主机名或者ip地址写上去即可。 spark001spark002
spark003
spark004 显然对master机器就已经配置好了,然后把这个spark目录完整的拷贝到其它三台机器的相同目录下即可。是的,只要把master配置好,那么其它的机器只需要得到一份拷贝即可。那么spark集群在启动之后,读取spark-env.sh就知道master是我们的spark001,而根据slaves知道spark002、spark003、spark004是worker,那么内部就会进行通信之类的。 但是我这里只有一台机器,因此我们就用一台机器模拟standalone,配置方式是一样的,如果是一台机器的话,就这么配。 #### spark-env.sh ####export JAVA_HOME=/opt/jdk1.8.0_221/export SPARK_MASTER_HOST=localhostexport SPARK_MASTER_PORT=7077export PYSPARK_PYTHON=/usr/bin/python3export PYSPARK_DRIVER_PYTHON=/usr/bin/python3#### slaves ####localhost 然后我们就可以启动spark集群了,在sbin目录。我们先执行start-master.sh,然后执行start-slaves.sh,不过更简单的,我们直接执行start-all.sh也是可以的。 输入jps,如果出现了master和worker,说明启动成功了。除此之外,我们还可以通过webUI查看,spark集群的端口默认是8080,如果被占用会尝试 1,变成8081,所以端口不是8080也不要觉得奇怪。 我这里已经启动了,但是由于是一台机器,所以worker只有一个,它既是worker也是master。 关于端口的问题,我们在之前看到了4040,它是查看 pyspark 任务的端口;而这里的8080是Spark集群的webUI端口,图片上面还写了大大的7077,这个就是我们在spark-env.sh中设置的端口,这个端口是指定master和worker进行rpc通信的时候使用的端口
此时我们就连接到了 Spark 集群中,我们再来看看集群的 webUI。 我们看到运行的应用多了一个,因为我们以standalone的模式启动了pyspark shell。 好啦,我们知道以standalone模式启动pyspark shell,那么如何使用submit提交作业呢?答案很简单,把使用local模式提交作业的命令copy下来,把local[*]改成我们的spark集群:spark://127.0.0.1:7077 就完事了,非常简单吧,我们测试一下。 任务显然是执行成功了的,但是日志信息太多了,程序的输出在下面,可以自己测试一下。 yarn运行模式最后我们来看一下 yarn 模式,我们之前说了 yarn 是使用 spark 的公司采用的最多的一个模式。使用 yarn 模式的时候,spark充当一个客户端,它需要做的事情就是提交作业到yarn上去执行。那么yarn它和standalone模式之前有什么区别呢?
那么如何把作业提交到yarn上运行呢?很简单,直接--master local换成--master yarn即可。
[root@matsuri ~]# spark-submit --master yarn --name 夏色祭 1.pyException in thread 'main' org.apache.spark.SparkException: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:657)
at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:290)
at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:251)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:120)
at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$1.<init>(SparkSubmit.scala:911)
at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:911)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:81)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[root@matsuri ~]# 但是我们看到报错了,根据报错信息我们知道,如果想提交到yarn上去执行,那么必须配置 export HADOOP_CONF_DIR=/opt/hadoop-2.6.0-cdh5.15.1/etc/hadoop 然后再来启动,但是注意哈,yarn必须要已经启动才可以。输入jps,要能看到NodeManager和ResourceManager。 然后我们来测试一下: 输出的内容非常非常多,但是我们已经看到了输出的结果,说明提交到yarn上是执行成功了的。注意这个过程会比较慢,因为需要到yarn上申请资源等一系列操作。 关于提交作业,还有两种模式,--deploy-mode client,--deploy-mode cluster。它们有什么区别呢?
这里面出现了一些概念,我们马上就说,还有spark的架构等等。目前不需要理解这么多概念性的东西,学习起来会非常的累,所以我们之前介绍RDD的时候直接使用的local模式,而且还是使用的是交互式这种模式。因为RDD的语法跟你用的什么模式没有关系,我们既然学习语法就学习语法,涉及到的概念的东西越少越好。再比如这里的运行模式,我们就只需要知道有这三种运行模式、以及怎么指定即可。像yarn、driver、Manager、Application等等等等概念性的东西我们会采用图文的形式单独说。目前只需要知道,运行模式不同对代码没有影响,我们的代码只需要写一份,需要什么模式,直接--master指定即可。 spark核心概念我们之前介绍了spark的核心RDD,它是spark操作的基本单元。但是对于spark集群来说你还需要它的其它概念,比如我们操作RDD的时候,资源由谁来管理、任务由谁来执行等等。
东西有点多,我们可以梳理一下。假设我们有一个应用程序:application,那么driver负责帮我们启动并创建sc,然后发送task到executor上,executor是在work node上执行的,执行的时候需要资源,这些是cluster manager帮我们申请,另外启动的时候还可以指定deploy mode。如果当遇到了action操作,那么对多个task的并行计算就组合成了job,每个job又会被切分成多个stage。这样是不是都串起来了呢?
spark运行架构以及注意事项一个 spark 应用程序运行在一组独立的进程之上,意思就是多个应用程序之间是隔离的。每个应用程序都具备一个driver和一组独有的executor,多个应用程序是通过driver进程里面的SparkContext对象进行协调的。 如果要运行一个集群,那么你的SparkContext对象要能够连接到cluster manager 上面的架构图是官网上面的,这个架构还有一些很有用的地方。
MapReduce和spark区分我们说spark比MapReduce的效率要高很多,那么它们之间的差异主要体现在什么地方呢? MapReduce
spark
Spark Cachespark是有缓存的,我们在计算完结果之后是可以缓存起来的,这样做能够加快速度。 >>> rdd = sc.textFile('file:///root/1.txt')>>> rdd.count()100000 >>> # 此时rdd就被缓存起来了,输出的什么东西先不用管>>> rdd.cache()
file:///root/1.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0>>> # 再次进行计算,当然这里看不出来效果。其实如果你的数据量很大的话,你第二次执行的时候会发现速度变快了>>> rdd.count()100000 >>> import os>>> # 这里先看一下这个文件的大小>>> os.stat('/root/1.txt').st_size / 10241150022 我们看到 storage 里面的 RDD Name,这个就是我们的文件名;Storage Level表示存储级别,默认是基于内存的;Cached Partitions表示缓存的分区数,因为默认的RDD有两个分区。关键看倒数第二个内存大小,我们看相比原来的文件大小,小了很多,这是spark内部基于缓存所做的策略;最后的是磁盘大小,没有缓存到磁盘上,所以是0。 因为RDD具有不变性,所以当我们缓存起来之后 RDD的持久化Spark 一个最重要的能力就是它可以通过一些操作来持久化 你可以通过调用 我们看到持久化一个RDD有两种操作:persist和cache,那么有什么区别呢?
此外,每一个持久化的RDD都能以不同的缓存级别进行存储,比如:持久化数据到磁盘、或者在内存中持久化、甚至还可以是用序列化java对象的方式 那么缓存级别都有哪些呢?
class StorageLevel(object): def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1): self.useDisk = useDisk # 是否使用磁盘 self.useMemory = useMemory # 是否使用内存 self.useOffHeap = useOffHeap # 是否使用堆外 self.deserialized = deserialized # 是否反序列化 self.replication = replication # 副本系数,默认是1 def __repr__(self): return 'StorageLevel(%s, %s, %s, %s, %s)' % ( self.useDisk, self.useMemory, self.useOffHeap, self.deserialized, self.replication) def __str__(self): result = '' result = 'Disk ' if self.useDisk else '' result = 'Memory ' if self.useMemory else '' result = 'OffHeap ' if self.useOffHeap else '' result = 'Deserialized ' if self.deserialized else 'Serialized ' result = '%sx Replicated' % self.replication return result# 所以我们需要哪一种就可以直接通过StorageLevel这个类来调用,并且我们看到创建的方式也很简单# 如果支持什么,就给对应的参数传递True即可,不支持的传递Flse# DISK_ONLY就只给第一个参数useDisk传递True,其它都是False# MEMORY_AND_DISK就是第一、和第二个参数为True,其它为False# 带2的,就给副本系数传个2,比较简单StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False) StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2) StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1) 如果不想缓存了,可以使用unpersist,不需要参数。但是注意:persist是惰性的,只有在遇到一次action操作的时候,才会缓存RDD的分区数据,但是unpersist是立刻执行的。 到底使用哪种缓存我们看到StorageLevel支持很多缓存策略,那么我们到底该选择哪一种呢?官方给了如下建议。
spark血缘关系还记得RDD的五大特性吗?其中有一条说:RDD依赖于一系列其它的RDD。每个RDD进行transformation操作的时候都会生成一个新的RDD,如果当前操作的RDD的某个分区数据丢了,那么会从上一个RDD重新计算丢失的分区数据。我们说这些RDD就像父亲、儿子一样,一代一代的传下去,它们之间是具有血缘关系的。 当对 RDD4 执行 collect 操作时,会从 RDD1 开始计算。如果 RDD3 的第二个分区丢失了,那么会根据 RDD2 的第二个分区重新计算;如果 RDD2 的第二个分区也丢失了,那么会从 RDD1 的第二个分区重新计算。 所以每个RDD之间是有血缘关系的,如果数据丢失,那么会根据父RDD重新计算丢失的数据,而不是重新计算。 Spark Dependency我们说每个RDD是依赖于其它RDD的,但是RDD之间的依赖关系也分为两种,一种是窄依赖
窄依赖像我们说的map、filter、甚至是union,它们都是窄依赖。窄依赖的一个特点就是可以进行流水线式的操作,一个接一个。 蓝色表示 RDD,橙色表示分区 partition,我们上面的几种转换都是窄依赖,因为 子RDD的一个partition 至多引用一个 父RDD。所以我们看到窄依赖是可以像流水线一样,一直往下走。如果在MapReduce中计算1 2 3要怎么做呢?要先计算1 2,然后把结果落地到磁盘,然后再从磁盘读取再和3进行运算。但是对于spark来说,窄依赖是可以一直在内存中持续操作的。 宽依赖我们说宽依赖的话,那么父RDD的一个partition会被子RDD使用多次,也就是父RDD的一个分区会被子RDD的多个分区所使用。 所以宽依赖就像这样,一个父RDD的partition会被子RDD的多个partition所引用。 那么它和窄依赖有什么区别呢?显然如果是窄依赖,那么子RDD在数据丢失的时候直接根据父RDD对应的分区进行计算即可,即使这个子RDD对应多个父RDD,也是很简单的。但是宽依赖就不一样了,如果是宽依赖的话,那么子RDD在分区数据丢失之后,再根据父RDD重新计算是一件比较麻烦的事情,因为涉及到了shuffle操作,这里再一次提到了shuffle,但我们现在还是先不说。首先shuffle的英文是洗牌,你可以理解为打乱,比如我们说的ByKey,是根据key来操作的,如果分区数据丢了,那么是不是需要从父RDD那里找到所有对应的key呢?相比窄依赖,这显然是一件非常麻烦的事情。 shuffle一些行为会触发shuffle操作,shuffle是spark用于重新分配数据的一种机制,以便对不同partition里面的数据进行分组。 那么什么地方会发生shuffle操作呢?我们可以想一下reduceByKey,reduceByKey会生成一个新的RDD,所有相同的key对应的value都会组合在一起,形成一个列表,基本上所有的ByKey操作都会涉及到shuffle。 注意:shuffle是一个比较昂贵的操作,因为它涉及磁盘IO、数据序列化、网络IO。 我们最后再用一张图,来展示一下窄依赖、宽依赖、以及shuffle操作。 每当遇到一个shuffle操作时,就会被拆分成两个stage。还记得stage吗?我们说一个stage的边界往往是从某个地方取数据开始,到shuffle结束。 spark调优前戏我们开发一个应用程序是比较简单的,但是这个程序执行的时候所表现出来的性能也是需要我们关注的,下面我们就来看看spark调优。 不过在看spark调优之前,我们需要能够监控我们应用程序,而监控的方式我们是通过webUI。每一个SparkContext对象都会启动一个webUI,默认端口是4040,如果被占用会尝试 1,变成4041,比如我们通过pyspark shell启动的时候也是可以的,因为默认创建了一个SparkContext对象。webUI上面展示了很多有用的信息,其中包括:
但是这里面存在一个问题,那就是当我们在执行作业的时候是可以通过4040端口查看的,但是程序结束那么这个端口就打不开了,因为程序结束SparkContext也没了。但是,我们想知道这个程序执行的怎么样,占用了多少内存、cpu、以及花了多长时间等等;而且如果程序挂了怎么办,或者程序执行的时候突然速度急剧下降,但是当我们想找出原因的时候,却看不到信息,这显然是不行的。 为了能够在程序结束之后也能看到webUI上面的信息,我们需要在程序启动之前设置一个参数: 然后我们打开文件、设置参数。 就不打开注释了,直接拷贝一份即可,指定为true以及设置日志路径。但是这个路径不会自动创建,我们指定的是 # 怎么配置呢?凡是以start-history开头的都要配置在SPARK_HISTORY_OPTS里面,以'-Dx=y'的形式指定# 把下面这一行拷贝进去即可export SPARK_HISTORY_OPTS='-Dspark.history.fs.logDirectory=hdfs://localhost:9000/log' 然后别忘记在 HDFS 上创建相应的目录:hdfs dfs -mkdir /log 此时我们就算基本配置完成了,然后启动sbin/start-history-server.sh。 打开该页面,查看一下: 上面显示日志记录在:hdfs文件系统上的 /log 下,当然我们目前这里没有任务。你可以自己跑一个试一下。 如果你的作业是凌晨跑的,那么第二天上班的时候也能看。不然的话,如果作业凌晨三点挂了怎么办,第二都不知道为什么挂了。我们注意到在左下角还有 这对调优是非常有帮助的,不然你的作业有多少个stage、stage里面有多少个并行计算的task、每个task计算的时候处理了多少数据、花了多长时间等等你都不知道,而配置了history,那么这些信息都能够清晰的展示在页面上,这对调优是很有帮助的。如果停止的话,可以使用 spark调优的几个方面Spark调优,我们可以从以下几个方面入手。 序列化序列化有什么作用呢?我们说shuffle是经过网络传输、磁盘IO的,而且在做缓存的时候、对于你内存的节省使用、以及进程之间的通信都是要涉及到序列化的。 序列化该怎么选择呢?首先序列化在任何的分布式程序的性能方面都扮演了一个重要的角色,如果序列化之后的速度比较慢、或者序列化之后的大小比较大,那么就会降低性能。因此,你需要对spark应用程序进行调优。而spark目的是在序列化之后的速度和大小之间取得一个平衡,它提供了两种序列化方式:
内存管理对内存方面的调优有三个要考量的因素: 所以spark对内存的使用主要涉及到两个部分:执行和存储。执行所用的内存主要涉及shuffle、join、sort、aggregate等操作,而存储所用的内存则涉及缓存和传递数据。在spark中,执行和存储共享一片内存区域:如果执行没有使用内存,那么存储可以获得所有的内存,反之亦然。但是,如果在必要的情况下,执行可以抢夺存储所用的内存;但是为了避免全部抢光,所以可以设置一个阈值,如果存储到达这个阈值,执行就不能再抢存储的内存了。 广播变量使用广播变量能够极大地降低内存的使用,如果你的task大小比较大,那么你可以考虑使用广播变量。 >>> # 创建广播变量>>> data = sc.broadcast([1, 2, 3])>>> # 调用value方法能够获取值>>> data.value [1, 2, 3]>>> 数据本地性数据本地性对spark job的性能有着很大的影响,假设我们数据存在hdfs上,我们的作业提交到yarn上去执行。如果我们的作业和数据在同一台节点上,那么计算会非常快;但要是不在同一台节点,那么肯定要把一方移动另一方所在的节点上,这样的话相比数据和计算在同一台节点相比,性能肯定会降低。于是我们要移动计算,而不是移动数据,而且即便要移动也要选好位置。 因此数据本地性就是指数据离计算到底有多近,基于数据的当前位置,数据本地性有以下几个级别,从最近到最远:
Spark SQL概述接下来聊一聊 Spark SQL,从名字上来看显然是让我们像写 SQL 一样去编写 Spark 应用程序。但Saprk 并不仅仅是 SQL,SQL只是 Spark 提供的功能之一。 想想 Hive,它们存在的意义都是类似的,因为SQL已经存在很多很多年了。如果使用 MapReduce 编程的话,需要会 Java;使用 Spark 编程的话,虽然简单,但也需要你会 Scala、Java、Python等编程语言中的一种。而 SQL 真的算是 '老少咸宜',并且它已经成为了事实上的一个标准,如果一款框架能让你像写 SQL 一样编写程序的话,那么它一定是非常受欢迎的,就类似于Hive一样。 SQL on Hadoop常用框架目前我们已经知道为什么要有SQL了,而在大数据领域可以基于SQL的框架还有其它的,这些框架我们也称之为 SQL on Hadoop,因为数据存储在 Hadoop 的 HDFS 之上、并且支持SQL。 Apache Hive Hive 我们知道,可以将SQL语句翻译成MapReduce,不过既然有了Spark SQL,那么Hive用的是不是就不太多了呢?不是的,Hive对于离线的数仓分析来说,用的是非常非常多的,因为有一定的时间和沉淀了,非常稳定。 而我们说 Hive 是将 SQL 翻译成 MapReduce(也就是运行在MapReduce引擎上),但除了MapReduce之外还可以翻译成 Tez、Spark,所以SQL底层到底是运行在哪种引擎之上,是可以通过参数来设置的。但是说实话,引擎采用的大部分还是MapReduce,因为最稳定。 那么 Hive 都有哪些功能呢?
Clouhera Impala 这个是Cloudera公司开发的,很多公司都是采购它们的CDH。对于 impala 而言,它也是使用SQL,只不过它不是把SQL运行在MapReduce之上,而是使用了自己的守护进程。而一般情况下,这些进程显然要和DataNode安装在同一节点上,因为要读数据。 特点如下:
Spark SQL Spark中的一个子模块,让SQL跑在Spark引擎上,这也是我们即将介绍的。 Presto 一个基于SQL的交互式查询引擎,可以和Hive共享元数据信息,但它主要是提供了一些的连接器,通过这些链接器,可以查询Hive、Cassandra等框架里面的数据。 Phoenix HBase的数据主要基于API来查询,而这个过程还是比较费劲的,而Phoenix就是支持使用SQL来查询HBase的数据。 Drill 支持HDFS、Hive、Spark SQL等多种后端存储,然后直接进行多种后端数据的处理。 Spark SQL 误区关于Spark SQL,有很多人会进入误区。 Spark SQL 就是一个SQL处理框架,这是一个典型的误区 Spark SQL的官方定义是,一个用于处理结构化数据的Spark子模块,特点如下:
Spark SQL的应用不局限于SQL,还支持Hive、JSON、Parquet文件的直接读取以及操作,SQL 仅仅是Spark SQL中的一个功能而已。 如果你看官网的话,你会发现 SQL、DataFrame、DataSet是放在一起的,DataFrame和DataSet是更高层面的API,在编程的时候也是使用DataFrame和DataSet。但是在实际执行的时候,无论是SQL、还是DataFrame、DataSet,最终底层还是要被翻译成 RDD 的,所以RDD是Spark中核心。
Python连接Spark SQL正如我们想要操作RDD一样,我们需要有一个SparkContext对象,那么这里也是类似的,需要一个SparkSession对象。而我们说启动pyspark shell的时候,默认给我们创建一个SparkContext对象:sc;同理也创建了一个SparkSession对象:spark。 我们来看一下 pyspark shell。 直接输入spark,会有一个默认的SparkSession对象。除了pyspark shell之外,还有 spark-sql shell,也在bin目录下,我们终端直接输入即可。 会输出大量的日志信息,我们直接在里面输入SQL语句即可,但由于我们是要使用代码操作的,所以我们后续还是会使用 pyspark shell,不使用 spark-sql shell。我们在里面创建一张表吧,然后查询一下。 真的每一步操作都会输出大量的日志信息,而且内容就隐藏在日志信息中,真的是不好观察。我们看到,该表在 default 库下,表名叫 t,false表示该表不是临时表。 Python读取多种数据源得到DataFrameSpark SQL支持通过各种数据源得到一个DataFrame,DataFrame可以使用各种相关的transformation操作,并且还能注册成一个临时的视图。一旦将其注册成临时的视图,那么便可以通过SQL查询来操作相应的数据。 所以大数据处理的过程都是三步走:加载数据、业务逻辑处理、保存处理之后的数据。 所以首先我们要读取数据,而我们这些都通过 SparkSession 对象来操作。而读取的数据可以来自于各种地方,比如:本地、HDFS、亚马逊的S3、阿里的OSS、腾讯的COS、RDBMS等;而数据的载体可以是各种格式,比如:文本、JSON、Parquet、JDBC等等。 通过Spark将各种不同的数据源的数据加载成DataFrame、DataSet,后续便可以直接进行操作了。 读取本地文本文件SparkSession对象可以将一个已存在的RDD、hive表、其它的数据源转成DataFrame,比如我们将一个文本文件变成DataFrame。 17,female,古明地觉400,female,四方茉莉18,female,椎名真白 我们命名为 1.txt。 >>> # spark是默认的SparkSession对象,调用其内部的.read.text方法即可读取文本数据, 得到DataFrame>>> df = spark.read.text('file:///root/1.txt')>>> # 调用show方法展示数据>>> df.show() ------------------- | value| ------------------- | 17,female,古明地觉| |400,female,四方茉莉| | 18,female,椎名真白| ------------------- >>> 不过我们看到这似乎连在一起了,很好理解,因为是纯文本数据。不过显然我们可以对其进行分隔,变成多列。不过由于涉及到DataFrame的操作,我们暂且不表,只看如何读取数据。 读取本地json文件我们来读取一下 json 文件看看,注意每个json需要写在一行。 {'name': '夏色祭', '_name': '夏哥'}
{'name': '神乐mea', '_name': '屑女仆'}
{'name': '凑阿库娅', '_name': '阿夸'} 读取json文件也很简单,和读取文本文件类似。 >>> df = spark.read.json('file:///root/1.json')>>> df.show() ------ -------- | _name| name| ------ -------- | 夏哥| 夏色祭| |屑女仆| 神乐mea| | 阿夸|凑阿库娅| ------ -------- >>> # 由于是json文件, 每一个相同的key对应一个字段
工作中用哪种都无所谓。
从数据库中读取数据然而不幸的是,pyspark读取数据库是需要通过java来实现的,所以还需要下载相关的jar包,因此有兴趣自己了解一下即可。 DataFrame的相关操作数据读取进来是一方面,我们重点是要对数据进行各种各样的操作,而DataFrame都支持我们进行哪些操作呢?下面就一起来看一下吧。 未完待续。 |
|
来自: 新用户61024634 > 《待分类》