1.0 前言目前Spark Streaming编程指南地址: http://spark./docs/latest/streaming-programming-guide.html
1.1 创建StreamingContext对象
1.1.1通过SparkContext创建源码如下: def this(sparkContext:SparkContext, batchDuration: Duration) = { this(sparkContext,null,batchDuration) } 第一参数为sparkContext对象,第二个参数为批次时间; 创建实例: val ssc = new StreamingContext(sc, Seconds(5))
1.1.2通过SparkConf创建源码如下: defthis(conf:SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf),null,batchDuration) } 第一参数为SparkConf对象,第二个参数为批次时间; 创建实例: valconf =newSparkConf().setAppName("StreamTest") val ssc = newStreamingContext(conf,Seconds(5))
1.1.3通过SparkConf参数创建源码如下: defthis( master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map()) = { this(StreamingContext.createNewSparkContext(master,appName, sparkHome, jars, environment), null, batchDuration) } 第一参数为需要创建SparkConf对象的详细参数,master-spark地址,appName-对象名称,sparkHome- sparkHome环境变量,jars, environment,第二个参数为批次时间; 创建实例: val ssc = newStreamingContext(“ spark://host:port”, "StreamTest", Seconds(5), System.getenv("SPARK_HOME"),StreamingContext.jarOfClass(this.getClass))
1.1.4通过checkpointfile参数创建源码如下: defthis(path:String, hadoopConf: Configuration) = this(null, CheckpointReader.read(path,new SparkConf(), hadoopConf).get,null) 第一参数为checkpoint file的路径,第二个参数为haoop的配置 源码如下: def this(path:String) = this(path,newConfiguration) 第一参数为checkpoint file的路径
1.2创建Dstream监听对象1.2.1 fileStream源码如下: /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and inputformat. * Files must be written to the monitored directory by "moving"them from another * location within the same file system. File names starting with . areignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file */ deffileStream[ K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String): InputDStream[(K, V)] = { newFileInputDStream[K, V, F](this, directory) }
/** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and inputformat. * Files must be written to the monitored directory by "moving"them from another * location within the same file system. * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignoreexisting files in the directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file */ deffileStream[ K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly:Boolean): InputDStream[(K, V)] = { newFileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } 参数:K-读入HDFS的Key的类型,V-读入HDFS的Value的类型,F-读入HDFS 的类型;directory-监听HDFS的路径,filter-对监听HDFS的文件进行过滤的函数,newFilesOnly-是否只监听新增文件; fileStream可以通过设置filter函数,对监听目录下的文件进行过滤,只对满足条件的文件进行监听和处理; 默认过滤方法: defdefaultFilter(path: Path): Boolean = !path.getName().startsWith(".") 该方法是过滤以隐藏文件。 fileStream可以通过设置newFilesOnly为TRUE或者FALES,是否处理监听目录下已存在的文件,默认是不处理已存在文件,只处理新增加文件,如果设置为FALES,可以处理前一个窗口时间内的老文件。 源码如下: privatevalinitialModTimeIgnoreThreshold =if (newFilesOnly) System.currentTimeMillis()else0L
val modTimeIgnoreThreshold = math.max( initialModTimeIgnoreThreshold, // initialthreshold based on newFilesOnly setting currentTime -durationToRemember.milliseconds // trailing end of the remember window ) modTimeIgnoreThreshold是时间窗口过滤条件,通过newFilesOnly值来取的是当前时间或者前一个窗口时间。 创建实例: // 创建新过滤函数 defmyFilter(path:Path): Boolean = path.getName().contains("data") // 创建fileStream val data1 = ssc.fileStream[LongWritable,Text, TextInputFormat](Spath1, pa => myFilter(pa),false).map(_._2.toString)
1.2.2 textFileStream源码如下: /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable,value * as Text and input format as TextInputFormat). Files must be written tothe * monitored directory by "moving" them from another locationwithin the same * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ deftextFileStream(directory: String): DStream[String] = { fileStream[LongWritable, Text,TextInputFormat](directory).map(_._2.toString) } 参数:directory监听的目录; 其实textFileStream是fileStream的一个实例。 创建实例: valStreamFile1=ssc.textFileStream(Spath1)
1.2.3 socketTextStream源码如下: /** * Create a input stream from TCP source hostname:port. Data is receivedusing * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n`delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Portto connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default:StorageLevel.MEMORY_AND_DISK_SER_2) */ defsocketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = { socketStream[String](hostname, port, SocketReceiver.bytesToLines,storageLevel) } 参数:hostname是主机IP,port是端口号,storageLevel数据的存储级别,默认2份MEMORY_AND_DISK; 创建实例: val lines = ssc.socketTextStream(serverIP, serverPort);
1.2.4 rawSocketStream源码如下: /** * Create a input stream from network source hostname:port, where data isreceived * as serialized blocks (serialized using the Spark's serializer) thatcan be directly * pushed into the block manager without deserializing them. This is themost efficient * way to receive data. * @param hostname Hostname to connect to for receiving data * @param port Portto connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default:StorageLevel.MEMORY_AND_DISK_SER_2) * @tparam T Typeof the objects in the received blocks */ defrawSocketStream[T: ClassTag]( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[T] = { newRawInputDStream[T](this, hostname, port, storageLevel) } rawSocketStream类似于socketTextStream;参照socketTextStream。
1.2.5 networkStream源码如下: /** * Create an input stream with any arbitrary user implemented receiver. * Find more details at:http://spark./docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ @deprecated("Use receiverStream","1.0.0") defnetworkStream[T: ClassTag]( receiver: Receiver[T]): ReceiverInputDStream[T] = { receiverStream(receiver) } 创建实例: 参照:http://spark./docs/latest/streaming-custom-receivers.html
1.2.6 receiverStream源码如下: /** * Create an input stream with any arbitrary user implemented receiver. * Find more details at:http://spark./docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of Receiver */ defreceiverStream[T: ClassTag]( receiver: Receiver[T]): ReceiverInputDStream[T] = { newPluggableInputDStream[T](this, receiver) } 创建实例: val StreamFile1 = ssc.receiverStream (newCustomReceiver(host, port)) 参照:http://spark./docs/latest/streaming-custom-receivers.html
1.2.7 actorStream源码如下: /** * Create an input stream with any arbitrary user implemented actorreceiver. * Find more details at:http://spark./docs/latest/streaming-custom-receivers.html * @param props Props object defining creation of the actor * @param name Name of the actor * @param storageLevel RDD storage level (default:StorageLevel.MEMORY_AND_DISK_SER_2) * * @note An important point to note: * Since Actor may exist outsidethe spark framework, It is thus user's responsibility * to ensure the type safety,i.e parametrized type of data received and actorStream * should be same. */ defactorStream[T: ClassTag]( props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy =ActorSupervisorStrategy.defaultStrategy ): ReceiverInputDStream[T] = { receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) } 创建实例: val StreamFile1 = ssc.actorStream[String](Props(newCustomActor()),"CustomReceiver") 参照:http://spark./docs/latest/streaming-custom-receivers.html
1.2.8 queueStream源码如下: /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queueof RDDs * @param oneAtATime Whether only one RDD should be consumed fromthe queue in every interval * @tparam T Type ofobjects in the RDD */ defqueueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean = true ): InputDStream[T] = { queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) }
/** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queueof RDDs * @param oneAtATime Whether only one RDD should be consumed fromthe queue in every interval * @param defaultRDD Default RDD is returned by the DStream whenthe queue is empty. * Set as null ifno RDD should be returned when empty * @tparam T Type ofobjects in the RDD */ defqueueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T] ): InputDStream[T] = { newQueueInputDStream(this, queue, oneAtATime, defaultRDD) }
1.2.9 union DStream源码如下: /** * Create a unified DStream from multiple DStreams of the same type andsame slide duration. */ defunion[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = { newUnionDStream[T](streams.toArray) } 对同一类型的DStream进行合并,生成一个新的DStream,其中要求DStream的数据格式一致,批次时间间隔一致。
1.2.10 transform DStream源码如下: /** * Create a new DStream in which each RDD is generated by applying afunction on RDDs of * the DStreams. */ deftransform[T: ClassTag]( dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[T] ): DStream[T] = { newTransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } 对Dstream进行transform操作生成一个新的Dstream。
1.3 Checkpointing状态的操作是基于多个批次的数据的。它包括基于window的操作和updateStateByKey。因为状态的操作要依赖于上一个批次的数据,所以它要根据时间,不断累积元数据。为了清空数据,它支持周期性的检查点,通过把中间结果保存到hdfs上。因为检查操作会导致保存到hdfs上的开销,所以设置这个时间间隔,要很慎重。对于小批次的数据,比如一秒的,检查操作会大大降低吞吐量。但是检查的间隔太长,会导致任务变大。通常来说,5-10秒的检查间隔时间是比较合适的。 实例: ssc.checkpoint("hdfs://192.168.1.100:9000/check") valStreamFile1=ssc.textFileStream(Spath1) StreamFile1.checkpoint(Seconds(30))
转载请注明出处: http://blog.csdn.NET/sunbow0/article/details/42966467
|
|
来自: 陈永正的图书馆 > 《Spark Streaming》