分享

flink实战——双流join之Join和coGroup的区别和应用

 jasonbetter 2019-09-04

本文链接:https://blog.csdn.net/aA518189/article/details/84032660

简介

Join和coGroup都是flinkSQL中用于连接多个流的算子,但是有一定的区别,推荐能使用coGroup不要使用Join,因为coGroup更强大。下面让我们简单看一下两个算子的用法

Window Join

DataStream,DataStream→DataStream

在给定密钥和公共窗口上连接两个数据流。

dataStream.join(otherStream)

    .where(<key selector>).equalTo(<key selector>)

    .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    .apply { ... }

Window CoGroup

DataStream,DataStream→DataStream

在给定密钥和公共窗口上对两个数据流进行Cogroup。

dataStream.coGroup(otherStream)

    .where(0).equalTo(1)

    .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    .apply {}

 CoGrouped源码

joinedStream源码

对比结论

结论一:.使用时,where里面要传入数据流T1要与数据流T2匹配的key,equalTo中传入T2要和T1匹配的key。

结论二:join和coGroup的最大区别就是apply方法提供的参数类型不一样,

join的apply

coGroup的apply参数

两种算子apply方法中的参数类型不一样,join中提供的apply方法,参数是T1与T2泛型类型。而coGroup中提供的apply方法,参数是Iterator[T1]与Iterator[2]迭代器,基于这2种方式,如果我们的逻辑不仅仅是对一条record做处理,而是要与上一record或更复杂的判断与比较,甚至是对结果排序,那么join中的apply方法显得比较困难,所以推荐使用coGroup。 

结论三:apply方法的好处

我们想要在Flink中实现实时的流计算,就可以通过joinedStream或coGroupedStream来实现。但是在join之后实施更复杂的运算,例如判断、迭代等,仅仅通过SQL实现,恐怕会很麻烦,只能通过PL/SQL块来实现,而Flink提供了apply方法,用户可以自己编写复杂的函数来实现。

案例 双流jion

scala版

package flinkSQL

import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.CoGroupFunction

import org.apache.flink.streaming.api.TimeCharacteristic

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.util.Collector

import org.apache.flink.api.scala._

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.table.api.scala._

/**

  * Created by  ${WangZhiHua} on 2018/11/13

  */

object JoinDemo {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //获取接口传送的数据

    val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")

    val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")

    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

   //使用样例类StockTransaction封装获取的数据

    val dataStreamMap1 = dataStream1.map(f => {

      val tokens1 = f.split(",")

      StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)

    })

      .assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)

    //使用样例类StockSnapshot封装获取的数据

    val dataStreamMap2 = dataStream2.map(f => {

      val tokens2 = f.split(",")

      StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)

    })

      .assignAscendingTimestamps(f => format.parse(f.md_time).getTime)

    /**

      * 进行双流join

      * 限定范围是:3秒钟的Event time时间窗口

      */

    val joinStream = dataStreamMap1.coGroup(dataStreamMap2)

      .where(_.tx_code)

      .equalTo(_.md_code)

      .window(TumblingEventTimeWindows.of(Time.seconds(3)))

      val innerJoinStream = joinStream.apply(new InnerJoinFunction)

     innerJoinStream.name("innerJoin").print()

    print("===================== end =========================")

    env.execute("join demo")

  }

}

//定义样例类封装接收的数据

case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)

case class StockSnapshot(md_time:String, md_code:String,md_value:Double)

//定义一个内连接函数,继承CoCroup

class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{

  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {

    /**

      * 将Java中的Iterable对象转换为Scala的Iterable

      * scala的集合操作效率高,简洁

      */

    import scala.collection.JavaConverters._

    val scalaT1 = T1.asScala.toList

    val scalaT2 = T2.asScala.toList

    /**

      * Inner Join要比较的是同一个key下,同一个时间窗口内的数据

      */

    if(scalaT1.nonEmpty && scalaT2.nonEmpty){

      for(transaction <- scalaT1){

        for(snapshot <- scalaT2){

          out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")

        }

      }

    }

  }

}

class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {

  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {

    /**

      * 将Java中的Iterable对象转换为Scala的Iterable

      * scala的集合操作效率高,简洁

      */

    import scala.collection.JavaConverters._

    val scalaT1 = T1.asScala.toList

    val scalaT2 = T2.asScala.toList

    /**

      * Left Join要比较的是同一个key下,同一个时间窗口内的数据

      */

    if(scalaT1.nonEmpty && scalaT2.isEmpty){

      for(transaction <- scalaT1){

        out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")

      }

    }

  }

}

