分享

Flink

 sztonydwnajpqa 2020-10-19

Flink

窗口

本章节主要介绍滑动窗口,并提供滑动窗口优化方式。窗口的详细内容请参见官网:https://ci./projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html

窗口介绍

窗口中数据的保存形式主要有中间结果和原始数据两种,对窗口中的数据使用公共算子,如sum等操作时(window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5))).sum)仅会保留中间结果;当用户使用自定义窗口时(window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5))).apply(new UDF))时保存所有的原始数据。

用户使用自定义SlidingEventTimeWindow和SlidingProcessingTimeWindow时,数据以多备份的形式保存。假设窗口的定义如下:

window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5))).apply(new UDFWindowFunction)

当一个数据到来时,会被分配到20/5=4个不同的窗口中,即:数据在内存中保存了4份。当窗口大小/滑动周期非常大时,冗余现象非常严重,难以接受。

图4-1 窗口原始结构示例

假设一个数据在102秒时到来,它将会被分配到[85, 105)、[90, 110)、[95, 115)以及[100, 120)四个不同的窗口中。

窗口优化

针对上述SlidingEventTimeWindow和SlidingProcessingTimeWindow在保存原始数据时存在的数据冗余问题,对保存原始数据的窗口进行重构,优化存储,使其存储空间大大降低,具体思路如下:

  1. 以滑动周期为单位,将窗口划分为若干相互不重合的pane。

    每个窗口由一到多个pane组成,多个pane对窗口构成了覆盖关系。所谓一个pane即一个滑动周期,如:在窗口window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds.of(5)))中pane的大小为5秒,假设这个窗口为[100, 120),则包含的pane为[100, 105), [105, 110), [110, 115), [115, 120)。

    图4-2 窗口重构示例
  2. 当某个数据到来时,并不分配到具体的窗口中,而是根据自己的时间戳计算出该数据所属的pane,并将其保存到对应的pane中。

    一个数据仅保存在一个pane中,内存中只有一份。

    图4-3 窗口保存数据示例
  3. 当需要触发某个窗口时,计算该窗口包含的所有pane,并取出合并成一个完整的窗口计算。
    图4-4 窗口触发计算示例
  4. 当某个pane不再需要时,将其从内存中删除。
    图4-5 窗口删除示例

通过优化,可以大幅度降低数据在内存以及快照中的数量。

Job Pipeline

  • 通常情况下,会将与某一方面业务相关的逻辑代码放在一个比较大的jar包中,这种jar包称为Fat Jar。Fat Jar具有以下缺点:
    • 随着业务逻辑越来越复杂,Jar包的大小也不断增加。
    • 协调难度增大,所有的业务开发人员都在同一套业务逻辑上开发,虽然可以将整个业务逻辑划分为几个模块,单各模块之间是一种紧耦合的关系,当需求更改时,需要重新规划整个流图。
  • 拆分成多个作业目前还存在问题。
    • 通常情况下,作业之间可以通过Kafka实现数据传输,如作业A可以将数据发送到Kafka的Topic A下,然后作业B和作业C可以从Topic A下读取数据。该方案简单易行,但是延迟很难做到100ms以内。
    • 采用TCP直接相连的方式,算子在分布式环境下,可能会调度到任意节点,上下游之间无法感知其存在。

Job Pipeline流图结构

Pipeline是由Flink的多个Job通过TCP连接起来,上游Job可以直接向下游Job发送数据。这种发送数据的流图称为Job Pipeline,如图4-6所示。

图4-6 Job Pipeline流图

Job Pipeline原理介绍

图4-7 Job Pipeline原理图
  • NettySink和NettySource

    Pipeline中上下游Job是直接通过Netty进行通信,上游Job的Sink算子作为Server,下游Job的Source算子作为Client。上游Job的Sink算子命名为NettySink,下游Job的Source算子命名为NettySource。

  • NettyServer和NettyClient

    NettySink作为Netty的服务器端,内部NettyServer实现服务器功能;NettySource作为Netty的客户端,内部NettyClient实现客户端功能。

  • 发布者

    通过NettySink向下游Job发送数据的Job称为发布者。

  • 订阅者

    通过NettySource接收上游Job发送的数据的Job称为订阅者。

  • 注册服务器

    保存NettyServer的IP、端口以及NettySink的并发度信息的第三方存储器。

  • 总体架构是一个三层结构,由外到里依次是:
    • NettySink->NettyServer->NettyServerHandler。
    • NettySource->NettyClient->NettyClientHandler。

