分享

Flink流计算编程--Session Window实战

 jasonbetter 2019-06-20

原文地址:https://blog.csdn.net/lmalds/article/details/52692911

2016年09月28日 18:56:11 lmalds李麦迪 

1、session window简介

Flink从1.1开始支持Session window,它是属于基于时间的窗口。

这里以EventTime为例,基于时间的窗口,可以分为3种:TumblingEventTimeWindows,SlidingEventTimeWindows和EventTimeSessionWindows。

对于Tumbling与Sliding窗口,其窗口的时间大小是固定的,例如10秒钟一个窗口,那么窗口中开始时间和结束时间一定是一个10秒的间隔,例如从10:00:00到10:00:10。Sliding的窗口大小也是固定的,例如每隔10秒钟统计过去20秒的数据,那么它的窗口也是从10:00:00到10:00:20,大小是20秒。

而Session window的窗口大小,则是由数据本身决定。例如,基于同一个key,有如下几条数据,其自身时间戳如下:

key,10:00:00key,10:00:03key,10:00:05key,10:00:12key,10:00:15key,10:00:24key,10:00:30key,10:00:42.....
  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

那么,假设Session Window的时间gap如果是6秒,那么,上面的数据会被分成以下几个窗口(窗口开始时间以及窗口结束时间,窗口内记录数):

窗口1:(key,10:00:00,10:00:11,3)
窗口2:(key,10:00:12,10:00:21,2)
窗口3:(key,10:00:24,10:00:30,1)
窗口4:(key,10:00:30,10:00:42,1)
......
  • 1

  • 2

  • 3

  • 4

  • 5

可以看到,session window只需要设置一个时间间隔(gap)即可定义一个session window机制。

2、session window窗口分析

下面我们来分析下上边的数据。 
首先,我们设置的时间gap是6秒,那么,当相邻的记录相差>=6秒时,则触发窗口。 
对于第一条记录与第二条记录,其时间间隔是3秒,那么这两条记录属于同一个窗口内,此时并不触发窗口;第二条与第三条记录,间隔2秒,也不触发窗口;第三条与第四条记录,间隔>=6秒(7秒),此时,窗口被触发了。 
继续,第四条记录与第五条记录间隔3秒,不触发;第五条与第六条间隔9秒,触发; 
继续,第六条与第七条间隔6秒,触发; 
继续,第七条与第八条间隔12秒,触发。

到此,上边这些数据被划分到不同的窗口中,每个窗口的大小也不一样。

那么,每个窗口的时间范围有没有什么共性?我们可以按照下面的公式来计算每个窗口的时间范围:

窗口大小=[第一条数据的时间,第一个与相邻数据相差大于等于gap的时间+gap)
  • 1

看似有点难以理解,其实现实的意义就是:窗口内包含的数据是“活跃的”。

例如,用户点击行为,如果认为30秒间隔用户没有操作,则认为是不活跃的。那么通过session window,定义一个30秒的gap,此时,每个窗口内的数据,都是用户在活跃期间的数据,超过30秒了没有任何操作,则认为用户不活跃,有可能下线。

3、session window在Flink中的实现

上面的介绍有点繁琐,不够言简意赅,那么我们直接看代码。 
数据介绍:在代码之前,介绍下数据,指数数据,正常情况每隔3秒产生一条,如果达到6秒甚至更多实践才产生数据,则认为有gap,此时说明指数的交易不够频繁,不够活跃。

代码如下:

import java.text.SimpleDateFormatimport org.apache.flink.streaming.api.scala._
import DataTypes.StockIndeximport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.RichWindowFunctionimport org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindowsimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport toptrade.DataStreamOperator.CommonOperatorimport toptrade.kafkaInOut.KafkaConsumeToptradeobject SessionWindowTest {

  // *************************************************************************
  // main函数
  // *************************************************************************

  def main(args : Array[String]) : Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val source = new KafkaConsumeToptrade()

    val indexString = source.indexDataStream(env).name("Index").setParallelism(4)
    val indexDataStream = new CommonOperator().mapIndexToDataStreamPOJO(indexString).filter(f=>f.lastIndex != 0L && f.totalVolume != 0L).setParallelism(8).name("index filter")

    val watermarkIndex = indexDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockIndex] {

      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 10000L

      override def getCurrentWatermark: Watermark = {
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
      }

      override def extractTimestamp(t: StockIndex, l: Long): Long = {
        val timestamp = t.time
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
        timestamp
      }
    })      .name("index watermark")      .setParallelism(8)


    val sessionWindow = watermarkIndex      .keyBy(_.code)      .window(EventTimeSessionWindows.withGap(Time.seconds(6)))      .apply(new IndexSessionWindow)      .setParallelism(8)


    sessionWindow.print().setParallelism(1)

    env.execute()

  }

