Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。读者可以使用Flink Scala Shell或者Intellij Idea来进行练习:
很多情况下,我们需要对多个数据流进行整合处理,Flink为我们提供了多流转换算子,本文主要介绍多流转换。 union在 union示意图 假设股票价格数据流来自不同的交易所,我们将其合并成一个数据流: val shenzhenStockStream: DataStream[StockPrice] = ... val hongkongStockStream: DataStream[StockPrice] = ... val shanghaiStockStream: DataStream[StockPrice] = ... val unionStockStream: DataStream[StockPrice] = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream) connect
对一个数据流进行控制处理 对于 val intStream: DataStream[Int] = senv.fromElements(1, 0, 9, 2, 3, 6) val stringStream: DataStream[String] = senv.fromElements("LOW", "HIGH", "LOW", "LOW") val connectedStream: ConnectedStreams[Int, String] = intStream.connect(stringStream) // CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出 class MyCoMapFunction extends CoMapFunction[Int, String, String] { override def map1(input1: Int): String = input1.toString override def map2(input2: String): String = input2 } val mapResult = connectedStream.map(new MyCoMapFunction) 我们知道,如果不对 // 先将两个流connect,再进行keyBy val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream .connect(mediaStatusStream) .keyBy(0,0) // 先keyBy再connect val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0) .connect(mediaStatusStream.keyBy(0)) 无论先 下面的代码展示了如何将股票价格和媒体正负面评价结合起来,当媒体评价为正且股票价格大于阈值时,输出一个正面信号。完整代码在我的github上:https://github.com/luweizheng/flink-tutorials package com.flink.tutorials.demos.stock import java.util.Calendar import com.flink.tutorials.demos.stock.StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import scala.util.Random object StockMediaConnectedDemo { def main(args: Array[String]) { // 设置执行环境 val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 每5秒生成一个Watermark env.getConfig.setAutoWatermarkInterval(5000L) // 股票价格数据流 val stockPriceRawStream: DataStream[StockPrice] = env // 该数据流由StockPriceSource类随机生成 .addSource(new StockPriceSource) // 设置 Timestamp 和 Watermark .assignTimestampsAndWatermarks(new StockPriceTimeAssigner) val mediaStatusStream: DataStream[Media] = env .addSource(new MediaSource) // 先将两个流connect,再进行keyBy val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream .connect(mediaStatusStream) .keyBy(0,0) // 先keyBy再connect val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0) .connect(mediaStatusStream.keyBy(0)) val alert1 = keyByConnect1.flatMap(new AlertFlatMap).print() val alerts2 = keyByConnect2.flatMap(new AlertFlatMap).print() // 执行程序 env.execute("connect stock price with media status") } /** 媒体评价 * * symbol 股票代号 * timestamp 时间戳 * status 评价 正面/一般/负面 */ case class Media(symbol: String, timestamp: Long, status: String) class MediaSource extends RichSourceFunction[Media]{ var isRunning: Boolean = true val rand = new Random() var stockId = 0 override def run(srcCtx: SourceContext[Media]): Unit = { while (isRunning) { // 每次从列表中随机选择一只股票 stockId = rand.nextInt(5) var status: String = "NORMAL" if (rand.nextGaussian() > 0.9) { status = "POSITIVE" } else if (rand.nextGaussian() < 0.05) { status = "NEGATIVE" } val curTime = Calendar.getInstance.getTimeInMillis srcCtx.collect(Media(stockId.toString, curTime, status)) Thread.sleep(rand.nextInt(100)) } } override def cancel(): Unit = { isRunning = false } } case class Alert(symbol: String, timestamp: Long, alert: String) class AlertFlatMap extends RichCoFlatMapFunction[StockPrice, Media, Alert] { var priceMaxThreshold: List[Double] = List(101.0d, 201.0d, 301.0d, 401.0d, 501.0d) var mediaLevel: String = "NORMAL" override def flatMap1(stock: StockPrice, collector: Collector[Alert]) : Unit = { val stockId = stock.symbol.toInt if ("POSITIVE".equals(mediaLevel) && stock.price > priceMaxThreshold(stockId)) { collector.collect(Alert(stock.symbol, stock.timestamp, "POSITIVE")) } } override def flatMap2(media: Media, collector: Collector[Alert]): Unit = { mediaLevel = media.status } } } |
|