第4课:SparkStreaming事务处理作者:杭州-Frank本期导读:ExactlyOnce输出不重复http://lib.c sdn.net/base/10Spark?Streaming?事务处理架构什么是事务以银行转帐为例,A用户转笔账给B用户,如果B用 户没收到账,或者收到多笔账,都是破坏事务的一致性。事务处理就是,能够处理且只会处理一次,即A只转一次,B只收一次。从事务视角解密S parkStreaminghttp://lib.csdn.net/base/16架构SparkStreaming应用程序启动时会 分配资源,除非整个集群硬件资源崩溃,一般情况下都不会有问题。SparkStreaming程序分成而部分:DriverExecuto rReceiver接收到数据后不断发送元数据给Driver,Driver接收到metadata元数据信息后进行CheckPoint 处理。其中CheckPoint包括:Configuration(含有http://lib.csdn.net/base/10Spar k?Conf、SparkStreaming等配置信息)、BlockMetaData、DStreamGraph、未处理完和等待中 的Job。Receiver可以在多个Executor节点的上执行Job,Job的执行完全基于SparkCore的调度模式进行。架构 演进变化图根据上面的解密画出最基础的架构,如下图1所示:图1图1只是SparkStreaming基本的情况,Executor只有 函数处理逻辑和数据,外部InputStream流入到Receiver中通过BlockManager写入磁盘、内存、WAL进行容错。 WAL先写入磁盘然后写入Executor中,WAL它是写进HDFS的,失败可能性不大。如果1G数据要处理,Executor一条一条 接收,Receiver接收数据是积累到一定记录后才会写入WAL,如果Receiver线程失败时,数据有可能会丢失。Driver处理 元数据前会进行CheckPoint,SparkStreaming获取数据、产生作业,但没有解决执行的问题,执行一定要经过Spar kContext。Driver级别的恢复,通过CheckPoint从文件系统上恢复,重新构建StreamingContext, 也就是重新构建SparkContext,再重新生成SparkJob,再提交Spark集群运行。Receiver的重新恢复时会通过 磁盘的WAL从磁盘恢复过来。演进后的架构如下图2所示:图2SparkStreaming和Kafka结合不会出现WAL数据丢失的问 题,SparkStreaming必须考虑外部流水线的方式处理。SparkStreaming1.3的时候为了避免WAL的性能损 失和实现ExactlyOnce而提供了KafkaDirectAPI,把Kafka作为文件存储系统!SparkStream ing+Kafka构建完美的流处理世界!下图3是再次演化后的架构图:图3ExactlyOnce原理ExactlyOnce的事务 处理:1.数据零丢失:必须有可靠的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint, 且通过WAL来保证数据安全;2.SparkStreaming1.3的时候为了避免WAL的性能损失和实现ExactlyOnce 而提供了KafkaDirectAPI,把Kafka作为文件存储系统!!!此时兼具有流的优势和文件系统的优势,至此,Spark Streaming+Kafka就构建了完美的流处理世界!!!为什么说它完美:数据不需要拷贝复本;不需要进行WAL,不必要的性能损耗 。kafka比HDFS高效很多,内存中产生Memorycopy的方式。有了kafka就不需要所谓的Receiver,所有的Exe cutors通过KafkaAPI直接消费数据,直接管理Offset,所以也不会重复消费数据;事务实现啦!!!数据丢失及其具体的解 决方式:在Receiver收到数据且通过Driver的调度Executor开始计算数据的时候,如果Driver突然崩溃,则此时Ex ecutor会被Kill掉,那么Executor中的数据就会丢失。此时就必须通过列如WAL的方式,让所有的数据都通过列如HDFS的 方式进行安全性容错处理,此时如果Executor数据丢失就可与通过WAL恢复。(通过WAL的方式会极大损伤SparkStream ing中Receiver接受数据的性能。)数据重复读取的情况:在Receiver收到数据且保存到HDFS等持久化引擎,但是没有来得 及进行updateOffsets(数据来源都是基于kafka),此时Receiver崩溃后重新启动,就会通过管理kafka的zoo keeper中元数据再次重复读取数据,但是此时SparkStreaming认为是成功的,但是Kafka认为是失败的(因为没有更新o ffset到zookeeper中),这就导致数据重复消费。性能损伤:通过WAL方式会极大的损伤SparkStreaming中Re ceivers接受数据的性能;如果通过Kafka的作为数据来源的话,Kafka中有数据,然后Receiver接受的时候又会有数据副 本,这个时候其实是存储资源的浪费;(处理数据的时候能访问到元数据信息,把元数据信息存储到内存数据库,再次计算时去查询元数据信息,如 果处理过,就不处理。)关于SparkStreaming数据输出多次重写及其解决方案:为什么会有这个问题,因为SparkStre aming在计算的时候基于SparkCore,SparkCore天生会做以下事情导致SparkStreaming的结果(部分 )重复输出。Task重试;慢任务推测;Stage重复;Job重试。具体解决方案:设置spark.task.maxFailures次 数为1;设置spark.speculation为关闭状态(因为慢任务推测其实非常消耗性能,所以关闭后可以显著提高SparkStr eaming处理性能)SparkStreamingonKafka的话,任务失败就是Job失败,Job失败后可以设置auto. offset.reset为“largest”的方式;最后再次强调可以通过transform和foreachRDD基于业务逻辑代码 进行逻辑控制来实现数据不重复消费和输出不重复!这两个方式类似于SparkStreaming的后门,可以做任意想象的控制操作!附 录:Kafka摘录:http://www.open-open.com/lib/view/open1421150566328.htm lhttp://www.open-open.com/lib/view/open1421150566328.htmlKafka是什么 Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统( 也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apach e基金会并成为顶级开源项目。前言一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下 面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。Kafka文件存储机制Kafka 部分名词解释如下:Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka 集群。Topic:一类消息,例如pageview日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多 个topic的分发。Partition:topic物理上的分组,一个topic可以分为多个partition,每个partiti on是一个有序的队列。Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。分析过程分为 以下4个步骤:topic中partition存储分布partiton中文件存储方式partiton中segment文件存储结构pa rtition中如何通过offset查找message通过上述4过程详细分析,我们就可以清楚认识到kafka文件存储机制的奥秘。t opic中partition存储分布假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文 件存储根目录,在Kafkabroker中server.properties文件配置(参数log.dirs=xxx/message -folder),例如创建2个topic名称分别为report_push、launch_info,partitions数量都为 partitions=4存储路径和目录规则为:xxx/message-folder|--report_push-0|--rep ort_push-1|--report_push-2|--report_push-3|--launch_info-0|-- launch_info-1|--launch_info-2|--launch_info-3在Kafka文件存储中,同一个top ic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个p artiton序号从0开始,序号最大值为partitions数量减1。如果是多broker分布情况,如下图1所示:图1partit on中文件存储方式下图2形象说明了partition中文件存储方式:图2每个partion(目录)相当于一个巨型文件被平均分配到多 个大小相等segment(段)数据文件中。但每个段segmentfile消息数量不一定相等,这种特性方便oldsegment file快速被删除。每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。这样做的好处就是 能快速删除无用文件,有效提高磁盘利用率。partiton中segment文件存储结构读者从2.2节了解到Kafka文件系统part ition存储方式,本节深入分析partion中segmentfile组成和物理结构。segmentfile组成:由2大部分组 成,分别为indexfile和datafile,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为se gment索引文件、数据文件.segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment 文件名为上一个全局partion的最大offset(偏移message数)。数值最大为64位long大小,19位数字字符长度,没有 数字用0填充。下面文件列表是笔者在Kafkabroker上做的一个实验,创建一个topicXXX包含1partition,设置 每个segment大小为500MB,并启动producer向Kafkabroker写入大量数据,如下图2所示segment文件列 表形象说明了上述2个规则:图3以上图3中一对segmentfile文件为例,说明segment中index<—->datafi le对应关系物理结构如下:图4在上图4中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中messa ge的物理偏移地址。其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第 368772个message)、以及该消息的物理偏移地址为497。segmentdatafile由许多message组成,下图 5详细说明message物理结构如下:图5参数说明:关键字解释说明8byteoffset在parition(分区)内的每条消息 都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offse t表示partiion的第多少message4bytemessagesizemessage大小4byteCRC32用cr c32校验message1byte“magic"表示本次发布Kafka服务程序协议版本号1byte“attributes" 表示为独立版本、或标识压缩类型、或编码类型。4bytekeylength表示key的长度,当key为-1时,Kbytek ey字段不填Kbytekey可选valuebytespayload表示实际消息数据。在partition中如何通过offs et查找message例如读取offset=368776的message,需要通过下面2个步骤查找。第一步查找segmentfi le上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个 文件00000000000000368769.index的消息量起始偏移量为368770=368769+1.同样,第三个 文件00000000000000737337.index的起始偏移量为737338=737337+1,其他后续文件依次类推,以 起始偏移量命名并排序这些文件,只要根据offset二分查找文件列表,就可以快速定位到具体文件。当offset=36877 6时定位到00000000000000368769.index|log第二步通过segmentfile查找message通过第一 步定位到segmentfile,当offset=368776时,依次定位到00000000000000368769.index的 元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过0000000000000036876 9.log顺序查找直到offset=368776为止。从上述图3可知这样做的优点,segmentindexfile采取稀疏索 引存储方式,它减少索引文件大小,通过mmap(文件或其它对象映射进内存)可以直接内存操作,稀疏索引为数据文件的每个对应messag e设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。3Kafka文件存储机制–实际运行效果实验环 境:Kafka集群:由2台虚拟机组成cpu:4核物理内存:8GB网卡:千兆网卡jvmheap:4GB详细Kafka服务端配置及 其优化请参考:?http://blog.csdn.net/lizhitao/article/details/25667831kaf kaserver.properties配置详解图6从上图可以看出基本没有大量读磁盘的操作,只有(定期批量)写磁盘操作。之所以操 作磁盘这么高效,这跟Kafka文件存储设计中读写message是息息相关的。Kafka中读写message有如下特点:写messa ge消息从java堆转入PageCache(即物理内存)。由异步线程刷盘,消息从pagacache刷入磁盘。读message消 息直接从PageCache(数据在虚拟内存)转入socket发送出去。当从PageCache没有找到相应数据时,此时会产生磁盘 IO,从磁盘Load消息到PageCache,然后直接从socket发出去。总结Kafka高效文件存储设计特点:Kafka把to pic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。通过索引信 息可以快速定位message和确定response的最大大小。通过index元数据全部映射到memory,可以避免segmentfile的IO磁盘操作。通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。参考1.?LinuxPageCache机制2.?http://kafka.apache.org/documentation.htmlKafka官方文档感谢王家林老师的知识分享王家林:DT大数据梦工厂创始人、Spark亚太研究院院长和首席专家、大数据培训专家、大数据架构师。新浪微博:http://weibo.com/ilovepains微信公众号:DT_Spark博客:http://blog.sina.com.cn/ilovepains手机:18610086859QQ:1740415547邮箱:18610086859@vip.126.comYY课堂:每天20:00现场授课频道68917580?Spark源码定制班第4课10/11 |
|