// *************************************************************************
  // SessionWindow Function
  // *************************************************************************
  class IndexSessionWindow extends RichWindowFunction[StockIndex,(String,String,String,String,String,Int),String,TimeWindow]{

    var state : ValueState[IndexSumTest] = null
    var size = 0

    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

    override def open(config : Configuration) : Unit = {

      state = getRuntimeContext.getState(new ValueStateDescriptor[IndexSumTest]("snapshot State", classOf[IndexSumTest], null))
    }

    override def apply(key: String, window: TimeWindow, input: Iterable[StockIndex], out: Collector[(String, String, String, String,String,Int)]): Unit = {
      //init
      if(state.value() == null){
        state.update(IndexSumTest(0))
      }else{
        size = state.value().size
      }

      val list = input.toList.sortBy(_.time)

      val window_start_time = format.format(window.getStart)
      val window_end_time = format.format(window.getEnd)
      val window_size = input.size

      size = size + window_size
      state.update(IndexSumTest(size))      out.collect((key,window_start_time,window_end_time,format.format(list.head.time),format.format(list.last.time),size))
    }
  }

  // *************************************************************************
  // Case Class
  // *************************************************************************
  case class IndexSumTest(size : Int)

}
  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

  • 14

  • 15

  • 16

  • 17

  • 18

  • 19

  • 20

  • 21

  • 22

  • 23

  • 24

  • 25

  • 26

  • 27

  • 28

  • 29

  • 30

  • 31

  • 32

  • 33

  • 34

  • 35

  • 36

  • 37

  • 38

  • 39

  • 40

  • 41

  • 42

  • 43

  • 44

  • 45

  • 46

  • 47

  • 48

  • 49

  • 50

  • 51

  • 52

  • 53

  • 54

  • 55

  • 56

  • 57

  • 58

  • 59

  • 60

  • 61

  • 62

  • 63

  • 64

  • 65

  • 66

  • 67

  • 68

  • 69

  • 70

  • 71

  • 72

  • 73

  • 74

  • 75

  • 76

  • 77

  • 78

  • 79

  • 80

  • 81

  • 82

  • 83

  • 84

  • 85

  • 86

  • 87

  • 88

  • 89

  • 90

  • 91

  • 92

  • 93

  • 94

  • 95

  • 96

  • 97

  • 98

  • 99

  • 100

  • 101

  • 102

  • 103

  • 104

  • 105

  • 106

  • 107

  • 108

  • 109

  • 110

  • 111

  • 112

session window function的实现,输出的内容代表:(key,窗口开始时间,窗口结束时间,窗口内最早的一条数据的时间,窗口内最后一条数据时间,同一个key的累计个数)。

4、session window的输出结果

上面的结果,输出如下(抽取了其中一小部分):

(990857,2016-09-23 14:31:03.000,2016-09-23 14:31:09.000,2016-09-23 14:31:03.000,2016-09-23 14:31:03.000,5)(990857,2016-09-23 14:31:49.000,2016-09-23 14:31:55.000,2016-09-23 14:31:49.000,2016-09-23 14:31:49.000,6)(990857,2016-09-23 14:32:09.000,2016-09-23 14:32:20.000,2016-09-23 14:32:09.000,2016-09-23 14:32:14.000,8)(990857,2016-09-23 14:32:29.000,2016-09-23 14:32:35.000,2016-09-23 14:32:29.000,2016-09-23 14:32:29.000,9)(990857,2016-09-23 14:32:39.000,2016-09-23 14:32:45.000,2016-09-23 14:32:39.000,2016-09-23 14:32:39.000,10)(990857,2016-09-23 14:32:49.000,2016-09-23 14:32:55.000,2016-09-23 14:32:49.000,2016-09-23 14:32:49.000,11)(990857,2016-09-23 14:33:04.000,2016-09-23 14:33:10.000,2016-09-23 14:33:04.000,2016-09-23 14:33:04.000,12)(990857,2016-09-23 14:33:14.000,2016-09-23 14:33:20.000,2016-09-23 14:33:14.000,2016-09-23 14:33:14.000,13)(990857,2016-09-23 14:33:29.000,2016-09-23 14:33:35.000,2016-09-23 14:33:29.000,2016-09-23 14:33:29.000,14)(990857,2016-09-23 14:33:39.000,2016-09-23 14:33:45.000,2016-09-23 14:33:39.000,2016-09-23 14:33:39.000,15)(990857,2016-09-23 14:33:49.000,2016-09-23 14:33:55.000,2016-09-23 14:33:49.000,2016-09-23 14:33:49.000,16)(990857,2016-09-23 14:34:04.000,2016-09-23 14:34:10.000,2016-09-23 14:34:04.000,2016-09-23 14:34:04.000,17)(990857,2016-09-23 14:34:14.000,2016-09-23 14:34:20.000,2016-09-23 14:34:14.000,2016-09-23 14:34:14.000,18)
  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

我们以第三个窗口为例来说明: 
第三个窗口中有2条记录(8-6),最早的一条记录时间是:2016-09-23 14:32:09.000,最后的一条记录时间是:2016-09-23 14:32:14.000。相差5秒,因此这两条数据没有达到6秒的间隔,所以这两条数据一定属于同一个窗口。下一条数据可以观察下一个窗口的开始时间:2016-09-23 14:32:29.000,比第三个窗口的最后一条的时间多了15秒,因此才产生了第三个窗口。第三个窗口的结束时间是:2016-09-23 14:32:20.000,正好是窗口内最后一个数据时间+gap的时间。

由此也验证了我们上边提高的公式。

不过作为窗口结束时间,在实际中的用处不大,只是gap内部记录的一个时间戳而已,仅做触发条件使用。

5、参考

https://ci./projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#session-windows 
http:///blog/2016/06/06/flink-internals-session-window/ 
https://www./ideas/the-world-beyond-batch-streaming-101

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多