目录一、Flink简介5为什么选择Flink?25传统数据处理架构26事务处理27分析处理27有状态的流式处理28lambda架构28Fli nk的主要特点28事件驱动(Event-driven)29有状态流处理29流处理37并行Dataflows40自定义时间流处理4 1分层API41通过状态快照实现的容错41其它特点42Flink的组件栈42二、Flink的基础编程模型43FlinkTable 和SQL44Demo项目45Flink中的API59DataStreamAPI简介61什么能被转化成流?61基于Data StreamAPI实现欺诈检测62一个完整的示例74Stream执行环境75基本的streamsource78基本的s treamsink78调试79动手实践79更多阅读79流式分析79EventTimeandWatermarks79Wind ows82实践练习88延伸阅读88基于TableAPI实现实时报表89WhatWillYouBeBuilding?8 9Prerequisites89Help,I’mStuck!90HowToFollowAlong90BreakingD ownTheCode91Testing92AttemptOne93UserDefinedFunctions93Addin gWindows94OnceMore,WithStreaming!95数据管道&ETL97无状态的转换97Keyed Streams99有状态的转换102ConnectedStreams105动手练习108延展阅读108事件驱动应用108处理函数 (ProcessFunctions)108旁路输出(SideOutputs)113结语114实践练习114延伸阅读114通过状 态快照实现容错处理114StateBackends114状态快照115实践练习117延伸阅读118三、Flink的部署118本地 模式安装118步骤1:下载118步骤2:启动集群119步骤3:提交作业(Job)119步骤4:停止集群119单节点模式11 9场景说明120环境搭建121环境讲解122FlinkWebUI界面122日志123FlinkCLI123FlinkRES TAPI123KafkaTopics123核心特性探索124获取所有运行中的Job124Job失败与恢复125Job升级 与扩容126查询Job指标130机群搭建135Flink集群规模138集群角色139四、Flink运行架构140Flink运行 时的组件140JobManager140TaskManger141ResourceManager142Dispatcher142 任务提交流程142任务提交流程(Yarn)143任务调度原理143TaskManager和Slots144常用算子146程序与数据 流(DataFlow)146执行图147并行度(Parallelism)148任务链(OperatorChains)151分区策 略152分布式缓存153广播变量154窗口154状态存储155五、Flink流处理API156Environment156getE xecutionEnvironment156createLocalEnvironment156createRemoteEnviro nment157Source157从集合读取数据157自定义source158Transform159map160flatmap1 60Filter160KeyBy161滚动聚合算子(RollingAggregation)162Reduce162Split和S elect163Connect和CoMap165Union168支持的数据类型168基础数据类型169Java和Scala元组(T uples)169Scala样例类(caseclasses)169Java简单对象(POJOs)169其它(Arrays,Li sts,Maps,Enums,等等)170六、时间语义与Wartermark170Flink中的时间语义170Event Time的引入172Watermark172基本概念172Watermark的引入175七、压测和监控178Flink面试题1 79第二部分:Flink面试进阶篇179第三部分:Flink面试源码篇185一、Flink简介ApacheFlink是一个 框架和分布式处理引擎,用于对无界和有界数据流进行状态计算,Flink是一个分布式计算框架。并且Flink提供了数据分布、容错机 制以及资源管理等核心功能。ApacheFlink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编 写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序 。此外,Flink的运行时本身也支持迭代算法的执行。Flink可以搭建廉价机群,快速处理任意规模的数据。Flink总体架构如图,从 左往右看Flink的实时处理是一个个Event(事件)驱动的(类比Kafka,Flume),不同于SparkStreaming中 微批次。2019年是大数据实时计算领域最不平凡的一年,2019年1月阿里巴巴Blink(内部的Flink分支版本) 开源,大数据领域一夜间从Spark独步天下走向了两强争霸的时代。Flink因为其天然的流式计算特性以及强大的处理性能成为炙手 可热的大数据处理框架。7月6日,ApacheFlink1.11正式发布。从3月初进行功能规划到7月初正式发版, 1.11用将近4个月的时间重点优化了Flink的易用性问题,提升用户的生产使用体验。SQL作为Flink中公认的核 心模块之一,对推动Flink流批一体功能的完善至关重要。在1.11中,FlinkSQL也进行了大量的增强与完善,开发大 功能10余项,不仅扩大了应用场景,还简化了流程,上手操作更简单。其中,值得注意的改动包括:默认Planner已经切到Bl inkplanner上。引入了对CDC(ChangeDataCapture,变动数据捕获)的支持,用户仅用几句简单的S QL即可对接Debezium和Canal的数据源。离线数仓实时化,用户可方便地使用SQL将流式数据从Kafka写 入Hive等。FlinkSQL演变随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂,开发者可能是资深的大 数据从业者、初学Java的爱好者,或是不懂代码的数据分析者。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。S QL是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink作为流批一体的计算引擎,致力于提供一套SQL 支持全部应用场景,FlinkSQL的实现也完全遵循ANSISQL标准。之前,用户可能需要编写上百行业务代码,使用SQ L后,可能只需要几行SQL就可以轻松搞定。FlinkSQL的发展大概经历了以下阶段:Flink1.1.0:第一次引入 SQL模块,并且提供TableAPI,当然,这时候的功能还非常有限。Flink1.3.0:在StreamingSQL上 支持了Retractions,显著提高了StreamingSQL的易用性,使得FlinkSQL支持了复杂的Unbo unded聚合连接。Flink1.5.0:SQLClient的引入,标志着FlinkSQL开始提供纯SQL文本。 Flink1.9.0:抽象了Table的Planner接口,引入了单独的BlinkTable模块。BlinkTa ble模块是阿里巴巴内部的SQL层版本,不仅在结构上有重大变更,在功能特性上也更加强大和完善。Flink1.10.0:作为 第一个Blink基本完成merge的版本,修复了大量遗留的问题,并给DDL带来了Watermark的语法,也给B atchSQL带来了完整的TPC-DS支持和高效的性能。经过了多个版本的迭代支持,SQL模块在Flink中变得越来越 重要,Flink的SQL用户也逐渐扩大。基于SQL模块的Python接口和机器学习接口也在快速发展。毫无疑问,SQ L模块作为最常用的API之一和生态的集成变得越来越重要。SQL1.11重要变更FlinkSQL在原有的基础上扩展了新 场景的支持:FlinkSQL引入了对CDC(ChangeDataCapture,变动数据捕获)的支持,它使Flink 可以方便地通过像Debezium这类工具来翻译和消费数据库的变动日志。FlinkSQL扩展了类Filesystemco nnector对实时化用户场景和格式的支持,从而可以支持将流式数据从Kafka写入Hive等场景。除此之外,Flink SQL也从多个方面提高SQL的易用性,系统性的解决了之前的Bug、完善了用户API。CDC支持CDC格式是数据库中一 种常用的模式,业务上典型的应用是通过工具(比如Debezium或Canal)将CDC数据通过特定的格式从数据库中导出到 Kafka中。在以前,业务上需要定义特殊的逻辑来解析CDC数据,并把它转换成一般的Insert-only数据,后续的处理 逻辑需要考虑到这种特殊性,这种work-around的方式无疑给业务上带来了不必要的复杂性。如果FlinkSQL引擎能原 生支持CDC数据的输入,将CDC对接到FlinkSQL的ChangelogStream概念上,将会大大降低用户 业务的复杂度。流计算的本质是就是不断更新、不断改变结果的计算。考虑一个简单的聚合SQL,流计算中,每次计算产生的聚合值其实都是一 个局部值,所以会产生ChangelogStream。在以前想要把聚合的数据输出到Kafka中,如上图所示,几乎是不可能的, 因为Kafka只能接收Insert-only的数据。Flink之前主要是因为Source&Sink接口的限制,导致不 能支持CDC数据的输入。FlinkSQL1.11经过了大量的接口重构,在新的Source&Sink接口上,支持了C DC数据的输入和输出,并且支持了Debezium与Canal格式(FLIP-105)。这一改动使动态TableSou rce不再只支持append-only的操作,而且可以导入外部的修改日志(插入事件)将它们翻译为对应的修改操作(插入、修改和 删除)并将这些操作与操作的类型发送到后续的流中。如上图所示,理论上,CDC同步到Kafka的数据就是Append的一个流 ,只是在格式中含有Changelog的标识:?一种方式是把Changlog标识看做一个普通字段,这也是目前普遍的使用方式。 在Flink1.11后,可以将它声明成Changelog的格式,Flink内部机制支持InterpretChang elog,可以原生识别出这个特殊的流,将其转换为Flink的ChanglogStream,并按照SQL的语义处理;同理 ,FlinkSQL也具有输出ChangeStream的能力(Flink1.11暂无内置实现),这就意味着,你可以将 任意类型的SQL写入到Kafka中,只要有Changelog支持的Format。?为了消费CDC数据,用户需要在 使用SQLDDL创建表时指指定“format=debezium-json”或者“format=canal-json”:CRE ATE?TABLE?my_table?(...)WITH(''connector''=''...'',--e.g.''kafka'' ''format''=''debezium-json'');?Flink1.11的接口都已Ready,但是在实现上:只支持Kaf ka的Debezium-json和Canal-json读。欢迎大家扩展实现自己的Format和Connector。 Source&Sink重构Source&Sink重构的一个重要目的是支持上节所说的Changelog,但是除了Ch angelog以外,它也解决了诸多之前的遗留问题。新Source&Sink使用标准姿势(详见官方文档):CREATE? TABLE?kafka_table?(...)WITH(''connector''=''kafka-0.10'',''topic'' =''test-topic'',''scan.startup.mode''=''earliest-offset'',''propertie s.bootstrap.servers''=''localhost:9092'',''format''=''json'',''json.f ail-on-missing-field''=''false'');Flink1.11为了向前兼容性,依然保留了老Source &Sink,使用“connector.type”的Key,即可Fallback到老Source&Sink上。 ■Factory发现机制Flink1.11前,用户可能经常遇到一个异常,叫做NoMatchingFactory异常:? 指的是,定义了一个DDL,在用的时候,DDL属性找不到对应的TableFactory实现,可能的原因是:Classpath 下没有实现类,FlinkSQL是通过?JavaSPI的机制来发现Factory;参数写错了。但是报的异常让人非常疑惑, 根据异常的提示消息,很难找到到底哪里的代码错了,更难明确知道哪个Key写错了。public?interface?Factory ?{StringfactoryIdentifier();……}所以在Flink1.11中,社区重构了TableF actory接口,提出了一个新的Factory接口,它有一个方法,叫做FactoryIdentifier。以后所有的Fa ctory的lookup都通过identifier。这样的话就非常清晰明了,找不到是因为Classpath下没Fa ctory的类,找得到那就可以定位到Factory的实现中,进行确定性的校验。■类型与数据结构之前的Source&Sin k接口支持用户自定义数据结构,即框架知道如何把自定义的数据结构转换为FlinkSQL认识的内部数据结构,如:public? interface?TableSource?{TypeInformationgetReturnType(); ...}用户可以自定义泛型T,通过getReturnType来告诉框架怎么转换。不过问题来了,当getReturnType 和DDL中声明的类型不一致时怎么办?特别是两套类型系统的情况下,如:Runtime的TypeInformation,SQ L层的DataType。由于精度等问题,可能导致经常出现类型不匹配的异常。Flink1.11系统性地解决了这个问题。现在 Connector开发者不能自定义数据结构,只能使用FlinkSQL内部的数据结构:RowData。所以保证了默认Typ e与DDL的对应,不用再返回类型让框架去确认了。RowData数据结构在SQL内部设计出来为了:抽象类接口,在不同场景 有适合的高性能实现。包含RowKind,契合流计算中的CDC数据格式。遵循SQL规范,比如包含精度信息。对应SQL类 型的可枚举的数据结构。■Upsert与PrimaryKey流计算的一个典型场景是把聚合的数据写入到UpsertSink 中,比如JDBC、HBase,当遇到复杂的SQL时,时常会出现:UpsertStreamTableSink需要上游的Q uery有完整的PrimaryKey信息,不然就直接抛异常。这个现象涉及到Flink的UpsertStreamTab leSink机制。顾名思义,它是一个更新的Sink,需要按Key来更新,所以必须要有Key信息。如何发现Primar yKey?一个方法是让优化器从Query中推断,如下图发现PrimaryKey的例子。这种情况下在简单Query当 中很好,也满足语义,也非常自然。但是如果是一个复杂的Query,比如聚合又Join再聚合,那就只有报错了。不能期待优化器有多 智能,很多情况它都不能推断出PK,而且,可能业务的SQL本身就不能推断出PK,所以导致了这样的异常。?怎么解决问题?Fli nk1.11彻底的抛弃了这个机制,不再从Query来推断PK了,而是完全依赖Createtable语法。比如C reate一个jdbc_table,需要在定义中显式地写好PrimaryKey(后面NOTENFORCED的意思是不 强校验,因为Connector也许没有具备PK的强校验的能力)。当指定了PK,就相当于就告诉框架这个JdbcSink 会按照对应的Key来进行更新。如此,就跟Query完全没有关系了,这样的设计可以定义得非常清晰,如何更新完全按照设置的定义 来。CREATETABLEjdbc_table(idBIGINT,...PRIMARYKEY(id)NOTEN FORCED)Hive流批一体首先看传统的Hive数仓。一个典型的Hive数仓如下图所示。一般来说,ETL使用调度工具 来调度作业,比如作业每天调度一次或者每小时调度一次。这里的调度,其实也是一个叠加的延迟。调度产生Table1,再产生Table 2,再调度产生Table3,计算延时需要叠加起来。问题是慢,延迟大,并且Ad-hoc分析延迟也比较大,因为前面的数据入库,或 者前面的调度的ETL会有很大的延迟。Ad-hoc分析再快返回,看到的也是历史数据。所以现在流行构建实时数仓,从Kafka 读计算写入Kafka,最后再输出到BIDB,BIDB提供实时的数据服务,可以实时查询。Kafka的ETL为实时作业 ,它的延时甚至可能达到毫秒级。实时数仓依赖Queue,它的所有数据存储都是基于Queue或者实时数据库,这样实时性很好,延时 低。但是:第一,基于Queue,一般来说就是行存加Queue,存储效率其实不高。第二,基于预计算,最终会落到BIDB,已经 是聚合好的数据了,没有历史数据。而且Kafka存的一般来说都是15天以内的数据,没有历史数据,意味着无法进行Ad-hoc 分析。所有的分析全是预定义好的,必须要起对应的实时作业,且写到DB中,这样才可用。对比来说,Hive数仓的好处在于它可以进 行Ad-hoc分析,想要什么结果,就可以随时得到什么结果。能否结合离线数仓和实时数仓两者的优势,然后构建一个Lambda的 架构?核心问题在于成本过高。无论是维护成本、计算成本还是存储成本等都很高。并且两边的数据还要保持一致性,离线数仓写完Hive数 仓、SQL,然后实时数仓也要写完相应SQL,将造成大量的重复开发。还可能存在团队上分为离线团队和实时团队,两个团队之间的沟通、迁 移、对数据等将带来大量人力成本。如今,实时分析会越来越多,不断的发生迁移,导致重复开发的成本也越来越高。少部分重要的作业尚可接受, 如果是大量的作业,维护成本其实是非常大的。如何既享受Ad-hoc的好处,又能实现实时化的优势?一种思路是将Hive的离线数 仓进行实时化,就算不能毫秒级的实时,准实时也好。所以,Flink1.11在Hive流批一体上做了一些探索和尝试,如下图所示 。它能实时地按Streaming的方式来导出数据,写到BIDB中,并且这套系统也可以用分析计算框架来进行Ad-hoc 的分析。这个图当中,最重要的就是FlinkStreaming的导入。■StreamingSink早期Flink版本在 DataStreaming层,已经有一个强大的StreamingFileSink将流数据写到文件系统。它是一个准实时的、E xactly-once的系统,能实现一条数据不多,一条数据不少的Sink。具体原理是基于两阶段提交:第一阶段:Snapshot PerTask,关闭需要Commit的文件,或者记录正在写的文件的Offset。第二阶段:NotifyCheckpointC omplete,Rename需要Commit的文件。注意,Rename是一个原子且幂等的操作,所以只要保证Rename 的At-least-once,即可保证数据的Exactly-once。这样一个Filesystem的Writer看似 比较完美了。但是在Hive数仓中,数据的可见性是依赖HiveMetastore的,那在这个流程中,谁来通知HiveM etastore呢?SQL层在StreamingFileSink,扩展了Partition的Committer。相当于 不仅要进行File的Commit,还要进行Partition的Commit。如图所示,FileWriter对应之前的 StreamingFileSink,它提供的是Exactly-once的FileWriter。而后面再接了一个节点Par titionCommitter。支持的CommitPolicy有:内置支持Addpartition到Hivemet astore;支持写SuccessFile到文件系统当中;并且也可以自定义Committer,比如可以analysisp artition、合并partition里面的小文件。Committer挂在Writer后,由CommitTrig ger决定什么时机来commit:默认的commit时机是,有文件就立即commit。因为所有commit都是可重 入的,所以这一点是可允许的。另外,也支持通过partition时间和Watermark来共同决定的。比如小时分区,如果现在 时间到11点,10点的分区就可以commit了。Watermark保证了作业当前的准确性。■StreamingSo urceHive数仓中存在大量的ETL任务,这些任务往往是通过调度工具来周期性的运行,这样做主要有两个问题:实时性不强,往往 调度最小也是小时级。流程复杂,组件多,容易出现问题。针对这些离线的ETL作业,Flink1.11为此开发了实时化的Hiv e流读,支持:Partition表,监控Partition的生成,增量读取新的Partition。非Partition 表,监控文件夹内新文件的生成,增量读取新的文件。甚至可以使用10分钟级别的分区策略,使用Flink的Hivestre amingsource和Hivestreamingsink,可以大大提高Hive数仓的实时性到准实时分钟级,在实时 化的同时,也支持针对Table全量的Ad-hoc查询,提高灵活性。SELECTFROMhive_table/+ OPTIONS(''streaming-source.enable''=’true’,''streaming-source.cons ume-start-offset''=''2020-05-20'')/;另外除了Scan的读取方式,Flink1.11也支持 了TemporalJoin的方式,也就是以前常说的StreamingDimJoin。SELECTo.amout,o .currency,r.rate,o.amountr.rateFROMOrdersASoJOINLatest RatesFORSYSTEM_TIMEASOFo.proctimeASrONr.currency=o.cu rrency目前支持的方式是CacheAll,并且是不感知分区的,比较适合小表的情况。■HiveDialectFlink SQL遵循的是ANSI-SQL的标准,而HiveSQL有它自己的HQL语法,它们之间的语法、语义都有些许不同。如何 让Hive用户迁移到Flink生态中,同时避免用户太大的学习成本?为此,FlinkSQL1.11提供了Hive Dialect,可以使得用户在Flink生态中使用HQL语言来计算。目前只支持DDL,后续版本会逐步攻坚Qeuries 。■?FilesystemConnectorHiveIntegration提供了一个重量级的集成,功能丰富,但是环境比较复杂 。如果只是想要一个轻量级的Filesystem读写呢?Flinktable在长久以来只支持一个CSV的Filesys temTable,并且还不支持Partition,行为上在某些方面也有些不符合大数据计算的直觉。Flink1.11重构了整 个Filesystemconnector的实现:结合Partition,现在,Filesystemconnector支 持SQL中Partition的所有语义,支持Partition的DDL,支持PartitionPruning,支 持静态/动态Partition的插入,支持Overwrite的插入。支持各种Formats:CSVJSONAparc hAVROApacheParquetApacheORC支持Batch的读写。支持Streamingsink,也支持 Partitioncommit,支持写Success文件。用几句简单的SQL,不用搭建Hive集成环境即可:启动一个流 作业写入Filesystem中,然后在Hive端即可查询到Filesystem上的数据,相比之前Datastream 的作业,简单SQL即可搞定离线数据的入库。通过FilesystemConnector来查询Hive数仓中的数据,功 能没有Hive集成那么全,但是定义简单。Table易用性■DDLHints和Like在Flink1.10以后, HiveMetaStore逐渐成为FlinkstreamingSQL中Table相关的Meta信息的存储。比如 ,可以通过HiveCatalog保存KafkaTables。这样可以在启动的时候直接使用Tables。通过DDL这 种方式,把SQL提交到Cluster,就可以写入Kafka,或者写入MySQL、DFS。使用HiveCatalog 后,是不是说只用写一次DDL,之后的流计算作业都是直接使用Kafka的Table呢?不完全是,因为还是有一些缺陷。比如 ,一个典型的KafkaTable有一些Execution相关的参数。因为kafka一般来说都是存15天以内的数据 ,需要指定每次消费的时间偏移,时间偏移是在不断变化的。每次提交作业,使用KafkaTable的参数是不一样的。而这些参数又存 储在Catalog里面,这种情况下只能创建另外一张表,所以字段和参数要重写一遍,非常繁琐。Flink1.11,社区就开发了 TableHints,它在1.11中目前只专注一个功能,即DynamicTableOptions。用起来很简单,在S QL中SelectFrom时,在Table后面写TableHints的方式来指定其动态Options,在不同 的使用场景,指定不同的动态参数。Flink1.11,引入了Like语法。LIKE是标准的SQL定义。相当于Clone 一张表出来复用它的schema。LIKE支持多种Constraints。可以选择继承,也可以选择完全覆盖。TableHi nts:SELECT?id,?name?FROM?kafka_table1?/+?OPTIONS(''scan.startup.m ode''=''earliest-offset'')?/;LIKE:CREATETABLEkafka_table2WITH( ''scan.startup.mode''=''earliest-offset'')LIKEkafka_table1;这两个手段在对接 HiveCatalog的基础上,是非常好的补充,能够尽可能的避免在每次作业的时候都写一大堆Schema。■内置Conn ectorsFlinkSQL1.11引入了新的三个内置Connectors,主要是为了大家更方便的进行调试、测试,以及进行 压测和线上的观察:DatagenSource:一个无中生有产生数据的Source,可以定义生成的策略,比如Sequence, 比如Random的生成。方便线下进行功能性的测试,也可以拿来性能测试。PrintSink:直接在Task节点Runti me的打印出数据,比如线上作业某个Sink少数据了,不知道是上游发来数据有问题,还是Sink逻辑有问题,这时可以额外接一 个PrintSink,排查上游数据到底有没有问题。BlackholeSink:默默把数据给吃掉,方便功能性的调试。?这三个 Connectors的目的是为了在调试、测试中排除Connectors的影响,一般来说,Connectors在流计算中是不 可控的存在,很多问题把Connectors糅杂在一起,变得比较复杂难以排查。SQL-API■TableEnvironment TableEnvironment作为SQL层的编程入口,无疑是非常重要的,之前的API主要是:TablesqlQuer y:从一段Select的Query中返回Table接口,把用户的SQL翻译成Flink的Table。void sqlUpdate:本质上是执行一段DDL/DML。但是行为上,当是DDL时,直接执行;当是DML时,默默Cache 到TableEnvironment,等到后续的execute调用,才会真正的执行。execute:真正的执行,提交作业到集 群。TableEnvironment默默的Cache执行计划,而且多个API感觉上会很混乱,所以,1.11社区重构了编 程接口,目的是想要提供一个干净、并且不易出bug的清晰接口。单SQL执行:TableResultexecuteSql(S tringsql)多SQL执行:StatementSetcreateStatementSet()TableResult:支 持collect、print、getJobClient现在executeSql就是一个大一统的接口,不管是什么SQL,是 Query还是DDL还是DML,直接丢给它都可以很方便地使用起来。并且,和Datastream也有了很清晰的界限:调用 过toDataStream:一定要使用StreamExecutionEnvironment.execute没调用过toDat aStream:一定要使用TableEnvironment.executeSql■SQL-ClientSQL-Client在 1.11对齐了很多Flink内部本来就支持的DDL,除此之外值得注意的是,社区还开发了Tableau的结果展示模式, 展示更自然一些,直接在命令行展示结果,而不是切换页面:总结和展望上述解读主要侧重在用户接口方面,社区已经有比较丰富的文档,大家可以 去官网查看这些功能的详细文档,以便更深入的了解和使用。FlinkSQL1.11在CDC方面开了个头,内部机制和API 上打下了夯实的基础,未来会内置更多的CDC支持,直接对接数据库Binlog,支持更多的FlinkSQL语法。后续版本也 会从底层提供更多的流批一体支持,给SQL层带来更多的流批一体的可能性。(1)Flink的架构简单理解无界流和有界流无界流:流数 据不会停止,没有边界,需要实时处理,绝对的实时处理,来一条,处理一条。有界流:定义了数据的范围,类比Spark-Streaming 中的微批次处理,Hive离线Mr处理。Flink擅长于处理无界数据流(例如Kafka里的日志数据),有界数据集。Fink可以部署在 Yarn,K8s,Mesos多种资源调度框架中。Fink可以处理任意数据量级。上万亿的Event处理。维护TB级别的处理状态。(类 比SparkRDD中Cache,持久化TB级别的处理状态)运行在上千个核心的机群中。Flink的状态持久化的优化当Flink计算 Task中内存不足时候,Flink通过特殊的数据结构,高效的持久化到本地磁盘。Flink会周期的异步持久化计算状态,防止Task进 程挂掉,Task主机意外宕机。并保证持久化数据的一致性。Flink提供了CheckPoint,可以异步的将计算状态持久化到持久层 (如HDFS,本地文件系统)(2)Flink的应用关于流处理的一些基本概念流处理:Flink可以处理有界或无界数据,提供了强力的无 界数据处理的特性。Flink更加偏向绝对的实时处理,来一条处理一条,而不是微批次。状态:计算状态是Flink的一等公民,Flink 提供了许多特性来处理状态。分层APIFlink提供了不同级别的API,满足各种应用场景的应用需求(类比RDD和SparkSQL) ProcessFunction对应RDD,DataStream对应SparkStreaming,SQL/TableAPI对应 SparkSQL总结:Flink实时处理很强,提供的ProcessFunctionAPI可以非常细粒度灵活的控制Event处理 。Flink的提供了SQL支持,对Event进行数据聚合后,使用SQL进行数据分析。Flink是来一个Event处理一次,对比按照 时间间隔微批次处理,所以保证了数据实时处理。Flink对计算状态持久化提供了非常多的特性。Flink提供了诸多高抽象层的API以便 用户编写分布式任务:DataSetAPI,对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flin k提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。DataStreamAPI,对数据流进行流处理 操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。TableAPI,对 结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。此外, Flink还针对特定的应用领域提供了领域库,例如:FlinkML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现。根据官网的介 绍,Flink的特性包含:支持高吞吐、低延迟、高性能的流处理支持带有事件时间的窗口(Window)操作支持有状态计算的Ex actly-once语义支持高度灵活的窗口(Window)操作,支持基于time、count、session以及dat a-driven的窗口操作支持具有Backpressure功能的持续流模型支持基于轻量级分布式快照(Snapshot)实现的 容错一个运行时同时支持BatchonStreaming处理和Streaming处理Flink在JVM内部实现了自 己的内存管理支持迭代计算支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存为什么选择Flin k?Flink的优点:低延迟高吞吐结果的准确性和良好的容错性Flink相比传统的SparkStreaming有什么区别?这个 问题是一个非常宏观的问题,因为两个框架的不同点非常之多。但是在面试时有非常重要的一点一定要回答出来:Flink是标准的实时处理引 擎,基于事件驱动。而SparkStreaming是微批(Micro-Batch)的模型。下面我们就分几个方面介绍两个框架的主 要区别:1.架构模型SparkStreaming在运行时的主要角色包括:Master、Worker、Driver、Execu tor,Flink在运行时主要包含:Jobmanager、Taskmanager和Slot。2.任务调度SparkStrea ming连续不断的生成微小的数据批次,构建有向无环图DAG,SparkStreaming会依次创建DStreamGraph 、JobGenerator、JobScheduler。Flink根据用户提交的代码生成StreamGraph,经过优化生成J obGraph,然后提交给JobManager进行处理,JobManager会根据JobGraph生成Execution Graph,ExecutionGraph是Flink调度最核心的数据结构,JobManager根据ExecutionGr aph对Job进行调度。3.时间机制SparkStreaming支持的时间机制有限,只支持处理时间。Flink支持了流 处理程序在时间上的三个定义:处理时间、事件时间、注入时间。同时也支持?watermark?机制来处理滞后数据。4.容错机制对于 SparkStreaming任务,我们可以设置checkpoint,然后假如发生故障并重启,我们可以从上次checkpoi nt之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。Flink则使用两阶段提交协议来解决这 个问题。传统数据处理架构事务处理分析处理将数据从业务数据库复制到数仓,再进行分析和查询。有状态的流式处理lambda架构用两套系统 ,同时保证低延迟和结果准确Flink的主要特点事件驱动(Event-driven)有状态流处理WhatisState?Whi lemanyoperationsinadataflowsimplylookatoneindividual?ev entatatime?(forexampleaneventparser),someoperationsreme mberinformationacrossmultipleevents(forexamplewindowopera tors).Theseoperationsarecalled?stateful.Someexamplesofstat efuloperations:Whenanapplicationsearchesforcertaineventpa tterns,thestatewillstorethesequenceofeventsencountereds ofar.Whenaggregatingeventsperminute/hour/day,thestatehold sthependingaggregates.Whentrainingamachinelearningmodelo verastreamofdatapoints,thestateholdsthecurrentversion ofthemodelparameters.Whenhistoricdataneedstobemanaged,t hestateallowsefficientaccesstoeventsthatoccurredinthep ast.Flinkneedstobeawareofthestateinordertomakeitfaul ttolerantusing?https://ci.apache.org/projects/flink/flink-docs- release-1.13/zh/docs/dev/datastream/fault-tolerance/checkpointing /checkpoints?and?https://ci.apache.org/projects/flink/flink-docs- release-1.13/zh/docs/ops/state/savepoints/savepoints.Knowledgeab outthestatealsoallowsforrescalingFlinkapplications,meani ngthatFlinktakescareofredistributingstateacrossparallel instances.https://ci.apache.org/projects/flink/flink-docs-release -1.13/zh/docs/dev/datastream/fault-tolerance/queryable_state/Quer yablestate?allowsyoutoaccessstatefromoutsideofFlinkduri ngruntime.Whenworkingwithstate,itmightalsobeusefultore adabout?https://ci.apache.org/projects/flink/flink-docs-release- 1.13/zh/docs/ops/state/state_backends/Flink’sstatebackends.Fli nkprovidesdifferentstatebackendsthatspecifyhowandwheres tateisstored.KeyedStateKeyedstateismaintainedinwhatcanb ethoughtofasanembeddedkey/valuestore.Thestateispartiti onedanddistributedstrictlytogetherwiththestreamsthatare readbythestatefuloperators.Hence,accesstothekey/valuest ateisonlypossibleon?keyedstreams,i.e.afterakeyed/partiti oneddataexchange,andisrestrictedtothevaluesassociatedwi ththecurrentevent’skey.Aligningthekeysofstreamsandstat emakessurethatallstateupdatesarelocaloperations,guarant eeingconsistencywithouttransactionoverhead.Thisalignmental soallowsFlinktoredistributethestateandadjustthestreamp artitioningtransparently.KeyedStateisfurtherorganizedintos o-called?KeyGroups.KeyGroupsaretheatomicunitbywhichFlin kcanredistributeKeyedState;thereareexactlyasmanyKeyGro upsasthedefinedmaximumparallelism.Duringexecutioneachpar allelinstanceofakeyedoperatorworkswiththekeysforoneor moreKeyGroups.StatePersistenceFlinkimplementsfaulttoleranc eusingacombinationof?streamreplay?and?checkpointing.Acheck pointmarksaspecificpointineachoftheinputstreamsalongw iththecorrespondingstateforeachoftheoperators.Astreamin gdataflowcanberesumedfromacheckpointwhilemaintainingcon sistency?(exactly-onceprocessingsemantics)?byrestoringthesta teoftheoperatorsandreplayingtherecordsfromthepointoft hecheckpoint.Thecheckpointintervalisameansoftradingofft heoverheadoffaulttoleranceduringexecutionwiththerecovery time(thenumberofrecordsthatneedtobereplayed).Thefault tolerancemechanismcontinuouslydrawssnapshotsofthedistribut edstreamingdataflow.Forstreamingapplicationswithsmallsta te,thesesnapshotsareverylight-weightandcanbedrawnfreque ntlywithoutmuchimpactonperformance.Thestateofthestreami ngapplicationsisstoredataconfigurableplace,usuallyinad istributedfilesystem.Incaseofaprogramfailure(duetomachi ne-,network-,orsoftwarefailure),Flinkstopsthedistributed streamingdataflow.Thesystemthenrestartstheoperatorsandre setsthemtothelatestsuccessfulcheckpoint.Theinputstreams areresettothepointofthestatesnapshot.Anyrecordsthatar eprocessedaspartoftherestartedparalleldataflowareguaran teedtonothaveaffectedthepreviouslycheckpointedstate.Byde fault,checkpointingisdisabled.See?https://ci.apache.org/proje cts/flink/flink-docs-release-1.13/zh/docs/dev/datastream/fault-to lerance/checkpointing/Checkpointing?fordetailsonhowtoenable andconfigurecheckpointing.Forthismechanismtorealizeitsful lguarantees,thedatastreamsource(suchasmessagequeueorbr oker)needstobeabletorewindthestreamtoadefinedrecentp oint.?http://kafka.apache.org/ApacheKafka?hasthisabilityandF link’sconnectortoKafkaexploitsthis.See?https://ci.apache.or g/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datas tream/guarantees/FaultToleranceGuaranteesofDataSourcesandS inks?formoreinformationabouttheguaranteesprovidedbyFlink’ sconnectors.BecauseFlink’scheckpointsarerealizedthroughdis tributedsnapshots,weusethewords?snapshot?and?checkpoint?inte rchangeably.Oftenwealsousetheterm?snapshot?tomeaneither?c heckpoint?or?savepoint.CheckpointingThecentralpartofFlink’sf aulttolerancemechanismisdrawingconsistentsnapshotsofthed istributeddatastreamandoperatorstate.Thesesnapshotsactas consistentcheckpointstowhichthesystemcanfallbackincase ofafailure.Flink’smechanismfordrawingthesesnapshotsisd escribedin“http://arxiv.org/abs/1506.08603LightweightAsynchron ousSnapshotsforDistributedDataflows”.Itisinspiredbythes tandard?http://research.microsoft.com/en-us/um/people/lamport/pub s/chandy.pdfChandy-Lamportalgorithm?fordistributedsnapshotsan disspecificallytailoredtoFlink’sexecutionmodel.Keepinmin dthateverythingtodowithcheckpointingcanbedoneasynchrono usly.Thecheckpointbarriersdon’ttravelinlockstepandopera tionscanasynchronouslysnapshottheirstate.SinceFlink1.11,c heckpointscanbetakenwithorwithoutalignment.Inthissectio n,wedescribealignedcheckpointsfirst.BarriersAcoreelementi nFlink’sdistributedsnapshottingarethe?streambarriers.These barriersareinjectedintothedatastreamandflowwiththerec ordsaspartofthedatastream.Barriersneverovertakerecords, theyflowstrictlyinline.Abarrierseparatestherecordsint hedatastreamintothesetofrecordsthatgoesintothecurrent snapshot,andtherecordsthatgointothenextsnapshot.Eachb arriercarriestheIDofthesnapshotwhoserecordsitpushedin frontofit.Barriersdonotinterrupttheflowofthestreamand arehenceverylightweight.Multiplebarriersfromdifferentsna pshotscanbeinthestreamatthesametime,whichmeansthatva rioussnapshotsmayhappenconcurrently.Streambarriersareinjec tedintotheparalleldataflowatthestreamsources.Thepoint wherethebarriersforsnapshot?n?areinjected(let’scallit?Sn) isthepositioninthesourcestreamuptowhichthesnapshotco versthedata.Forexample,inApacheKafka,thispositionwould bethelastrecord’soffsetinthepartition.Thisposition?Sn?is reportedtothe?checkpointcoordinator?(Flink’sJobManager).The barriersthenflowdownstream.Whenanintermediateoperatorhas receivedabarrierforsnapshot?n?fromallofitsinputstreams, itemitsabarrierforsnapshot?n?intoallofitsoutgoingstream s.Onceasinkoperator(theendofastreamingDAG)hasreceived thebarrier?n?fromallofitsinputstreams,itacknowledgestha tsnapshot?n?tothecheckpointcoordinator.Afterallsinkshave acknowledgedasnapshot,itisconsideredcompleted.Oncesnapshot ?n?hasbeencompleted,thejobwillneveragainaskthesourcefo rrecordsfrombefore?Sn,sinceatthatpointtheserecords(and theirdescendantrecords)willhavepassedthroughtheentiredat aflowtopology.Operatorsthatreceivemorethanoneinputstream needto?align?theinputstreamsonthesnapshotbarriers.Thefi gureaboveillustratesthis:Assoonastheoperatorreceivessnap shotbarrier?n?fromanincomingstream,itcannotprocessanyfur therrecordsfromthatstreamuntilithasreceivedthebarrier?n ?fromtheotherinputsaswell.Otherwise,itwouldmixrecordst hatbelongtosnapshot?n?andwithrecordsthatbelongtosnapshot ?n+1.Oncethelaststreamhasreceivedbarrier?n,theoperatorem itsallpendingoutgoingrecords,andthenemitssnapshot?n?barri ersitself.Itsnapshotsthestateandresumesprocessingrecords fromallinputstreams,processingrecordsfromtheinputbuffers beforeprocessingtherecordsfromthestreams.Finally,theoper atorwritesthestateasynchronouslytothestatebackend.Noteth atthealignmentisneededforalloperatorswithmultipleinputs andforoperatorsafterashufflewhentheyconsumeoutputstrea msofmultipleupstreamsubtasks.SnapshottingOperatorStateWhen operatorscontainanyformof?state,thisstatemustbepartoft hesnapshotsaswell.Operatorssnapshottheirstateatthepoint intimewhentheyhavereceivedallsnapshotbarriersfromtheir inputstreams,andbeforeemittingthebarrierstotheiroutputs treams.Atthatpoint,allupdatestothestatefromrecordsbefo rethebarriershavebeenmade,andnoupdatesthatdependonrec ordsfromafterthebarriershavebeenapplied.Becausethestate ofasnapshotmaybelarge,itisstoredinaconfigurable?https ://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/o ps/state/state_backends/statebackend.Bydefault,thisistheJo bManager’smemory,butforproductionuseadistributedreliable storageshouldbeconfigured(suchasHDFS).Afterthestatehas beenstored,theoperatoracknowledgesthecheckpoint,emitsthe snapshotbarrierintotheoutputstreams,andproceeds.Theresult ingsnapshotnowcontains:Foreachparallelstreamdatasource,t heoffset/positioninthestreamwhenthesnapshotwasstartedFor eachoperator,apointertothestatethatwasstoredaspartof thesnapshotRecoveryRecoveryunderthismechanismisstraightfor ward:Uponafailure,Flinkselectsthelatestcompletedcheckpoi nt?k.Thesystemthenre-deploystheentiredistributeddataflow, andgiveseachoperatorthestatethatwassnapshottedasparto fcheckpoint?k.Thesourcesaresettostartreadingthestreamf romposition?Sk.ForexampleinApacheKafka,thatmeanstelling theconsumertostartfetchingfromoffset?Sk.Ifstatewassnapsh ottedincrementally,theoperatorsstartwiththestateofthela testfullsnapshotandthenapplyaseriesofincrementalsnapsho tupdatestothatstate.See?https://ci.apache.org/projects/flink/ flink-docs-release-1.13/zh/docs/dev/execution/task_failure_recove ry/RestartStrategies?formoreinformation.UnalignedCheckpointin gCheckpointingcanalsobeperformedunaligned.Thebasicideais thatcheckpointscanovertakeallin-flightdataaslongasthe in-flightdatabecomespartoftheoperatorstate.Notethatthis approachisactuallyclosertothe?http://research.microsoft.com/ en-us/um/people/lamport/pubs/chandy.pdfChandy-Lamportalgorithm?, butFlinkstillinsertsthebarrierinthesourcestoavoidover loadingthecheckpointcoordinator.Thefiguredepictshowanoper atorhandlesunalignedcheckpointbarriers:Theoperatorreactson thefirstbarrierthatisstoredinitsinputbuffers.Itimmedia telyforwardsthebarriertothedownstreamoperatorbyaddingit totheendoftheoutputbuffers.Theoperatormarksallovertake nrecordstobestoredasynchronouslyandcreatesasnapshotofi tsownstate.Consequently,theoperatoronlybrieflystopsthepr ocessingofinputtomarkthebuffers,forwardsthebarrier,and createsthesnapshotoftheotherstate.Unalignedcheckpointinge nsuresthatbarriersarearrivingatthesinkasfastaspossible .It’sespeciallysuitedforapplicationswithatleastoneslow movingdatapath,wherealignmenttimescanreachhours.However, sinceit’saddingadditionalI/Opressure,itdoesn’thelpwhen theI/Otothestatebackendsisthebottleneck.Seethemorein- depthdiscussionin?https://ci.apache.org/projects/flink/flink-do cs-release-1.13/zh/docs/ops/state/checkpoints/ops?forotherlimit ations.Notethatsavepointswillalwaysbealigned.UnalignedReco veryOperatorsfirstrecoverthein-flightdatabeforestartingpr ocessinganydatafromupstreamoperatorsinunalignedcheckpoint ing.Asidefromthat,itperformsthesamestepsasduring?https: //ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/co ncepts/stateful-stream-processing/recoveryofalignedcheckpoints .StateBackendsTheexactdatastructuresinwhichthekey/values indexesarestoreddependsonthechosen?https://ci.apache.org/pr ojects/flink/flink-docs-release-1.13/zh/docs/ops/state/state_back ends/statebackend.Onestatebackendstoresdatainanin-memory hashmap,anotherstatebackenduses?http://rocksdb.org/RocksDB? asthekey/valuestore.Inadditiontodefiningthedatastructur ethatholdsthestate,thestatebackendsalsoimplementthelog ictotakeapoint-in-timesnapshotofthekey/valuestateandst orethatsnapshotaspartofacheckpoint.Statebackendscanbe configuredwithoutchangingyourapplicationlogic.SavepointsAll programsthatusecheckpointingcanresumeexecutionfroma?savep oint.SavepointsallowbothupdatingyourprogramsandyourFlink clusterwithoutlosinganystate.https://ci.apache.org/projects/ flink/flink-docs-release-1.13/zh/docs/ops/state/savepoints/Savepo ints?are?manuallytriggeredcheckpoints,whichtakeasnapshotof theprogramandwriteitouttoastatebackend.Theyrelyonth eregularcheckpointingmechanismforthis.Savepointsaresimilar tocheckpointsexceptthattheyare?triggeredbytheuser?and?do n’tautomaticallyexpire?whennewercheckpointsarecompleted.Exa ctlyOncevs.AtLeastOnceThealignmentstepmayaddlatencyto thestreamingprogram.Usually,thisextralatencyisontheorde rofafewmilliseconds,butwehaveseencaseswherethelatency ofsomeoutliersincreasednoticeably.Forapplicationsthatreq uireconsistentlysuperlowlatencies(fewmilliseconds)forall records,Flinkhasaswitchtoskipthestreamalignmentduringa checkpoint.Checkpointsnapshotsarestilldrawnassoonasano peratorhasseenthecheckpointbarrierfromeachinput.Whenthe alignmentisskipped,anoperatorkeepsprocessingallinputs,ev enaftersomecheckpointbarriersforcheckpoint?n?arrived.That way,theoperatoralsoprocesseselementsthatbelongtocheckpoi nt?n+1?beforethestatesnapshotforcheckpoint?n?wastaken.Ona restore,theserecordswilloccurasduplicates,becausetheyar ebothincludedinthestatesnapshotofcheckpoint?n,andwillb ereplayedaspartofthedataaftercheckpoint?n.Alignmenthappe nsonlyforoperatorswithmultiplepredecessors(joins)aswell asoperatorswithmultiplesenders(afterastreamrepartitioning /shuffle).Becauseofthat,dataflowswithonlyembarrassinglypa rallelstreamingoperations(map(),?flatMap(),?filter(),…)actua llygive?exactlyonce?guaranteesevenin?atleastonce?mode.State andFaultToleranceinBatchProgramsFlinkexecutes?https://ci.a pache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/data set/overview/batchprograms?asaspecialcaseofstreamingprogra ms,wherethestreamsarebounded(finitenumberofelements).A? DataSet?istreatedinternallyasastreamofdata.Theconceptsa bovethusapplytobatchprogramsinthesamewayaswellasthey applytostreamingprograms,withminorexceptions:https://ci.ap ache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/execu tion/task_failure_recovery/Faulttoleranceforbatchprograms?doe snotusecheckpointing.Recoveryhappensbyfullyreplayingthe streams.Thatispossible,becauseinputsarebounded.Thispushe sthecostmoretowardstherecovery,butmakestheregularproce ssingcheaper,becauseitavoidscheckpoints.Statefuloperations intheDataSetAPIusesimplifiedin-memory/out-of-coredatastru ctures,ratherthankey/valueindexes.TheDataSetAPIintroduces specialsynchronized(superstep-based)iterations,whichareonly possibleonboundedstreams.Fordetails,checkoutthe?https:// ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/ dataset/iterations/iterationdocs.流处理在Flink的世界观中,一切都是由流组成的,离线数据时有 界的流;实时数据时一个没有界限的流:这就是所谓的有界流和无界流。在自然环境中,数据的产生原本就是流式的。无论是来自Web服务器 的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕?有界流(bo unded)或?无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。批处理是 有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计 或汇总计算后再输出结果。流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数 据进行处理。在Flink中,应用程序由用户自定义算子转换而来的流式dataflows?所组成。这些流式dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个汇(sink)结束。通常,程序代码中的transformatio n和dataflow中的算子(operator)之间是一一对应的。但有时也会出现一个transformation包含多个 算子的情况,如上图所示。Flink应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如ApacheKafka或K inesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink应用程序生成的结果流也可以发送到各种数据汇中 。有状态流处理Flink中的算子可以是有状态的。这意味着如何处理一个事件可能取决于该事件之前所有事件数据的累积结果。Flink 中的状态不仅可以用于简单的场景(例如统计仪表板上每分钟显示的数据),也可以用于复杂的场景(例如训练作弊检测模型)。Flink应用 程序可以在分布式群集上并行运行,其中每个算子的各个并行实例会在单独的线程中独立运行,并且通常情况下是会在不同的机器上运行。有状态算 子的并行实例组在存储其对应状态时通常是按照键(key)进行分片存储的。每个并行实例算子负责处理一组特定键的事件数据,并且这组键对应 的状态会保存在本地。如下图的Flink作业,其前三个算子的并行度为2,最后一个sink算子的并行度为1,其中第三个算子 是有状态的,并且你可以看到第二个算子和第三个算子之间是全互联的(fully-connected),它们之间通过网络进行数据分发。通 常情况下,实现这种类型的Flink程序是为了通过某些键对数据流进行分区,以便将需要一起处理的事件进行汇合,然后做统一计算处理。 Flink应用程序的状态访问都在本地进行,因为这有助于其提高吞吐量和降低延迟。通常情况下Flink应用程序都是将状态存储在 JVM堆上,但如果状态太大,我们也可以选择将其以结构化数据格式存储在高速磁盘中。并行DataflowsFlink程序本质上是 分布式并行程序。在程序执行期间,一个流有一个或多个流分区(StreamPartition),每个算子有一个或多个算子子任务(Op eratorSubtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。算子子任务数就是其对应算子 的并行度。在同一程序中,不同算子也可能具有不同的并行度。Flink算子之间可以通过一对一(直传)模式或重新分发模式传输数据:一对 一模式(例如上图中的?Source?和?map()?算子之间)可以保留元素的分区和顺序信息。这意味着?map()?算子的subt ask[1]输入的数据以及其顺序与?Source?算子的subtask[1]输出的数据和顺序完全相同,即同一分区的数据只会进 入到下游算子的同一分区。重新分发模式(例如上图中的?map()?和?keyBy/window?之间,以及?keyBy/window ?和?Sink?之间)则会更改数据所在的流分区。当你在程序中选择使用不同的?transformation,每个算子子任务也会根据不 同的transformation将数据发送到不同的目标子任务。例如以下这几种transformation和其对应分发数据的 模式:keyBy()(通过散列键重新分区)、broadcast()(广播)或?rebalance()(随机重新分发)。在重新分发数 据的过程中,元素只有在每对输出和输入子任务之间才能保留其之间的顺序信息(例如,keyBy/window?的subtask[2] 接收到的?map()?的subtask[1]中的元素都是有序的)。因此,上图所示的?keyBy/window?和?Sink?算 子之间数据的重新分发时,不同键(key)的聚合结果到达Sink的顺序是不确定的。自定义时间流处理对于大多数流数据处理应用程序而 言,能够使用处理实时数据的代码重新处理历史数据并产生确定并一致的结果非常有价值。在处理流式数据时,我们通常更需要关注事件本身发生的 顺序而不是事件被传输以及处理的顺序,因为这能够帮助我们推理出一组事件(事件集合)是何时发生以及结束的。例如电子商务交易或金融交易中 涉及到的事件集合。为了满足上述这类的实时流处理场景,我们通常会使用记录在数据流中的事件时间的时间戳,而不是处理数据的机器时钟的时间 戳。分层API越顶层越抽象,表达含义越简明,使用越方便越底层越具体,表达能力越丰富,使用越灵活通过状态快照实现的容错通过状态快照和 流重放两种方式的组合,Flink能够提供可容错的,精确一次计算的语义。这些状态快照在执行时会获取并存储分布式pipeline 中整体的状态,它会将数据源中消费数据的偏移量记录下来,并将整个jobgraph中算子获取到该数据(记录的偏移量对应的数据)时 的状态记录并存储下来。当发生故障时,Flink作业会恢复上次存储的状态,重置数据源从状态中记录的上次消费的偏移量开始重新进行消费 处理。而且状态快照在执行时会异步获取状态并存储,并不会阻塞正在进行的数据处理逻辑。其它特点?支持事件时间(event-time)和 处理时间(processing-time)语义?精确一次(exactly-once)的状态一致性保证?低延迟,每秒处理数百万个事件 ,毫秒级延迟?与众多常用存储系统的连接?高可用,动态扩展,实现724小时全天候运行Flink的组件栈根据Flink官网描述,Fl ink是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。自下而上,每一层分别代表:Deploy层: 该层主要涉及了Flink的部署模式,在上图中我们可以看出,Flink支持包括local、Standalone、Cluster、C loud等多种部署模式。Runtime层:Runtime层提供了支持Flink计算的核心实现,比如:支持分布式Stream 处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。API层:API层主要实现了 面向流(Stream)处理和批(Batch)处理API,其中面向流处理对应DataStreamAPI,面向批处理对应DataSe tAPI,后续版本,Flink有计划将DataStream和DataSetAPI进行统一。Libraries层:该层称为Fli nk应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流 处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习 库)、Gelly(图处理)。Flink的运行必须依赖Hadoop组件吗?Flink可以完全独立于Hadoop,在不依赖Hado op组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop组件 ,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查 点。二、Flink的基础编程模型上图是来自Flink官网的运行流程图。通过上图我们可以得知,Flink程序的基本构建是数据输入来 自一个Source,Source代表数据的输入端,经过Transformation进行转换,然后在一个或者多个Sink接收 器中结束。数据流(stream)就是一组永远不会停止的数据记录流,而转换(transformation)是将一个或多个流作为输入, 并生成一个或多个输出流的操作。执行时,Flink程序映射到streamingdataflows,由流(streams)和转换操 作(transformationoperators)组成。FlinkTable和SQLTableEnvironment是Tab leAPI和SQL集成的核心概念。这个类主要用来:在内部catalog中注册表注册外部catalog执行SQL查询注册用户定义( 标量,表或聚合)函数将DataStream或DataSet转换为表持有对ExecutionEnvironment或StreamEx ecutionEnvironment的引用FlinkSQL的实现原理是什么?是如何实现SQL解析的呢?首先大家要知道Fli nk的SQL解析是基于ApacheCalcite这个开源框架。基于此,一次完整的SQL解析过程如下:用户使用对外提供Strea mSQL的语法开发业务应用用calcite对StreamSQL进行语法检验,语法检验通过后,转换成calcite的逻辑树节点;最 终形成calcite的逻辑计划采用Flink自定义的优化规则和calcite火山模型、启发式模型共同对逻辑树进行优化,生成最优的F link物理计划对物理计划采用janinocodegen生成代码,生成用低阶APIDataStream描述的流应用,提交到F link平台执行Demo项目MavenPOM,注意使用Scala2.11,如果你想用Flink1.9,请选择Scala2.1 2。org.apache.flink>flink-scala_2.111.7.0on>org.apa che.flinkflink-streaming-scala_2.11actId>1.7.0ild>d>net.alchim31.mavenscala-maven-plugintifactId>3.4.6compiletestC ompilein>org.apache.maven.pluginsmaven -assembly-plugin3.0.0tion>jar-with-dependenciesriptorRef>tion>make-assemblypackage>singleugins>第一个WorkCount.scalaimportorg.apache.flink.api.scal a.ExecutionEnvironment//隐式转换importorg.apache.flink.api.scala._ob jectWordCount{defmain(args:Array[String]):Unit={//获取Flin k运行环境valenvironment=ExecutionEnvironment.getExecutionEnvironm ent//注意readTextFile路径填你自己的文件路径。valtestDataSet=environment.r eadTextFile("1.txt")//可以理解testDataSet为一个RDD//groupby(0)意思是以元组第一 个为key进行分组,sum(1)是对元组第二个位置数据进行累加。valresult=testDataSet.flatMa p(_.split("")).map((_,1)).groupBy(0).sum(1)result.print()}}1. txthellojavahellojavahelloflinkhelloflink运行结果FlinkSock实时统计im portorg.apache.flink.streaming.api.scala.StreamExecutionEnvironm entimportorg.apache.flink.api.scala._objectSockDStream{defma in(args:Array[String]):Unit={valenvironment=StreamExecuti onEnvironment.getExecutionEnvironmentvalunit=environment.sock etTextStream("192.168.208.102",7777)valresult=unit.flatMap(_ .split("")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)resul t.print()//执行environment.execute()}}linux192.168.208.102虚拟机内 #如果没有netcat请先安装yuminstall-ync#启动sock隧道nc-lk7777运行SockDStream .scalaFlink默认是状态保持的(输入两个hellojava,输出hello2,java2),默认保存在内存里。 1、kafka版本:kafka_2.11-1.0.12、配置pom.xml="UTF-8"?>s:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocat ion="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xs d/maven-4.0.0.xsd">4.0.0k afka-demokafka-demo 1.0-SNAPSHOTorg.a pache.kafkakafka_2.11n>1.0.1y>编写producerdemo案例packagekafka;import kafka.javaapi.producer.Producer;importkafka.producer.KeyedMessa ge;importkafka.producer.ProducerConfig;importjava.util.Properti es;publicclassKafkaProducer{privatefinalProducerring>producer;publicfinalstaticStringTOPIC="TEST-TOPIC"; privateKafkaProducer(){Propertiesprops=newProperties();//此 处配置的是kafka的端口props.put("metadata.broker.list","192.168.18.140:9 092");//配置value的序列化类props.put("serializer.class","kafka.serial izer.StringEncoder");//配置key的序列化类props.put("key.serializer.clas s","kafka.serializer.StringEncoder");//request.required.acks// 0,whichmeansthattheproducerneverwaitsforanacknowledgeme ntfromthebroker(thesamebehavioras0.7).Thisoptionprovid esthelowestlatencybuttheweakestdurabilityguarantees(some datawillbelostwhenaserverfails).//1,whichmeansthatth eproducergetsanacknowledgementaftertheleaderreplicahasr eceivedthedata.Thisoptionprovidesbetterdurabilityasthec lientwaitsuntiltheserveracknowledgestherequestassuccessf ul(onlymessagesthatwerewrittentothenow-deadleaderbutno tyetreplicatedwillbelost).//-1,whichmeansthattheproduc ergetsanacknowledgementafterallin-syncreplicashavereceiv edthedata.Thisoptionprovidesthebestdurability,weguarant eethatnomessageswillbelostaslongasatleastoneinsync replicaremains.props.put("request.required.acks","-1");produce r=newProducer(newProducerConfig(props));}vo idproduce(){intmessageNo=1000;finalintCOUNT=10000;whi le(messageNoStringdata="hellokafkamessage"+key;producer.send(newKey edMessage(TOPIC,key,data));System.out.println( data);messageNo++;}}publicstaticvoidmain(String[]args) {newKafkaProducer().produce();}}运行结果:Connectedtothetarget VM,address:''127.0.0.1:56726'',transport:''socket''log4j:WARNNo appenderscouldbefoundforlogger(kafka.utils.VerifiableProper ties).log4j:WARNPleaseinitializethelog4jsystemproperly.log4 j:WARNSeehttp://logging.apache.org/log4j/1.2/faq.html#noconfig formoreinfo.hellokafkamessage1000hellokafkamessage1001hel lokafkamessage1002hellokafkamessage1003编写consumerdemo案例pa ckagekafka;importkafka.consumer.ConsumerConfig;importkafka.con sumer.ConsumerIterator;importkafka.consumer.KafkaStream;importk afka.javaapi.consumer.ConsumerConnector;importkafka.serializer.S tringDecoder;importkafka.utils.VerifiableProperties;importjava. util.HashMap;importjava.util.List;importjava.util.Map;importja va.util.Properties;publicclassKafkaConsumer{privatefinalCon sumerConnectorconsumer;privateKafkaConsumer(){Propertiespro ps=newProperties();//zookeeper配置props.put("zookeeper.connec t","192.168.18.140:2181");//group代表一个消费组props.put("group.id", "jd-group");//zk连接超时props.put("zookeeper.session.timeout.ms", "4000");props.put("zookeeper.sync.time.ms","200");props.put("a uto.commit.interval.ms","1000");props.put("auto.offset.reset", "smallest");//序列化类props.put("serializer.class","kafka.serializ er.StringEncoder");ConsumerConfigconfig=newConsumerConfig(pr ops);consumer=kafka.consumer.Consumer.createJavaConsumerConnec tor(config);}voidconsume(){MaptopicCountMa p=newHashMap();topicCountMap.put(KafkaProduc er.TOPIC,newInteger(1));StringDecoderkeyDecoder=newStringD ecoder(newVerifiableProperties());StringDecodervalueDecoder= newStringDecoder(newVerifiableProperties());MapafkaStream>>consumerMap=consumer.createMessage Streams(topicCountMap,keyDecoder,valueDecoder);KafkaStreaming,String>stream=consumerMap.get(KafkaProducer.TOPIC).get(0) ;ConsumerIteratorit=stream.iterator();while (it.hasNext()){System.out.println(it.next().message());}}pub licstaticvoidmain(String[]args){newKafkaConsumer().consume ();}}运行结果:"C:\ProgramFiles\Java\jdk1.8.0_101\bin\java.exe"-jav aagent:D:\Installed\ideaIU-2018.1.5.win-scala\lib\idea_rt.jar=567 56:D:\Installed\ideaIU-2018.1.5.win-scala\bin-Dfile.encoding=UTF -8-classpath"C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\charset s.jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\deploy.jar;C:\Pr ogramFiles\Java\jdk1.8.0_101\jre\lib\ext\access-bridge-64.jar;C: \ProgramFiles\Java\jdk1.8.0_101\jre\lib\ext\cldrdata.jar;C:\Prog ramFiles\Java\jdk1.8.0_101\jre\lib\ext\dnsns.jar;C:\ProgramFile s\Java\jdk1.8.0_101\jre\lib\ext\jaccess.jar;C:\ProgramFiles\Java \jdk1.8.0_101\jre\lib\ext\jfxrt.jar;C:\ProgramFiles\Java\jdk1.8. 0_101\jre\lib\ext\localedata.jar;C:\ProgramFiles\Java\jdk1.8.0_1 01\jre\lib\ext\nashorn.jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre \lib\ext\sunec.jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\ext \sunjce_provider.jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\e xt\sunmscapi.jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\ext\s unpkcs11.jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\ext\zipfs .jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\javaws.jar;C:\Pro gramFiles\Java\jdk1.8.0_101\jre\lib\jce.jar;C:\ProgramFiles\Jav a\jdk1.8.0_101\jre\lib\jfr.jar;C:\ProgramFiles\Java\jdk1.8.0_101 \jre\lib\jfxswt.jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\js se.jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\management-agen t.jar;C:\ProgramFiles\Java\jdk1.8.0_101\jre\lib\plugin.jar;C:\Pr ogramFiles\Java\jdk1.8.0_101\jre\lib\resources.jar;C:\ProgramFi les\Java\jdk1.8.0_101\jre\lib\rt.jar;E:\workspace\kafka\kafka-dem o\target\classes;D:\maven3.3\localRepository\org\apache\kafka\kaf ka_2.11\1.0.1\kafka_2.11-1.0.1.jar;D:\maven3.3\localRepository\or g\apache\kafka\kafka-clients\1.0.1\kafka-clients-1.0.1.jar;D:\mav en3.3\localRepository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;D:\ma ven3.3\localRepository\org\xerial\snappy\snappy-java\1.1.4\snappy -java-1.1.4.jar;D:\maven3.3\localRepository\org\slf4j\slf4j-api\1 .7.25\slf4j-api-1.7.25.jar;D:\maven3.3\localRepository\com\faster xml\jackson\core\jackson-databind\2.9.1\jackson-databind-2.9.1.ja r;D:\maven3.3\localRepository\com\fasterxml\jackson\core\jackson- annotations\2.9.0\jackson-annotations-2.9.0.jar;D:\maven3.3\local Repository\com\fasterxml\jackson\core\jackson-core\2.9.1\jackson- core-2.9.1.jar;D:\maven3.3\localRepository\net\sf\jopt-simple\jop t-simple\5.0.4\jopt-simple-5.0.4.jar;D:\maven3.3\localRepository\ com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;D:\m aven3.3\localRepository\org\scala-lang\scala-library\2.11.12\scal a-library-2.11.12.jar;D:\maven3.3\localRepository\org\slf4j\slf4j -log4j12\1.7.25\slf4j-log4j12-1.7.25.jar;D:\maven3.3\localReposit ory\log4j\log4j\1.2.17\log4j-1.2.17.jar;D:\maven3.3\localReposito ry\com\101tec\zkclient\0.10\zkclient-0.10.jar;D:\maven3.3\localRe pository\org\apache\zookeeper\zookeeper\3.4.10\zookeeper-3.4.10.j ar"kafka.KafkaConsumerlog4j:WARNNoappenderscouldbefoundfor logger(kafka.utils.VerifiableProperties).log4j:WARNPleaseinit ializethelog4jsystemproperly.log4j:WARNSeehttp://logging.ap ache.org/log4j/1.2/faq.html#noconfigformoreinfo.hellokafkame ssage1000hellokafkamessage1002hellokafkamessage1004hellok afkamessage1006Flink中的APIFlink为流式/批式处理应用程序的开发提供了不同级别的抽象。Flin kAPI最底层的抽象为有状态实时流处理。其抽象实现是?https://ci.apache.org/projects/flink /flink-docs-release-1.13/zh/docs/dev/datastream/operators/process _function/ProcessFunction,并且?ProcessFunction?被Flink框架集成到了?htt ps://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs /dev/datastream/overview/DataStreamAPI?中来为我们使用。它允许用户在应用程序中自由地处理来 自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(eventtime)和 处理时间(processingtime)回调方法,从而允许程序可以实现复杂计算。FlinkAPI第二层抽象是?CoreAP Is。实际上,许多应用程序不需要使用到上述最底层抽象的API,而是可以使用?CoreAPIs?进行编程:其中包含?https: //ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/de v/datastream/overview/DataStreamAPI(应用于有界/无界数据流场景)和?https://ci.a pache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/data set/overview/DataSetAPI(应用于有界数据集场景)两部分。CoreAPIs提供的流式API(Fluen tAPI)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚 合(aggregations)、窗口(windows)和状态(state)操作等。此层API中处理的数据类型在每种编程语言中都 有其对应的类。ProcessFunction?这类底层抽象和?DataStreamAPI?的相互集成使得用户可以选择使用更底层 的抽象API来实现自己的需求。DataSetAPI?还额外提供了一些原语,比如循环/迭代(loop/iteration)操作 。FlinkAPI第三层抽象是?TableAPI。TableAPI?是以表(Table)为中心的声明式编程(DSL)API ,例如在流式数据场景下,它可以表示一张正在动态改变的表。https://ci.apache.org/projects/flink/ flink-docs-release-1.13/zh/docs/dev/table/overview/TableAPI?遵循(扩 展)关系模型:即表拥有schema(类似于关系型数据库中的schema),并且TableAPI也提供了类似于关系模型中的 操作,比如select、project、join、group-by和aggregate等。TableAPI程序是以声明 的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管TableAPI使用起来很简洁并且可以由各种类型的用户自 定义函数扩展功能,但还是比CoreAPI的表达能力差。此外,TableAPI程序在执行之前还会使用优化器中的优化规则对用 户编写的表达式进行优化。表和?DataStream/DataSet?可以进行无缝切换,Flink允许用户在编写应用程序时将?Ta bleAPI?与?DataStream/DataSet?API混合使用。FlinkAPI最顶层抽象是?SQL。这层抽象在语 义和程序表达式上都类似于?TableAPI,但是其程序实现都是SQL查询表达式。https://ci.apache.org/ projects/flink/flink-docs-release-1.13/zh/docs/dev/table/overview /SQL?抽象与TableAPI抽象之间的关联是非常紧密的,并且SQL查询语句可以在?TableAPI?中定义的表上执 行。DataStreamAPI简介该练习的重点是充分全面地了解DataStreamAPI,以便于编写流式应用入门。什么能被 转化成流?Flink的Java和ScalaDataStreamAPI可以将任何可序列化的对象转化为流。Flink自 带的序列化器有基本类型,即String、Long、Integer、Boolean、Array复合类型:Tuples、POJOs 和Scalacaseclasses而且Flink会交给Kryo序列化其他类型。也可以将其他序列化器和Flink一 起使用。特别是有良好支持的Avro。Javatuples和POJOsFlink的原生序列化器可以高效地操作tuples 和POJOsTuples对于Java,Flink自带有?Tuple0?到?Tuple25?类型。Tuple2,Integer>person=Tuple2.of("Fred",35);//zerobasedindex!S tringname=person.f0;Integerage=person.f1;POJOs如果满足以下条件,Flin k将数据类型识别为POJO类型(并允许“按名称”字段引用):该类是公有且独立的(没有非静态内部类)该类有公有的无参构造函数类 (及父类)中所有的所有不被static、transient修饰的属性要么是公有的(且不被final修饰),要么是包含公有的 getter和setter方法,这些方法遵循Javabean命名规范。示例:publicclassPerson {publicStringname;publicIntegerage;publicPerson(){}; publicPerson(Stringname,Integerage){...};}Personpers on=newPerson("FredFlintstone",35);Flink的序列化器https://ci.apac he.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastr eam/fault-tolerance/schema_evolution/支持的POJO类型数据结构升级。Scalatupl es和caseclasses如果你了解Scala,那一定知道tuple和caseclass。基于DataStre amAPI实现欺诈检测ApacheFlink提供了DataStreamAPI来实现稳定可靠的、有状态的流处理应用程序 。Flink支持对状态和时间的细粒度控制,以此来实现复杂的事件驱动数据处理系统。这个入门指导手册讲述了如何通过Flink DataStreamAPI来实现一个有状态流处理程序。你要搭建一个什么系统在当今数字时代,信用卡欺诈行为越来越被重视。罪犯可 以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。用盗得的信用卡进行很小额度的例如一美元或者更小额度的消费进行测试。如果测试 消费成功,那么他们就会用这个信用卡进行大笔消费,来购买一些他们希望得到的,或者可以倒卖的财物。在这个教程中,你将会建立一个针对可疑 信用卡交易行为的反欺诈检测系统。通过使用一组简单的规则,你将了解到Flink如何为我们实现复杂业务逻辑并实时执行。准备条件这 个代码练习假定你对Java或Scala有一定的了解,当然,如果你之前使用的是其他开发语言,你也应该能够跟随本教程进行学习。 困难求助如果遇到困难,可以参考?https://flink.apache.org/zh/gettinghelp.html社区支持资 源。当然也可以在邮件列表提问,Flink的?https://flink.apache.org/zh/community.htm l用户邮件列表?一直被评为所有Apache项目中最活跃的一个,这也是快速获得帮助的好方法。怎样跟着教程练习首先,你需要在你的电脑上 准备以下环境:Java8or11Maven一个准备好的FlinkMavenArchetype能够快速创建一个包含了必 要依赖的Flink程序骨架,基于此,你可以把精力集中在编写业务逻辑上即可。这些已包含的依赖包括?flink-streamin g-java、flink-walkthrough-common?等,他们分别是Flink应用程序的核心依赖项和这个代码练习需要 的数据生成器,当然还包括其他本代码练习所依赖的类。{%panel?说明:?为简洁起见,本练习中的代码块中可能不包含完整的类路径。 完整的类路径可以在文档底部?https://ci.apache.org/projects/flink/flink-docs-rel ease-1.13/zh/docs/try-flink/datastream/链接?中找到。%}$mvnarchetype: generate\-DarchetypeGroupId=org.apache.flink\-DarchetypeArtif actId=flink-walkthrough-datastream-java\-DarchetypeVersion=1.13 .2\-DgroupId=frauddetection\-DartifactId=frauddetection\-Dv ersion=0.1\-Dpackage=spendreport\-DinteractiveMode=false你可以根据 自己的情况修改?groupId、?artifactId?和?package。通过这三个参数,Maven将会创建一个名为?fra uddetection?的文件夹,包含了所有依赖的整个工程项目将会位于该文件夹下。将工程目录导入到你的开发环境之后,你可以找到? FraudDetectionJob.java?(或?FraudDetectionJob.scala)代码文件,文件中的代码如下所 示。你可以在IDE中直接运行这个文件。同时,你可以试着在数据流中设置一些断点或者以DEBUG模式来运行程序,体验Fli nk是如何运行的。FraudDetectionJob.javapackagespendreport;importorg.ap ache.flink.streaming.api.datastream.DataStream;importorg.apache. flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.walkthrough.common.sink.AlertSink;importorg.ap ache.flink.walkthrough.common.entity.Alert;importorg.apache.flin k.walkthrough.common.entity.Transaction;importorg.apache.flink.w alkthrough.common.source.TransactionSource;publicclassFraudDete ctionJob{publicstaticvoidmain(String[]args)throwsExceptio n{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.g etExecutionEnvironment();DataStreamtransactions= env.addSource(newTransactionSource()).name("transactions");Da taStreamalerts=transactions.keyBy(Transaction::getAcco untId).process(newFraudDetector()).name("fraud-detector");ale rts.addSink(newAlertSink()).name("send-alerts");env.execute(" FraudDetection");}}FraudDetector.javapackagespendreport;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;im portorg.apache.flink.util.Collector;importorg.apache.flink.walk through.common.entity.Alert;importorg.apache.flink.walkthrough.c ommon.entity.Transaction;publicclassFraudDetectorextendsKeyed ProcessFunction{privatestaticfinal longserialVersionUID=1L;privatestaticfinaldoubleSMALL_AMO UNT=1.00;privatestaticfinaldoubleLARGE_AMOUNT=500.00;pr ivatestaticfinallongONE_MINUTE=601000;@Overridepublic voidprocessElement(Transactiontransaction,Contextcontext,Co llectorcollector)throwsException{Alertalert=newAl ert();alert.setId(transaction.getAccountId());collector.collect (alert);}}代码分析让我们一步步地来分析一下这两个代码文件。FraudDetectionJob?类定义了程序的数据流,而 ?FraudDetector?类定义了欺诈交易检测的业务逻辑。下面我们开始讲解整个Job是如何组装到?FraudDetecti onJob?类的?main?函数中的。执行环境第一行的?StreamExecutionEnvironment?用于设置你的执行环境 。任务执行环境用于定义任务的属性、创建数据源以及最终启动任务的执行。StreamExecutionEnvironmentenv =StreamExecutionEnvironment.getExecutionEnvironment();创建数据源数据源从 外部系统例如ApacheKafka、RabbitMQ或者ApachePulsar接收数据,然后将数据送到Flink 程序中。这个代码练习使用的是一个能够无限循环生成信用卡模拟交易数据的数据源。每条交易数据包括了信用卡ID(account Id),交易发生的时间(timestamp)以及交易的金额(amount)。绑定到数据源上的?name?属性是为了调试方便, 如果发生一些异常,我们能够通过它快速定位问题发生在哪里。DataStreamtransactions =env.addSource(newTransactionSource()).name("transactions");对 事件分区&欺诈检测transactions?这个数据流包含了大量的用户交易数据,需要被划分到多个并发上进行欺诈检测处理。由于欺 诈行为的发生是基于某一个账户的,所以,必须要要保证同一个账户的所有交易行为数据要被同一个并发的task进行处理。为了保证同一个 task处理同一个key的所有数据,你可以使用?DataStream#keyBy?对流进行分区。?process()?函数 对流绑定了一个操作,这个操作将会对流上的每一个消息调用所定义好的函数。通常,一个操作会紧跟着?keyBy?被调用,在这个例子中, 这个操作是FraudDetector,该操作是在一个?keyedcontext?上执行的。DataStreama lerts=transactions.keyBy(Transaction::getAccountId).process(n ewFraudDetector()).name("fraud-detector");输出结果sink会将?DataStrea m?写出到外部系统,例如ApacheKafka、Cassandra或者AWSKinesis等。?AlertSink?使 用?INFO?的日志级别打印每一个?Alert?的数据记录,而不是将其写入持久存储,以便你可以方便地查看结果。alerts.add Sink(newAlertSink());运行作业Flink程序是懒加载的,并且只有在完全搭建好之后,才能够发布到集群上执行。 调用?StreamExecutionEnvironment#execute?时给任务传递一个任务名参数,就可以开始运行任务。en v.execute("FraudDetection");欺诈检测器欺诈检查类?FraudDetector?是?KeyedProc essFunction?接口的一个实现。他的方法?KeyedProcessFunction#processElement?将会在 每个交易事件上被调用。这个程序里边会对每笔交易发出警报,有人可能会说这做报过于保守了。本教程的后续步骤将指导你对这个欺诈检测器进 行更有意义的业务逻辑扩展。publicclassFraudDetectorextendsKeyedProcessFunct ion{privatestaticfinaldoubleSMALL _AMOUNT=1.00;privatestaticfinaldoubleLARGE_AMOUNT=500.00 ;privatestaticfinallongONE_MINUTE=601000;@Overridepub licvoidprocessElement(Transactiontransaction,Contextcontext ,Collectorcollector)throwsException{Alertalert=ne wAlert();alert.setId(transaction.getAccountId());collector.col lect(alert);}}实现一个真正的应用程序我们先实现第一版报警程序,对于一个账户,如果出现小于$1美元的交易后紧跟着 一个大于$500的交易,就输出一个报警信息。假设你的欺诈检测器所处理的交易数据如下:交易3和交易4应该被标记为欺诈行为 ,因为交易3是一个$0.09的小额交易,而紧随着的交易4是一个$510的大额交易。另外,交易7、8和交易 9就不属于欺诈交易了,因为在交易7这个$0.02的小额交易之后,并没有跟随一个大额交易,而是一个金额适中的交易,这使得交 易7到交易9不属于欺诈行为。欺诈检测器需要在多个交易事件之间记住一些信息。仅当一个大额的交易紧随一个小额交易的情况发生时 ,这个大额交易才被认为是欺诈交易。在多个事件之间存储信息就需要使用到?https://ci.apache.org/project s/flink/flink-docs-release-1.13/zh/docs/concepts/glossary/状态,这也是我 们选择使用?https://ci.apache.org/projects/flink/flink-docs-release-1.1 3/zh/docs/dev/datastream/operators/process_function/KeyedProcessF unction?的原因。它能够同时提供对状态和时间的细粒度操作,这使得我们能够在接下来的代码练习中实现更复杂的算法。最直接的实现 方式是使用一个boolean型的标记状态来表示是否刚处理过一个小额交易。当处理到该账户的一个大额交易时,你只需要检查这个标记 状态来确认上一个交易是是否小额交易即可。然而,仅使用一个标记作为?FraudDetector?的类成员来记录账户的上一个交易状态是 不准确的。Flink会在同一个?FraudDetector?的并发实例中处理多个账户的交易数据,假设,当账户A和账户B 的数据被分发的同一个并发实例上处理时,账户A的小额交易行为可能会将标记状态设置为真,随后账户B的大额交易可能会被误判为欺诈 交易。当然,我们可以使用如?Map?这样的数据结构来保存每一个账户的状态,但是常规的类成员变量是无法做到容错处理的,当任务失败重 启后,之前的状态信息将会丢失。这样的话,如果程序曾出现过失败重启的情况,将会漏掉一些欺诈报警。为了应对这个问题,Flink提供 了一套支持容错状态的原语,这些原语几乎与常规成员变量一样易于使用。Flink中最基础的状态类型是?https://ci.apac he.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastr eam/fault-tolerance/state/ValueState,这是一种能够为被其封装的变量添加容错能力的类型。?Val ueState?是一种?keyedstate,也就是说它只能被用于?keyedcontext?提供的operator中,即 所有能够紧随?DataStream#keyBy?之后被调用的operator。一个operator中的?keyedstat e?的作用域默认是属于它所属的key的。这个例子中,key就是当前正在处理的交易行为所属的信用卡账户(key传入key By()函数调用),而?FraudDetector?维护了每个帐户的标记状态。?ValueState?需要使用?ValueSta teDescriptor?来创建,ValueStateDescriptor?包含了Flink如何管理变量的一些元数据信息。状态 在使用之前需要先被注册。状态需要使用?open()?函数来注册状态。publicclassFraudDetectorext endsKeyedProcessFunction{privatesta ticfinallongserialVersionUID=1L;privatetransientValueStat eflagState;@Overridepublicvoidopen(Configurationpa rameters){ValueStateDescriptorflagDescriptor=newVa lueStateDescriptor<>("flag",Types.BOOLEAN);flagState=getRunt imeContext().getState(flagDescriptor);}ValueState?是一个包装类,类似于Jav a标准库里边的?AtomicReference?和?AtomicLong。它提供了三个用于交互的方法。update?用于更新状 态,value?用于获取状态值,还有?clear?用于清空状态。如果一个key还没有状态,例如当程序刚启动或者调用过?Val ueState#clear?方法时,ValueState#value?将会返回?null。如果需要更新状态,需要调用?Value State#update?方法,直接更改?ValueState#value?的返回值可能不会被系统识别。容错处理将在Flink 后台自动管理,你可以像与常规变量那样与状态变量进行交互。下边的示例,说明了如何使用标记状态来追踪可能的欺诈交易行为。@Overr idepublicvoidprocessElement(Transactiontransaction,Contextc ontext,Collectorcollector)throwsException{//Getthe currentstateforthecurrentkeyBooleanlastTransactionWasSmal l=flagState.value();//Checkiftheflagissetif(lastTransa ctionWasSmall!=null){if(transaction.getAmount()>LARGE_AMOU NT){//OutputanalertdownstreamAlertalert=newAlert();al ert.setId(transaction.getAccountId());collector.collect(alert); }//CleanupourstateflagState.clear();}if(transaction.getA mount()e(true);}}对于每笔交易,欺诈检测器都会检查该帐户的标记状态。请记住,ValueState?的作用域始终限于当前的k ey,即信用卡帐户。如果标记状态不为空,则该帐户的上一笔交易是小额的,因此,如果当前这笔交易的金额很大,那么检测程序将输出报警信 息。在检查之后,不论是什么状态,都需要被清空。不管是当前交易触发了欺诈报警而造成模式的结束,还是当前交易没有触发报警而造成模式的 中断,都需要重新开始新的模式检测。最后,检查当前交易的金额是否属于小额交易。如果是,那么需要设置标记状态,以便可以在下一个事件中 对其进行检查。注意,ValueState?实际上有3种状态:unset(null),true,和?fal se,ValueState?是允许空值的。我们的程序只使用了unset(null)和?true?两种来判断标记状态被设置了 与否。欺诈检测器v2:状态+时间=??骗子们在小额交易后不会等很久就进行大额消费,这样可以降低小额测试交易被发现的几率。 比如,假设你为欺诈检测器设置了一分钟的超时,对于上边的例子,交易3和交易4只有间隔在一分钟之内才被认为是欺诈交易。F link中的?KeyedProcessFunction?允许您设置计时器,该计时器在将来的某个时间点执行回调函数。让我们看看如何 修改程序以符合我们的新要求:当标记状态被设置为?true?时,设置一个在当前时间一分钟后触发的定时器。当定时器被触发时,重置标记状 态。当标记状态被重置时,删除定时器。要删除一个定时器,你需要记录这个定时器的触发时间,这同样需要状态来实现,所以你需要在标记状态后 也创建一个记录定时器时间的状态。privatetransientValueStateflagState;p rivatetransientValueStatetimerState;@Overridepublicvoid open(Configurationparameters){ValueStateDescriptorf lagDescriptor=newValueStateDescriptor<>("flag",Types.BOOLEAN );flagState=getRuntimeContext().getState(flagDescriptor);Valu eStateDescriptortimerDescriptor=newValueStateDescriptor <>("timer-state",Types.LONG);timerState=getRuntimeContext(). getState(timerDescriptor);}KeyedProcessFunction#processElement?需要 使用提供了定时器服务的?Context?来调用。定时器服务可以用于查询当前时间、注册定时器和删除定时器。使用它,你可以在标记状 态被设置时,也设置一个当前时间一分钟后触发的定时器,同时,将触发时间保存到?timerState?状态中。if(transact ion.getAmount()te.update(true);//setthetimerandtimerstatelongtimer=co ntext.timerService().currentProcessingTime()+ONE_MINUTE;contex t.timerService().registerProcessingTimeTimer(timer);timerState.u pdate(timer);}处理时间是本地时钟时间,这是由运行任务的服务器的系统时间来决定的。当定时器触发时,将会调用?Keyed ProcessFunction#onTimer?方法。通过重写这个方法来实现一个你自己的重置状态的回调逻辑。@Overridep ublicvoidonTimer(longtimestamp,OnTimerContextctx,Collector< Alert>out){//removeflagafter1minutetimerState.clear();f lagState.clear();}最后,如果要取消定时器,你需要删除已经注册的定时器,并同时清空保存定时器的状态。你可以把这些 逻辑封装到一个助手函数中,而不是直接调用?flagState.clear()。privatevoidcleanUp(Conte xtctx)throwsException{//deletetimerLongtimer=timerStat e.value();ctx.timerService().deleteProcessingTimeTimer(timer);/ /cleanupallstatetimerState.clear();flagState.clear();}这就是一个 功能完备的,有状态的分布式流处理程序了。完整的程序packagespendreport;importorg.apache.fl ink.api.common.state.ValueState;importorg.apache.flink.api.commo n.state.ValueStateDescriptor;importorg.apache.flink.api.common.t ypeinfo.Types;importorg.apache.flink.configuration.Configuration ;importorg.apache.flink.streaming.api.functions.KeyedProcessFunc tion;importorg.apache.flink.util.Collector;importorg.apache.fli nk.walkthrough.common.entity.Alert;importorg.apache.flink.walkth rough.common.entity.Transaction;publicclassFraudDetectorextend sKeyedProcessFunction{privatestatic finallongserialVersionUID=1L;privatestaticfinaldoubleSM ALL_AMOUNT=1.00;privatestaticfinaldoubleLARGE_AMOUNT=500 .00;privatestaticfinallongONE_MINUTE=601000;privatetr ansientValueStateflagState;privatetransientValueSta tetimerState;@Overridepublicvoidopen(Configurationpar ameters){ValueStateDescriptorflagDescriptor=newVal ueStateDescriptor<>("flag",Types.BOOLEAN);flagState=getRunt imeContext().getState(flagDescriptor);ValueStateDescriptor timerDescriptor=newValueStateDescriptor<>("timer-state",Typ es.LONG);timerState=getRuntimeContext().getState(timerDescript or);}@OverridepublicvoidprocessElement(Transactiontransact ion,Contextcontext,Collectorcollector)throwsExceptio n{//GetthecurrentstateforthecurrentkeyBooleanlastTran sactionWasSmall=flagState.value();//Checkiftheflagisset if(lastTransactionWasSmall!=null){if(transaction.getAmount( )>LARGE_AMOUNT){//OutputanalertdownstreamAlertalert=ne wAlert();alert.setId(transaction.getAccountId());collector.col lect(alert);}//CleanupourstatecleanUp(context);}if(tran saction.getAmount()gState.update(true);longtimer=context.timerService().currentP rocessingTime()+ONE_MINUTE;context.timerService().registerProc essingTimeTimer(timer);timerState.update(timer);}}@Overridep ublicvoidonTimer(longtimestamp,OnTimerContextctx,Collector< Alert>out){//removeflagafter1minutetimerState.clear();f lagState.clear();}privatevoidcleanUp(Contextctx)throwsExce ption{//deletetimerLongtimer=timerState.value();ctx.time rService().deleteProcessingTimeTimer(timer);//cleanupallstat etimerState.clear();flagState.clear();}}期望的结果使用已准备好的?Transacti onSource?数据源运行这个代码,将会检测到账户3的欺诈行为,并输出报警信息。你将能够在你的taskmanager 的日志中看到下边输出:2019-08-1914:22:06,220INFOorg.apache.flink.walkthr ough.common.sink.AlertSink-Alert{id=3}2019-08-1914: 22:11,383INFOorg.apache.flink.walkthrough.common.sink.AlertSin k-Alert{id=3}2019-08-1914:22:16,551INFOorg.apach e.flink.walkthrough.common.sink.AlertSink-Alert{id=3 }2019-08-1914:22:21,723INFOorg.apache.flink.walkthrough.commo n.sink.AlertSink-Alert{id=3}2019-08-1914:22:26,896 INFOorg.apache.flink.walkthrough.common.sink.AlertSink -Alert{id=3}一个完整的示例该示例将关于人的记录流作为输入,并且过滤后只包含成年人。importorg.ap ache.flink.streaming.api.environment.StreamExecutionEnvironment;i mportorg.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.api.common.functions.FilterFunction;publicclas sExample{publicstaticvoidmain(String[]args)throwsExcepti on{finalStreamExecutionEnvironmentenv=StreamExecutionEnviro nment.getExecutionEnvironment();DataStreamflintstones= env.fromElements(newPerson("Fred",35),newPerson("Wilma",35 ),newPerson("Pebbles",2));DataStreamadults=flintst ones.filter(newFilterFunction(){@Overridepublicboole anfilter(Personperson)throwsException{returnperson.age>= 18;}});adults.print();env.execute();}publicstaticclassPe rson{publicStringname;publicIntegerage;publicPerson(){} ;publicPerson(Stringname,Integerage){this.name=name;thi s.age=age;};publicStringtoString(){returnthis.name.toStr ing()+":age"+this.age.toString();};}}Stream执行环境每个Flink 应用都需要有执行环境,在该示例中为?env。流式应用需要用到?StreamExecutionEnvironment。DataStr eamAPI将你的应用构建为一个jobgraph,并附加到?StreamExecutionEnvironment?。当调用 ?env.execute()?时此graph就被打包并发送到JobManager上,后者对作业并行处理并将其子任务分发给 TaskManager来执行。每个作业的并行子任务将在?taskslot?中执行。注意,如果没有调用execute(),应 用就不会运行。此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。基本的streamsou rce上述示例用?env.fromElements(...)?方法构造?DataStream?。这样将简单的流放在 一起是为了方便用于原型或测试。StreamExecutionEnvironment?上还有一个?fromCollection(Co llection)?方法。因此,你可以这样做:Listpeople=newArrayList>();people.add(newPerson("Fred",35));people.add(newPerson("Wil ma",35));people.add(newPerson("Pebbles",2));DataStream flintstones=env.fromCollection(people);另一个获取数据到流中的便捷方法是用socke tDataStreamlines=env.socketTextStream("localhost",999 9)或读取文件DataStreamlines=env.readTextFile("file:///path" );在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如Apach eKafka,Kinesis和各种文件系统。RESTAPI和数据库也经常用于增强流处理的能力(streamenrichm ent)。基本的streamsink上述示例用?adults.print()?打印其结果到taskmanager的日志中 (如果运行在IDE中时,将追加到你的IDE控制台)。它会对流中的每个元素都调用?toString()?方法。输出看起来类似 于1>Fred:age352>Wilma:age351>和2>指出输出来自哪个sub-task(即threa d)Inproduction,commonlyusedsinksincludetheStreamingFileSin k,variousdatabases,andseveralpub-subsystems.调试在生产中,应用程序将在远程 集群或一组容器中运行。如果集群或容器挂了,这就属于远程失败。JobManager和TaskManager日志对于调试此类故障 非常有用,但是更简单的是Flink支持在IDE内部进行本地调试。你可以设置断点,检查局部变量,并逐行执行代码。如果想了解 Flink的工作原理和内部细节,查看Flink源码也是非常好的方法。动手实践至此,你已经可以开始编写并运行一个简单的Dat aStream应用了。克隆?https://github.com/apache/flink-training/tree/rel ease-1.13/flink-training-repo?并在阅读完README中的指示后,开始尝试第一个练习吧:?http s://github.com/apache/flink-training/blob/release-1.13/ride-clean singFilteringaStream(RideCleansing)?。更多阅读https://flink.apache .org/news/2020/04/15/flink-serialization-tuning-vol-1.htmlFlinkS erializationTuningVol.1:ChoosingyourSerializer—ifyoucan https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/d ocs/dev/datastream/overview/AnatomyofaFlinkProgramhttps://ci. apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/dat astream/overview/DataSourceshttps://ci.apache.org/projects/flink /flink-docs-release-1.13/zh/docs/dev/datastream/overview/DataSin kshttps://ci.apache.org/projects/flink/flink-docs-release-1.13/zh /docs/connectors/datastream/overview/DataStreamConnectors流式分析Eve ntTimeandWatermarks概要Flink明确支持以下三种时间语义:事件时间(eventtime):?事件产生 的时间,记录的是设备生产(或者存储)事件的时间摄取时间(ingestiontime):?Flink读取事件时记录的时间处理时间 (processingtime):?Flinkpipeline中具体算子处理事件的时间为了获得可重现的结果,例如在计算过去的 特定一天里第一个小时股票的最高价格时,我们应该使用事件时间。这样的话,无论什么时间去计算都不会影响输出结果。然而如果使用处理时间的 话,实时应用程序的结果是由程序运行的时间所决定。多次运行基于处理时间的实时程序,可能得到的结果都不相同,也可能会导致再次分析历史数 据或者测试新代码变得异常困难。使用EventTime如果想要使用事件时间,需要额外给Flink提供一个时间戳提取器和Wa termark生成器,Flink将使用它们来跟踪事件时间的进度。这将在选节https://ci.apache.org/proj ects/flink/flink-docs-release-1.13/zh/docs/learn-flink/streaming_ analytics/使用Watermarks?中介绍,但是首先我们需要解释一下watermarks是什么。Watermark s让我们通过一个简单的示例来演示为什么需要watermarks及其工作方式。在此示例中,我们将看到带有混乱时间戳的事件流,如下 所示。显示的数字表达的是这些事件实际发生时间的时间戳。到达的第一个事件发生在时间4,随后发生的事件发生在更早的时间2,依此类推 :···23192224211417131215911724→假设我们要对数据流排序,我们想要达到 的目的是:应用程序应该在数据流里的事件到达时就有一个算子(我们暂且称之为排序)开始处理事件,这个算子所输出的流是按照时间戳排序好的 。让我们重新审视这些数据:(1)我们的排序器看到的第一个事件的时间戳是4,但是我们不能立即将其作为已排序的流释放。因为我们并不 能确定它是有序的,并且较早的事件有可能并未到达。事实上,如果站在上帝视角,我们知道,必须要等到时间戳为2的元素到来时,排序器才 可以有事件输出。需要一些缓冲,需要一些时间,但这都是值得的(2)接下来的这一步,如果我们选择的是固执的等待,我们永远不会有结果。 首先,我们看到了时间戳为4的事件,然后看到了时间戳为2的事件。可是,时间戳小于2的事件接下来会不会到来呢?可能会,也可 能不会。再次站在上帝视角,我们知道,我们永远不会看到时间戳1。最终,我们必须勇于承担责任,并发出指令,把带有时间戳2的事件作 为已排序的事件流的开始(3)然后,我们需要一种策略,该策略定义:对于任何给定时间戳的事件,Flink何时停止等待较早事件的到来。 这正是watermarks的作用?—它们定义何时停止等待较早的事件。Flink中事件时间的处理取决于?watermark 生成器,后者将带有时间戳的特殊元素插入流中形成?watermarks。事件时间?t?的watermark代表?t?之前(很可能 )都已经到达。当watermark以2或更大的时间戳到达时,事件流的排序器应停止等待,并输出2作为已经排序好的流。(4 )我们可能会思考,如何决定watermarks的不同生成策略每个事件都会延迟一段时间后到达,然而这些延迟有所不同,有些事件可 能比其他事件延迟得更多。一种简单的方法是假定这些延迟受某个最大延迟的限制。Flink将此策略称为?最大无序边界(bounded -out-of-orderness)?watermark。当然,我们可以想像出更好的生成watermark的方法,但是对于大多 数应用而言,固定延迟策略已经足够了。延迟VS正确性watermarks给了开发者流处理的一种选择,它们使开发人员在开发应用程 序时可以控制延迟和完整性之间的权衡。与批处理不同,批处理中的奢侈之处在于可以在产生任何结果之前完全了解输入,而使用流式传输,我们不 被允许等待所有的时间都产生了,才输出排序好的数据,这与流相违背。我们可以把watermarks的边界时间配置的相对较短,从而冒 着在输入了解不完全的情况下产生结果的风险-即可能会很快产生错误结果。或者,你可以等待更长的时间,并利用对输入流的更全面的了解来产生 结果。当然也可以实施混合解决方案,先快速产生初步结果,然后在处理其他(最新)数据时向这些结果提供更新。对于有一些对延迟的容忍程度很 低,但是又对结果有很严格的要求的场景下,或许是一个福音。延迟延迟是相对于watermarks定义的。Watermark(t)? 表示事件流的时间已经到达了?t;watermark之后的时间戳≤?t?的任何事件都被称之为延迟事件。使用Watermark s如果想要使用基于带有事件时间戳的事件流,Flink需要知道与每个事件相关的时间戳,而且流必须包含watermark。动手练习 中使用的出租车数据源已经为我们处理了这些详细信息。但是,在您自己的应用程序中,您将必须自己进行处理,这通常是通过实现一个类来实现的 ,该类从事件中提取时间戳,并根据需要生成watermarks。最简单的方法是使用?WatermarkStrategy:DataS treamstream=...WatermarkStrategystrategy=Wate rmarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds (20)).withTimestampAssigner((event,timestamp)->event.timestam p);DataStreamwithTimestampsAndWatermarks=stream.assignT imestampsAndWatermarks(strategy);WindowsFlink在窗口的场景处理上非常有表现力。在本节 中,我们将学习:如何使用窗口来计算无界流上的聚合,Flink支持哪种类型的窗口,以及如何使用窗口聚合来实现DataStream 程序概要我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析:每分钟的浏览量每位用户每周的 会话数每个传感器每分钟的最高温度用Flink计算窗口分析取决于两个主要的抽象操作:WindowAssigners,将事件分配 给窗口(根据需要创建新的窗口对象),以及?WindowFunctions,处理窗口内的数据。Flink的窗口API还具有? Triggers?和?Evictors?的概念,Triggers?确定何时调用窗口函数,而?Evictors?则可以删除在窗口中收 集的元素。举一个简单的例子,我们一般这样使用键控事件流(基于key分组的输入事件流):stream..keyBy(selector>).window().reduce|aggregate|process(< windowfunction>)您不是必须使用键控事件流(keyedstream),但是值得注意的是,如果不使用键控事件流,我 们的程序就不能?并行?处理。stream..windowAll().reduce|aggre gate|process()窗口分配器Flink有一些内置的窗口分配器,如下所示:通过一些示例 来展示关于这些窗口如何使用,或者如何区分它们:滚动时间窗口每分钟页面浏览量TumblingEventTimeWindows.of( Time.minutes(1))滑动时间窗口每10秒钟计算前1分钟的页面浏览量SlidingEventTimeWindows.of (Time.minutes(1),Time.seconds(10))会话窗口每个会话的网页浏览量,其中会话之间的间隔至少为30分 钟EventTimeSessionWindows.withGap(Time.minutes(30))以下都是一些可以使用的间隔时间 ?Time.milliseconds(n),?Time.seconds(n),?Time.minutes(n),?Time.hou rs(n),和?Time.days(n)。基于时间的窗口分配器(包括会话时间)既可以处理?事件时间,也可以处理?处理时间。这两种 基于时间的处理没有哪一个更好,我们必须折衷。使用?处理时间,我们必须接受以下限制:无法正确处理历史数据,无法正确处理超过最大无序边 界的数据,结果将是不确定的,但是有自己的优势,较低的延迟。使用基于计数的窗口时,请记住,只有窗口内的事件数量到达窗口要求的数值时, 这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。我们可能在有些场景下,想使用全局wi ndowassigner将每个事件(相同的key)都分配给某一个指定的全局窗口。很多情况下,一个比较好的建议是使用?Pro cessFunction,具体介绍在https://ci.apache.org/projects/flink/flink-docs -release-1.13/zh/docs/learn-flink/event_driven/这里。窗口应用函数我们有三种最基本的 操作窗口内的事件的选项:像批量处理,ProcessWindowFunction?会缓存?Iterable?和窗口内容,供接下来全量 计算;或者像流处理,每一次有事件被分配到窗口时,都会调用?ReduceFunction?或者?AggregateFunction? 来增量计算;或者结合两者,通过?ReduceFunction?或者?AggregateFunction?预聚合的增量计算结果在触发 窗口时,提供给?ProcessWindowFunction?做全量计算。接下来展示一段1和3的示例,每一个实现都是计算传 感器的最大值。在每一个一分钟大小的事件时间窗口内,生成一个包含?(key,end-of-window-timestamp,ma x_value)?的一组结果。ProcessWindowFunction示例DataStream input=...input.keyBy(x->x.key).window(TumblingEventTimeWind ows.of(Time.minutes(1))).process(newMyWastefulMax());publicsta ticclassMyWastefulMaxextendsProcessWindowFunctioning,//输入类型Tuple3,//输出类型String,//键类 型TimeWindow>{//窗口类型@Overridepublicvoidprocess(Stringkey ,Contextcontext,Iterableevents,Collectore3>out){intmax=0;for(SensorReading event:events){max=Math.max(event.value,max);}out.collec t(Tuple3.of(key,context.window().getEnd(),max));}}在当前实现中有一些值得关 注的地方:Flink会缓存所有分配给窗口的事件流,直到触发窗口为止。这个操作可能是相当昂贵的。Flink会传递给?Proces sWindowFunction?一个?Context?对象,这个对象内包含了一些窗口信息。Context?接口展示大致如下:pu blicabstractclassContextimplementsjava.io.Serializable{pub licabstractWwindow();publicabstractlongcurrentProcessingTi me();publicabstractlongcurrentWatermark();publicabstractKe yedStateStorewindowState();publicabstractKeyedStateStoreglob alState();}windowState?和?globalState?可以用来存储当前的窗口的key、窗口或者当前key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。增量聚合示例DataStre aminput=...input.keyBy(x->x.key).window(Tum blingEventTimeWindows.of(Time.minutes(1))).reduce(newMyReducing Max(),newMyWindowFunction());privatestaticclassMyReducingMax implementsReduceFunction{publicSensorReading reduce(SensorReadingr1,SensorReadingr2){returnr1.value()> r2.value()?r1:r2;}}privatestaticclassMyWindowFunctionext endsProcessWindowFunctionensorReading>,String,TimeWindow>{@Overridepublicvoidproces s(Stringkey,Contextcontext,IterablemaxReadin g,Collector>out){SensorRe adingmax=maxReading.iterator().next();out.collect(Tuple3.of(k ey,context.window().getEnd(),max));}}请注意?Iterableg>?将只包含一个读数–?MyReducingMax?计算出的预先汇总的最大值。晚到的事件默认场景下,超过最大无序边界的事件会被 删除,但是Flink给了我们两个选择去控制这些事件。您可以使用一种称为https://ci.apache.org/projec ts/flink/flink-docs-release-1.13/zh/docs/learn-flink/event_driven /旁路输出?的机制来安排将要删除的事件收集到侧输出流中,这里是一个示例:OutputTaglateTag=ne wOutputTag("late"){};SingleOutputStreamOperatorre sult=stream..keyBy(...).window(...).sideOutputLateData(lateT ag).process(...);DataStreamlateStream=result.getSideOu tput(lateTag);我们还可以指定?允许的延迟(allowedlateness)?的间隔,在这个间隔时间内,延迟的事件将 会继续分配给窗口(同时状态会被保留),默认状态下,每个延迟事件都会导致窗口函数被再次调用(有时也称之为?latefiring?) 。默认情况下,允许的延迟为0。换句话说,watermark之后的元素将被丢弃(或发送到侧输出流)。举例说明:stream.. keyBy(...).window(...).allowedLateness(Time.seconds(10)).proce ss(...);当允许的延迟大于零时,只有那些超过最大无序边界以至于会被丢弃的事件才会被发送到侧输出流(如果已配置)。深入了解窗口 操作Flink的窗口API某些方面有一些奇怪的行为,可能和我们预期的行为不一致。根据?https://flink.apac he.org/community.htmlFlink用户邮件列表?和其他地方一些频繁被问起的问题,以下是一些有关Window s的底层事实,这些信息可能会让您感到惊讶。滑动窗口是通过复制来实现的滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相 关的窗口中。例如,如果您每隔15分钟就有24小时的滑动窗口,则每个事件将被复制到424=96个窗口中。时间窗 口会和时间对齐仅仅因为我们使用的是一个小时的处理时间窗口并在12:05开始运行您的应用程序,并不意味着第一个窗口将在1:05 关闭。第一个窗口将长55分钟,并在1:00关闭。请注意,滑动窗口和滚动窗口分配器所采用的offset参数可用于改变窗 口的对齐方式。有关详细的信息,请参见?https://ci.apache.org/projects/flink/flink-doc s-release-1.13/zh/docs/dev/datastream/operators/windows/滚动窗口?和?ht tps://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/doc s/dev/datastream/operators/windows/滑动窗口?。window后面可以接window比如说:s tream.keyBy(t->t.key).window().reduce(cefunction>).windowAll().reduce(ducefunction>)可能我们会猜测以Flink的能力,想要做到这样看起来是可行的(前提是你使用的是ReduceFu nction或AggregateFunction),但不是。之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时 间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致 的。空的时间窗口不会输出结果事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。Late EventsCanCauseLateMerges会话窗口的实现是基于窗口的一个抽象能力,窗口可以?聚合。会话窗口中的每个数 据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔 ,从而产生一个虽然有延迟但是更加准确地结果。实践练习本节附带的动手练习是?https://github.com/apache/fl ink-training/blob/release-1.13/hourly-tipsHourlyTipsExercise?.延 伸阅读https://ci.apache.org/projects/flink/flink-docs-release-1.13/z h/docs/concepts/time/TimelyStreamProcessinghttps://ci.apache.or g/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/o perators/windows/Windows基于TableAPI实现实时报表ApacheFlinkoffersa TableAPIasaunified,relationalAPIforbatchandstreamproce ssing,i.e.,queriesareexecutedwiththesamesemanticsonunbo unded,real-timestreamsorbounded,batchdatasetsandproduce thesameresults.TheTableAPIinFlinkiscommonlyusedtoease thedefinitionofdataanalytics,datapipelining,andETLappli cations.WhatWillYouBeBuilding?Inthistutorial,youwilllea rnhowtobuildareal-timedashboardtotrackfinancialtransact ionsbyaccount.ThepipelinewillreaddatafromKafkaandwrite theresultstoMySQLvisualizedviaGrafana.PrerequisitesThiswa lkthroughassumesthatyouhavesomefamiliaritywithJavaorSca la,butyoushouldbeabletofollowalongevenifyoucomefrom adifferentprogramminglanguage.Italsoassumesthatyouarefa miliarwithbasicrelationalconceptssuchas?SELECT?and?GROUPBY ?clauses.Help,I’mStuck!Ifyougetstuck,checkoutthe?https:/ /flink.apache.org/community.htmlcommunitysupportresources.Inp articular,ApacheFlink’s?https://flink.apache.org/community.html usermailinglist?consistentlyranksasoneofthemostactiveof anyApacheprojectandagreatwaytogethelpquickly.Ifrunnin gdockeronwindowsandyourdatageneratorcontainerisfailing tostart,thenpleaseensurethatyou’reusingtherightshell.F orexample?docker-entrypoint.sh?for?table-walkthrough_data-genera tor_1?containerrequiresbash.Ifunavailable,itwillthrowane rror?standard_init_linux.go:211:execuserprocesscaused“nosuc hfileordirectory”.Aworkaroundistoswitchtheshellto?sh?o nthefirstlineof?docker-entrypoint.sh.HowToFollowAlongIfyo uwanttofollowalong,youwillrequireacomputerwith:Java8o r11MavenDockerTherequiredconfigurationfilesareavailablein the?https://github.com/apache/flink-playgroundsflink-playgrounds? repository.Oncedownloaded,opentheproject?flink-playground/ta ble-walkthrough?inyourIDEandnavigatetothefile?SpendReport. EnvironmentSettingssettings=EnvironmentSettings.newInstance(). build();TableEnvironmenttEnv=TableEnvironment.create(settings) ;tEnv.executeSql("CREATETABLEtransactions(\n"+"account_i dBIGINT,\n"+"amountBIGINT,\n"+"transaction_ti meTIMESTAMP(3),\n"+"WATERMARKFORtransaction_timeAStran saction_time-INTERVAL''5''SECOND\n"+")WITH(\n"+"''conn ector''=''kafka'',\n"+"''topic''=''transactions'',\n"+" ''properties.bootstrap.servers''=''kafka:9092'',\n"+"''form at''=''csv''\n"+")");tEnv.executeSql("CREATETABLEspend_repo rt(\n"+"account_idBIGINT,\n"+"log_tsTIMESTAMP( 3),\n"+"amountBIGINT\n,"+"PRIMARYKEY(account_i d,log_ts)NOTENFORCED"+")WITH(\n"+"''connector''=''jdb c'',\n"+"''url''=''jdbc:mysql://mysql:3306/sql-demo'',\n "+"''table-name''=''spend_report'',\n"+"''driver''=''c om.mysql.jdbc.Driver'',\n"+"''username''=''sql-demo'',\n"+" ''password''=''demo-sql''\n"+")");Tabletransactions=tEnv. from("transactions");report(transactions).executeInsert("spend_re port");BreakingDownTheCodeTheExecutionEnvironmentThefirstt wolinessetupyour?TableEnvironment.Thetableenvironmentish owyoucansetpropertiesforyourJob,specifywhetheryouarew ritingabatchorastreamingapplication,andcreateyoursource s.Thiswalkthroughcreatesastandardtableenvironmentthatuse sthestreamingexecution.EnvironmentSettingssettings=Environm entSettings.newInstance().build();TableEnvironmenttEnv=TableEn vironment.create(settings);RegisteringTablesNext,tablesarereg isteredinthecurrent?https://ci.apache.org/projects/flink/flink -docs-release-1.13/zh/docs/dev/table/catalogs/catalog?thatyouca nusetoconnecttoexternalsystemsforreadingandwritingboth batchandstreamingdata.Atablesourceprovidesaccesstodata storedinexternalsystems,suchasadatabase,akey-valuestor e,amessagequeue,orafilesystem.Atablesinkemitsatable toanexternalstoragesystem.Dependingonthetypeofsourcean dsink,theysupportdifferentformatssuchasCSV,JSON,Avro,o rParquet.tEnv.executeSql("CREATETABLEtransactions(\n"+" account_idBIGINT,\n"+"amountBIGINT,\n"+"trans action_timeTIMESTAMP(3),\n"+"WATERMARKFORtransaction_tim eAStransaction_time-INTERVAL''5''SECOND\n"+")WITH(\n"+" ''connector''=''kafka'',\n"+"''topic''=''transactions'' ,\n"+"''properties.bootstrap.servers''=''kafka:9092'',\n"+" ''format''=''csv''\n"+")");Twotablesareregistered;atr ansactioninputtable,andaspendreportoutputtable.Thetrans actions(transactions)tableletsusreadcreditcardtransaction s,whichcontainaccountID’s(account_id),timestamps(transacti on_time),andUS$amounts(amount).Thetableisalogicalviewo veraKafkatopiccalled?transactions?containingCSVdata.tEnv.ex ecuteSql("CREATETABLEspend_report(\n"+"account_idBIGINT ,\n"+"log_tsTIMESTAMP(3),\n"+"amountBIGINT\ n,"+"PRIMARYKEY(account_id,log_ts)NOTENFORCED"+")WI TH(\n"+"''connector''=''jdbc'',\n"+"''url''=''j dbc:mysql://mysql:3306/sql-demo'',\n"+"''table-name''=''spend _report'',\n"+"''driver''=''com.mysql.jdbc.Driver'',\n"+ "''username''=''sql-demo'',\n"+"''password''=''demo-sq l''\n"+")");Thesecondtable,?spend_report,storesthefinalres ultsoftheaggregation.ItsunderlyingstorageisatableinaM ySqldatabase.TheQueryWiththeenvironmentconfiguredandtables registered,youarereadytobuildyourfirstapplication.From the?TableEnvironment?youcanread?from?aninputtabletoreadits rowsandthenwritethoseresultsintoanoutputtableusing?exe cuteInsert.The?report?functioniswhereyouwillimplementyour businesslogic.Itiscurrentlyunimplemented.Tabletransactions =tEnv.from("transactions");report(transactions).executeInsert("s pend_report");TestingTheprojectcontainsasecondarytestingcla ss?SpendReportTest?thatvalidatesthelogicofthereport.Itcre atesatableenvironmentinbatchmode.EnvironmentSettingssettin gs=EnvironmentSettings.newInstance().inBatchMode().build();Tabl eEnvironmenttEnv=TableEnvironment.create(settings);OneofFli nk’suniquepropertiesisthatitprovidesconsistentsemanticsa crossbatchandstreaming.Thismeansyoucandevelopandtestap plicationsinbatchmodeonstaticdatasets,anddeploytoproduc tionasstreamingapplications.AttemptOneNowwiththeskeletono faJobset-up,youarereadytoaddsomebusinesslogic.Thegoa listobuildareportthatshowsthetotalspendforeachaccoun tacrosseachhouroftheday.Thismeansthetimestampcolumnne edsbeberoundeddownfrommillisecondtohourgranularity.Flink supportsdevelopingrelationalapplicationsinpure?https://ci.a pache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/tabl e/sql/overview/SQL?orusingthe?https://ci.apache.org/projects/fl ink/flink-docs-release-1.13/zh/docs/dev/table/tableapi/TableAPI. TheTableAPIisafluentDSLinspiredbySQL,thatcanbewritt eninPython,Java,orScalaandsupportsstrongIDEintegration. JustlikeaSQLquery,Tableprogramscanselecttherequiredfi eldsandgroupbyyourkeys.Thesefeatures,allongwith?https:// ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/ table/functions/systemfunctions/built-infunctions?like?floor?and ?sum,youcanwritethisreport.publicstaticTablereport(Table transactions){returntransactions.select($("account_id"),$("t ransaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),$("a mount")).groupBy($("account_id"),$("log_ts")).select($("accou nt_id"),$("log_ts"),$("amount").sum().as("amount"));}UserDefin edFunctionsFlinkcontainsalimitednumberofbuilt-infunctions ,andsometimesyouneedtoextenditwitha?https://ci.apache.or g/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/functi ons/udfs/user-definedfunction.If?floor?wasn’tpredefined,youc ouldimplementityourself.importjava.time.LocalDateTime;import java.time.temporal.ChronoUnit;importorg.apache.flink.table.annot ation.DataTypeHint;importorg.apache.flink.table.functions.Scalar Function;publicclassMyFloorextendsScalarFunction{public@Da taTypeHint("TIMESTAMP(3)")LocalDateTimeeval(@DataTypeHint("TIM ESTAMP(3)")LocalDateTimetimestamp){returntimestamp.truncated To(ChronoUnit.HOURS);}}Andthenquicklyintegrateitinyourapp lication.publicstaticTablereport(Tabletransactions){return transactions.select($("account_id"),call(MyFloor.class,$("tran saction_time")).as("log_ts"),$("amount")).groupBy($("account_id "),$("log_ts")).select($("account_id"),$("log_ts"),$("amount ").sum().as("amount"));}Thisqueryconsumesallrecordsfromthe? transactions?table,calculatesthereport,andoutputstheresult sinanefficient,scalablemanner.Runningthetestwiththisim plementationwillpass.AddingWindowsGroupingdatabasedontime isatypicaloperationindataprocessing,especiallywhenworkin gwithinfinitestreams.Agroupingbasedontimeiscalleda?htt ps://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs /dev/datastream/operators/windows/window?andFlinkoffersflexibl ewindowingsemantics.Themostbasictypeofwindowiscalleda? Tumble?window,whichhasafixedsizeandwhosebucketsdonotov erlap.publicstaticTablereport(Tabletransactions){returntra nsactions.window(Tumble.over(lit(1).hour()).on($("transaction_ti me")).as("log_ts")).groupBy($("account_id"),$("log_ts")).sele ct($("account_id"),$("log_ts").start().as("log_ts"),$("amount" ).sum().as("amount"));}Thisdefinesyourapplicationasusingone hourtumblingwindowsbasedonthetimestampcolumn.Soarowwi thtimestamp?2019-06-0101:23:47?isputinthe?2019-06-0101:00:0 0?window.Aggregationsbasedontimeareuniquebecausetime,aso pposedtootherattributes,generallymovesforwardinacontinuo usstreamingapplication.Unlike?floor?andyourUDF,windowfunct ionsare?https://en.wikipedia.org/wiki/Intrinsic_functionintrinsi cs,whichallowstheruntimetoapplyadditionaloptimizations.I nabatchcontext,windowsofferaconvenientAPIforgroupingre cordsbyatimestampattribute.Runningthetestwiththisimpleme ntationwillalsopass.OnceMore,WithStreaming!Andthat’sit,a fullyfunctional,stateful,distributedstreamingapplication!T hequerycontinuouslyconsumesthestreamoftransactionsfromKa fka,computesthehourlyspendings,andemitsresultsassoonas theyareready.Sincetheinputisunbounded,thequerykeepsrun ninguntilitismanuallystopped.AndbecausetheJobusestime window-basedaggregations,Flinkcanperformspecificoptimizatio nssuchasstatecleanupwhentheframeworkknowsthatnomorer ecordswillarriveforaparticularwindow.Thetableplaygroundi sfullydockerizedandrunnablelocallyasstreamingapplication. TheenvironmentcontainsaKafkatopic,acontinuousdatagenera tor,MySql,andGrafana.Fromwithinthe?table-walkthrough?folder startthedocker-composescript.$docker-composebuild$docker-co mposeup-dYoucanseeinformationontherunningjobviathe?htt p://localhost:8082/Flinkconsole.Exploretheresultsfrominside MySQL.$docker-composeexecmysqlmysql-Dsql-demo-usql-demo-pd emo-sqlmysql>usesql-demo;Databasechangedmysql>selectcount() fromspend_report;+----------+|count()|+----------+|110|+-- --------+Finally,goto?http://localhost:3000/d/FOe0PbmGk/walkthr ough?viewPanel=2&orgId=1&refresh=5sGrafana?toseethefullyvisua lizedresult!数据管道&ETLApacheFlink的一种常见应用场景是ETL(抽取、转换、加载)管道任务。 从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。在这个教程中,我们将介绍如何使用Flink的Data StreamAPI实现这类应用。这里注意,Flink的?https://ci.apache.org/projects/fli nk/flink-docs-release-1.13/zh/docs/dev/table/overview/Table和SQL API?完全可以满足很多ETL使用场景。但无论你最终是否直接使用DataStreamAPI,对这里介绍的基本知识有扎实的 理解都是有价值的。无状态的转换本节涵盖了?map()?和?flatmap(),这两种算子可以用来实现无状态转换的基本操作。本节中的 示例建立在你已经熟悉?https://github.com/apache/flink-training/tree/release- 1.13/flink-training-repo?中的出租车行程数据的基础上。map()在第一个练习中,你将过滤出租车行程数据中的 事件。在同一代码仓库中,有一个?GeoUtils?类,提供了一个静态方法?GeoUtils.mapToGridCell(float lon,floatlat),它可以将位置坐标(经度,维度)映射到100x100米的对应不同区域的网格单元。现在让我们为每 个出租车行程时间的数据对象增加?startCell?和?endCell?字段。你可以创建一个继承?TaxiRide?的?Enric hedRide?类,添加这些字段:publicstaticclassEnrichedRideextendsTaxiRid e{publicintstartCell;publicintendCell;publicEnrichedRide (){}publicEnrichedRide(TaxiRideride){this.rideId=ride.rid eId;this.isStart=ride.isStart;...this.startCell=GeoUtils.m apToGridCell(ride.startLon,ride.startLat);this.endCell=GeoUti ls.mapToGridCell(ride.endLon,ride.endLat);}publicStringtoStr ing(){returnsuper.toString()+","+Integer.toString(this.sta rtCell)+","+Integer.toString(this.endCell);}}然后你可以创建一个应用来转换这 个流DataStreamrides=env.addSource(newTaxiRideSource(. ..));DataStreamenrichedNYCRides=rides.filter(ne wRideCleansingSolution.NYCFilter()).map(newEnrichment());enric hedNYCRides.print();使用这个?MapFunction:publicstaticclassEnrichme ntimplementsMapFunction{@Overridepub licEnrichedRidemap(TaxiRidetaxiRide)throwsException{return newEnrichedRide(taxiRide);}}flatmap()MapFunction?只适用于一对一的转换:对每 个进入算子的流元素,map()?将仅输出一个转换后的元素。对于除此以外的场景,你将要使用?flatmap()。DataStream rides=env.addSource(newTaxiRideSource(...));DataStr eamenrichedNYCRides=rides.flatMap(newNYCEnrich ment());enrichedNYCRides.print();其中用到的?FlatMapFunction?:publicst aticclassNYCEnrichmentimplementsFlatMapFunctionichedRide>{@OverridepublicvoidflatMap(TaxiRidetaxiRide,Col lectorout)throwsException{FilterFunctionide>valid=newRideCleansing.NYCFilter();if(valid.filter(taxi Ride)){out.collect(newEnrichedRide(taxiRide));}}}使用接口中提供的?Co llector?,flatmap()?可以输出你想要的任意数量的元素,也可以一个都不发。KeyedStreamskeyBy()将 一个流根据其中的一些属性来进行分区是十分有用的,这样我们可以使所有具有相同属性的事件分到相同的组里。例如,如果你想找到从每个网格单 元出发的最远的出租车行程。按SQL查询的方式来考虑,这意味着要对?startCell?进行GROUPBY再排序,在Fl ink中这部分可以用?keyBy(KeySelector)?实现。rides.flatMap(newNYCEnrichmen t()).keyBy(enrichedRide->enrichedRide.startCell)每个?keyBy?会通过s huffle来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。通过计算得到键KeySelect or不仅限于从事件中抽取键。你也可以按想要的方式计算得到键值,只要最终结果是确定的,并且实现了?hashCode()?和?equ als()。这些限制条件不包括产生随机数或者返回Arrays或Enums的KeySelector,但你可以用元组和PO JO来组成键,只要他们的元素遵循上述条件。键必须按确定的方式产生,因为它们会在需要的时候被重新计算,而不是一直被带在流记录中。例 如,比起创建一个新的带有?startCell?字段的?EnrichedRide?类,用这个字段作为key:keyBy(enric hedRide->enrichedRide.startCell)我们更倾向于这样做:keyBy(ride->GeoUtil s.mapToGridCell(ride.startLon,ride.startLat))KeyedStream的聚合以下代 码为每个行程结束事件创建了一个新的包含?startCell?和时长(分钟)的元组流:importorg.joda.time.In terval;DataStream>minutesByStartCell= enrichedNYCRides.flatMap(newFlatMapFunction2>(){@OverridepublicvoidflatMap(EnrichedRi deride,Collector>out)throwsExceptio n{if(!ride.isStart){IntervalrideInterval=newInterval(rid e.startTime,ride.endTime);Minutesduration=rideInterval.toDur ation().toStandardMinutes();out.collect(newTuple2<>(ride.startC ell,duration));}}});现在就可以产生一个流,对每个?startCell?仅包含那些最长行程的数据。有很多 种方法表示使用哪个字段作为键。前面使用?EnrichedRide?POJO的例子,用字段名来指定键。而这个使用?Tuple2?对 象的例子中,用字段在元组中的序号(从0开始)来指定键。minutesByStartCell.keyBy(value->val ue.f0)//.keyBy(value->value.startCell).maxBy(1)//duration .print();现在每次行程时长达到新的最大值,都会输出一条新记录,例如下面这个对应50797网格单元的数据:...4>( 64549,5M)4>(46298,18M)1>(51549,14M)1>(53043,13M)1>(56031,22M) 1>(50797,6M)...1>(50797,8M)...1>(50797,11M)...1>(50797,12M)(隐 式的)状态这是培训中第一个涉及到有状态流的例子。尽管状态的处理是透明的,Flink必须跟踪每个不同的键的最大时长。只要应用中有状 态,你就应该考虑状态的大小。如果键值的数量是无限的,那Flink的状态需要的空间也同样是无限的。在流处理场景中,考虑有限窗口的 聚合往往比整个流聚合更有意义。reduce()?和其他聚合算子上面用到的?maxBy()?只是Flink中?KeyedStre am?上众多聚合函数中的一个。还有一个更通用的?reduce()?函数可以用来实现你的自定义聚合。有状态的转换Flink为什么要 参与状态管理?在Flink不参与管理状态的情况下,你的应用也可以使用状态,但Flink为其管理状态提供了一些引人注目的特性 :本地性:Flink状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取持久性:Flink状态是容错的,例如,它可 以自动按一定的时间间隔产生checkpoint,并且在任务失败后进行恢复纵向可扩展性:Flink状态可以存储在集成的Roc ksDB实例中,这种方式下可以通过增加本地磁盘来扩展空间横向可扩展性:Flink状态可以随着集群的扩缩容重新分布可查询性: Flink状态可以通过使用?https://ci.apache.org/projects/flink/flink-docs-re lease-1.13/zh/docs/dev/datastream/fault-tolerance/queryable_state /状态查询API?从外部进行查询。在本节中你将学习如何使用Flink的API来管理keyedstate。RichF unctions至此,你已经看到了Flink的几种函数接口,包括?FilterFunction,?MapFunction,和? FlatMapFunction。这些都是单一抽象方法模式。对其中的每一个接口,Flink同样提供了一个所谓“rich”的变体 ,如?RichFlatMapFunction,其中增加了以下方法,包括:open(Configurationc)close()g etRuntimeContext()open()?仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。g etRuntimeContext()?为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问Flink状态的途径。 一个使用KeyedState的例子在这个例子里,想象你有一个要去重的事件数据流,对每个键只保留第一个事件。下面是完成这个功能 的应用,使用一个名为?Deduplicator?的?RichFlatMapFunction?:privatestaticcla ssEvent{publicfinalStringkey;publicfinallongtimestamp; ...}publicstaticvoidmain(String[]args)throwsException{Str eamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecu tionEnvironment();env.addSource(newEventSource()).keyBy(e-> e.key).flatMap(newDeduplicator()).print();env.execute();}为了实 现这个功能,Deduplicator?需要记录每个键是否已经有了相应的记录。它将通过使用Flink的?keyedstate? 接口来做这件事。当你使用像这样的keyedstream的时候,Flink会为每个状态中管理的条目维护一个键值存储。Flin k支持几种不同方式的keyedstate,这个例子使用的是最简单的一个,叫做?ValueState。意思是对于?每个键?,F link将存储一个单一的对象——在这个例子中,存储的是一个?Boolean?类型的对象。我们的?Deduplicator?类 有两个方法:open()?和?flatMap()。open()?方法通过定义?ValueStateDescriptoran>?建立了管理状态的使用。构造器的参数定义了这个状态的名字(“keyHasBeenSeen”),并且为如何序列化这些对象提供了 信息(在这个例子中的?Types.BOOLEAN)。publicstaticclassDeduplicatorextend sRichFlatMapFunction{ValueStatekeyHasB eenSeen;@Overridepublicvoidopen(Configurationconf){ValueSt ateDescriptordesc=newValueStateDescriptor<>("keyHasB eenSeen",Types.BOOLEAN);keyHasBeenSeen=getRuntimeContext().ge tState(desc);}@OverridepublicvoidflatMap(Eventevent,Collec torout)throwsException{if(keyHasBeenSeen.value()== null){out.collect(event);keyHasBeenSeen.update(true);}}}当fl atMap方法调用?keyHasBeenSeen.value()?时,Flink会在?当前键的上下文?中检索状态值,只有当状态 为?null?时,才会输出当前事件。这种情况下,它同时也将更新?keyHasBeenSeen?为?true。这种访问和更新按键分区 的状态的机制也许看上去很神奇,因为在?Deduplicator?的实现中,键不是明确可见的。当Flink运行时调用?RichF latMapFunction?的?open?方法时,是没有事件的,所以这个时候上下文中不含有任何键。但当它调用?flatMap? 方法,被处理的事件的键在运行时中就是可用的了,并且被用来确定操作哪个Flink状态后端的入口。部署在分布式集群时,将会有很多? Deduplicator?的实例,每一个实例将负责整个键空间的互斥子集中的一个。所以,当你看到一个单独的?ValueState,比 如ValueStatekeyHasBeenSeen;要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分 布式的共享键值存储。清理状态上面例子有一个潜在的问题:当键空间是无界的时候将发生什么?Flink会对每个使用过的键都存储一个?B oolean?类型的实例。如果是键是有限的集合还好,但在键无限增长的应用中,清除再也不会使用的状态是很必要的。这通过在状态对象上调 用?clear()?来实现,如下:keyHasBeenSeen.clear()对一个给定的键值,你也许想在它一段时间不使用后来做这 件事。当学习?ProcessFunction?的相关章节时,你将看到在事件驱动的应用中怎么用定时器来做这个。也可以选择使用?htt ps://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs /dev/datastream/fault-tolerance/state/状态的过期时间(TTL),为状态描述符配置你想要旧状态 自动被清除的时间。Non-keyedState在没有键的上下文中我们也可以使用Flink管理的状态。这也被称作?https: //ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/de v/datastream/fault-tolerance/state/算子的状态。它包含的接口是很不一样的,由于对用户定义的函数来 说使用non-keyedstate是不太常见的,所以这里就不多介绍了。这个特性最常用于source和sink的实现。 ConnectedStreams相比于下面这种预先定义的转换:有时你想要更灵活地调整转换的某些功能,比如数据流的阈值、规则或者其 他参数。Flink支持这种需求的模式称为?connectedstreams?,一个单独的算子有两个输入流。connected stream也可以被用来实现流的关联。示例在这个例子中,一个控制流是用来指定哪些词需要从?streamOfWords?里过滤掉的 。一个称为?ControlFunction?的?RichCoFlatMapFunction?作用于连接的流来实现这个功能。pub licstaticvoidmain(String[]args)throwsException{StreamExec utionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnv ironment();DataStreamcontrol=env.fromElements("DROP" ,"IGNORE").keyBy(x->x);DataStreamstreamOfWords=en v.fromElements("Apache","DROP","Flink","IGNORE").keyBy(x-> x);control.connect(streamOfWords).flatMap(newControlFunction ()).print();env.execute();}这里注意两个流只有键一致的时候才能连接。?keyBy?的作用是将流数据分 区,当keyedstream被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使 按键关联两个流成为可能。在这个例子中,两个流都是?DataStream?类型的,并且都将字符串作为键。正如你将在下 面看到的,RichCoFlatMapFunction?在状态中存了一个布尔类型的变量,这个变量被两个流共享。publicstat icclassControlFunctionextendsRichCoFlatMapFunctionring,String>{privateValueStateblocked;@Overridep ublicvoidopen(Configurationconfig){blocked=getRuntimeConte xt().getState(newValueStateDescriptor<>("blocked",Boolean.clas s));}@OverridepublicvoidflatMap1(Stringcontrol_value,Coll ectorout)throwsException{blocked.update(Boolean.TRUE );}@OverridepublicvoidflatMap2(Stringdata_value,Collector out)throwsException{if(blocked.value()==null){o ut.collect(data_value);}}}RichCoFlatMapFunction?是一种可以被用于一对连接流的? FlatMapFunction,并且它可以调用richfunction的接口。这意味着它可以是有状态的。布尔变量?block ed?被用于记录在数据流?control?中出现过的键(在这个例子中是单词),并且这些单词从?streamOfWords?过滤掉。 这是?keyed?state,并且它是被两个流共享的,这也是为什么两个流必须有相同的键值空间。在Flink运行时中,flatM ap1?和?flatMap2?在连接流有新元素到来时被调用——在我们的例子中,control?流中的元素会进入?flatMap 1,streamOfWords?中的元素会进入?flatMap2。这是由两个流连接的顺序决定的,本例中为?control.conn ect(streamOfWords)。认识到你没法控制?flatMap1?和?flatMap2?的调用顺序是很重要的。这两个输入流 是相互竞争的关系,Flink运行时将根据从一个流或另一个流中消费的事件做它要做的。对于需要保证时间和/或顺序的场景,你会发现在 Flink的管理状态中缓存事件一直到它们能够被处理是必须的。(注意:如果你真的感到绝望,可以使用自定义的算子实现?InputSe lectable?接口,在两输入算子消费它的输入流时增加一些顺序上的限制。)动手练习本节的动手练习是?https://github .com/apache/flink-training/blob/release-1.13/rides-and-fares行程和票价 练习?。延展阅读https://ci.apache.org/projects/flink/flink-docs-release-1 .13/zh/docs/dev/datastream/operators/overview/数据流转换https://ci.apa che.org/projects/flink/flink-docs-release-1.13/zh/docs/concepts/s tateful-stream-processing/有状态流的处理事件驱动应用处理函数(ProcessFunctions)简介P rocessFunction?将事件处理与Timer,State结合在一起,使其成为流处理应用的强大构建模块。这是使用Fl ink创建事件驱动应用程序的基础。它和?RichFlatMapFunction?十分相似,但是增加了Timer。示例如果你已 经体验了?https://ci.apache.org/projects/flink/flink-docs-release-1.13 /zh/docs/learn-flink/streaming_analytics/流式分析训练?的https://ci.apach e.org/projects/flink/flink-docs-release-1.13/zh/docs/learn-flink/ streaming_analytics/动手实践,你应该记得,它是采用?TumblingEventTimeWindow?来计算每 个小时内每个司机的小费总和,像下面的示例这样://计算每个司机每小时的小费总和DataStreamLong,Float>>hourlyTips=fares.keyBy((TaxiFarefare)->fare.d riverId).window(TumblingEventTimeWindows.of(Time.hours(1))).pro cess(newAddTips());使用?KeyedProcessFunction?去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码://计算每个司机每小时的小费总和DataStreamFloat>>hourlyTips=fares.keyBy((TaxiFarefare)->fare.driverI d).process(newPseudoWindow(Time.hours(1)));在这个代码片段中,一个名为?Pseudo Window?的?KeyedProcessFunction?被应用于KeyedStream,其结果是一个?DataStream >?(与使用Flink内置时间窗口的实现生成的流相同)。PseudoWin dow?的总体轮廓示意如下://在时长跨度为一小时的窗口中计算每个司机的小费总和。//司机ID作为key。publicst aticclassPseudoWindowextendsKeyedProcessFunctionare,Tuple3>{privatefinallongdurationMsec ;publicPseudoWindow(Timeduration){this.durationMsec=durati on.toMilliseconds();}@Override//在初始化期间调用一次。publicvoidopen( Configurationconf){...}@Override//每个票价事件(TaxiFare-Event) 输入(到达)时调用,以处理输入的票价事件。publicvoidprocessElement(TaxiFarefare, Contextctx,Collector>out)throwsExc eption{...}@Override//当当前水印(watermark)表明窗口现在需要完成的时候调用。pu blicvoidonTimer(longtimestamp,OnTimerContextcontext,Colle ctor>out)throwsException{...}}注 意事项:有几种类型的ProcessFunctions–不仅包括?KeyedProcessFunction,还包括?CoPro cessFunctions、BroadcastProcessFunctions?等.KeyedProcessFunction?是一 种?RichFunction。作为?RichFunction,它可以访问使用ManagedKeyedState所需的?op en?和?getRuntimeContext?方法。有两个回调方法须要实现:?processElement?和?onTimer。每 个输入事件都会调用?processElement?方法;当计时器触发时调用?onTimer。它们可以是基于事件时间(event time)的timer,也可以是基于处理时间(processingtime)的timer。除此之外,processElem ent?和?onTimer?都提供了一个上下文对象,该对象可用于与?TimerService?交互。这两个回调还传递了一个可用于 发出结果的?Collector。open()?方法//每个窗口都持有托管的Keyedstate的入口,并且根据窗口的结束时 间执行keyed策略。//每个司机都有一个单独的MapState对象。privatetransientMapState< Long,Float>sumOfTips;@Overridepublicvoidopen(Configurationco nf){MapStateDescriptorsumDesc=newMapStateDesc riptor<>("sumOfTips",Long.class,Float.class);sumOfTips=getRu ntimeContext().getMapState(sumDesc);}由于票价事件(fare-event)可能会乱序到达,有时 需要在计算输出前一个小时结果前,处理下一个小时的事件。这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。实际 上,如果Watermark延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。此实现通过使用?MapState? 来支持处理这一点,该?MapState?将每个窗口的结束时间戳映射到该窗口的小费总和。processElement()?方法pub licvoidprocessElement(TaxiFarefare,Contextctx,Collectorple3>out)throwsException{longeventTime= fare.getEventTime();TimerServicetimerService=ctx.timerServic e();if(eventTime<=timerService.currentWatermark()){//事件延迟; 其对应的窗口已经触发。}else{//将eventTime向上取值并将结果赋值到包含当前事件的窗口的末尾时间点。l ongendOfWindow=(eventTime-(eventTime%durationMsec)+durat ionMsec-1);//在窗口完成时将启用回调timerService.registerEventTimeTimer( endOfWindow);//将此票价的小费添加到该窗口的总计中。Floatsum=sumOfTips.get(end OfWindow);if(sum==null){sum=0.0F;}sum+=fare.tip;sumO fTips.put(endOfWindow,sum);}}需要考虑的事项:延迟的事件怎么处理?watermark后面的事件( 即延迟的)正在被删除。如果你想做一些比这更高级的操作,可以考虑使用旁路输出(Sideoutputs),这将在https://c i.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/learn -flink/event_driven/下一节中解释。本例使用一个?MapState,其中keys是时间戳(timestamp ),并为同一时间戳设置一个Timer。这是一种常见的模式;它使得在Timer触发时查找相关信息变得简单高效。onTimer ()?方法publicvoidonTimer(longtimestamp,OnTimerContextcontext ,Collector>out)throwsException{l ongdriverId=context.getCurrentKey();//查找刚结束的一小时结果。Floatsum OfTips=this.sumOfTips.get(timestamp);Tuple3 result=Tuple3.of(driverId,timestamp,sumOfTips);out.collect( result);this.sumOfTips.remove(timestamp);}注意:传递给?onTimer?的?OnTim erContextcontext?可用于确定当前key。我们的pseudo-windows在当前Watermark到达 每小时结束时触发,此时调用?onTimer。这个?onTimer?方法从?sumOfTips?中删除相关的条目,这样做的效果是不 可能容纳延迟的事件。这相当于在使用Flink的时间窗口时将allowedLateness设置为零。性能考虑Flink提 供了为RocksDB优化的?MapState?和?ListState?类型。相对于?ValueState,更建议使用?Map State?和?ListState,因为使用RocksDBStateBackend的情况下,?MapState?和?ListS tate?比?ValueState?性能更好。RocksDBStateBackend可以附加到?ListState,而无需进行 (反)序列化,对于?MapState,每个key/value都是一个单独的RocksDB对象,因此可以有效地访问和更新? MapState。旁路输出(SideOutputs)简介有几个很好的理由希望从Flink算子获得多个输出流,如下报告条目:异 常情况(exceptions)格式错误的事件(malformedevents)延迟的事件(lateevents)operato r告警(operationalalerts),如与外部服务的连接超时旁路输出(Sideoutputs)是一种方便的方法。除了 错误报告之外,旁路输出也是实现流的n路分割的好方法。示例现在你可以对上一节中忽略的延迟事件执行某些操作。Sideoutput channel与?OutputTag?相关联。这些标记拥有自己的名称,并与对应DataStream类型一致。priv atestaticfinalOutputTaglateFares=newOutputTagxiFare>("lateFares"){};上面显示的是一个静态?OutputTag?,当在?Pseudo Window?的?processElement?方法中发出延迟事件时,可以引用它:if(eventTime<=timerSe rvice.currentWatermark()){//事件延迟,其对应的窗口已经触发。ctx.output(lateFa res,fare);}else{...}以及当在作业的?main?中从该旁路输出访问流时://计算每个司机每小时的小 费总和SingleOutputStreamOperatorhourlyTips=fares.keyBy((TaxiFare fare)->fare.driverId).process(newPseudoWindow(Time.hours(1)) );hourlyTips.getSideOutput(lateFares).print();或者,可以使用两个同名的Output Tag来引用同一个旁路输出,但如果这样做,它们必须具有相同的类型。结语在本例中,你已经了解了如何使用?ProcessFuncti on?重新实现一个简单的时间窗口。当然,如果Flink内置的窗口API能够满足你的开发需求,那么一定要优先使用它。但如 果你发现自己在考虑用Flink的窗口做些错综复杂的事情,不要害怕自己动手。此外,ProcessFunctions?对于计算分析 之外的许多其他用例也很有用。下面的实践练习提供了一个完全不同的例子。ProcessFunctions?的另一个常见用例是清理过时 State。如果你回想一下?https://github.com/apache/flink-training/blob/rele ase-1.13/rides-and-faresRidesandFaresExercise?,其中使用?RichCoFla tMapFunction?来计算简单Join,那么示例方案假设TaxiRides和TaxiFares两个事件是严格匹配为 一个有效数据对(必须同时出现)并且每一组这样的有效数据对都和一个唯一的?rideId?严格对应。如果数据对中的某个TaxiRi des事件(TaxiFares事件)丢失,则同一?rideId?对应的另一个出现的TaxiFares事件(TaxiRid es事件)对应的State则永远不会被清理掉。所以这里可以使用?KeyedCoProcessFunction?的实现代替它 (RichCoFlatMapFunction),并且可以使用计时器来检测和清除任何过时的State。实践练习本节的实践练习是? https://github.com/apache/flink-training/blob/release-1.13/long-r ide-alertsLongRideAlertsExercise?.延伸阅读https://ci.apache.org/pr ojects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/opera tors/process_function/处理函数(ProcessFunction)https://ci.apache.org/ projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/sid e_output/旁路输出(SideOutputs)通过状态快照实现容错处理StateBackends由Flink管理的 keyedstate是一种分片的键/值存储,每个keyedstate的工作副本都保存在负责该键的taskmanager 本地中。另外,Operatorstate也保存在机器节点本地。Flink定期获取所有状态的快照,并将这些快照复制到持久化的 位置,例如分布式文件系统。如果发生故障,Flink可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。Flink管理的 状态存储在?statebackend?中。Flink有两种statebackend的实现–一种基于RocksDB 内嵌key/value存储将其工作状态保存在磁盘上的,另一种基于堆的statebackend,将其工作状态保存在Java 的堆内存中。这种基于堆的statebackend有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系 统;MemoryStateBackend,它使用JobManager的堆保存状态快照。名称WorkingState状态备份快 照RocksDBStateBackend本地磁盘(tmpdir)分布式文件系统全量/增量支持大于内存大小的状态经验法则:比基 于堆的后端慢10倍FsStateBackendJVMHeap分布式文件系统全量快速,需要大的堆内存受限制于GCMemorySt ateBackendJVMHeapJobManagerJVMHeap全量适用于小状态(本地)的测试和实验当使用基于堆的st atebackend保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在?RocksDBStateBackend?中的对象 ,访问和更新涉及序列化和反序列化,所以会有更大的开销。但RocksDB的状态量仅受本地磁盘大小的限制。还要注意,只有?Rock sDBStateBackend?能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。所有这些stateba ckends都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。状态快照定义快照?–是Flink 作业状态全局一致镜像的通用术语。快照包括指向每个数据源的指针(例如,到文件或Kafka分区的偏移量)以及每个作业的有状态运算符 的状态副本,该状态副本是处理了sources偏移位置之前所有的事件后而生成的状态。Checkpoint?–一种由Flink 自动执行的快照,其目的是能够从故障中恢复。Checkpoints可以是增量的,并为快速恢复进行了优化。外部化的Checkpo int?–通常checkpoints不会被用户操纵。Flink只保留作业运行时的最近的?n?个checkpoints(n ?可配置),并在作业取消时删除它们。但你可以将它们配置为保留,在这种情况下,你可以手动从中恢复。Savepoint?–用户出于某 种操作目的(例如有状态的重新部署/升级/缩放操作)手动(或API调用)触发的快照。Savepoints始终是完整的,并且已针 对操作灵活性进行了优化。状态快照如何工作?Flink使用?https://en.wikipedia.org/wiki/Chand y-Lamport_algorithmChandy-Lamportalgorithm?算法的一种变体,称为异步barrier 快照(asynchronousbarriersnapshotting)。当checkpointcoordinator(jo bmanager的一部分)指示taskmanager开始checkpoint时,它会让所有sources记录它们 的偏移量,并将编号的?checkpointbarriers?插入到它们的流中。这些barriers流经jobgraph, 标注每个checkpoint前后的流部分。Checkpoint?n?将包含每个operator的state,这些sta te是对应的operator消费了严格在checkpointbarrier?n?之前的所有事件,并且不包含在此(chec kpointbarrier?n)后的任何事件后而生成的状态。当jobgraph中的每个operator接收到barr iers时,它就会记录下其状态。拥有两个输入流的Operators(例如?CoProcessFunction)会执行?barr ier对齐(barrieralignment)?以便当前快照能够包含消费两个输入流barrier之前(但不超过)的所有e vents而产生的状态。Flink的statebackends利用写时复制(copy-on-write)机制允许当异步生 成旧版本的状态快照时,能够不受影响地继续流处理。只有当快照被持久保存后,这些旧版本的状态才会被当做垃圾回收。确保精确一次(exac tlyonce)当流处理应用程序发生错误的时候,结果可能会产生丢失或者重复。Flink根据你为应用程序和集群的配置,可以产生以 下结果:Flink不会从快照中进行恢复(atmostonce)没有任何丢失,但是你可能会得到重复冗余的结果(atleast once)没有丢失或冗余重复(exactlyonce)Flink通过回退和重新发送source数据流从故障中恢复,当理想 情况被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。相反,这意味着?每一个事件都会影响Flink管理的状态精确一次 。Barrier只有在需要提供精确一次的语义保证时需要进行对齐(Barrieralignment)。如果不需要这种语义,可以通 过配置?CheckpointingMode.AT_LEAST_ONCE?关闭Barrier对齐来提高性能。端到端精确一次为了实 现端到端的精确一次,以便sources中的每个事件都仅精确一次对sinks生效,必须满足以下条件:你的sources必 须是可重放的,并且你的sinks必须是事务性的(或幂等的)实践练习https://ci.apache.org/projects /flink/flink-docs-release-1.13/zh/docs/try-flink/flink-operations -playground/FlinkOperationsPlayground?包括有关?https://ci.apache.or g/projects/flink/flink-docs-release-1.13/zh/docs/try-flink/flink- operations-playground/ObservingFailure&Recovery?的部分。延伸阅读https: //ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/co ncepts/stateful-stream-processing/StatefulStreamProcessinghttps ://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/o ps/state/state_backends/StateBackendshttps://ci.apache.org/proje cts/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/g uarantees/DataSources和Sinks的容错保证https://ci.apache.org/project s/flink/flink-docs-release-1.13/zh/docs/dev/datastream/fault-tole rance/checkpointing/开启和配置Checkpointinghttps://ci.apache.org/proj ects/flink/flink-docs-release-1.13/zh/docs/ops/state/checkpoints/ Checkpointshttps://ci.apache.org/projects/flink/flink-docs-releas e-1.13/zh/docs/ops/state/savepoints/Savepointshttps://ci.apache.o rg/projects/flink/flink-docs-release-1.13/zh/docs/ops/state/large _state_tuning/大状态与Checkpoint调优https://ci.apache.org/projects/fl ink/flink-docs-release-1.13/zh/docs/ops/monitoring/checkpoint_mon itoring/监控Checkpointhttps://ci.apache.org/projects/flink/flink-d ocs-release-1.13/zh/docs/dev/execution/task_failure_recovery/Task 故障恢复三、Flink的部署ApacheFlink可以以多种方式在不同的环境中部署,抛开这种多样性而言,Flink集群的基本构 建方式和操作原则仍然是相同的。在这篇文章里,你将会学习如何管理和运行Flink任务,了解如何部署和监控应用程序、Flink如何从失 败作业中进行恢复,同时你还会学习如何执行一些日常操作任务,如升级和扩容。本地模式安装请按照以下几个步骤下载最新的稳定版本开始使用。 步骤1:下载为了运行Flink,只需提前安装好?Java8或者Java11。你可以通过以下命令来检查Java是否已经 安装正确。java-versionhttps://flink.apache.org/zh/downloads.html下载?re lease1.13.2并解压。$tar-xzfflink-1.13.2-bin-scala_2.11.tgz$cdf link-1.13.2-bin-scala_2.11步骤2:启动集群Flink附带了一个bash脚本,可以用于启动本地集群 。$./bin/start-cluster.shStartingcluster.Startingstandalonesess iondaemononhost.Startingtaskexecutordaemononhost.步骤3:提交作业 (Job)Flink的Releases附带了许多的示例作业。你可以任意选择一个,快速部署到已运行的集群上。$./bin/f linkrunexamples/streaming/WordCount.jar$taillog/flink--taske xecutor-.out(to,1)(be,1)(or,1)(not,1)(to,2)(be,2)另外,你可以通过 Flink的?http://localhost:8081/WebUI?来监视集群的状态和正在运行的作业。步骤4:停止集群完成 后,你可以快速停止集群和所有正在运行的组件。$./bin/stop-cluster.sh单节点模式为了运行Flink,只需提前安 装好?Java8或者Java11。你可以通过以下命令来检查Java是否已经安装正确。java-versionhttp s://flink.apache.org/zh/downloads.html下载?release1.13.2。上传至Linux服 务器并解压。$tar-xzfflink-1.13.2-bin-scala_2.11.tgz-C{指定目录}$cdfl ink-1.13.2-bin-scala_2.11./bin/start-cluster.shhttps://link.zhihu .com/?target=http%3A//localhost%3A8081/http://{节点Ip}:8081为FlinkW ebUI场景说明这篇文章中的所有操作都是基于如下两个集群进行的:?https://ci.apache.org/projects/f link/flink-docs-release-1.13/zh/docs/concepts/glossary/FlinkSess ionCluster?以及一个Kafka集群,我们会在下文带领大家一起搭建这两个集群。一个Flink集群总是包含一个? https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/d ocs/concepts/glossary/JobManager?以及一个或多个?https://ci.apache.org/pr ojects/flink/flink-docs-release-1.13/zh/docs/concepts/glossary/Fl inkTaskManager。JobManager负责处理?https://ci.apache.org/projects/fl ink/flink-docs-release-1.13/zh/docs/concepts/glossary/Job?提交、Job 监控以及资源管理。FlinkTaskManager运行worker进程,负责实际任务?https://ci.apach e.org/projects/flink/flink-docs-release-1.13/zh/docs/concepts/glo ssary/Tasks?的执行,而这些任务共同组成了一个FlinkJob。在这篇文章中,我们会先运行一个TaskMana ger,接下来会扩容到多个TaskManager。另外,这里我们会专门使用一个?client?容器来提交FlinkJob, 后续还会使用该容器执行一些操作任务。需要注意的是,Flink集群的运行并不需要依赖?client?容器,我们这里引入只是为了 使用方便。这里的Kafka集群由一个Zookeeper服务端和一个KafkaBroker组成。一开始,我们会往Jo bManager提交一个名为?Flink事件计数?的Job,此外,我们还创建了两个KafkaTopic:input?和? output。该Job负责从?input?topic消费点击事件?ClickEvent,每个点击事件都包含一个?timest amp?和一个?page?属性。这些事件将按照?page?属性进行分组,然后按照每15s窗口?https://ci.apac he.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastr eam/operators/windows/windows?进行统计,最终结果输出到?output?topic中。总共有6 种不同的page属性,针对特定page,我们会按照每15s产生1000个点击事件的速率生成数据。因此,针对特定p age,该Flinkjob应该能在每个窗口中输出1000个该page的点击数据。环境搭建环境搭建只需要几步就可以完成 ,我们将会带你过一遍必要的操作命令,并说明如何验证我们正在操作的一切都是运行正常的。你需要在自己的主机上提前安装好?https: //docs.docker.com/docker?(1.12+)和?https://docs.docker.com/compos e/docker-compose?(2.1+)。我们所使用的配置文件位于?https://github.com/apache/fl ink-playgroundsflink-playgrounds?仓库中,检出该仓库并启动docker环境:gitclon e--branchrelease-{{site.version_title}}https://github.com/ap ache/flink-playgrounds.gitcdflink-playgrounds/operations-playgro unddocker-composebuilddocker-composeup-d接下来可以执行如下命令来查看Docker 容器:docker-composepsNameComman dStatePorts-------------------- ----------------------------------------------------------------- ----------------------------------------operations-playground_cli ckevent-generator_1/docker-entrypoint.shjava...Up6 123/tcp,8081/tcpoperations-playground_client_1/ docker-entrypoint.shflin...Exit0operations-playground_jobm anager_1/docker-entrypoint.shjobm...Up61 23/tcp,0.0.0.0:8081->8081/tcpoperations-playground_kafka_1 start-kafka.shUp0.0.0.0:909 4->9094/tcpoperations-playground_taskmanager_1/docker -entrypoint.shtask...Up6123/tcp,8081/tcpoperations-p layground_zookeeper_1/bin/sh-c/usr/sbin/sshd... Up2181/tcp,22/tcp,2888/tcp,3888/tcp从上面的信息可以看出client 容器已成功提交了FlinkJob(Exit0),同时包含数据生成器在内的所有集群组件都处于运行中状态(Up)。你可以 执行如下命令停止docker环境:docker-composedown-v环境讲解在这个搭建好的环境中你可以尝试和验证很多 事情,在下面的两个部分中我们将向你展示如何与Flink集群进行交互以及演示并讲解Flink的一些核心特性。FlinkWe bUI界面观察Flink集群首先想到的就是FlinkWebUI界面:打开浏览器并访问?http://localhost:8 081/http://localhost:8081,如果一切正常,你将会在界面上看到一个TaskManager和一个处于“R UNNING”状态的名为?ClickEventCount?的Job。FlinkWebUI界面包含许多关于Flink 集群以及运行在其上的Jobs的有用信息,比如:JobGraph、Metrics、CheckpointingStatistic s、TaskManagerStatus等等。日志JobManagerJobManager日志可以通过?docker-comp ose?命令进行查看。docker-composelogs-fjobmanagerJobManager刚启动完成之时,你会 看到很多关于checkpointcompletion(检查点完成)的日志。TaskManagerTaskManager日志 也可以通过同样的方式进行查看。docker-composelogs-ftaskmanagerTaskManager刚启动完 成之时,你同样会看到很多关于checkpointcompletion(检查点完成)的日志。FlinkCLIhttps:// ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/depl oyment/cli/FlinkCLI?相关命令可以在client容器内进行使用。比如,想查看FlinkCLI的?h elp?命令,可以通过如下方式进行查看:docker-composerun--no-depsclientflink--h elpFlinkRESTAPIhttps://ci.apache.org/projects/flink/flink-docs- release-1.13/zh/docs/ops/rest_api/FlinkRESTAPI?可以通过本机的?localhos t:8081?进行访问,也可以在client容器中通过?jobmanager:8081?进行访问。比如,通过如下命令可以获取 所有正在运行中的Job:curllocalhost:8081/jobsKafkaTopics可以运行如下命令查看Kafka Topics中的记录://inputtopic(1000records/s)docker-composeexecka fkakafka-console-consumer.sh\--bootstrap-serverlocalhost:9092 --topicinput//outputtopic(24records/min)docker-composeexec kafkakafka-console-consumer.sh\--bootstrap-serverlocalhost:90 92--topicoutput核心特性探索到目前为止,你已经学习了如何与Flink以及Docker容器进行交互,现在让 我们看一些常用的操作命令。本节中的各部分命令不需要按任何特定的顺序执行,这些命令大部分都可以通过?https://ci.apac he.org/projects/flink/flink-docs-release-1.13/zh/docs/try-flink/f link-operations-playground/CLI?或?https://ci.apache.org/projects/f link/flink-docs-release-1.13/zh/docs/try-flink/flink-operations-p layground/RESTAPI?执行。获取所有运行中的JobCLI命令docker-composerun--no-dep sclientflinklist预期输出Waitingforresponse...------------------ Running/RestartingJobs-------------------16.07.201916:37:55: :ClickEventCount(RUNNING)--------------------------- -----------------------------------Noscheduledjobs.RESTAPI请求cu rllocalhost:8081/jobs预期响应(结果已格式化){"jobs":[{"id":"" ,"status":"RUNNING"}]}一旦Job提交,Flink会默认为其生成一个JobID,后续对该Jo b的所有操作(无论是通过CLI还是RESTAPI)都需要带上JobID。Job失败与恢复在Job(部分)失败的 情况下,Flink对事件处理依然能够提供精确一次的保障,在本节中你将会观察到并能够在某种程度上验证这种行为。Step1:观 察输出如https://ci.apache.org/projects/flink/flink-docs-release-1.13/ zh/docs/try-flink/flink-operations-playground/前文所述,事件以特定速率生成,刚好使得 每个统计窗口都包含确切的1000条记录。因此,你可以实时查看outputtopic的输出,确定失败恢复后所有的窗口依然 输出正确的统计数字,以此来验证Flink在TaskManager失败时能够成功恢复,而且不丢失数据、不产生数据重复。为此 ,通过控制台命令消费?output?topic,保持消费直到Job从失败中恢复(Step3)。docker-compose execkafkakafka-console-consumer.sh\--bootstrap-serverlocalh ost:9092--topicoutputStep2:模拟失败为了模拟部分失败故障,你可以kill掉一个TaskMa nager,这种失败行为在生产环境中就相当于TaskManager进程挂掉、TaskManager机器宕机或者从框架或用户代 码中抛出的一个临时异常(例如,由于外部资源暂时不可用)而导致的失败。docker-composekilltaskmanager 几秒钟后,JobManager就会感知到TaskManager已失联,接下来它会取消Job运行并且立即重新提交该Jo b以进行恢复。当Job重启后,所有的任务都会处于?SCHEDULED?状态,如以下截图中紫色方格所示:注意:虽然Job 的所有任务都处于SCHEDULED状态,但整个Job的状态却显示为RUNNING。此时,由于TaskManager提 供的TaskSlots资源不够用,Job的所有任务都不能成功转为?RUNNING?状态,直到有新的TaskManager 可用。在此之前,该Job将经历一个取消和重新提交不断循环的过程。与此同时,数据生成器(datagenerator)一直 不断地往?input?topic中生成?ClickEvent?事件,在生产环境中也经常出现这种Job挂掉但源头还在不断产生数 据的情况。Step3:失败恢复一旦TaskManager重启成功,它将会重新连接到JobManager。docker-c omposeup-dtaskmanager当TaskManager注册成功后,JobManager就会将处于?SCHE DULED?状态的所有任务调度到该TaskManager的可用TaskSlots中运行,此时所有的任务将会从失败前最近一次 成功的?https://ci.apache.org/projects/flink/flink-docs-release-1.13/ zh/docs/learn-flink/fault_tolerance/checkpoint?进行恢复,一旦恢复成功,它们的状态 将转变为?RUNNING。接下来该Job将快速处理Kafkainput事件的全部积压(在Job中断期间累积的数据), 并以更快的速度(>24条记录/分钟)产生输出,直到它追上kafka的lag延迟为止。此时观察?output?topi c输出,你会看到在每一个时间窗口中都有按?page?进行分组的记录,而且计数刚好是1000。由于我们使用的是?https: //ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/co nnectors/datastream/kafka/FlinkKafkaProducer?“至少一次"模式,因此你可能会看到一些记 录重复输出多次。注意:在大部分生产环境中都需要一个资源管理器(Kubernetes、Yarn,、Mesos)对失败的Job 进行自动重启。Job升级与扩容升级Flink作业一般都需要两步:第一,使用?https://ci.apache.org/pr ojects/flink/flink-docs-release-1.13/zh/docs/ops/state/savepoints /Savepoint?优雅地停止FlinkJob。Savepoint是整个应用程序状态的一次快照(类似于checkpoi nt),该快照是在一个明确定义的、全局一致的时间点生成的。第二,从Savepoint恢复启动待升级的FlinkJob。 在此,“升级”包含如下几种含义:配置升级(比如Job并行度修改)Job拓扑升级(比如添加或者删除算子)Job的用户自定义函 数升级在开始升级之前,你可能需要实时查看?Output?topic输出,以便观察在升级过程中没有数据丢失或损坏。docker- composeexeckafkakafka-console-consumer.sh\--bootstrap-server localhost:9092--topicoutputStep1:停止Job要优雅停止Job,需要使用JobID 通过CLI或RESTAPI调用“stop”命令。JobID可以通过https://ci.apache.org/p rojects/flink/flink-docs-release-1.13/zh/docs/try-flink/flink-ope rations-playground/获取所有运行中的Job?接口或FlinkWebUI界面获取,拿到JobID后就可 以继续停止作业了:CLI命令docker-composerun--no-depsclientflinkstop-id>预期输出Suspendingjob""withasavepoint.Suspendedjob ""withasavepoint.Savepoint已保存在?state.savepoints.dir?指 定的路径中,该配置在?flink-conf.yaml?中定义,flink-conf.yaml?挂载在本机的?/tmp/flink- savepoints-directory/?目录下。在下一步操作中我们会用到这个Savepoint路径,如果我们是通过RE STAPI操作的,那么Savepoint路径会随着响应结果一起返回,我们可以直接查看文件系统来确认Savepoint 保存情况。命令ls-lia/tmp/flink-savepoints-directory预期输出total017drwx r-xr-x3rootroot6017jul17:05.2drwxrwxrwt135rootroot342 017jul17:09..1002drwxr-xr-x2rootroot14017jul17:05sav epoint--RESTAPI请求#停止Jobcurl-XPOSTlocalh ost:8081/jobs//stop-d''{"drain":false}''预期响应(结果已格式化){" request-id":""}请求#检查停止结果并获取savepoint路径curllocal host:8081/jobs//savepoints/预期响应(结果已格式化){"st atus":{"id":"COMPLETED"},"operation":{"location":"int-path>"}Step2a:重启Job(不作任何变更)现在你可以从这个Savepoint重新启动待升级的J ob,为了简单起见,不对该Job作任何变更就直接重启。CLI命令docker-composerun--no-depscl ientflinkrun-s\-d/opt/ClickCountJob.jar\ --bootstrap.serverskafka:9092--checkpointing--event-time预期输出St artingexecutionofprogramJobhasbeensubmittedwithJobID-id>RESTAPI请求#从客户端容器上传JARdocker-composerun--no-depsclientc url-XPOST-H"Expect:"\-F"jarfile=@/opt/ClickCountJob.jar"h ttp://jobmanager:8081/jars/upload预期响应(结果已格式化){"filename":"/tmp /flink-web-/flink-web-upload/","status":"success" }请求#提交Jobcurl-XPOSThttp://localhost:8081/jars//run\ -d''{"programArgs":"--bootstrap.serverskafka:9092--checkpoint ing--event-time","savepointPath":""}''预期响应(结果已 格式化){"jobid":""}一旦该Job再次处于?RUNNING?状态,你将从?output?Topi c中看到数据在快速输出,因为刚启动的Job正在处理停止期间积压的大量数据。另外,你还会看到在升级期间没有产生任何数据丢失 :所有窗口都在输出1000。Step2b:重启Job(修改并行度)在从Savepoint重启Job之前,你还可以 通过修改并行度来达到扩容Job的目的。CLI命令docker-composerun--no-depsclientfli nkrun-p3-s\-d/opt/ClickCountJob.jar\--b ootstrap.serverskafka:9092--checkpointing--event-time预期输出Start ingexecutionofprogramJobhasbeensubmittedwithJobID>RESTAPI请求#UploadingtheJARfromtheClientcontainerdocker-co mposerun--no-depsclientcurl-XPOST-H"Expect:"\-F"jarfil e=@/opt/ClickCountJob.jar"http://jobmanager:8081/jars/upload预期响应 (结果已格式化){"filename":"/tmp/flink-web-/flink-web-upload/ar-id>","status":"success"}请求#提交Jobcurl-XPOSThttp://localh ost:8081/jars//run\-d''{"parallelism":3,"programArgs" :"--bootstrap.serverskafka:9092--checkpointing--event-time", "savepointPath":""}''预期响应(结果已格式化){"jobid":"b-id>"}现在Job已重新提交,但由于我们提高了并行度所以导致TaskSlots不够用(1个TaskSlot可用 ,总共需要3个),最终Job会重启失败。通过如下命令:docker-composescaletaskmanager=2 你可以向Flink集群添加第二个TaskManager(为Flink集群提供2个TaskSlots资源),它会 自动向JobManager注册,TaskManager注册完成后,Job会再次处于“RUNNING”状态。一旦Job 再次运行起来,从?output?Topic的输出中你会看到在扩容期间数据依然没有丢失:所有窗口的计数都正好是1000。查询 Job指标可以通过JobManager提供的RESTAPI来获取系统和用户https://ci.apache.org /projects/flink/flink-docs-release-1.13/zh/docs/ops/metrics/指标具体请 求方式取决于我们想查询哪类指标,Job相关的指标分类可通过?jobs//metrics?获得,而要想查询某类指标 的具体值则可以在请求地址后跟上?get?参数。请求curl"localhost:8081/jobs//metri cs?get=lastCheckpointSize"预期响应(结果已格式化且去除了占位符)[{"id":"lastChec kpointSize","value":"9378"}]RESTAPI不仅可以用于查询指标,还可以用于获取正在运行中的 Job详细信息。请求#可以从结果中获取感兴趣的vertex-idcurllocalhost:8081/jobs/id>预期响应(结果已格式化){"jid":"","name":"ClickEventCount", "isStoppable":false,"state":"RUNNING","start-time":15644670 66026,"end-time":-1,"duration":374793,"now":1564467440819, "timestamps":{"CREATED":1564467066026,"FINISHED":0,"SUSPEND ED":0,"FAILING":0,"CANCELLING":0,"CANCELED":0,"RECONCILIN G":0,"RUNNING":1564467066126,"FAILED":0,"RESTARTING":0}, "vertices":[{"id":"","name":"ClickEventSource", "parallelism":2,"status":"RUNNING","start-time":15644670664 23,"end-time":-1,"duration":374396,"tasks":{"CREATED":0, "FINISHED":0,"DEPLOYING":0,"RUNNING":2,"CANCELING":0,"FAI LED":0,"CANCELED":0,"RECONCILING":0,"SCHEDULED":0},"metr ics":{"read-bytes":0,"read-bytes-complete":true,"write-byte s":5033461,"write-bytes-complete":true,"read-records":0,"re ad-records-complete":true,"write-records":166351,"write-recor ds-complete":true}},{"id":"","name":"ClickEven tCounter","parallelism":2,"status":"RUNNING","start-time": 1564467066469,"end-time":-1,"duration":374350,"tasks":{"CR EATED":0,"FINISHED":0,"DEPLOYING":0,"RUNNING":2,"CANCELIN G":0,"FAILED":0,"CANCELED":0,"RECONCILING":0,"SCHEDULED": 0},"metrics":{"read-bytes":5085332,"read-bytes-complete": true,"write-bytes":316,"write-bytes-complete":true,"read-rec ords":166305,"read-records-complete":true,"write-records":6, "write-records-complete":true}},{"id":"","name ":"ClickEventStatisticsSink","parallelism":2,"status":"RUNN ING","start-time":1564467066476,"end-time":-1,"duration":37 4343,"tasks":{"CREATED":0,"FINISHED":0,"DEPLOYING":0,"RU NNING":2,"CANCELING":0,"FAILED":0,"CANCELED":0,"RECONCILI NG":0,"SCHEDULED":0},"metrics":{"read-bytes":20668,"read -bytes-complete":true,"write-bytes":0,"write-bytes-complete": true,"read-records":6,"read-records-complete":true,"write-r ecords":0,"write-records-complete":true}}],"status-counts" :{"CREATED":0,"FINISHED":0,"DEPLOYING":0,"RUNNING":4,"C ANCELING":0,"FAILED":0,"CANCELED":0,"RECONCILING":0,"SCHE DULED":0},"plan":{"jid":"","name":"ClickEventCo unt","nodes":[{"id":"","parallelism":2,"operat or":"","operator_strategy":"","description":"ClickEventStati sticsSink","inputs":[{"num":0,"id":"","ship_s trategy":"FORWARD","exchange":"pipelined_bounded"}],"optimi zer_properties":{}},{"id":"","parallelism":2," operator":"","operator_strategy":"","description":"ClickEven tCounter","inputs":[{"num":0,"id":"","ship_s trategy":"HASH","exchange":"pipelined_bounded"}],"optimizer _properties":{}},{"id":"","parallelism":2,"ope rator":"","operator_strategy":"","description":"ClickEventS ource","optimizer_properties":{}}]}}请查阅?https://ci.apache.or g/projects/flink/flink-docs-release-1.13/zh/docs/ops/rest_api/RES TAPI参考,该参考上有完整的指标查询接口信息,包括如何查询不同种类的指标(例如TaskManager指标)。你可能已经注 意到了,ClickEventCount?这个Job在启动时总是会带上?--checkpointing?和?--event- time?两个参数,如果我们去除这两个参数,那么Job的行为也会随之改变。--checkpointing?参数开启了?htt ps://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs /learn-flink/fault_tolerance/checkpoint?配置,checkpoint是Flink容错机 制的重要保证。如果你没有开启checkpoint,那么在?https://ci.apache.org/projects/fli nk/flink-docs-release-1.13/zh/docs/try-flink/flink-operations-pla yground/Job失败与恢复这一节中,你将会看到数据丢失现象发生。--event-time?参数开启了Job的?http s://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/ concepts/time/事件时间?机制,该机制会使用?ClickEvent?自带的时间戳进行统计。如果不指定该参数,Flin k将结合当前机器时间使用事件处理时间进行统计。如此一来,每个窗口计数将不再是准确的1000了。ClickEventCou nt?这个Job还有另外一个选项,该选项默认是关闭的,你可以在?client?容器的?docker-compose.yaml? 文件中添加该选项从而观察该Job在反压下的表现,该选项描述如下:--backpressure?将一个额外算子添加到Job中 ,该算子会在偶数分钟内产生严重的反压(比如:10:12期间,而10:13期间不会)。这种现象可以通过多种https://ci .apache.org/projects/flink/flink-docs-release-1.13/zh/docs/ops/me trics/网络指标观察到,比如:outputQueueLength?和?outPoolUsage?指标,通过WebUI上的h ttps://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/do cs/ops/monitoring/back_pressure/反压监控也可以观察到。机群搭建Flink机群相对来说配置比较简单配 置Jdk1.8,同样的步骤,上传解压。注意机群模式需要配置主机访问从机的SSH免密。编辑配置文件viconf/flink-c onf.yaml注意yaml文件冒号后必须加一个空格再填写参数。jobmanager.rpc.address:(此项设置为主机 IP地址设置)编写conf/slaves文件,填加从机IP地址。从机配置地址主机Ip地址配置分发文件到从机,分发脚本如下。#!/b in/bashpcount=$#if((pcount==0));thenechonoargs;exit;fip1=$1fna me=`basename$p1`echofname=$fnamepdir=`cd-P$(dirname$p1);pwd `echopdir=$pdiruser=`whoami`//注意下一行你必须修改,换成主机名,或者你的IPfor((host=1 02;host<105;host++));doecho--------hadoop$host--------rsync- rvl$pdir/$fname$user@hadoop$host:$pdirdone启动机群bin/start-cluster .sh查看WebUI{主机地址}:8081Flink集群规模Flink集群一般规模多大?大家注意,这个问题看起来是问你实际应用 中的Flink集群规模,其实还隐藏着另一个问题:Flink可以支持多少节点的集群规模?在回答这个问题时候,可以将自己生产环节中的集 群规模、节点、内存情况说明,同时说明部署模式(一般是FlinkonYarn),除此之外,用户也可以同时在小集群(少于5个节点) 和拥有TB级别状态的上千个节点上运行Flink任务。集群角色Flink程序在运行时主要有TaskManager,Job Manager,Client三种角色。其中JobManager扮演着集群中的管理者Master的角色,它是整个集群的协调者,负责接 收FlinkJob,协调检查点,Failover故障恢复等,同时管理Flink集群中从节点TaskManager。TaskMa nager是实际负责执行计算的Worker,在其上执行FlinkJob的一组Task,每个TaskManager负责管理其所在节 点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。Client是Flink程序提交的客户端, 当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Fl ink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager 的连接,将FlinkJob提交给JobManager。四、Flink运行架构Flink运行时的组件JobManager控制一个 应用程序执行的主线程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行JobManager会先接收到要执行 的应用程序,这个程序包括作业图(JobGraph),逻辑数据图(logicaldataflowgraph)?和打包了所有的类、 库和其它资源的JAR包JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(Exec utionGraph)包含了所有可以并发执行的任务JobManager会向资源管理器(ResourceManager)请求 执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽。一旦他获取到了足够的资源,就会将执行图分发到真正运行性他们 的TaskManager上,而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoin ts)的协调。公司怎么提交的实时任务,有多少JobManager?1.我们使用yarnsession模式提交任务。 每次提交都会创建一个新的Flink集群,为每一个job提供一个yarn-session,任务之间互相独立,互不影响,方 便管理。任务执行完成之后创建的集群也会消失。2.集群默认只有一个JobManager。但为了防止单点故障,我们配置了高可用。 我们公司一般配置一个主JobManager,两个备用JobManager,然后结合ZooKeeper的使用,来达到高 可用。Flink的checkpoint的存储介质可以是内存,文件系统,或者RocksDB。端到端的exactly-once 对sink要求比较高,具体实现主要有幂等写入和事务性写入两种方式。幂等写入的场景依赖于业务逻辑,更常见的是用事务性写入。 而事务性写入又有预写日志(WAL)和两阶段提交(2PC)两种方式。在下级存储(外部系统)不支持事务的情况下,Flink想要保证e xactly-once,可以用预写日志的方式,把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写 入sink系统TaskManger?Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个Tas kManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。?启动之后,Tas kManager会向ResourceManager注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插 槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。?在执行过程中,一个TaskMa nager可以跟其它运行同一应用程序的TaskManager交换数据。ResourceManager?主要负责管理任务管理器(Ta skManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。?Flink为不同的环境和资源 管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。?当JobManager申请插槽资源 时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManag er没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。D ispatcher?可以跨作业运行,它为应用提交提供了REST接口。?当一个应用被提交执行时,分发器就会启动并将应用移交给一个Jo bManager。?Dispatcher也会启动一个WebUI,用来方便地展示和监控作业执行的信息。?Dispatcher在架构 中可能并不是必需的,这取决于应用提交运行的方式。任务提交流程任务提交流程(Yarn)任务调度原理TaskManager和Slots ?Flink中每一个TaskManager都是一个JVM进程,它可能会在独立的线程上执行一个或多个子任务?为了控制一个Ta skManager能接收多少个task,TaskManager通过taskslot来进行控制(一个TaskMana ger至少有一个slot)?默认情况下,Flink允许子任务共享slot,即使它们是不同任务的子任务。这样的结果是,一个 slot可以保存作业的整个管道。?TaskSlot是静态的概念,是指TaskManager具有的并发执行能力在Flin k架构角色中我们提到,TaskManager是实际负责执行计算的Worker,TaskManager是一个JVM进程,并会以 独立的线程来执行一个task或多个subtask。为了控制一个TaskManager能接受多少个task,Flink提出了 TaskSlot的概念。简单的说,TaskManager会将自己节点上管理的资源分为不同的Slot:固定大小的资源子集。这样 就避免了不同Job的Task互相竞争内存资源,但是需要主要的是,Slot只会做内存的隔离。没有做CPU的隔离。常用算子Flink 最常用的常用算子包括:Map:DataStream→DataStream,输入一个参数产生一个参数,map的功能是对输入的参数 进行转换操作。Filter:过滤掉指定条件的数据。KeyBy:按照指定的key进行分组。Reduce:用来进行结果汇总合并。Win dow:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)程序与数据流(DataFlow)?所有的Fli nk程序都是由三部分组成的:?Source?、Transformation?和?Sink。?Source负责读取数据源,Tran sformation利用各种算子进行处理加工,Sink?负责输出?在运行时,Flink上运行的程序会被映射成“逻辑数据流”(da taflows),它包含了这三部分?每一个dataflow以一个或多个sources开始以一个或多个sinks结束。dataflo w类似于任意的有向无环图(DAG)?在大部分情况下,程序中的转换运算(transformations)跟dataflow中的算子( operator)是一一对应的关系执行图?Flink中的执行图可以分成四层:StreamGraph->JobGraph-> ExecutionGraph->物理执行图?StreamGraph:是根据用户通过StreamAPI编写的代码生成的最 初的图。用来表示程序的拓扑结构。?JobGraph:StreamGraph经过优化后生成了JobGraph,提交给JobMan ager的数据结构。主要的优化为,将多个符合条件的节点chain在一起作为一个节点?ExecutionGraph:JobMa nager根据JobGraph生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本, 是调度层最核心的数据结构。?物理执行图:JobManager根据ExecutionGraph对Job进行调度后,在各个T askManager上部署Task后形成的“图”,并不是一个具体的数据结构。并行度(Parallelism)?一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)。一般情况下,一个?stream?的并行度,可以认为就是 其所有算子中最大的并行度。?一个程序中,不同的算子可能具有不同的并行度?算子之间传输数据的形式可以是?one-to-one(fo rwarding)?的模式也可以是redistributing?的模式,具体是哪一种形式,取决于算子的种类?One-to-one: stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map算子的子任务看到的元素的个数以及顺序跟so urce算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。? Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不 同的目标任务。例如,keyBy基于hashCode重分区、而broadcast和rebalance会随机重新分区,这 些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。F link中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。我们在实际生产环境中可 以从四个不同层面设置并行度:操作算子层面(OperatorLevel)执行环境层面(ExecutionEnvironment Level)客户端层面(ClientLevel)系统层面(SystemLevel)需要注意的优先级:算子层面>环境层面>客户端 层面>系统层面。十二、Flink的Slot和parallelism有什么区别?官网上十分经典的图:slot是指taskmanage r的并发执行能力,假设我们将taskmanager.numberOfTaskSlots配置为3那么每一个taskmanag er中分配3个TaskSlot,3个taskmanager一共有9个TaskSlot。parallelism是指task manager实际使用的并发能力。假设我们把parallelism.default设置为1,那么9个TaskSlot只能用 1个,有8个空闲。十三、Flink有没有重启策略?说说有哪几种?Flink实现了多种重启策略。固定延迟重启策略(FixedDe layRestartStrategy)故障率重启策略(FailureRateRestartStrategy)没有重启策略 (NoRestartStrategy)Fallback重启策略(FallbackRestartStrategy)任务链(O peratorChains)?Flink采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要 求,必须将两个或多个算子设为相同的并行度,并通过本地转发(localforward)的方式进行连接?相同并行度的?one-to- one?操作,Flink这样相连的算子链接在一起形成一个task,原来的算子成为里面的subtask?并行度相同、并且是o ne-to-one操作,两个条件缺一不可分区策略什么要搞懂什么是分区策略。分区策略是用来决定数据如何发送至下游。目前Flink 支持了8中分区策略的实现。上图是整个Flink实现的分区策略继承图:GlobalPartitioner?数据会被分发到下游算子的 第一个实例中进行处理。ShufflePartitioner?数据会被随机分发到下游算子的每一个实例中进行处理。RebalanceP artitioner?数据会被循环发送到下游的每一个实例中进行处理。RescalePartitioner?这种分区器会根据上下游算 子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为2,编号为A和B。下游并行度为4,编号为1,2, 3,4。那么A则把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游并行度为4,编号为A,B,C,D。下游并行度为2,编号 为1,2。那么A和B则把数据发送给1,C和D则把数据发送给2。BroadcastPartitioner?广播分区会将上游数据输出到 下游算子的每个实例中。适合于大数据集和小数据集做Jion的场景。ForwardPartitioner?ForwardPartiti oner用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner用来做数据 的控制台打印。KeyGroupStreamPartitioner?Hash分区器。会将数据按Key的Hash值输出到下游算 子实例中。CustomPartitionerWrapper?用户自定义分区器。需要用户自己实现Partitioner接口,来定义自 己的分区逻辑。例如:staticclassCustomPartitionerimplementsPartitionerng>{@Overridepublicintpartition(Stringkey,intnumPartitions) {switch(key){case"1":return1;case"2":return2;case"3" :return3;default:return4;}}}分布式缓存Flink实现的分布式缓存和Hadoop有异曲 同工之妙。目的是在本地读取文件,并把他放在taskmanager节点中,防止task重复拉取。valenv=Execut ionEnvironment.getExecutionEnvironment//registerafilefromHD FSenv.registerCachedFile("hdfs:///path/to/your/file","hdfsFile") //registeralocalexecutablefile(script,executable,...)env .registerCachedFile("file:///path/to/exec/file","localExecFile", true)//defineyourprogramandexecute...valinput:DataSet[St ring]=...valresult:DataSet[Integer]=input.map(newMyMapper( ))...env.execute()广播变量我们知道Flink是并行的,计算过程可能不在一个Slot中进行,那么有一种情况即 :当我们需要访问同一份数据。那么Flink中的广播变量就是为了解决这种情况。我们可以把广播变量理解为是一个公共的共享变量,我们可以 把一个dataset数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。窗口Flink支 持两种划分窗口的方式,按照time和count。如果根据时间划分窗口,那么它就是一个time-window如果根据数据划分窗口, 那么它就是一个count-window。flink支持窗口的两个重要属性(size和interval)如果size=interva l,那么就会形成tumbling-window(无重叠数据)如果size>interval,那么就会形成sliding-wind ow(有重叠数据)如果size将会漏掉2秒钟的数据。通过组合可以得出四种基本窗口:time-tumbling-window无重叠数据的时间窗口,设置方式举例: timeWindow(Time.seconds(5))time-sliding-window有重叠数据的时间窗口,设置方式举例: timeWindow(Time.seconds(5),Time.seconds(3))count-tumbling-window 无重叠数据的数量窗口,设置方式举例:countWindow(5)count-sliding-window有重叠数据的数量窗口,设 置方式举例:countWindow(5,3)状态存储Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择 的状态存储策略不同,会影响状态持久化如何和checkpoint交互。Flink提供了三种状态存储方式:MemoryStateB ackend、FsStateBackend、RocksDBStateBackend。五、Flink流处理APIEnvironmen tgetExecutionEnvironment创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的此方法返回本地执行 环境。如果从命令行客户端调用程序以提交到集群,那么此方法返回集群的执行环境。会根据查询运行的方式决定返回什么样的运行环境,是最常用 的一种创建环境的方式。valenv:ExecutionEnvironment=ExecutionEnvironment.g etExecutionEnvironmentvalenv=StreamExecutionEnvironment.getExe cutionEnvironment如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1.createLoc alEnvironment返回本地环境valenv=StreamExecutionEnvironment.createLoc alEnvironment(1)createRemoteEnvironment返回集群执行环境,将Jar提交到远程服务器,需要在调 用时制定JobManager的IP和端口号,并指定要在集群中运行的Jar包valenv=ExecutionEnvironme nt.createRemoteEnvironment("jobmanage-hostname",6123,"YOURPATH// wordcount.jar")Source从集合读取数据packagecom.atguigu.apitest1importor g.apache.flink.streaming.api.scala._caseclassSensor(id:String ,timestamp:Long,tempeature:Double)objectSourceTest1{defma in(args:Array[String]):Unit={valenv=StreamExecutionEnvironm ent.getExecutionEnvironmentenv.setParallelism(1)//从集合中读取数据val stream:DataStream[Sensor]=env.fromCollection(List(Sensor("se nsor_1",1547718199,35.8),Sensor("sensor_6",1547718201,15.4), Sensor("sensor_7",1547718202,6.7),Sensor("sensor_10",1547718 205,38.1),Sensor("sensor_1",1547718207,37.2),Sensor("sensor_ 1",1547718212,33.5),Sensor("sensor_1",1547718215,38.1)))/ /打印输出stream.print("stream")env.execute("sourcetestjob")}}自定义 sourcepackagecom.atguigu.apitest1importorg.apache.flink.streami ng.api.functions.source.SourceFunctionimportorg.apache.flink.str eaming.api.scala._importscala.util.RandomcaseclassSensor(id:S tring,timestamp:Long,tempeature:Double)objectSourceTest1{ defmain(args:Array[String]):Unit={valenv=StreamExecutionEn vironment.getExecutionEnvironmentenv.setParallelism(1)//自定义sour cevalstream5=env.addSource(newMySensor())stream5.print("stre am5")env.execute("sourcetestjob")}}//实现一个自定义的SourceFunction,自 动生成测试数据classMySensor()extendsSourceFunction[Sensor]{//定义一个fla g,表示数据源是否正常进行varrunning:Boolean=trueoverridedefcancel(): Unit=running=false//随机生成sensorreading数据overridedefrun(sour ceContext:SourceFunction.SourceContext[Sensor]):Unit={//定义一个 随机数发生器valrand=newRandom()//随机生成10个传感器的温度值,并且不停在之前温度基础上更新(随机上 下波动)//首先生成10个传感器的初始温度varcurTemps=1.to(10).map(i=>("sensor _"+i,60+rand.nextGaussian()20)//高斯随机数)//无限循环,生成随机数据while (running){//在当前温度基础上,随机生成微小波动curTemps=curTemps.map(data=>( data._1,data._2+rand.nextGaussian()))//获取当前系统时间valcurTs=Sys tem.currentTimeMillis()//包装成样例类,用ctx发出数据curTemps.foreach(data =>sourceContext.collect(Sensor(data._1,curTs,data._2)))//定义间隔时 间Thread.sleep(1000L)}}}Transform转换算子mapvalstreamMap=stream. map{x=>x2}flatmapvalstreamFlatMap=stream.flatMap{x=> x.split("")}FiltervalstreamFilter=stream.filter{x=>x==1 }KeyByDataStream?→?KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素, 在内部以hash的形式实现的。packagecom.atguigu.apitest1importcom.atguigu.api test.MyIDSelectorimportorg.apache.flink.api.java.functions.KeySe lectorimportorg.apache.flink.streaming.api.scala._objectTransfo rm1{defmain(args:Array[String]):Unit={valenv=StreamExe cutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)v aldataStreamFromFile:DataStream[String]=env.readTextFile("D:\ \Flink\\20-Flink【www.hoh0.com】\\FlinkTutorial\\src\\main\\notes\\ bigdata")valdataStream:DataStream[Sensor]=dataStreamFromFil e.map(data=>{valdataArray=data.split(",")Sensor(dataArray (0),dataArray(1).toLong,dataArray(2).toDouble)}).keyBy("id")// .keyBy(newMyIDSelect())//.min("temperature").reduce ((curRes,newData)=>Sensor(curRes.id,curRes.timestamp.max(newDat a.timestamp),curRes.temperature.min(newData.temperature)))//聚合出每个 Sensor的最大时间戳和最小温度值dataStream.print()env.execute("transformtes tjob")}}classMyIDSelect()extendsKeySelector[Sensor,String]{ overridedefgetKey(value:Sensor):String=???}滚动聚合算子(RollingA ggregation)这些算子可以针对KeyedStream的每一个支流做聚合sum()min()max()minBy()maxB y()ReduceKeyedStream?→?DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生 一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。valstream2=env.readTex tFile("YOUR_PATH\\sensor.txt").map(data=>{valdataArray=da ta.split(",")SensorReading(dataArray(0).trim,dataArray(1).trim. toLong,dataArray(2).trim.toDouble)}).keyBy("id").reduce((x, y)=>SensorReading(x.id,x.timestamp+1,y.temperature))Split和 Selectpackagecom.atguigu.apitest1importcom.atguigu.apitest.MyID Selectorimportorg.apache.flink.api.java.functions.KeySelectorimp ortorg.apache.flink.streaming.api.scala._objectTransform1{def main(args:Array[String]):Unit={valenv=StreamExecutionEnv ironment.getExecutionEnvironmentenv.setParallelism(1)//分流vals plitStream:SplitStream[Sensor]=dataStream.split(data=>{if(da ta.temperature>30)Seq("high")elseSeq("low")})valhighTempStr eam:DataStream[Sensor]=splitStream.select("high")vallowTimeSt ream:DataStream[Sensor]=splitStream.select("low")valallTimeStre am:DataStream[Sensor]=splitStream.select("high","low")highTemp Stream.print("high")lowTimeStream.print("low")allTimeStream.pri nt("all")env.execute("transformtestjob")}}splitDataStream?→?S plitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。按逻辑将DataStream分 开。selectSplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。 valsplitStream=stream2.split(sensorData=>{if(sensorData. temperature>30)Seq("high")elseSeq("low")})valhigh=split Stream.select("high")vallow=splitStream.select("low")valall= splitStream.select("high","low")Connect和CoMappackagecom.atguig u.apitest1importcom.atguigu.apitest.MyIDSelectorimportorg.apach e.flink.api.java.functions.KeySelectorimportorg.apache.flink.str eaming.api.scala._objectTransform1{defmain(args:Array[String ]):Unit={valenv=StreamExecutionEnvironment.getExecutionEnv ironmentenv.setParallelism(1)//分流valsplitStream:SplitStream[ Sensor]=dataStream.split(data=>{if(data.temperature>30)Seq(" high")elseSeq("low")})valhighTempStream:DataStream[Sensor]= splitStream.select("high")vallowTimeStream:DataStream[Sensor]= splitStream.select("low")valallTimeStream:DataStream[Sensor]= splitStream.select("high","low")//合流valwarningStream:DataStr eam[(String,Double,String)]=highTempStream.map(data=>(data.id ,data.temperature,"hightempwarning"))valconnectedStreams:C onnectedStreams[(String,Double,String),Sensor]=warningStream.co nnect(lowTimeStream)valresultStream:DataStream[Object]=connect edStreams.map(warningData=>(warningData._1,warningData._2,warn ingData._3),lowTimeData=>(lowTimeData.id,"normal"))resultStr eam.print("result")env.execute("transformtestjob")}}classMyI DSelect()extendsKeySelector[Sensor,String]{overridedefgetKey (value:Sensor):String=???DataStream,DataStream?→?ConnectedStr eams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不 发生任何变化,两个流相互独立。CoMap,CoFlatMapConnectedStreams→DataStream:作用于Co nnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行 map和flatMap处理。valwarning=high.map(sensorData=>(sensorData.i d,sensorData.temperature))valconnected=warning.connect(low)v alcoMap=connected.map(warningData=>(warningData._1,warning Data._2,"warning"),lowData=>(lowData.id,"healthy"))Union[外链图 片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vEd2BDPc-1625279341418)(D:\Fli nk\union_27.png)]DataStream?→?DataStream:对两个或者两个以上的DataStream进行un ion操作,产生一个包含所有DataStream元素的新DataStream。valunionStream:DataStrea m[StartUpLog]=appStoreStream.union(otherStream)unionStream.prin t("union:::")支持的数据类型基础数据类型Flink支持所有的Java和Scala基础数据类型,Int,Double, Long,String,…val?numbers??DataStream[Long]?=?env.fromEleme nts(1L,2L,3L,4L)numbers.map(n?=>?n+1)Java和Scala元组(Tuples)v alpersons:DataStream[(String,Integer)]=env.fromEle ments(("Adam",17),("Sarah",23))persons.filter(p=>p._2 >18)Scala样例类(caseclasses)caseclassPerson(name: String,age:Int)valpersons:DataStream[Person] =env.fromElements(Person("Adam",17),Person("Sarah", 23))Java简单对象(POJOs)publicclassPerson{publicStringname;publi cintage;publicPerson(){}publicPerson(Stringname,int age){this.name=name;this.age=age;}}DataStreamper sons=env.fromElements(newPerson("Alex",42),newPerson("Wend y",23));其它(Arrays,Lists,Maps,Enums,等等)Flink对Java和Scala中的一些特殊 目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。六、时间语义与WartermarkFli nk中的时间语义在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:EventTime:是事件创建的时间。它通常 由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。Inge stionTime:是数据进入Flink的时间。ProcessingTime:是每一个执行基于时间操作的算子的本地系统时间 ,与机器相关,默认的时间属性就是ProcessingTime。例如,一条日志进入Flink的时间为2017-11-12 10:00:00.123,到达Window的系统时间为2017-11-1210:00:01.234,日志的内容如下:201 7-11-0218:37:15.624INFOFailovertorm2对于业务来说,要统计1min内的故障日志个 数,哪个时间是最有意义的?——eventTime,因为我们要根据日志的生成时间进行统计。Flink中的时间和其他流式计算系统的 时间一样分为三类:事件时间,摄入时间,处理时间三种。如果以EventTime为基准来定义时间窗口将形成EventTimeWin dow,要求消息本身就应该携带EventTime。如果以IngesingtTime为基准来定义时间窗口将形成Ingestin gTimeWindow,以source的systemTime为准。如果以ProcessingTime基准来定义时间窗口将形 成ProcessingTimeWindow,以operator的systemTime为准。EventTime的引入在F link的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用Proc essingTime或者IngestionTime。如果要使用EventTime,那么需要引入EventTime的时间属 性,引入方式如下所示:StreamExecutionEnvironmentenv=StreamExecutionEnvir onment.getExecutionEnvironment();env.setParallelism(1);//TODO 基于事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.Event Time);Watermark基本概念我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和 时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱 序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的EventTime顺序排列的。那么此时出现一个问 题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去 ,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是?Watermark。Wat ermark是一种衡量EventTime进展的机制。Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark机制结合window来实现。数据流中的Watermark用于表示timestamp小于Water mark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。Watermark可以理解成一个延 迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后 认定eventTime小于maxEventTime-t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTi me–t,那么这个窗口被触发执行。有序流的Watermarker如下图所示:(Watermark设置为0)乱序流的W atermarker如下图所示:(Watermark设置为2)当Flink接收到数据时,会按照一定的规则去生成Wate rmark,这条Watermark就等于当前所有到达数据中的maxEventTime-延迟时长,也就是说,Watermar k是基于数据携带的时间戳生成的,一旦Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于e venttime是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。上图中,我们设置的 允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Wate rmark是10s,如果我们的窗口1是1s~5s,窗口2是6s~10s,那么时间戳为7s的事件到达时的Wat ermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。Watermark就 是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。只要没有达到水位那么不管现实中 的时间推进了多久都不会触发关窗。Watermark是ApacheFlink为了处理EventTime窗口计算提出的一种 机制,本质上是一种时间戳。一般来讲Watermark经常和Window一起被用来处理乱序事件。Watermark的引入wat ermark的引入很简单,对于乱序数据,最常见的引用方式如下://乱序数据设置watermarks//TODOassignT imestampsAndWatermarks是毫秒,所以乘以1000变成秒//延迟两秒处理.assignTimestamps AndWatermarks(newBoundedOutOfOrdernessTimestampExtractorReading>(Time.seconds(2)){@OverridepubliclongextractTime stamp(SensorReadingelement){//TODO将毫秒变成秒returnelement.getTi mestamp()1000L;}});EventTime的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事 件时间是什么(数据源里的数据没有时间戳的话,就只能使用ProcessingTime了。)我们看到上面的例子中创建了一个看起来 有点复杂的类,这个类实现的其实就是分配时间戳的接口。Flink暴露了TimestampAssigner接口供我们实现,使我们可 以自定义如何从事件数据中抽取时间戳。StreamExecutionEnvironmentenv=StreamExecutio nEnvironment.getExecutionEnvironment();//设置事件时间语义env.setStreamTi meCharacteristic(TimeCharacteristic.EventTime);DataStreameading>dataStream=env.addSource(newSensorSource()).assignTimestampsAndWatermarks(newMyAssigner());MyAssigner有两种类型AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks以上两个接口都继承自TimestampAssigner。Assignerwithperiodicwatermarks周期性的生成watermark:系统会周期性的将watermark插入到流中(水位线也是一种特殊的事件!)。默认周期是200毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。//每隔5秒产生一个watermarkenv.getConfig.setAutoWatermarkInterval(5000);产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark。例子,自定义一个周期性的时间戳抽取://自定义周期性时间戳分配器publicstaticclassMyPeriodicAssignerimplementsAssignerWithPeriodicWatermarks{privateLongbound=601000L;//延迟一分钟privateLongmaxTs=Long.MIN_VALUE;//当前最大时间戳@Nullable@OverridepublicWatermarkgetCurrentWatermark(){returnnewWatermark(maxTs-bound);}@OverridepubliclongextractTimestamp(SensorReadingelement,longpreviousElementTimestamp){maxTs=Math.max(maxTs,element.getTimestamp());returnelement.getTimestamp();}}一种简单的特殊情况是,如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,那我们可以使用AscendingTimestampExtractor,这个类会直接使用数据的时间戳生成watermark。DataStreamdataStream=dataStream.assignTimestampsAndWatermarks(newAscendingTimestampExtractor(){@OverridepubliclongextractAscendingTimestamp(SensorReadingelement){returnelement.getTimestamp()1000;}});而对于乱序数据流,如果我们能大致估算出数据流中的事件的最大延迟时间,就可以使用如下代码:DataStreamdataStream=dataStream.assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor(Time.seconds(1)){@OverridepubliclongextractTimestamp(SensorReadingelement){returnelement.getTimestamp()1000L;}});Assignerwithpunctuatedwatermarks间断式地生成watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理。直接上代码来举个例子,我们只给sensor_1的传感器的数据流插入watermark:publicstaticclassMyPunctuatedAssignerimplementsAssignerWithPunctuatedWatermarks{privateLongbound=601000L;//延迟一分钟@Nullable@OverridepublicWatermarkcheckAndGetNextWatermark(SensorReadinglastElement,longextractedTimestamp){if(lastElement.getId().equals("sensor_1"))returnnewWatermark(extractedTimestamp-bound);elsereturnnull;}@OverridepubliclongextractTimestamp(SensorReadingelement,longpreviousElementTimestamp){returnelement.getTimestamp();}}七、压测和监控要做压力测试和监控我们一般碰到的压力来自以下几个方面:一、产生数据流的速度如果过快,而下游的算子消费不过来的话,会产生背压。背压的监控可以使用FlinkWebUI(localhost:8081)来可视化监控,一旦报警就能知道。一般情况下背压问题的产生可能是由于sink这个操作符没有优化好,做一下优化就可以了。比如如果是写入ElasticSearch,那么可以改成批量写入,可以调大ElasticSearch队列的大小等等策略。二,设置watermark的最大延迟时间这个参数,如果设置的过大,可能会造成内存的压力。可以设置最大延迟时间小一些,然后把迟到元素发送到侧输出流中去。晚一点更新结果。或者使用类似于RocksDB这样的状态后端,RocksDB会开辟堆外存储空间,但IO速度会变慢,需要权衡。三,还有就是滑动窗口的长度如果过长,而滑动距离很短的话,Flink的性能会下降的很厉害。我们主要通过时间分片的方法,将每个元素只存入一个“重叠窗口”,这样就可以减少窗口处理中状态的写入。四,状态后端使用RocksDB,还没有碰到被撑爆的问题。Flink面试题第二部分:Flink面试进阶篇一、Flink是如何支持批流一体的?本道面试题考察的其实就是一句话:Flink的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink使用一个引擎支持了DataSetAPI和DataStreamAPI。二、Flink是如何做到高效的数据交换的?在一个FlinkJob中,数据需要在不同的task中进行交换,整个数据交换是有TaskManager负责的,TaskManager的网络组件首先从缓冲buffer中收集records,然后再发送。Records并不是一个一个被发送的,二是积累一个批次再发送,batch技术可以更加高效的利用网络资源。三、Flink是如何做容错的?Flink实现容错主要靠强大的CheckPoint机制和State机制。Checkpoint负责定时制作分布式快照、对程序中的状态进行备份;State用来存储计算过程中的中间状态。四、Flink分布式快照的原理是什么?Flink的分布式快照是根据Chandy-Lamport算法量身定做的。简单来说就是持续创建分布式数据流及其状态的一致快照。核心思想是在inputsource端插入barrier,控制barrier的同步来实现snapshot的备份和exactly-once语义。五、Flink是如何保证Exactly-once语义的?Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。分为以下几个步骤:开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面预提交(preCommit)将内存中缓存的数据写入文件并关闭正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟丢弃(abort)丢弃临时文件若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据。六、Flink的kafka连接器有什么特别的地方?Flink源码中有一个独立的connector模块,所有的其他connector都依赖于此模块,Flink在1.9版本发布的全新kafka连接器,摒弃了之前连接不同版本的kafka集群需要依赖不同版本的connector这种做法,只需要依赖一个connector即可。七、说说Flink的内存管理是如何做的?Flink并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink为了直接操作二进制数据实现了自己的序列化框架。理论上Flink的内存管理分为三部分:NetworkBuffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改MemoryManagepool:大量的MemorySegment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heaporoff-heap,这个放到下节谈),内存的分配支持预分配和lazyload,默认懒加载的方式。UserCode,这部分是除了MemoryManager之外的内存用于Usercode和TaskManager本身的数据结构。八、说说Flink的序列化如何做的?Java本身自带的序列化和反序列化的功能,但是辅助信息占用空间比较大,在序列化对象时记录了过多的类信息。ApacheFlink摒弃了Java原生的序列化方法,以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。TypeInformation是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器。TypeInformation支持以下几种类型:BasicTypeInfo:任意Java基本类型或String类型BasicArrayTypeInfo:任意Java基本类型数组或String数组WritableTypeInfo:任意HadoopWritable接口的实现类TupleTypeInfo:任意的FlinkTuple类型(支持Tuple1toTuple25)。Flinktuples是固定长度固定类型的JavaTuple实现CaseClassTypeInfo:任意的ScalaCaseClass(包括Scalatuples)PojoTypeInfo:任意的POJO(JavaorScala),例如,Java对象的所有成员变量,要么是public修饰符定义,要么有getter/setter方法GenericTypeInfo:任意无法匹配之前几种类型的类针对前六种类型数据集,Flink皆可以自动生成对应的TypeSerializer,能非常高效地对数据集进行序列化和反序列化。九、Flink中的Window出现了数据倾斜,你有什么解决办法?window产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决:在数据进入窗口前做预聚合重新设计窗口聚合的key十、Flink中在使用聚合函数GroupBy、Distinct、KeyBy等函数时出现数据热点该如何解决?数据倾斜和数据热点是所有大数据框架绕不过去的问题。处理这类问题主要从3个方面入手:在业务上规避这类问题例如一个假设订单场景,北京和上海两个城市订单量增长几十倍,其余城市的数据量不变。这时候我们在进行聚合的时候,北京和上海就会出现数据堆积,我们可以单独数据北京和上海的数据。Key的设计上把热key进行拆分,比如上个例子中的北京和上海,可以把北京和上海按照地区进行拆分聚合。参数设置Flink1.9.0SQL(BlinkPlanner)性能优化中一项重要的改进就是升级了微批模型,即MiniBatch。原理是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐和减少数据的输出量。十一、Flink任务延迟高,想解决这个问题,你会如何入手?在Flink的后台任务管理中,我们可以看到Flink的哪个算子和task出现了反压。最主要的手段是资源调优和算子调优。资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。十二、Flink是如何处理反压的?Flink内部是基于producer-consumer模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink使用了高效有界的分布式阻塞队列,就像Java通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。十三、Flink的反压和Strom有哪些不同?Storm是通过监控Bolt中的接收队列负载情况,如果超过高水位值就会将反压信息写到Zookeeper,Zookeeper上的watch会通知该拓扑的所有Worker都进入反压状态,最后Spout停止发送tuple。Flink中的反压使用了高效有界的分布式阻塞队列,下游消费变慢会导致发送端阻塞。二者最大的区别是Flink是逐级反压,而Storm是直接从源头降速。十四、OperatorChains(算子链)这个概念你了解吗?为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是我们所说的算子链。十五、Flink什么情况下才会把Operatorchain在一起形成算子链?两个operatorchain在一起的的条件:上下游的并行度一致下游节点的入度为1(也就是说下游节点没有来自其他节点的输入)上下游节点都在同一个slotgroup中(下面会解释slotgroup)下游节点的chain策略为ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)上游节点的chain策略为ALWAYS或HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)两个节点间数据分区方式是forward(参考理解数据流的分区)用户没有禁用chain十六、说说Flink1.9的新特性?支持hive读写,支持UDFFlinkSQLTopN和GroupBy等优化Checkpoint跟savepoint针对实际业务场景做了优化Flinkstate查询十七、消费kafka数据的时候,如何处理脏数据?可以在处理前加一个fliter算子,将不符合规则的数据过滤出去。第三部分:Flink面试源码篇一、FlinkJob的提交流程?用户提交的FlinkJob会被转化成一个DAG任务运行,分别是:StreamGraph、JobGraph、ExecutionGraph,Flink中JobManager与TaskManager,JobManager与Client的交互是基于Akka工具包的,是通过消息驱动。整个FlinkJob的提交还包含着ActorSystem的创建,JobManager的启动,TaskManager的启动和注册。二、Flink所谓"三层图"结构是哪几个"图"?一个Flink任务的DAG生成计算图大致经历以下三个过程:StreamGraph最接近代码所表达的逻辑层面的计算拓扑结构,按照用户代码的执行顺序向StreamExecutionEnvironment添加StreamTransformation构成流式图。JobGraph从StreamGraph生成,将可以串联合并的节点进行合并,设置节点之间的边,安排资源共享slot槽位和放置相关联的节点,上传任务所需的文件,设置检查点配置等。相当于经过部分初始化和优化处理的任务图。ExecutionGraph由JobGraph转换而来,包含了任务具体执行所需的内容,是最贴近底层实现的执行图。三、JobManger在集群中扮演了什么角色?JobManager负责整个Flink集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlot资源并命令TaskManager启动从客户端中获取的应用。JobManager相当于整个集群的Master节点,且整个集群有且只有一个活跃的JobManager,负责整个集群的任务管理和资源管理。JobManager和TaskManager之间通过ActorSystem进行通信,获取任务执行的情况并通过ActorSystem将应用的任务执行情况发送给客户端。同时在任务执行的过程中,FlinkJobManager会触发Checkpoint操作,每个TaskManager节点收到Checkpoint触发指令后,完成Checkpoint操作,所有的Checkpoint协调过程都是在FinkJobManager中完成。当任务完成后,Flink会将任务执行的信息反馈给客户端,并且释放掉TaskManager中的资源以供下一次提交任务使用。四、JobManger在集群启动过程中起到什么作用?JobManager的职责主要是接收Flink作业,调度Task,收集作业状态和管理TaskManager。它包含一个Actor,并且做如下操作:RegisterTaskManager:它由想要注册到JobManager的TaskManager发送。注册成功会通过AcknowledgeRegistration消息进行Ack。SubmitJob:由提交作业到系统的Client发送。提交的信息是JobGraph形式的作业描述信息。CancelJob:请求取消指定id的作业。成功会返回CancellationSuccess,否则返回CancellationFailure。UpdateTaskExecutionState:由TaskManager发送,用来更新执行节点(ExecutionVertex)的状态。成功则返回true,否则返回false。RequestNextInputSplit:TaskManager上的Task请求下一个输入split,成功则返回NextInputSplit,否则返回null。JobStatusChanged:它意味着作业的状态(RUNNING,CANCELING,FINISHED,等)发生变化。这个消息由ExecutionGraph发送。五、TaskManager在集群中扮演了什么角色?TaskManager相当于整个集群的Slave节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。客户端通过将编写好的Flink应用编译打包,提交到JobManager,然后JobManager会根据已注册在JobManager中TaskManager的资源情况,将任务分配给有资源的TaskManager节点,然后启动并运行任务。TaskManager从JobManager接收需要部署的任务,然后使用Slot资源启动Task,建立数据接入的网络连接,接收数据并开始数据处理。同时TaskManager之间的数据交互都是通过数据流的方式进行的。可以看出,Flink的任务运行其实是采用多线程的方式,这和MapReduce多JVM进行的方式有很大的区别,Flink能够极大提高CPU使用效率,在多个任务和Task之间通过TaskSlot方式共享系统资源,每个TaskManager中通过管理多个TaskSlot资源池进行对资源进行有效管理。六、TaskManager在集群启动过程中起到什么作用?TaskManager的启动流程较为简单:启动类:org.apache.flink.runtime.taskmanager.TaskManager核心启动方法:selectNetworkInterfaceAndRunTaskManager启动后直接向JobManager注册自己,注册完成后,进行部分模块的初始化。七、Flink计算资源的调度是如何实现的?TaskManager中最细粒度的资源是Taskslot,代表了一个固定大小的资源子集,每个TaskManager会将其所占有的资源平分给它的slot。通过调整taskslot的数量,用户可以定义task之间是如何相互隔离的。每个TaskManager有一个slot,也就意味着每个task运行在独立的JVM中。每个TaskManager有多个slot的话,也就是说多个task运行在同一个JVM中。而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输,也能共享一些数据结构,一定程度上减少了每个task的消耗。每个slot可以接受单个task,也可以接受多个连续task组成的pipeline,如下图所示,FlatMap函数占用一个taskslot,而keyAgg函数和sink函数共用一个taskslot:八、简述Flink的数据抽象及数据交换过程?Flink为了避免JVM的固有缺陷例如java对象存储密度低,FGC影响吞吐和响应等,实现了自主管理内存。MemorySegment就是Flink的内存抽象。默认情况下,一个MemorySegment可以被看做是一个32kb大的内存块的抽象。这块内存既可以是JVM里的一个byte[],也可以是堆外内存(DirectByteBuffer)。在MemorySegment这个抽象之上,Flink在数据从operator内的数据对象在向TaskManager上转移,预备被发给下个节点的过程中,使用的抽象或者说内存对象是Buffer。对接从Java对象转为Buffer的中间对象是另一个抽象StreamRecord。九、Flink中的分布式快照机制是如何实现的?Flink的容错机制的核心部分是制作分布式数据流和操作算子状态的一致性快照。这些快照充当一致性checkpoint,系统可以在发生故障时回滚。Flink用于制作这些快照的机制在“分布式数据流的轻量级异步快照”中进行了描述。它受到分布式快照的标准Chandy-Lamport算法的启发,专门针对Flink的执行模型而定制。barriers在数据流源处被注入并行数据流中。快照n的barriers被插入的位置(我们称之为Sn)是快照所包含的数据在数据源中最大位置。例如,在ApacheKafka中,此位置将是分区中最后一条记录的偏移量。将该位置Sn报告给checkpoint协调器(Flink的JobManager)。然后barriers向下游流动。当一个中间操作算子从其所有输入流中收到快照n的barriers时,它会为快照n发出barriers进入其所有输出流中。一旦sink操作算子(流式DAG的末端)从其所有输入流接收到barriersn,它就向checkpoint协调器确认快照n完成。在所有sink确认快照后,意味快照着已完成。一旦完成快照n,job将永远不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束。十、简单说说FlinkSQL的是如何实现的?Flink将SQL校验、SQL解析以及SQL优化交给了ApacheCalcite。Calcite在其他很多开源项目里也都应用到了,譬如ApacheHive,ApacheDrill,ApacheKylin,Cascading。Calcite在新的架构中处于核心的地位,如下图所示。构建抽象语法树的事情交给了Calcite去做。SQLquery会经过Calcite解析器转变成SQL节点树,通过验证后构建成Calcite的抽象语法树(也就是图中的LogicalPlan)。另一边,TableAPI上的调用会构建成TableAPI的抽象语法树,并通过Calcite提供的RelBuilder转变成Calcite的抽象语法树。然后依次被转换成逻辑执行计划和物理执行计划。在提交任务后会分发到各个TaskManager中运行,在运行时会使用Janino编译器编译代码后运行。Flink技术参考手册1 |
|