class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {

  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {

    /**

      * 将Java中的Iterable对象转换为Scala的Iterable

      * scala的集合操作效率高,简洁

      */

    import scala.collection.JavaConverters._

    val scalaT1 = T1.asScala.toList

    val scalaT2 = T2.asScala.toList

    /**

      * Right Join要比较的是同一个key下,同一个时间窗口内的数据

      */

    if(scalaT1.isEmpty && scalaT2.nonEmpty){

      for(snapshot <- scalaT2){

        out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")

      }

    }

  }

}

java版

public class DoubleJoin {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source1 = env.readTextFile("/Users/apple/Downloads/1.txt");

        DataStreamSource<String> source2 = env.readTextFile("/Users/apple/Downloads/2.txt");

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        /**

         * 数据流1

         */

        SingleOutputStreamOperator<Row> stream1 = source1.map(new MapFunction<String, Row>() {

            @Override

            public Row map(String value) throws Exception {

                String[] split = value.split(",");

                String timeStamp = split[0];

                String name = split[1];

                String city = split[2];

                Row row = new Row(3);

                row.setField(0,timeStamp);

                row.setField(1,name);

                row.setField(2,city);

                return row;

            }

        }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Row>() {

             long  currentMaxTimestamp = 0L;

             long  maxOutOfOrderness = 10000L;

             Watermark watermark=null;

            //最大允许的乱序时间是10s

             @Nullable

             @Override

             public Watermark getCurrentWatermark() {

                watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);

                 return watermark;

             }

             @Override

             public long extractTimestamp(Row element, long previousElementTimestamp) {

                 long timeStamp = 0;

                 try {

                     timeStamp = simpleDateFormat.parse(element.getField(0).toString()).getDate();

                 } catch (ParseException e) {

                     e.printStackTrace();

                 }

                 currentMaxTimestamp = Math.max(timeStamp, currentMaxTimestamp);

                     return timeStamp ;

             }

         }

        );

        stream1.print();

        /**

         * 数据流2

         */

        SingleOutputStreamOperator<Row> stream2 = source2.map(new MapFunction<String, Row>() {

            @Override

            public Row map(String value) throws Exception {

                String[] split = value.split(",");

                String timeStamp = split[0];

                String name = split[1];

                String age = split[2];

                String school= split[3];

                Row row = new Row(4);

                row.setField(0,timeStamp);

                row.setField(1,name);

                row.setField(2,age);

                row.setField(3,school);

                return row;

            }

        }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Row>() {

            long  currentMaxTimestamp = 0L;

            long  maxOutOfOrderness = 10000L;

            Watermark watermark=null;

            //最大允许的乱序时间是10s

            @Nullable

            @Override

            public Watermark getCurrentWatermark() {

                watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);

                return watermark;

            }

            @Override

            public long extractTimestamp(Row element, long previousElementTimestamp) {

                long timeStamp = 0;

                try {

                    timeStamp = simpleDateFormat.parse(element.getField(0).toString()).getDate();

                } catch (ParseException e) {

                    e.printStackTrace();

                }

                currentMaxTimestamp = Math.max(timeStamp, currentMaxTimestamp);

                return timeStamp ;

            }

        });

         stream2.print();

        /**

         * 双流join

         */

        stream1.coGroup(stream2)

                .where(new KeySelector<Row, String>() {

                    @Override

                    public String getKey(Row value) throws Exception {

                        System.out.println("stream1"+value.toString());

                        return value.getField(1).toString();

                    }

                })

                .equalTo(new KeySelector<Row, String>() {

                    @Override

                    public String getKey(Row value) throws Exception {

                        System.out.println("stream2"+value.toString());

                        return value.getField(1).toString();

                    }

                }).window(TumblingEventTimeWindows.of(Time.seconds(5)))

                .apply(new CoGroupFunction<Row, Row, Row>() {

                    @Override

                    public void coGroup(Iterable<Row> first, Iterable<Row> second, Collector<Row> out) throws Exception {

                        first.forEach(t ->

                                second.forEach(x ->

                                        {

                                            //双流join  选取需要的字段

                                            Row row = new Row(3);

                                            Object field1 = t.getField(0);

                                            Object field2 = x.getField(1);

                                            Object field3 = x.getField(2);

                                            //使用row封装数据

                                            row.setField(0, field1);

                                            row.setField(1, field2);

                                            row.setField(2, field3);

                                            out.collect(row);

                                        }

                                ));

                        System.out.println("join"+first.toString());

                    }

                }).printToErr();

        try {

            env.execute("ddcddd");

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

注意:我们在测试的时候尽量使用CountTrigge去触发窗口执行,如果使用默认的EventTimeTrigger,我们还需要设置具体的时间戳,不然可能测试时出现获取不到数据的假象,其实是窗口一直没触发。

使用案例:十条数据就触发一次窗口的执行

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.trigger(CountTrigger.of(10))

————————————————

版权声明:本文为CSDN博主「阿华田512」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/aA518189/article/details/84032660

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多