功能介绍

  • NettySink

    NettySink由以下几个重要模块组成:

    • RichParallelSinkFunction

      NettySink继承了RichParallelSinkFunction,使其具有Sink算子的属性。主要通过RichParallelSinkFunction的接口来实现以下功能:

      • 启动NettySink算子。
      • 运行NettySink算子,从本job的上游算子接收数据。
      • cancelNettySink算子等。

      也可以通过其属性获取以下信息:

      • NettySink算子各个并发度的subtaskIndex信息。
      • NettySink算子的并发度是多少。
    • RegisterServerHandler

      该组件主要是与注册服务器交互的部件,在平台中定义了一系列接口,包括以下几种接口:

      • “start();” :启动RegisterServerHandler,与第三方RegisterServer建立联系。
      • “createTopicNode();” :创建Topic节点。
      • “register();”: 将IP、端口及并发度信息注册到Topic节点下。
      • “deleteTopicNode();”: 删除Topic节点。
      • “unregister();”: 删除注册信息。
      • “query(); ”:查询注册信息。
      • “isExist();”: 查找某个信息是否存在。
      • “shutdown(); ”:关闭RegisterServerHandler,与第三方RegisterServer断开连接。
      • RegisterServerHandler接口实现了ZooKeeper作为RegisterServer的Handler,用户可以根据自己的需求,实现自己的Handler,ZooKeeper中信息的保存形式如下图所示:
        Namespace    |---Topic-1            |---parallel-1            |---parallel-2            |....            |---parallel-n    |---Topic-2            |---parallel-1            |---parallel-2            |....            |---parallel-m     |... 
      • Namespace的信息通过“flink-conf.yaml”的以下配置项获取:
        nettyconnector.registerserver.topic.storage: /flink/nettyconnector
      • ZookeeperRegisterServerHandler与ZooKeeper之间的SASL认证通过Flink的框架实现,使用Flink的相关配置项请参考业务操作指南 > Flink > 配置管理Flink
      • 用户必须自己保证每个Job有一个唯一的TOPIC,否则会引起作业间订阅关系的混乱。
      • 在ZookeeperRegisterServerHandler调用shutdown()时,首先删除本并发度的注册信息,然后尝试删除TOPIC节点,如果TOPIC节点为非空,则放弃删除TOPIC节点,说明其他并发度还未退出。
    • NettyServer

      该模块是NettySink算子的核心之一,主要作用是创建一个NettyServer并接收NettyClient的连接申请。将同一Job中上游算子发送过来的数据,经由NettyServerHandler发送出去。 另外,NettyServer的端口及子网需要在“flink-conf.yaml”配置文件中配置:

      • 端口范围
        nettyconnector.sinkserver.port.range: 28444-28943
      • 子网
        nettyconnector.sinkserver.subnet: 10.162.222.123/24

        nettyconnector.sinkserver.subnet默认配置为Flink客户端所在节点子网,若客户端与TaskManager不在同一个子网则有可能导致错误,需手动配置为TaskManager所在网络子网(业务IP)。

    • NettyServerHandler

      该Handler是NettySink与订阅者交互的通道,当NettySink接收到消息时,该Handler负责将消息发送出去。为保证数据传输的安全性,该通道通过SSL加密,SSL加密的相关配置请参考“业务操作指南 > Flink > 安全管理 > 安全加固 > 认证和加密”中的“加密传输”内容。另外设置一个Netty Connector的功能开关,只有当Flink的SSL总开关被打开以及配置“nettyconnector.ssl.enabled”“true”的时候才开启SSL加密,否则不开启。

  • NettySource

    NettySource由以下几个重要模块组成:

    • RichParallelSourceFunction

      NettySource继承了RichParallelSinkFunction,使其具有Source算子的属性,主要通过RichParallelSourceFunction接口来实现以下功能:

      • 启动NettySink算子。
      • 运行NettySink算子,接收来自订阅者的数据并注入到所在Job中。
      • 取消Source算子运行等。

      也可以通过其属性获取以下信息:

      • NettySource算子各个并发度的subtaskIndex信息。
      • NettySource算子的并发度是多少。

      当NettySource算子进入run阶段后,平台内部会不断监控其NettyClient状态是否健康,一旦发现其出现异常,即会重启NettyClient,重新与NettyServer建立连接并接收数据,以防接收的数据混乱。

    • RegisterServerHandler

      该组件与NettySink的RegisterServerHandler功能相同,在NettySource算子中仅获取所订阅Job的各个并发算子的IP、端口及并发算子信息。

    • NettyClient

      NettyClient与NettyServer建立连接,并通过NettyClientHandler接收数据。每个NettySource算子必须具有唯一的name(由用户来保障)。NettyServer通过唯一的name确定每个Client来自不同的NettySource。当NettyClient与NettyServer建立连接时,首先向NettyServer注册NettyClient,将NettyClient的NettySource name传递给NettyServer。

    • NettyClientHandler

      该模块是与发布者交互的通道,也是与Job的其他算子交互的通道。当该通道中接收到消息时,该Handler负责将消息注入到Job内部。另外,为保证数据安全传输,该通道通过SSL加密,与NettySink进行通信。SSL加密的相关配置请参考 “业务操作指南 > Flink > 安全配置 > 安全加固 > 认证和加密”中的“加密传输”内容。另外设置一个NettyConnector的功能开关,只有当Flink的SSL总开关被打开以及“nettyconnector.ssl.enabled”“true”的时候才开启SSL加密,否则不开启。

