原文链接:http://www./2015/11/introduction-spark-python.html 作者:Srini Kadamati,来自Dataquest.io的数据科学家由加州大学伯克利分校的AMP实验室(UC Berkeley AMP Lab)进行了一系列独创性工作后,Spark的发展目标是利用分布式内存数据结构提升数据处理速度,在大部分工作负载中超越Hadoop。 译者:牛亚真 Python版的Spark入门 通过本文的数据处理实战操作教程的学习,可以掌握如何使用Python编写Spark应用程序。 在本篇博文中,我们将通过一个真实的数据集来介绍Spark框架、基本的tranformation操作和action操作。如果想写和运行自己的Spark代码,可以在Dataquest网站上查看本博文的互动版。 Spark中的核心数据结构是RDD,也称弹性分布式数据集。见名知义, RDD是Spark的数据集表示,该数据集分布在多台机器的RAM或内存当中。一个RDD对象实际上就是一个元素集合,用于存储元组、字典和列表等。与Pandas中的DataFrames类似,你可以把一个数据集加载为一个RDD,然后运行与RDD对象对应的任一方法。 Spark由Scala语言编写,Scala语言运行在JVM上,程序代码最终被编译成字节码。开源社区已经开发出一个非常优秀的工具——PySpark,通过该工具可以用Python语言与RDD进行交互。通过Py4j库,Python可以操作JVM对象,在后面的RDD示例当中,正是Py4j库这一工具支撑着PySpark的运行。 首先,将包含所有Daily Show用户的数据集加载到一个RDD中。数据集格式为 FiveThirtyEight's 数据集的TSV版本,与CSV文档格式类似,TVS文档使用制表符“\t”代替逗号“,”来进行数据分隔。 raw_data = sc.textFile('daily_show.tsv') raw_data.take(5) ['YEAR\tGoogleKnowlege_Occupation\tShow\tGroup \tRaw_Guest_List', '1999\tactor\t1/11/99\tActing\tMichael J. Fox', '1999\tComedian\t1/12/99\tComedy\tSandra Bernhard', '1999\ttelevision actress\t1/13/99\tActing \tTracey Ullman', '1999\tfilm actress\t1/14/99\tActing\tGillian Anderson'] SparkContext对象负责与Spark集群进行交互,并协调集群自身的运行处理过程。SparkContext与集群管理器进行交互,管理运行具体计算的Executor。下面这个取自官网的图可以更好地说明整体架构: SparkContext对象通常使用变量sc来进行引用,具体代码如下:raw_data = sc.textFile('daily_show.tsv')将TVS数据集读到一个RDD对象raw_data当中,该RDD对象raw_data类似于一个字符串对象列表,数据集中每一行都为一个String对象,然后我们使用take()方法来打印RDD的前5个元素:raw_data.take(5)可以查看PySpark文档获取RDD对象的其它方法,这里take(n)方法将返回RDD的前n个元素。 你也许会有一个疑问:如果RDD类似于Python中的List,那为什么不只使用括号方式来获取RDD中的元素?原因在于RDD对象由多个分区组成,这些分区数据分布在不同的机器上,因此不能通过标准List实现进行操作,RDD对象需要特定的设计才能处理分布式数据。RDD其中一个优势是它具备在本地机器上运行Spark的能力,Spark能够将机器内存分片,模拟在多台机器上进行分布式计算,等程序真正运行在分布环境下时,不需要对代码进行任何的修改或调整。 Spark RDD另外一个优势便是其延迟代码运行机制,只有到真正需要相应数据的时候才会启动计算。在前面的代码中,Spark不会立即将TSV文件加载到RDD当中,而是等到raw_data.take(5)代码实际运行时才会进行相应的操作。 当代码raw_data = sc.textFile('dail_show.tsv')被调用时,一个文件指针将会被创建,但只有当代码raw_data.take(5)真正需要该文件内容来运行计算逻辑时,dail_show.tsv文件才真正被读到raw_data当中。在本课程的后面及后续课程当中,将会看到更多这种延迟计算的例子。 Spark从Hadoop的Map-Reduce模式中借鉴了很多思想,但在许多方面有着本质的不同。如果你有Hadoop和传统的Map-Reduce编程经验,可以阅读Cloudera公司发布的博文(post by Cloudera)来了解它们之间的不同。如果你此前从来没有接触过Hadoop或Map-Reduce,大可不必担心,我们会对本课程中涉及到的概念进行详细描述。使用Spark时需要理解的关键思想是数据流水线(data pipelining),Spark中的每一操作或计算本质上是由一系列能够链接在一起的步骤组成,这些步骤按顺序执行从而构成一个流水线(pipeline)。流水线中的每一步返回的要么是 Python值(如整型),要么是Python数据结构或者是RDD对象。首先我们演示的是map()函数。 Map(f)函数将函数f作用于RDD中的每个元素,同其它Python对象类似,由于RDD是可迭代的对象,Spark可以在每步迭代时运行函数f并在完成后返回一个新的RDD。 现在让我们来对map函数使用示例进行详细介绍,以便能够对其使用有更好的认知。如果仔细观察,你会发现 raw_data的数据很难处理,因为RDD raw_data中的各个元素类型为String,为此我们先将各个元素转换为List对象,从而增加数据的可控性。按以往的思路,我们将通过下列步骤进行: 1、使用for循环进行集合的迭代 2、使用定界符对各String进行分割 3、将结果保存为List 现在让我们来看如何在Spark中通过map函数来实现: 下面给出的代码块,将进行如下操作: 1、调用`map()`函数,函数中的参数指定了作用于数据集每一行的计算逻辑 2、编写lambda函数,该函数使用制表符“\t”将各行字符串进行分割,然后赋值给RDD `daily_show` 3、调用`daily_show` 的`take()`函数返回结果RDD的前五个元素 Map(f)函数是一个常用的tranformation操作步骤,运行时需要一个命名函数或lambda函数f: daily_show=raw_data.map(lambdaline:line.split('\t')) daily_show.take(5) [['YEAR', 'GoogleKnowlege_Occupation', 'Show', 'Group', 'Raw_Guest_List'], ['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'], ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'], ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'], ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson']] PySpark最引入注目的特性是使用Python进行代码的编写能够将逻辑与实际的数据转换分离,对于前述的代码块,在Python代码中写入lambda函数:raw_data.map(lambda: line(line.split('\t')))但如果利用Scala语言编写的话,Spark程序运行时代码将直接作用于RDD,这便是PySpark的威力所在。在不具备任何Scala语言基础的情况下,我们便能获得只有Spark Scala架构才具备的数据处理性能。另外,运行下列代码daily_show.take(5)返回的处理结果是Python友好的数据格式,这也是PySpark另外一个强大的地方。 在Spark中,有两种类型的方法: 1、Transformations方法,如 map(), reduceByKey() 2、Actions方法,如take(), reduce(), saveAsTextFile(), collect()Transformations方法具备延迟计算的特点并且其返回的结果始终是一个RDD对象的引用,Transformation方法在action操作需要其计算结果时才会真正执行。任何返回结果为RDD的函数都是transformation方法,而任何返回结果为值的函数都是action方法。这些概念在学完本课程并使用PySpark进行代码实践后将变得更加清晰。 你可能会有疑问,为什么分割各String对象的操作不直接作用原RDD而是创建一个新的RDD对象 daily_show? 在Python中,我们可以直接对集合中的各个元素直接逐个进行修改,而不是返回并赋值给一个新的对象。 但RDD对象是不可变的且对象一旦被创建其值便不能再被改变。在Python当中,List对象和Dictionary对象是可变的,这意味着对象中的值是可以被改变的,而Tuple对象是不可变的,修改Tuple对象的唯一方法是使用必要的更新操作创建一个新的Tuple对象。Spark利用RDD的不可变性来满足处理速度上的要求,其作用机制超出了本课程的范围。 假设我们想获得一个柱形图或一计数器,用于统计各年度对应演出的观众数,如果daily_show为由List构成的List,我们可以通过下列Python代码来得到我们想要的结果: tally = dict() for line in daily_show: year = line[0] if year in tally.keys(): tally[year] = tally[year] + 1 else: tally[year] = 1 tally中的键为对应的年份值,对应的值为数据集中包含相应值的行数。如果想通过Spark得到前述的结果,可以先使用map方法,然后紧跟着使用reduceByKey方法得到: tally=daily_show.map(lambdax:(x[0],1)) .reduceByKey(lambdax,y:x+y) print(tally) PythonRDD[156] at RDD at PythonRDD.scala:43 你可能注意到打印tally对象并不会返回我们期望的柱形图对应的值,这是因为这两个方法都是transformation方法,由于延迟计算的特性,PySpark会推迟执行map和reduceByKey方法。在使用take()方法预览tally中的前几个元素之前,我们对前面写的代码进行分析: daily_show.map(lambda x: (x[0], 1)) .reduceByKey(lambda x, y: x+y) 在map操作时,我们使用lambda函数创建了一个包含下列元素的元组: key: x[0], List中的第一个元素 value: 1, Int类型值我们高层次的策略是创建一个元组,元组中的key值为年份,value值为1。 通过map操作后,Spark在内存中将维护类似下列元组的列表: ('YEAR', 1) ('1991', 1) ('1991', 1) ('1991', 1) ('1991', 1) ... 通过reduce操作后,内容将变为: ('YEAR', 1) ('1991', 4) ... reduceByKey(f)函数将所有key相同的元组进行组合,具体组合方式通过函数f指定。 为能够看到前面两步操作得到的结果,我们将使用take命令启动程序的运行,因为tally是一个RDD对象,不能使用Python的len方法查看集合中的元素个数,这里使用RDD的count函数进行获取: tally.take(tally.count()) [('YEAR', 1), ('2013', 166), ('2001', 157), ('2004', 164), ('2000', 169), ('2015', 100), ('2010', 165), ('2006', 161), ('2014', 163), ('2003', 166), ('2002', 159), ('2011', 163), ('2012', 164), ('2008', 164), ('2007', 141), ('2005', 162), ('1999', 166), ('2009', 163)] 同Pandas不同,Spark对数据列对应的头部信息一无所知,在显示时不会过滤掉。因此需要一个方法将下列元素: ('YEAR', 1)从集合中去掉。你可能会尝试找到某个方法将元素从RDD中去掉,但需要注意的是RDD对象是不可变的而且对象一旦被创建便不能改变,唯一的方法是创建一个不包含该元组的新RDD。Spark通过使用filter(f)方法从一个现有的RDD对象创建一个新的RDD对象,只有满足特定标准的元素才会出现在新的RDD当中。指定的函数f返回值为True或False,返回的结果RDD包含了所有函数f返回值为True的值。关于filter函数更详细的信息可以查阅Spark文档。 deffilter_year(line): ifline[0]=='YEAR': returnFalse else: returnTrue filtered_daily_show=daily_show.filter(lambdaline: filter_year(line)) 为证明Spark的强大,我们将给大家演示如何将一系列的数据转换(transformation)操作链接在一起形成流水线并观察Spark管理所有事务的深层机理。Spark在实现时便考虑了这些功能并在后续进行连续的任务运行时进行了深度优化。以前通过Hadoop连续地运行许多任务时将耗费大量的时间,这是因为Hadoop运行时的所有中间结果都要写入磁盘并且Hadoop对整个执行流水线没有任何感知能力(如果对此感兴趣,可以参考:http:///RHWrT2)。 Spark由于完全基于内存(磁盘只作为备份和在特定任务运行时使用)、内核架构设计优良,相比Hadoop任务处理时间,Spark处理时间明显缩短。在下面的代码块中,我们将先过滤掉所有未列出专业的演员,再将专业对应关键字小写,然后再生成专业对应的柱状图,最后打印出柱状图中的前五个元素。 filtered_daily_show.filter(lambdaline:line[1]!='') \ .map(lambdaline:(line[1].lower(),1)) \ .reduceByKey(lambdax,y:x+y) \ .take(5) [('radio personality', 3), ('television writer', 1), ('american political figure', 2), ('former united states secretary of state', 6), ('mathematician', 1)] 我们希望通过本次课程,能够唤起大家对Spark的兴趣,并使那些熟悉Python的人掌握如何使用PySpark编写分布式程序。当需要处理大数据集时,PySpark的闪光点将更加突出,因为它模糊了在本地机器和网络大规模分布式集群(也称为云)上进行数据科学研究的界线。如果你喜欢本博客,可以到Dataquest 上阅读课程的第二部分,你将了解到更多关于Spark transformation 和 action操作的相关知识。 |
|