|
Spark版本定制班第1课-Frank |
|
|
第一课:通过案例对SparkStreaming透彻理解三板斧之一:解密SparkStreaming另类实验及SparkStreaming本 质解析作者:杭州-Frank本期导读:Spark源码定制选择从SparkStreaming入手;SparkStreamin g另类在线实验;瞬间理解SparkStreaming本质。从SparkStreaming入手开始Spark源码版本定制之路 从SparkStreaming入手Spark源码版本定制之路的理由从今天起,我们将踏上了新的Spark学习旅途。寻龙点穴,从Sp arkStreaming入手,SparkStreaming就是大数据的龙脉。我们为什么要选择从SparkStreaming入手 开始我们的Spark源码版本定制之路?有下面几个方面的理由:Spark大背景Spark最开始没有我们今天看到的SparkSt reaming、GraphX、MachineLearning、SparkSQL和SparkR等相关子框架内容,最开始就只有很 原始的SparkCore。我们要做Spark源码定制,做自己的发行版本,以SparkStreaming为切入点,SparkS treaming本身是SparkCore上的一个子框架,所以我们透过一个子框架的彻底研究,肯定可以精通Spark力量的源泉和所 有问题的解决之道;为什么不选SparkSQL?我们知道,Spark有很多子框架,现在除了基于SparkCore编程之外,用 得最多的就是SparkSQL。SparkSQL由于涉及了太多的SQL语法细节的解析或者说优化,其实这些解析或优化,对于我们集中精 力去研究Spark而言,它是一件重要的事情,但其实不是最重要的一件事情。由于它有太多的SQL语法解析,这个不是一个合适的子框架来让 我们研究。为什么不选SparkR?SparkR现在很不成熟,而且支持功能有限,这个也从我们的候选列表中删除掉。为什么不选 SparkGraphX(图计算)?如果大家关注了Spark的演进或发展的话,Spark最近发布的几个版本,Spark图计算基 本没有改进。如果按照这个趋势的话,Spark官方机构似乎在透露一个信号,图计算已经发展到尽头了。所以说,我们如果要研究的话,肯定不 会去做一个看上去发展到尽头的东西。另外,至于图计算而言,它有很多数学级别的算法,而我们是要把Spark做到极致,这样的话,数学这件 事情很重要,但对我们来说却不是最重要的。为什么不选SparkMLlib(机器学习)?Spark机器学习在封装了Vector( 向量)和Metrics基础之上,加上Spark的RDD,构建了它的众多的库。这个也由于涉及到了太多的数学的知识,所以我们选机器学习 其实也不是一个太好的选择。SparkStreaming魔力之所在2016上半年,据StackOverflow开展的一项调查结果 显示,50%以上的人认为,Spark中最吸引人的是SparkStreaming。总之,大家考虑用Spark,主要是因为Spark Streaming。SparkStreaming到底有什么魔力?它是流式计算这是一个流处理的时代,一切数据如果不是流式的 处理或者跟流式的处理不相关的话,都是无效的数据。这句话会不断地被社会的发展所证实。流式处理才是真正的我们对大数据的初步印象一方 面,数据流进来,立即给我们一个反馈,这不是批处理或者数据挖掘能做到的。另一方面,Spark非常强大的地方在于它的流式处理可以在线的 利用机器学习、图计算、SparkSQL或者SparkR的成果,这得益于Spark多元化、一体化的基础架构设计。也就是说,在Sp ark技术堆栈中,SparkStreaming可以调用任何的API接口,不需要做任何的设置。这是Spark无可匹敌之处,也是Sp arkStreaming必将一统天下的根源。这个时代的流处理单打独斗已经不行了,SparkStreaming必然会跟多个Spa rk子框架联合起来,称霸大数据领域。流式处理“魅力和复杂”的双重体如果你精通SparkStreaming,你就知道Spar kStreaming以及它背后的兄弟框架,展示了Spark和大数据的无穷魅力。不过,在Spark的所有程序中,肯定是基于Spa rkStreaming的应用程序最容易出问题。为什么?因为数据不断流进来,它要动态控制数据的流入,作业的切分还有数据的处理。这些 都会带来极大的复杂性。与其他Spark子框架的巨大区别如果你仔细观察,你会发现,SparkStreaming是基于Spark Core之上的一个应用程序。不像其他子框架,比如机器学习是把数学算法直接应用在Spark的RDD之上,SparkStreami ng更像一般的应用程序那样,感知流进来的数据并进行相应的处理。所以如果要做Spark的定制开发,SparkStreaming则 提供了最好的参考,掌握了SparkStreaming也就容易开发任意其他的程序。当然想掌握SparkStreaming,但不去精 通SparkCore的话,那是不可能的。SparkCore加SparkStreaming更是双剑合璧,威力无穷。由此可见我们 选择SparkStreaming来入手,等于是找到了关键点。如果要寻龙点穴,那么SparkStreaming就是龙穴之所在。找到 了穴位,我们就能一日千里。综上所述,我们筛选之下,SparkStreaming是我们唯一的选择!SparkStreaming 另类在线实验?我们在研究SparkStreaming的过程中会有困惑的事情:如何清晰的看到数据的流入、被处理的过程??在这里我们 使用一个小技巧,通过调节放大BatchInterval的方式,来降低批处理次数,以方便看清楚各个环节。?我们从已写过的广告点击的 在线黑名单过滤的SparkStreaming应用程序入手来演示本次实验。源码:packagecom.dt.spark.stre amingimportorg.apache.spark.SparkConfimportorg.apache.spark.str eaming.StreamingContextimportorg.apache.spark.streaming.Secondso bjectOnlineBlackListFilter{defmain(args:Array[String]){/ 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来 设置程序要链接的Spark集群的Master的URL,如果设置为local,则代表Spark程序在本地运行///创建 SparkConf对象valconf=newSparkConf()//设置应用程序的名称,在程序运行的监控界面可以看 到名称conf.setAppName("OnlineBlackListFilter")//此时,程序在Spark集群con f.setMaster("spark://Master:7077")valssc=newStreamingContext (conf,Seconds(30))/黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中, 黑名单的生成往往有复杂的业务逻辑,具体情况算法不同,但是在SparkStreaming进行处理的时候每次都能够访问完整的 信息。/valblackList=Array(("Spy",true),("Cheater",true))val blackListRDD=ssc.sparkContext.parallelize(blackList,8)//此时, 数据流来自于SocketvaladsClickStream=ssc.socketTextStream("Master", 9999)/此处模拟的广告点击的每条数据的格式为:time、name此处map操作的结果是name、(time, name)的格式/valadsClickStreamFormatted=adsClickStream.map{ad s=>(ads.split("")(1),ads)}adsClickStreamFormatted.transform (userClickRDD=>{//通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,/ /又获得了相应点击内容是否在黑名单中valjoinedBlackListRDD=userClickRDD.leftOut erJoin(blackListRDD)/进行filter过滤的时候,其输入元素是一个Tuple:(name,((ti me,name),boolean))其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时 候是否存在的值。如果存在的话,表明当前广告点击是黑名单,需要过滤掉,否则的话是有效点击内容;/valvalidCli cked=joinedBlackListRDD.filter(joinedItem=>{if(joinedItem._2 ._2.getOrElse(false)){false}else{true}})validClicked.map (validClick=>{validClick._2._1})}).print/计算后的有效数据一般都会写入K afka中,下游的计费系统会从kafka中pull到有效数据进行计费/ssc.start()ssc.awaitTermin ation()}}?在以上代码中,把程序的BatchInterval设置从30秒改成300秒:?val?ssc?=?new?S treamingContext(conf,?Seconds(300))打包发表并提交运行重新生成jar包,提交到集群上执行:#. /bin/spark-submit--classcom.dt.spark.sparkstreaming.OnlineBlack ListFilter--masterspark://Master:7077/mnt/job/OnlineBlackListF ilter.jar集群环境:?启动Spark集群,集群有5台机器:Master、Worker1、Worker2、Worker3、W orker4。启动Spark的HistoryServer,以便通过Web控制台查看Job运行情况。打开数据发送的端口:nc-l k9999在数据发送端口输入若干数据,比如:?1375864674543?Tom?1375864674553?Spy?13758 64674571?Andy?1375864688436?Cheater?1375864784240?Kelvin?13758648 53892?Steven?1375864979347?JohnWEB控制台查看History:打开浏览器,输入:http://ma ster:18080看HistoryServer的日志信息:点击最新的应用,看我们目前运行的应用程序中有些什么Job:?从上图可 以看见总共有5个Job,接下来看一看这些Job的具体内容。Job0此Job不体现我们的业务逻辑代码。这个Job是出于对后面计算 的负载均衡的考虑。?Job0包含有Stage0、Stage1。随便看一个Stage,比如Stage1。看看其中的Aggre gatedMetricsbyExecutor部分:?发现此Stage在所有Executor上都存在。Job1运行时间比较长 ,耗时1.5分钟。点击Stage2的链接,进去看看AggregatedMetricsByExecutor部分:Stage 2只在Worker4上的一个Executor执行,而且执行了1.5分钟。是否会觉得奇怪:从业务处理的角度看,我们发送的那么一点数 据,没有必要去启动一个运行1.5分钟的任务吧。那这个任务是做什么呢?从DAGVisualization部分,就知道此Job实际 就是启动了一个接收数据的Receiver:?原来Receiver是通过一个Job来启动的。那肯定有一个Action来触发它。看看 Tasks部分:?只有一个Worker运行此Job。是用于接收数据。?LocalityLevel是PROCESS_LOCAL,原 来是内存节点。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。?看来,SparkStreaming应用程序启动 后,自己会启动一些Job。默认启动了一个Job来接收数据,为后续处理做准备。Tips:一个Spark应用程序中可以启动很多Job, 而这些不同的Job之间可以相互配合。这一认识为我们写复杂Spark程序奠定了良好的基础。Job2看Details可以发现有我们程 序的主要业务逻辑,体现在Stage3、Stage?4、Stage5中。?我们看Stage3、Stage4的详情,可以知道这 2个Stage都是用4个Executor执行的。所有数据处理是在4台机器上进行的。?Stage5只在Worker4上。这是因为这 个Stage有Shuffle操作。Job3Job3的详情中有Stage6、Stage7、Stage8。其中Stage6、 Stage7被跳过。?看看Stage8的AggregatedMetricsbyExecutor部分。可以看到,数据处理是 在4台机器上进行的:?Job4Job4也体现了我们应用程序中的业务逻辑,有Stage9、Stage10、Stage11。 其中Stage9、Stage10被跳过。?看看Stage11的详情。可以看到,数据处理是在Worker2之外的其它3台机器上 进行的:?综合以上的现象可以知道,SparkStreaming的一个应用中,运行了这么多Job,远不是我们表面所看的那么简单。? 我们有必要通过这些现象,反过来回溯去寻根问源。不过这次暂不做深入分析。?我们的神奇之旅才刚刚开始。?瞬间理解SparkStrea ming本质SparkStreaming图示详解?以上的连续4个图,分别对应以下4个段落的描述:?SparkStreaming 接收Kafka、Flume、HDFS和Kinesis等各种来源的实时输入数据,进行处理后,处理结果保存在HDFS、Database s等各种地方。?SparkStreaming接收这些实时输入数据流,会将它们按批次划分,然后交给Spark引擎处理,生成按照批次 划分的结果流。?SparkStreaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上 表示RDD的序列。DStream中的每个RDD都包含来自一个时间间隔的数据。?SparkStreaming除了使用数据源产生的数 据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。任何对DStream的操作都会转变为对 底层RDD的操作。本图例子是对linesDstream做了flatMap操作,生成wordsDstream操作。?在我们前面的 实验中,每300秒会接收一批数据,基于这批数据会生成RDD,进而触发Job,执行处理。?DStream是一个没有边界的集合,没有大 小的限制。?DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。?锁定到时间片后,就是空间的操作,也就是对本时间片 的对应批次的数据的处理。SparkStreaming数据处理过程?下面用实例来讲解数据处理过程。?从SparkStreamin g程序转换为Spark执行的作业的过程中,使用了DStreamGraph。SparkStreaming程序中一般会有若干个对 DStream的操作,DStreamGraph就是由这些操作的依赖关系构成。1)程序到DStreamGraph的转换?从程序到 DStreamGraph的转换,如以下图例所示:?本例中,从每个foreach开始,都会进行回溯,从后往前回溯这些操作之间的依赖 关系,也就形成了DStreamGraph。2)DStream到RDD的转换执行从DStream到RDD的转换,也就形成了RDD Graph,如下图所示:?空间维度确定之后,随着时间不断推进,会不断实例化RDDGraph,然后触发Job去执行处理。?Spar kStreaming官网概述文档SparkStreamingisanextensionofthecoreSpar kAPIthatenablesscalable,high-throughput,fault-tolerantstre amprocessingoflivedatastreams.Datacanbeingestedfromman ysourceslikeKafka,Flume,Twitter,ZeroMQ,Kinesis,orTCPsoc kets,andcanbeprocessedusingcomplexalgorithmsexpressedwit hhigh-levelfunctionslike?map,?reduce,?join?and?window.Finally ,processeddatacanbepushedouttofilesystems,databases,and livedashboards.Infact,youcanapplySpark’s?http://spark.apa che.org/docs/latest/mllib-guide.htmlmachinelearning?and?http://s park.apache.org/docs/latest/graphx-programming-guide.htmlgraphpr ocessing?algorithmsondatastreams.Internally,itworksasfollo ws.SparkStreamingreceivesliveinputdatastreamsanddivides thedataintobatches,whicharethenprocessedbytheSparkengi netogeneratethefinalstreamofresultsinbatches.SparkStrea mingprovidesahigh-levelabstractioncalled?discretizedstream? or?DStream,whichrepresentsacontinuousstreamofdata.DStream scanbecreatedeitherfrominputdatastreamsfromsourcessuch asKafka,Flume,andKinesis,orbyapplyinghigh-leveloperatio nsonotherDStreams.Internally,aDStreamisrepresentedasas equenceof?http://spark.apache.org/docs/latest/api/scala/index.ht mlRDDs.ThisguideshowsyouhowtostartwritingSparkStreaming programswithDStreams.YoucanwriteSparkStreamingprogramsinScala,JavaorPython(introducedinSpark1.2),allofwhicharepresentedinthisguide.Youwillfindtabsthroughoutthisguidethatletyouchoosebetweencodesnippetsofdifferentlanguages.?现在再去读官方的SparkStreaming的文档,就好理解多了。?看来我们的SparkStreaming学习之旅,将从Spark?Streaming的现象开始,深入到Spark?Core和Spark?Streaming的本质。?感谢王家林老师的知识分享王家林:DT大数据梦工厂创始人、Spark亚太研究院院长和首席专家、大数据培训专家、大数据架构师。新浪微博:http://weibo.com/ilovepains微信公众号:DT_Spark博客:http://blog.sina.com.cn/ilovepains手机:18610086859QQ:1740415547邮箱:18610086859@vip.126.comYY课堂:每天20:00现场授课频道68917580?Spark源码定制班第1课1/13 |
|
|
|
|
|
|
|
|
|
|