配色: 字号:
Spark6_Spark Stream、Spark集成kafka、Spark图计算
2022-09-15 | 阅:  转:  |  分享 
  
SparkStream、Spark集成kafka、Spark图计算2018/12/12版本修改人修改记录修改时间V1.0王守奎编写2018
/5/23目录简介4SparkStreaming与Storm的区别4SparkStream基本概念4Sparkstreamin
g功能API5初始化StreamingContext5DStreams的输入和接收6streamingContext读取文件流6T
ransformationsonDStreams6UpdateStateByKey7Transform8reduceByKey
AndWindow8DStreams的输出9foreachRDD(func)9Checkpointing11部署应用程序12Spa
rkStreaming应用程序需要升级的时候有两种方式:12应用程序的监控12性能优化13数据接收中的并行性水平13数据处理中的
并行性水平14数据序列化14batchinterval调优14内存调整15sparkstreaming案例01、修改日志级别0
2、编写scala脚本03、打包,生成jar,并上传14、启动程序15、测试1SparkStream窗口操作21、编写程序22、
提交程序33、开启nc测试3Spark集成kafka31、编写pom32、创建scala文件53、mvn编译程序,生成jar包54
、启动zk和kafka65、执行6注意事项7Spark图计算(知道即可)12图的定义13图定义案例13图操作14打印图关系14上下
行程度计算14顶点的连接边数15图反转16打印定点和边16计算所有节点的下行集合16GraphBuilders(图构建器)16简
介SparkStreaming是流式处理框架,是SparkAPI的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来
源可以是:?Kafka,Flume,Twitter,ZeroMQ,Kinesis,orTCPsockets,并且可以
使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window。最终,处理后的数据可以存放在文件系统,数
据库等,方便实时展现。在内部,它的工作原理如下。SparkStreaming接收实时输入数据流并将数据分成批,然后由Spark引
擎处理,以批量生成最终结果流。SparkStreaming提供了一个高层次的抽象,称为离散流或DStream,它代表连续的数据流
。DStream可以通过Kafka,Flume和Kinesis等来源的输入数据流创建,也可以通过在其他DStream上应用高级操作
来创建。在内部,一个DStream被表示为一系列RDD。SparkStreaming与Storm的区别1、Storm是纯实时的流
式处理框架,SparkStreaming是准实时的处理框架(微批处理)。因为微批处理,SparkStreaming的吞吐量比Sto
rm要高。2、Storm的事务机制要比SparkStreaming的要完善。3、Storm支持动态资源调度。(spark1.2开
始和之后也支持)4、SparkStreaming擅长复杂的业务处理,Storm不擅长复杂的业务处理,擅长简单的汇总型计算。Spar
kStream基本概念1.Discretizedstream离散流DStream是高级抽象,逻辑上的概念,对DStream的
处理都会转换成对DStream所包含的RDD的处理DStream.flatMap(xxx)-->{RDD.flatMap()->
newRDD}-->DStream2}2.inputDStream和接收者InputDStream和receiver相
关联,从source接收数据,并存放在spark内存中3.spark内置的流来源有两种类型基本来源:在StreamingConte
xtAPI中直接可以使用的源(例如filesystem,socket,connect,akka)[FileStream]//指
定目录,监视的目录是否有新文件,不支持目录的嵌套//该目录下的文件,必须是通过自动移动或者是重命名的方式进入该目录//意味着进
入该目录的文件不能再修改valds=streamingContext.fileStream[KeyClass,ValueCl
ass,InputFormatClass](dataDirectory)高级来源:需要额外的外支撑(flume,kafka)需要有
依赖类库支持可以创建多个InputDstream,也就会有多个receiver。典型的耗时任务,要分配足够多的内存和cpuSpa
rkstreaming功能API初始化StreamingContext要初始化SparkStreaming程序,必须创建一个S
treamingContext对象,这是所有SparkStreaming功能的主要入口点。Seconds(1)为处理多长时间窗
口importorg.apache.spark._importorg.apache.spark.streaming._val
conf=newSparkConf().setAppName("RddToDf_2").setMaster("local")
valsc=newSparkContext(conf)valssc=newStreamingContext(sc,
Seconds(10))//每隔10秒统计一次字符总数初始完StreamingContext之后需要进行一下步骤:通过
创建输入DStreams来定义输入源。通过将转换和输出操作应用于DStream来定义流式计算。开始接收数据并使用它进行处理stre
amingContext.start()。等待处理停止(手动或由于任何错误)使用streamingContext.awaitTer
mination()。处理可以手动停止使用streamingContext.stop()。注意事项:一旦SparkContext已
经开始,就不能建立或添加新的流式计算。SparkContext文一旦停止,就不能重启。一个JVM中只能有一个StreamingCo
ntext同时处于活动状态。StreamingContext上的stop()也停止了SparkContext。要仅停止Stream
ingContext,请将可选参数的stop()调用设置stopSparkContext为false。只要先前的Streaming
Context在下一个StreamingContext被创建之前停止(不停止SparkContext),就可以重新使用SparkC
ontext来创建多个StreamingContext。DStreams的输入和接收SparkStreaming提供了两类内置的
流媒体源。基本数据源:filesystems,socket,Akkaactors高级数据源:Kafka,Flume,Kin
esis,Twitter,注意点Sparkworker/executor是一个长期运行的任务,因此它占用了分配给Spark
Streaming应用程序的核心之一。因此,重要的是要记住,SparkStreaming应用程序需要分配足够的内核(或线程,如
果在本地运行)来处理接收到的数据,以及运行接收器。在本地运行SparkStreaming程序时,不要使用“local”或“loc
al[1]”作为主URL。这两者中的任何一个都意味着只有一个线程将用于本地运行任务。如果您使用基于接收器的输入DStream(例
如套接字,Kafka,Flume等),则单线程将用于运行接收器,而不会处理接收到的数据。其中n>要运行的接收器的数量在群集上运行
,分配给SparkStreaming应用程序的内核数量必须多于接收器的数量。否则系统会收到数据,但无法处理。streamingC
ontext读取文件流streamingContext.fileStream[KeyClass,ValueClass,Inpu
tFormatClass](dataDirectory)SparkStreaming将监视目录dataDirectory并处理在
该目录中创建的任何文件(以不支持的嵌套目录编写的文件)。注意这些文件必须具有相同的数据格式。文件必须dataDirectory通过
原子移动或重命名到数据目录中创建。一旦移动,文件不得更改。所以如果文件被连续追加,新的数据将不会被读取Transformation
sonDStreams类似于RDD,转换允许来自输入DStream的数据被修改。DStreams支持SparkSparkR
DD上的许多转换。一些常见的如下map(func)flatMap(func)filter(func)repartition(num
Partitions)union(otherStream)count()reduce(func)countByValue()red
uceByKey(func,[numTasks])join(otherStream,[numTasks])cogroup(ot
herStream,[numTasks])transform(func)updateStateByKey(func)Update
StateByKey以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加在有新的数据信息进入或更新
时,可以让用户保持想要的任何状。使用这个功能需要完成两步:定义状态:可以是任意数据类型定义状态更新函数:用一个函数指定如何使用先前
的状态,从输入流中的新值更新状态。对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会
变得越来越大。importorg.apache.spark.storage.StorageLevelimportorg.apa
che.spark.streaming.{Seconds,StreamingContext}importorg.apache.
spark.{SparkContext,SparkConf}importorg.apache.spark.examples.s
treaming.StreamingExamplesobjectUpdateStateByKeyTest{defmain(
args:Array[String]){StreamingExamples.setStreamingLogLevels()
//创建一个本地的StreamingContext,含2个工作线程//valsparkConf=newSpark
Conf().setAppName("NetworkWordCount").setMaster("local[4]")valc
onf=newSparkConf().setAppName("RddToDf_2").setMaster("local[4]
")valsc=newSparkContext(conf)valssc=newStreamingContext
(sc,Seconds(10))//每隔10秒统计一次字符总数ssc.checkpoint("netresult")val
newUpdateFunc=(currValues:Seq[Int],prevValueState:Option[In
t])=>{//通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计
算当前批次的总和valcurrentCount=currValues.sum//已累加的值valpreviousC
ount=prevValueState.getOrElse(0)//返回累加后的结果,是一个Option[Int]类型S
ome(currentCount+previousCount)}//创建珍一个DStream,连接master:9998
vallines=ssc.socketTextStream("127.0.0.1",9998,StorageLevel.M
EMORY_AND_DISK_SER)valwords=lines.flatMap(_.split("")).map(x
=>(x,1))valwordCounts=words.updateStateByKey(newUpdateFunc
)wordCounts.print()ssc.start()//开始计算ssc.awaitTermination()//通
过手动终止计算,否则一直运行下去}}Transform该transform操作(以及它的变化transformWith)允许在D
Stream上应用任意的RDD到RDD函数。它可以用于应用任何未在DStreamAPI中公开的RDD操作。例如,将数据流中的每个
批次与其他数据集连接起来的功能不会直接暴露在DStreamAPI中。但是,您可以轻松使用transform来做到这一点。这使得非
常强大的可能性。valarr=List(("A",1),("B",2),("A",2),("B",3))va
lrddtest=sc.parallelize(arr,3)//创建珍一个DStream,连接master:9998v
allines=ssc.socketTextStream("127.0.0.1",9998,StorageLevel.ME
MORY_AND_DISK_SER)valwords=lines.flatMap(_.split("")).map(x
=>(x,1))valwordCounts=words.transform(rdd=>{rdd.join(rdd
test)})wordCounts.print()ssc.start()//开始计算ssc.awaitTerminati
on()//通过手动终止计算,否则一直运行下去reduceByKeyAndWindowSparkStreaming还提供了窗口化
的计算,允许您在滑动的数据窗口上应用变换。下图说明了这个滑动窗口windowlength:窗口的持续时间slidinginte
rval:窗口滑动间隔步长,都为窗口的倍数//创建珍一个DStream,连接master:9998vallines=ss
c.socketTextStream("127.0.0.1",9998,StorageLevel.MEMORY_AND_DISK
_SER)valwords=lines.flatMap(_.split("")).map(x=>(x,1))va
lwordCounts=words.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b
),Seconds(30),Seconds(10))wordCounts.print()ssc.start()//开始计
算ssc.awaitTermination()//通过手动终止计算,否则一直运行下去其他:window,countByWindo
w,reduceByKeyAndWindow,countByValueAndWindow道理一样DStreams的输出输出操作允许
DStream的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DS
tream转换。目前,定义了下面几种输出操作:print()saveAsObjectFiles(prefix,[suffix])
saveAsTextFiles(prefix,[suffix])saveAsHadoopFiles(prefix,[suffi
x])foreachRDD(func)foreachRDD(func)dstream.foreachRDD是一个功能强大的原型,允
许数据发送到外部系统。但是,理解如何正确有效地使用这个原语是很重要的。一些常见的错误可以避免如下。dstream.foreachR
DD{rdd=>valconnection=createNewConnection()//此时的在driver
端执行rdd.foreach{record=>connection.send(record)//此时在worker端
执行}}从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初
始化错误(连接对象应该在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。dstream.foreach
RDD{rdd=>rdd.foreach{record=>valconnection=createNewCo
nnection()connection.send(record)connection.close()}}通常情况下,创建一
个连接对象有时间和资源的开销。因此,为每个记录创建和销毁连接对象可能会产生不必要的高开销,并且会显着降低系统的整体吞吐量。更好的解
决方案是使用rdd.foreachPartition-创建单个连接对象,并使用该连接发送RDD分区中的所有记录。dstream
.foreachRDD{rdd=>rdd.foreachPartition{partitionOfRecords=>
valconnection=createNewConnection()partitionOfRecords.foreac
h(record=>connection.send(record))connection.close()}}最后,通过跨多
个RDD/批次重用连接对象,可以进一步优化这个功能。我们可以维护一个静态的连接对象池,这个连接对象可以被重用,因为多个批处理的R
DD被推送到外部系统,从而进一步降低了开销。dstream.foreachRDD{rdd=>rdd.foreachPart
ition{partitionOfRecords=>//ConnectionPoolisastatic,lazi
lyinitializedpoolofconnectionsvalconnection=ConnectionPoo
l.getConnection()partitionOfRecords.foreach(record=>connection
.send(record))ConnectionPool.returnConnection(connection)//re
turntothepoolforfuturereuse}}将DStream转化为DataFrame和SQL操作//
ConvertRDDsofthewordsDStreamtoDataFrameandrunSQLquery
words.foreachRDD((rdd:RDD[String],time:Time)=>{//Getthes
ingletoninstanceofSQLContextvalsqlContext=SQLContextSingle
ton.getInstance(rdd.sparkContext)importsqlContext.implicits._/
/ConvertRDD[String]toRDD[caseclass]toDataFramevalwordsDa
taFrame=rdd.map(w=>Record(w)).toDF()//Registerastablewor
dsDataFrame.registerTempTable("words")//Dowordcountontable
usingSQLandprintitvalwordCountsDataFrame=sqlContext.sql("
selectword,count()astotalfromwordsgroupbyword")println
(s"=========$time=========")wordCountsDataFrame.show()})Check
pointing流媒体应用程序必须全天候运行,因此必须对与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)具有恢复能力。为了
做到这一点,SparkStreaming需要检查点足够的信息到容错存储系统,以便从故障中恢复。有两种类型的检查点数据。//Fu
nctiontocreateandsetupanewStreamingContextdeffunctionToCr
eateContext():StreamingContext={valssc=newStreamingContex
t(...)//newcontextvallines=ssc.socketTextStream(...)//
createDStreams...ssc.checkpoint(checkpointDirectory)//set
checkpointdirectoryssc}//GetStreamingContextfromcheckpoint
dataorcreateanewonevalcontext=StreamingContext.getOrCreat
e(checkpointDirectory,functionToCreateContext_)//Doadditional
setuponcontextthatneedstobedone,//irrespectiveofwhethe
ritisbeingstartedorrestartedcontext....//Startthecontex
tcontext.start()context.awaitTermination()状态的操作是基于多个批次的数据的。它包括基于w
indow的操作和updateStateByKey。因为状态的操作要依赖于上一个批次的数据,所以它要根据时间,不断累积元数据。为了
清空数据,它支持周期性的检查点,通过把中间结果保存到hdfs上。因为检查操作会导致保存到hdfs上的开销,所以设置这个时间间隔,要
很慎重。对于小批次的数据,比如一秒的,检查操作会大大降低吞吐量。但是检查的间隔太长,会导致任务变大。通常来说,5-10秒的检查间隔
时间是比较合适的ssc.checkpoint(hdfsPath)//设置检查点的保存位置dstream.checkpoint(
checkpointInterval)//设置检查点间隔对于必须设置检查点的Dstream,比如通过updateStateBy
Key和reduceByKeyAndWindow创建的Dstream,默认设置是至少10秒。部署应用程序必要条件:为执行程序配置足
够的内存配置检查点配置应用程序驱动程序的自动重新启动为保证容错性可以设置spark.streaming.receiver.writ
eAheadLog.enable来启用true,但是带来的是系统开销,可以通过增大并行的receive来增加吞吐量设置最大接收速率
-如果群集资源不足以使流式应用程序处理数据的速度与接收速度一样快有三种方式:spark.streaming.receiver.
maxRatespark.streaming.kafka.maxRatePerPartitionDirectKafkaspar
k.streaming.backpressure.enabledtrue//自动配置SparkStreaming应用程序需要
升级的时候有两种方式:将要升级的应用程序和就旧的应用程序并行,直到新的应用程序接收到和老的应用程序一样多时,就停止老的应用程序如果
数据原是flume,kafka,则可以调用StreamingContext.stop(…),关闭,在开启新的应用程序,flume,
kafka会帮助我们缓存没有处理的数据应用程序的监控在SparkWebUI将显示一个附加的Streaming选项卡,其中显示有
关正在运行的接收方(接收方是否处于活动状态,接收记录数,接收方错误等)和已完成批次(批处理时间,排队延迟等)的统计信息)。这可以用
来监视流应用程序的进度。其中有两个重要的指标:处理时间-处理每批数据的时间。计划延迟-批次在队列中等待处理先前批次的时间。
另外,SparkStreaming程序的进度也可以使用StreamingListener接口进行监控,该接口允许您获取接收器状
态和处理时间。性能优化性能优化的目标有两个:通过有效利用集群资源,减少每批数据的处理时间。设置正确的批处理大小,以便可以按照接收到
的数据批次处理数据(也就是说,数据处理跟上数据读取)。数据接收中的并行性水平1、通过网络接收数据(如Kafka,Flume,套接字
等)需要将数据反序列化并存储在Spark中。如果数据接收成为系统中的瓶颈,则考虑并行化数据接收。请注意,每个输入DStream都会
创建一个接收器(运行在工作器上)接收单个数据流。因此,接收多个数据流可以通过创建多个输入DS流并配置它们以接收来自源的数据流的不同
分区来实现。例如,接收两个数据主题的单个Kafka输入DStream可以分成两个Kafka输入流,每个只接收一个主题。这将运行两个
接收器,允许并行接收数据,从而提高整体吞吐量。这些多个DStream可以结合在一起创建一个DStream。然后,可以在统一流上应用
在单个输入DStream上应用的转换。这如下完成。valnumStreams=5valkafkaStreams=(1
tonumStreams).map{i=>KafkaUtils.createStream(...)}valunifi
edStream=streamingContext.union(kafkaStreams)unifiedStream.prin
t()2、数据接收并行度调优,除了创建更多输入DStream和Receiver以外,还可以考虑调节blockinterval。通
过参数,spark.streaming.blockInterval,可以设置blockinterval,默认是200ms。对于大
多数Receiver来说,在将接收到的数据保存到Spark的BlockManager之前,都会将数据切分为一个一个的block。而
每个batch中的block数量,则决定了该batch对应的RDD的partition的数量,以及针对该RDD执行transfor
mation操作时,创建的task的数量。每个batch对应的task数量是大约估计的,即batchinterval/blo
ckinterval。例如说,batchinterval为2s,blockinterval为200ms,会创建10个task
。如果你认为每个batch的task数量太少,即低于每台机器的cpucore数量,那么就说明batch的task数量是不够的,因
为所有的cpu资源无法完全被利用起来。要为batch增加block的数量,那么就减小blockinterval。然而,推荐的bl
ockinterval最小值是50ms,如果低于这个数值,那么大量task的启动时间,可能会变成一个性能开销点。3、除了以上两种
情况还可以显式地对输入数据流进行重分区。使用inputStream.repartition()即可。这样就可以将接收到的batch
,分布到指定数量的机器上,然后再进行进一步的操作。数据处理中的并行性水平如果在计算的任何阶段使用的并行任务数量不够高,则群集资源可
能未被充分利用。例如,对于分布式减少操作,如reduceByKey和reduceByKeyAndWindow,可以传入numPa
rtitions:Int增大task的数量实现,也可以通过spark.default.parallelism设置全局的默认值数据
序列化数据序列化的开销可以通过调整序列化格式来减少。,在流式计算的场景下,有两种类型的数据需要序列化。1、输入数据:默认情况下,接
收到的输入数据,是存储在Executor的内存中的,使用的持久化级别是StorageLevel.MEMORY_AND_DISK_S
ER_2。这意味着,数据被序列化为字节从而减小GC开销,并且会复制以进行executor失败的容错。因此,数据首先会存储在内存中,
然后在内存不足时会溢写到磁盘上,从而为流式计算来保存所有需要的数据。这里的序列化有明显的性能开销——Receiver必须反序列化从
网络接收到的数据,然后再使用Spark的序列化格式序列化数据。2、流式计算操作生成的持久化RDD:流式计算操作生成的持久化RDD,
可能会持久化到内存中。例如,窗口操作默认就会将数据持久化在内存中,因为这些数据后面可能会在多个窗口中被使用,并被处理多次。然而,不
像SparkCore的默认持久化级别,StorageLevel.MEMORY_ONLY,流式计算操作生成的RDD的默认持久化级别
是StorageLevel.MEMORY_ONLY_SER,默认就会减小GC开销。在上述的场景中,使用Kryo序列化类库可以减小
CPU和内存的性能开销。batchinterval调优对于在群集上运行的SparkStreaming应用程序要稳定,系统应该能
够像接收数据一样快速地处理数据。换句话说,数据批处理应该像生成一样快。通过监视流式webUI中的批处理时间应该小于批处理间隔的处
理时间,可以找到对应用程序是否为真。根据流式计算的性质,使用的批处理间隔可能会对应用程序在固定的一组群集资源上可以维持的数据速率
产生重大影响。例如,让我们考虑一下前面的WordCountNetwork示例。对于特定的数据速率,系统可以跟上每2秒(即,2秒的间
隔时间)报告字数,但不是每500毫秒。所以需要设置批处理间隔,以保证生产中的预期数据速率。为你的应用计算正确的batch大小的比较
好的方法,是在一个很保守的batchinterval,比如5~10s,以很慢的数据接收速率进行测试。要检查应用是否跟得上这个数据
速率,可以检查每个batch的处理时间的延迟,如果处理时间与batchinterval基本吻合,那么应用就是稳定的。否则,如果b
atch调度的延迟持续增长,那么就意味应用无法跟得上这个速率,也就是不稳定的。因此你要想有一个稳定的配置,可以尝试提升数据处理的速
度,或者增加batchinterval。记住,由于临时性的数据增长导致的暂时的延迟增长,可以合理的,只要延迟情况可以在短时间内恢
复即可。内存调整park应用的内存使用SparkStreaming应用需要的集群内存资源,是由使用的transformation
操作类型决定的。举例来说,如果想要使用一个窗口长度为10分钟的window操作,那么集群就必须有足够的内存来保存10分钟内的数据。
如果想要使用updateStateByKey来维护许多key的state,那么你的内存资源就必须足够大。反过来说,如果想要做一个简
单的map-filter-store操作,那么需要使用的内存就很少。通常来说,通过Receiver接收到的数据,会使用Storag
eLevel.MEMORY_AND_DISK_SER_2持久化级别来进行存储,因此无法保存在内存中的数据会溢写到磁盘上。而溢写到磁
盘上,是会降低应用的性能的。因此,通常是建议为应用提供它需要的足够的内存资源。建议在一个小规模的场景下测试内存的使用量,并进行评估
。垃圾回收内存调优的另外一个方面是垃圾回收。对于流式应用来说,如果要获得低延迟,肯定不想要有因为JVM垃圾回收导致的长时间延迟。有
很多参数可以帮助降低内存使用和GC开销:1.DStream的持久性级别:如前面在“数据序列化”部分中所提到的,输入数据和RDD默
认保持为序列化字节。与反序列化的持久性相比,这可以减少内存使用和GC开销。启用Kryo序列化进一步减少了序列化的大小和内存使用情况
。压缩(见Spark配置spark.rdd.compress)可以进一步减少内存使用量,代价是CPU时间。2.清除旧数据:默认情况
下,自动清除所有由DStream转换生成的输入数据和持久RDD。SparkStreaming根据所使用的转换决定何时清除数据。例
如,如果您正在使用10分钟的窗口操作,则SparkStreaming将保留最近10分钟的数据,并主动丢弃较旧的数据。通过设置,数
据可以保留更长的时间(例如交互式查询旧数据)streamingContext.remember。3、CMS垃圾回收器:使用并行的m
ark-sweep垃圾回收机制,被推荐使用,用来保持GC低开销。虽然并行的GC会降低吞吐量,但是还是建议使用它,来减少batch的
处理时间(降低处理过程中的gc开销)。在spark-submit中使用–driver-java-options设置;使用spark
.executor.extraJavaOptions参数设置。-XX:+UseConcMarkSweepGC。sparkstre
aming案例1、修改日志级别/soft/sark/conf/log4j.propertieslog4j.rootCategory
=WARN,console2、编写scala脚本[stream.scala]packagecom.bm.sparkimport
org.apache.spark.SparkConfimportorg.apache.hadoop.io.LongWritabl
eimportorg.apache.spark.streaming.StreamingContextimportorg.apa
che.hadoop.fs.Pathimportorg.apache.hadoop.mapreduce.lib.input.Te
xtInputFormatimportorg.apache.hadoop.io.Textimportorg.apache.sp
ark.streaming.Secondsobjectstream{defmain(args:Array[String]
){valconf=newSparkConf().setMaster("local[4]").setAppName("
NetworkWordCount")valssc=newStreamingContext(conf,Seconds(5
))vallines=ssc.fileStream[LongWritable,Text,TextInputFormat
]("file:///home/hadoop/spark/xxx",(_:Path)=>{true},true)valword
s=lines.flatMap(x=>x._2.toString().split(""))valpairs=wo
rds.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+
_)wordCounts.print()ssc.start()ssc.awaitTermination()}}3、打包,
生成jar,并上传4、启动程序>spark-submit--masterlocal[4]--classcom.bm.spa
rk.streambigData-Spark-0.0.1-SNAPSHOT.jar5、测试在/home/hadoop/spark
/xxx文件夹下加入文件,观察流读取文件内容,如下图所示:SparkStream窗口操作Spark流的处理不是连续进行的,是分成
时间片进行的,如下图所示:1、编写程序packagecom.bm.sparkimportorg.apache.spark.st
reaming.StreamingContextimportorg.apache.spark.SparkConfimporto
rg.apache.spark.streaming.SecondsobjectMySparkStreamWindow{def
main(args:Array[String]){valconf=newSparkConf().setMaster
("local[4]").setAppName("NetworkWordCount")valssc=newStreami
ngContext(conf,Seconds(1))vallines=ssc.socketTextStream("loc
alhost",9999)valwords=lines.flatMap(x=>x.split(""))impor
torg.apache.spark.streaming.StreamingContext._valpairs=words
.map(word=>(word,1))valwindowedWordCounts=pairs.reduceByKe
yAndWindow((a:Int,b:Int)=>{print("a="+a+":b="+b);a+b},Seco
nds(3),Seconds(2))valwordCounts=pairs.reduceByKey(_+_)wor
dCounts.print()ssc.start()ssc.awaitTermination()}}2、提交程序打成bigD
ata-Spark-0.0.1-SNAPSHOT.jar(含有编译需要的文件),然后运行下面程序进行提交>spark-submit
--masterlocal[4]--classcom.bm.spark.MySparkStreamWindowbigDa
ta-Spark-0.0.1-SNAPSHOT.jar3、开启nc测试输出如下图所示:Spark集成kafka实现kafkaco
nsumer接口,接收的数据存放在executor中,然后spark再处理数据如果要做到无数据丢失,可以启用WAL,将数据保存在h
dfs,失败时再重新恢复集成过程如下:1、编写pom/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://mav
en.apache.org/xsd/maven-4.0.0.xsd">4.0.0sion>com.bmbigData-SparkctId>0.0.1-SNAPSHOTld.sourceEncoding>UTF-8es>junit>junit3.8.1provided>org.apache.sparkifactId>spark-core_2.112.1.1e>providedorg.apache.sp
ark
spark-hive_2.112.1
.1
provided
pId>org.apache.sparkspark-streaming-kafka_2
.11
1.6.3

org.apache.sparkspark-streaming_2.
11
2.1.1
<
groupId>org.apache.sparkspark-graphx_2.11artifactId>2.1.1
pId>jdk.toolsjdk.tools>1.8system{JAVA_HOME}/lib/to
ols.jar
2、创建sca
la文件[MySparkKafkaDemo.scala]packagecom.bm.sparkimportorg.apache
.spark.streaming.StreamingContextimportorg.apache.spark.SparkCon
fimportorg.apache.spark.streaming.kafka.KafkaUtilsimportorg.apa
che.spark.streaming.Secondsimportorg.apache.spark.streaming.Minu
tesobjectMySparkKafkaDemo{defmain(args:Array[String]){val
Array(zkQuorum,group,topics,numThreads)=Array("datanode1:218
1","g1","words_topic","2")valsparkConf=newSparkConf().setApp
Name("KafkaWordCount")valssc=newStreamingContext(sparkConf,
Seconds(2))ssc.checkpoint("checkpoint")valtopicMap=topics.sp
lit(",").map((_,numThreads.toInt)).toMapvallines=KafkaUtils.
createStream(ssc,zkQuorum,group,topicMap).map(_._2)valwords
=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,
1L)).reduceByKeyAndWindow(_+_,_-_,Minutes(10),Seconds(2),
2)wordCounts.print()ssc.start()ssc.awaitTermination()}}3、mvn
编译程序,生成jar包bigdata-spark-2.1.1-0.0.1-SNAPSHOT.jar//该包包含pom中提供的文
件4、启动zk和kafka>xcall.shzkServer.shstart//启动zk>sshdatanode1
kafka-server-start.sh/soft/kafka/config/server.properties&>ssh
datanode2kafka-server-start.sh/soft/kafka/config/server.propert
ies&>sshdatanode3kafka-server-start.sh/soft/kafka/config/serv
er.properties&>kafka-topics.sh--create--zookeeperdatanode1:21
81,datanode2:2181,datanode3:2181--replication-factor1--partiti
ons2--topicwords_topic>kafka-console-producer.sh--broker-list
datanode1:9092,datanode2:9092,datanode3:9092--topicwords_topic
>kafka-console-consumer.sh--zookeeperdatanode1:2181--from-begi
nning--topicwords_topic//仅供测试5、执行>start-all.sh>spark-submit-
-masterlocal[4]--classcom.bm.spark.MySparkKafkaDemobigData-Sp
ark-0.0.1-SNAPSHOT.jar注意事项1、高版本的spark没有org.apache.spark.Logging文件
,从低版本jar包中找到该文件后,在工程中建立org.apache.spark包和Logging文件后,把内容加入该文件,如下:/
LicensedtotheApacheSoftwareFoundation(ASF)underoneo
rmorecontributorlicenseagreements.SeetheNOTICEfiledi
stributedwiththisworkforadditionalinformationregarding
copyrightownership.TheASFlicensesthisfiletoYouundert
heApacheLicense,Version2.0(the"License");youmaynotus
ethisfileexceptincompliancewiththeLicense.Youmayob
tainacopyoftheLicenseathttp://www.apache.org/lice
nses/LICENSE-2.0Unlessrequiredbyapplicablelaworagree
dtoinwriting,softwaredistributedundertheLicenseisdis
tributedonan"ASIS"BASIS,WITHOUTWARRANTIESORCONDITIONS
OFANYKIND,eitherexpressorimplied.SeetheLicensefort
hespecificlanguagegoverningpermissionsandlimitationsund
ertheLicense./packageorg.apache.sparkimportorg.apache
.log4j.{LogManager,PropertyConfigurator}importorg.slf4j.{Logg
er,LoggerFactory}importorg.slf4j.impl.StaticLoggerBinderim
portorg.apache.spark.annotation.DeveloperApiimportorg.apache.
spark.util.Utils/::DeveloperApi::Utilitytraitfor
classesthatwanttologdata.CreatesaSLF4Jloggerforthecl
assandallowsloggingmessagesatdifferentlevelsusingmeth
odsthatonlyevaluateparameterslazilyiftheloglevelise
nabled.NOTE:DONOTUSEthisclassoutsideofSpark.Itis
intendedasaninternalutility.Thiswilllikelybech
angedorremovedinfuturereleases./@DeveloperApitraitLogg
ing{//MakethelogfieldtransientsothatobjectswithLogg
ingcan//beserializedandusedonanothermachine@transie
ntprivatevarlog_:Logger=null//Methodtogetthelogger
nameforthisobjectprotecteddeflogName={//Ignoretra
iling''sintheclassnamesforScalaobjectsthis.getClass.get
Name.stripSuffix("")}//Methodtogetorcreatethelogger
forthisobjectprotecteddeflog:Logger={if(log_==nu
ll){initializeIfNecessary()log_=LoggerFactory.getLogger(
logName)}log_}//LogmethodsthattakeonlyaString
protecteddeflogInfo(msg:=>String){if(log.isInfoEnabled
)log.info(msg)}protecteddeflogDebug(msg:=>String){
if(log.isDebugEnabled)log.debug(msg)}protecteddeflogT
race(msg:=>String){if(log.isTraceEnabled)log.trace(msg)
}protecteddeflogWarning(msg:=>String){if(log.isWarn
Enabled)log.warn(msg)}protecteddeflogError(msg:=>Stri
ng){if(log.isErrorEnabled)log.error(msg)}//Logmeth
odsthattakeThrowables(Exceptions/Errors)tooprotecteddef
logInfo(msg:=>String,throwable:Throwable){if(log.isInfoE
nabled)log.info(msg,throwable)}protecteddeflogDebug(ms
g:=>String,throwable:Throwable){if(log.isDebugEnabled)l
og.debug(msg,throwable)}protecteddeflogTrace(msg:=>St
ring,throwable:Throwable){if(log.isTraceEnabled)log.trace
(msg,throwable)}protecteddeflogWarning(msg:=>String,
throwable:Throwable){if(log.isWarnEnabled)log.warn(msg,th
rowable)}protecteddeflogError(msg:=>String,throwable:
Throwable){if(log.isErrorEnabled)log.error(msg,throwable)
}protecteddefisTraceEnabled():Boolean={log.isTrace
Enabled}privatedefinitializeIfNecessary(){if(!Loggi
ng.initialized){Logging.initLock.synchronized{if(!Loggin
g.initialized){initializeLogging()}}}}private
definitializeLogging(){//Don''tusealoggerinhere,asth
isisitselfoccurringduringinitializationofalogger//If
Log4j1.2isbeingused,butisnotinitialized,loadadefaultp
ropertiesfilevalbinderClass=StaticLoggerBinder.getSingleto
n.getLoggerFactoryClassStr//Thisdistinguishesthelog4j1.2
binding,currently//org.slf4j.impl.Log4jLoggerFactory,fromt
helog4j2.0binding,currently//org.apache.logging.slf4j.Log
4jLoggerFactoryvalusingLog4j12="org.slf4j.impl.Log4jLoggerF
actory".equals(binderClass)lazyvalisInInterpreter:Boolean
={try{valinterpClass=classForName("org.apache.spark.re
pl.Main")interpClass.getMethod("interp").invoke(null)!=null
}catch{case_:ClassNotFoundException=>false}}def
classForName(className:String):Class[_]={Class.forName(cl
assName,true,getContextOrSparkClassLoader)//scalastyle:onc
lassforname}defgetContextOrSparkClassLoader:ClassLoader=
Option(Thread.currentThread().getContextClassLoader).getOrElse(
getSparkClassLoader)defgetSparkClassLoader:ClassLoader=get
Class.getClassLoaderif(usingLog4j12){vallog4j12Initiali
zed=LogManager.getRootLogger.getAllAppenders.hasMoreElements
if(!log4j12Initialized){//scalastyle:offprintlnif(isInI
nterpreter){valreplDefaultLogProps="org/apache/spark/log4j
-defaults-repl.properties"Option(Utils.getSparkClassLoader.getR
esource(replDefaultLogProps))match{caseSome(url)=>Proper
tyConfigurator.configure(url)System.err.println(s"UsingSpark''
srepllog4jprofile:replDefaultLogProps")System.err.println(
"Toadjustlogginglevelusesc.setLogLevel(\"INFO\")")caseNo
ne=>System.err.println(s"SparkwasunabletoloadreplDefaultL
ogProps")}}else{valdefaultLogProps="org/apache/spar
k/log4j-defaults.properties"Option(Utils.getSparkClassLoader.ge
tResource(defaultLogProps))match{caseSome(url)=>Property
Configurator.configure(url)System.err.println(s"UsingSpark''s
defaultlog4jprofile:defaultLogProps")caseNone=>System.e
rr.println(s"SparkwasunabletoloaddefaultLogProps")}}
//scalastyle:onprintln}}Logging.initialized=true/
/Forceacallintoslf4jtoinitializeit.Avoidsthishappening
frommultiplethreads//andtriggeringthis:http://mailman.q
os.ch/pipermail/slf4j-dev/2010-April/002956.htmllog}}p
rivateobjectLogging{@volatileprivatevarinitialized=fal
sevalinitLock=newObject()try{//Weusereflectionhe
retohandlethecasewhereusersremovethe//slf4j-to-julbr
idgeordertoroutetheirlogstoJUL.valbridgeClass=Utils.
classForName("org.slf4j.bridge.SLF4JBridgeHandler")bridgeClass
.getMethod("removeHandlersForRootLogger").invoke(null)valinst
alled=bridgeClass.getMethod("isInstalled").invoke(null).asInsta
nceOf[Boolean]if(!installed){bridgeClass.getMethod("insta
ll").invoke(null)}}catch{casee:ClassNotFoundExceptio
n=>//can''tloganythingyetsojustfailsilently}}2、由于
连接超时报错,需要多等一会儿Spark图计算(知道即可)图计算也扩展了rdd图由顶点和边构成graphx包含了一系列图形算法的集合
和图分析任务图的定义Vertex:顶点,每个顶点有一个id,是一个64位长整数edge:和两个顶点关联,一个起点,一个终点图定义案
例scala>importorg.apache.spark._scala>importorg.apache.spark.str
eaming._scala>importorg.apache.spark.streaming.StreamingContext._scala>importorg.apache.hadoop.io.LongWritablescala>importorg.apache.hadoop.io.Textscala>importorg.apache.hadoop.fs.Pathscala>importorg.apache.hadoop.mapreduce.lib.input.TextInputFormatscala>importorg.apache.spark.graphx.Graphscala>importorg.apache.spark.rdd.RDDscala>importorg.apache.spark.graphx.Edge//user和顶点关联scala>valusers:RDD[(Long,(String,String))]=sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof"))))//rdd和边关联scala>valrelationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi")))scala>valdefaultUser=("JohnDoe","Missing")scala>valgraph=Graph(users,relationships,defaultUser)图操作打印图关系一个Triplets由一个边和两个顶点组成,如下图:scala>valfacts:RDD[String]=graph.triplets.map(triplet=>triplet.srcAttr._1+"isthe"+triplet.attr+"of"+triplet.dstAttr._1)scala>facts.collect.foreach(println(_))上下行程度计算scala>valxx=graph.inDegrees//计算图中所有顶点的下行程度scala>xx.countscala>xx.map(x=>println(x)).collectscala>valxx=graph.outDegrees//计算图中所有顶点的上行程度scala>xx.countscala>xx.map(x=>println(x)).collect顶点的连接边数scala>valg=graphscala>g.numVerticesscala>g.numEdgesscala>g.degrees.countscala>g.degrees.map(println).collect//打印每个顶点的连接边数(上行+下行)图反转scala>valrr=g.reverse//图反转,上行和下行对调scala>rr.triplets.collect打印定点和边scala>valg2=g.mapVertices((id,x)=>{println(id+":"+x);x})scala>g2.edges.collect计算所有节点的下行集合scala>importorg.apache.spark.graphx.EdgeDirectionscala>valx=graph.collectNeighborIds(EdgeDirection.In)//计算所有节点的下行集合scala>x.collectGraphBuilders(图构建器)1、编辑数据[g.txt]2141122、加载数据scala>importorg.apache.spark.graphx.GraphLoaderscala>valgg=GraphLoader.edgeListFile(sc,"file:///home/hadoop/spark/g.txt")scala>gg.edges.collect魁魁语录:少壮不经勤学苦老来方悔读书迟魁魁语录:少壮不经勤学苦老来方悔读书迟江湖一哥版权所有江湖一哥版权所有
献花(0)
+1
(本文系王守奎的图...原创)