本文链接:https://blog.csdn.net/aA518189/article/details/84032660 简介 Join和coGroup都是flinkSQL中用于连接多个流的算子,但是有一定的区别,推荐能使用coGroup不要使用Join,因为coGroup更强大。下面让我们简单看一下两个算子的用法 Window Join DataStream,DataStream→DataStream 在给定密钥和公共窗口上连接两个数据流。 dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... } Window CoGroup DataStream,DataStream→DataStream 在给定密钥和公共窗口上对两个数据流进行Cogroup。 dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {} CoGrouped源码 joinedStream源码 对比结论 结论一:.使用时,where里面要传入数据流T1要与数据流T2匹配的key,equalTo中传入T2要和T1匹配的key。 结论二:join和coGroup的最大区别就是apply方法提供的参数类型不一样, join的apply coGroup的apply参数 两种算子apply方法中的参数类型不一样,join中提供的apply方法,参数是T1与T2泛型类型。而coGroup中提供的apply方法,参数是Iterator[T1]与Iterator[2]迭代器,基于这2种方式,如果我们的逻辑不仅仅是对一条record做处理,而是要与上一record或更复杂的判断与比较,甚至是对结果排序,那么join中的apply方法显得比较困难,所以推荐使用coGroup。 结论三:apply方法的好处 我们想要在Flink中实现实时的流计算,就可以通过joinedStream或coGroupedStream来实现。但是在join之后实施更复杂的运算,例如判断、迭代等,仅仅通过SQL实现,恐怕会很麻烦,只能通过PL/SQL块来实现,而Flink提供了apply方法,用户可以自己编写复杂的函数来实现。 案例 双流jion scala版 package flinkSQL import java.text.SimpleDateFormat import org.apache.flink.api.common.functions.CoGroupFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.scala._ /** * Created by ${WangZhiHua} on 2018/11/13 */ object JoinDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //获取接口传送的数据 val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt") val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt") val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") //使用样例类StockTransaction封装获取的数据 val dataStreamMap1 = dataStream1.map(f => { val tokens1 = f.split(",") StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble) }) .assignAscendingTimestamps(f => format.parse(f.tx_time).getTime) //使用样例类StockSnapshot封装获取的数据 val dataStreamMap2 = dataStream2.map(f => { val tokens2 = f.split(",") StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble) }) .assignAscendingTimestamps(f => format.parse(f.md_time).getTime) /** * 进行双流join * 限定范围是:3秒钟的Event time时间窗口 */ val joinStream = dataStreamMap1.coGroup(dataStreamMap2) .where(_.tx_code) .equalTo(_.md_code) .window(TumblingEventTimeWindows.of(Time.seconds(3))) val innerJoinStream = joinStream.apply(new InnerJoinFunction) innerJoinStream.name("innerJoin").print() print("===================== end =========================") env.execute("join demo") } } //定义样例类封装接收的数据 case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double) case class StockSnapshot(md_time:String, md_code:String,md_value:Double) //定义一个内连接函数,继承CoCroup class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{ override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = { /** * 将Java中的Iterable对象转换为Scala的Iterable * scala的集合操作效率高,简洁 */ import scala.collection.JavaConverters._ val scalaT1 = T1.asScala.toList val scalaT2 = T2.asScala.toList /** * Inner Join要比较的是同一个key下,同一个时间窗口内的数据 */ if(scalaT1.nonEmpty && scalaT2.nonEmpty){ for(transaction <- scalaT1){ for(snapshot <- scalaT2){ out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test") } } } } } class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] { override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = { /** * 将Java中的Iterable对象转换为Scala的Iterable * scala的集合操作效率高,简洁 */ import scala.collection.JavaConverters._ val scalaT1 = T1.asScala.toList val scalaT2 = T2.asScala.toList /** * Left Join要比较的是同一个key下,同一个时间窗口内的数据 */ if(scalaT1.nonEmpty && scalaT2.isEmpty){ for(transaction <- scalaT1){ out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test") } } } } class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] { override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = { /** * 将Java中的Iterable对象转换为Scala的Iterable * scala的集合操作效率高,简洁 */ import scala.collection.JavaConverters._ val scalaT1 = T1.asScala.toList val scalaT2 = T2.asScala.toList /** * Right Join要比较的是同一个key下,同一个时间窗口内的数据 */ if(scalaT1.isEmpty && scalaT2.nonEmpty){ for(snapshot <- scalaT2){ out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test") } } } } java版 public class DoubleJoin { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source1 = env.readTextFile("/Users/apple/Downloads/1.txt"); DataStreamSource<String> source2 = env.readTextFile("/Users/apple/Downloads/2.txt"); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 数据流1 */ SingleOutputStreamOperator<Row> stream1 = source1.map(new MapFunction<String, Row>() { @Override public Row map(String value) throws Exception { String[] split = value.split(","); String timeStamp = split[0]; String name = split[1]; String city = split[2]; Row row = new Row(3); row.setField(0,timeStamp); row.setField(1,name); row.setField(2,city); return row; } }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Row>() { long currentMaxTimestamp = 0L; long maxOutOfOrderness = 10000L; Watermark watermark=null; //最大允许的乱序时间是10s @Nullable @Override public Watermark getCurrentWatermark() { watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness); return watermark; } @Override public long extractTimestamp(Row element, long previousElementTimestamp) { long timeStamp = 0; try { timeStamp = simpleDateFormat.parse(element.getField(0).toString()).getDate(); } catch (ParseException e) { e.printStackTrace(); } currentMaxTimestamp = Math.max(timeStamp, currentMaxTimestamp); return timeStamp ; } } ); stream1.print(); /** * 数据流2 */ SingleOutputStreamOperator<Row> stream2 = source2.map(new MapFunction<String, Row>() { @Override public Row map(String value) throws Exception { String[] split = value.split(","); String timeStamp = split[0]; String name = split[1]; String age = split[2]; String school= split[3]; Row row = new Row(4); row.setField(0,timeStamp); row.setField(1,name); row.setField(2,age); row.setField(3,school); return row; } }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Row>() { long currentMaxTimestamp = 0L; long maxOutOfOrderness = 10000L; Watermark watermark=null; //最大允许的乱序时间是10s @Nullable @Override public Watermark getCurrentWatermark() { watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness); return watermark; } @Override public long extractTimestamp(Row element, long previousElementTimestamp) { long timeStamp = 0; try { timeStamp = simpleDateFormat.parse(element.getField(0).toString()).getDate(); } catch (ParseException e) { e.printStackTrace(); } currentMaxTimestamp = Math.max(timeStamp, currentMaxTimestamp); return timeStamp ; } }); stream2.print(); /** * 双流join */ stream1.coGroup(stream2) .where(new KeySelector<Row, String>() { @Override public String getKey(Row value) throws Exception { System.out.println("stream1"+value.toString()); return value.getField(1).toString(); } }) .equalTo(new KeySelector<Row, String>() { @Override public String getKey(Row value) throws Exception { System.out.println("stream2"+value.toString()); return value.getField(1).toString(); } }).window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new CoGroupFunction<Row, Row, Row>() { @Override public void coGroup(Iterable<Row> first, Iterable<Row> second, Collector<Row> out) throws Exception { first.forEach(t -> second.forEach(x -> { //双流join 选取需要的字段 Row row = new Row(3); Object field1 = t.getField(0); Object field2 = x.getField(1); Object field3 = x.getField(2); //使用row封装数据 row.setField(0, field1); row.setField(1, field2); row.setField(2, field3); out.collect(row); } )); System.out.println("join"+first.toString()); } }).printToErr(); try { env.execute("ddcddd"); } catch (Exception e) { e.printStackTrace(); } } } 注意:我们在测试的时候尽量使用CountTrigge去触发窗口执行,如果使用默认的EventTimeTrigger,我们还需要设置具体的时间戳,不然可能测试时出现获取不到数据的假象,其实是窗口一直没触发。 使用案例:十条数据就触发一次窗口的执行 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .trigger(CountTrigger.of(10)) ———————————————— 版权声明:本文为CSDN博主「阿华田512」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/aA518189/article/details/84032660 |
|
来自: jasonbetter > 《Spark_Flink》