Job与Job之间的联系可能是多对多的关系,对于每个NettySink和NettySource算子的并发度而言,是一对多的关系,如图4-8所示。
图4-8 关系图

配置表

在某些场景下,用户存在固定的配置表,存储了基础信息;当平台接收流数据并处理时,需要与配置表进行匹配操作。由于配置表可能较大,考虑使用Redis存储,Redis是一个高性能的key-value数据库,流数据查询时延较低。

具体流程如下:

图4-9 流程图

Redis存储数据

Redis并不是简单的key-value存储,实际上它是一个数据结构服务器,支持不同类型的值。支持数据类型存储如下:

  • 二进制安全的字符串。
  • Lists: 按插入顺序排序的字符串元素的集合。基本上就是链表(linked lists)
  • Sets: 不重复且无序的字符串元素的集合。
  • Sorted sets:每个字符串元素都关联到一个叫score浮动数值(floating number value)。里面的元素是通过score进行排序,它是可以检索的一系列元素。
  • Hashes:由field和关联的value组成的map,field和value都是字符串。
  • Bit arrays: 通过特殊的命令,用户可以将String值当作一系列bits处理。例如用户可以设置和清除单独的bits,统计出所有设为1的bits的数量,或找到第一个被设为1或0的bit等等。
  • HyperLogLogs: 这是被用于估计一个set中元素数量的概率性的数据结构。

为满足最大5亿条数据配置表的存储并及时响应查询,使用Redis集群存储配置表,并使用流的异步IO作消息查询,提高数据处理的吞吐量。

  • Redis集群:在集群环境上的各个节点上部署Redis,并将数据分散存储在各个节点上,提升了存储容量,目前FusionInsight中已有Redis组件。
  • 异步IO:处理流数据,最大化数据处理的吞吐量,提高处理效率。

涉及Redis主要有两部分,Redis安装部署以及配置表数据导入:

  1. Redis安装。

    FusionInsight已经有Redis组件,在集群安装时可以勾选安装,具体安装步骤请参考FusionInsight安装配置说明。

  2. 配置表导入Redis。

    用户可以按照配置表的特征选取主键或者关键某几列作为key值,当需要存储的配置表的属性较多时,建议以Hashes的数据形式存储。

    FusionInsight的Redis组件提供了Jedis客户端对数据进行插入查询,可以参考Redis组件样例代码。

Redis数据类型详细信息请参见官网:https:///topics/data-types-intro

Flink异步IO

当与外部系统进行交互时,如外部的数据库,访问等待时间过长导致数据处理效率低。异步IO实现了不需要等待请求返回就可以同时发送其他请求,以此提高数据吞吐量。

异步IO的API实现需要注意三点:

  • AsyncFunction函数实现了数据处理的异步处理,需要重写asyncInvoke方法。
  • 回调函数获取算子的结果,并且通过AsyncCollector收集起来。
    图4-10 Async.I/O的比较
  • 超时时间和最大容量设置。

    超时时间定义了一个异步请求失败的最大时间。最大容量设置是指同时可以存在多少个异步请求,过多导致资源消耗加大;过小导致并行数小,吞吐量不能提高;建议针对数据源特点进行合理适配。

Stream SQL Join

Flink的Table API&SQL是一种用于Scala 和Java的语言集成式查询API, 它支持非常直观的从关系运算符 (如选择、筛选和连接)进行组合查询。Table API&SQL详细内容请参加官网:https://ci./projects/flink/flink-docs-release-1.7/dev/table/index.html

Stream SQL Join介绍

SQL Join用于根据两个或多个表中的列之间的关系,从这些表中查询数据。Flink Steam SQL Join允许对两个流式table进行join,并从中查询结果。支持类似于以下内容的查询:

SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime FROM Orders AS o JOIN Shipments AS s ON o.orderId = s.orderId AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;

目前,Stream SQL Join需在指定的窗口范围内进行。对窗口范围内的数据进行连接,需要至少一个相等连接谓词和一个绑定双方时间的条件。这个条件可以由两个适当的范围谓词 (<, <=, >=, >),一个BETWEEN谓词或者一个单一的相等谓词来定义。 这个相等谓词主要是比较两个输入表的同类型时间属性(比如处理时间或者事件时间)。

以下是一个关于在收到订单后四小时内发货, 将所有订单及其相应的货件进行Join的示例:

SELECT *FROM Orders o, Shipments sWHERE o.id = s.orderId ANDo.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
  1. Stream SQL Join仅支持Inner Join。
  2. ON子句应包括相等连接条件。
  3. 时间属性只支持处理时间和事件时间。
  4. 窗口条件只支持有界的时间范围, 如 o.proctime BETWEEN s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR,不支持像o. proctime > s.proctime这样无界的范围,并应包括两个流的 proctime 属性,不支持o.proctime BETWEEN proctime () AND proctime () + 1

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多