一、Kafka中的消息是否会丢失和重复消费要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费 1、消息发送kafka消息发送有同步(sync)、异步(async)两种,以及三种消息确认方式。 1). sync vs asyncKafka消息发送有两种方式:同步(sync)和异步(async),可通过默认是同步方式producer.type属性进行配置。 在官方文档Producer Configs中有如下:
翻译过来就是:
对于异步模式,还有4个配套的参数,如下:
2). acksproducers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数,如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。若设置为-1,producer会在所有ISR副本完成同步时,得到broker的确认,这个设置可以得到最高的可靠性保证。 Kafka的消息确认方式通过配置request.required.acks属性配置(仅仅for sync):
简单说:
3)分析下面分情况来分析消息丢失的场景: 1)在request.required.acks配置为1(只保证写入leader成功)的话,如果刚好leader partition挂了,数据就会丢失。 2)使用异步模式的时候,当缓冲区满了,如果阻塞等待的时间配置为0(还没有收到确认的情况下,缓冲池一满,就清除缓冲池里的消息),数据就会被立即丢弃掉。 在数据生产时避免数据丢失的方法: 只要能避免上述两种情况,那么就可以保证消息不会被丢失。 1)确认机制设置为-1,也就是让消息写入leader和所有的ISR副本。 2)还有,在异步模式下,如果消息发出去了,但还没有收到确认的时候,缓冲池满了,在配置文件中设置成不限制阻塞超时的时间,也就说让生产端一直阻塞,这样也能保证数据不会丢失。 2、消息消费Kafka消息消费有两个consumer接口,Low-level API和High-level API:
丢失消息的场景: 如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了; 解决办法: 如果使用了storm,要开启storm的ackfail机制;如果没有使用storm,低级API中需要手动控制offset值。 3.数据重复消费 (1)去重:将消息的唯一标识保存到外部介质中,每次消费处理时判断是否处理过; (2)不管:大数据场景中,报表系统或者日志信息丢失几条都无所谓,不会影响最终的统计分析结 二、Kafka的Leader选举机制Kafka将每个Topic进行分区Patition,以提高消息的并行处理,同时为保证高可用性,每个分区都有一定数量的副本 Replica,这样当部分服务器不可用时副本所在服务器就可以接替上来,保证系统可用性。在Leader上负责读写,Follower负责数据的同步。当一个Leader发生故障如何从Follower中选择新Leader呢? Kafka在Zookeeper上针对每个Topic都维护了一个ISR(in-sync replica---已同步的副本)的集合,集合的增减Kafka都会更新该记录。如果某分区的Leader不可用,Kafka就从ISR集合中选择一个副本作为新的Leader。这样就可以容忍的失败数比较高,假如某Topic有N+1个副本,则可以容忍N个服务器不可用。 如果ISR中副本都不可用,有两种处理方法:
附:Unclean leader 选举: 如果节点全挂了?请注意,Kafka 对于数据不会丢失的保证,是基于至少一个节点在保持同步状态,一旦分区上的所有备份节点都挂了,就无法保证了。 但是,实际在运行的系统需要去考虑假设一旦所有的备份都挂了,怎么去保证数据不会丢失,这里有两种实现的方法
这是可用性和一致性之间的简单妥协,如果我只等待 ISR 的备份节点,那么只要 ISR 备份节点都挂了,我们的服务将一直会不可用,如果它们的数据损坏了或者丢失了,那就会是长久的宕机。另一方面,如果不是 ISR 中的节点恢复服务并且我们允许它成为 leader , 那么它的数据就是可信的来源,即使它不能保证记录了每一个已经提交的消息。 kafka 默认选择第二种策略,当所有的 ISR 副本都挂掉时,会选择一个可能不同步的备份作为 leader ,可以配置属性 unclean.leader.election.enable 禁用此策略,那么就会使用第 一种策略即停机时间优于不同步。 这种困境不只有 Kafka 遇到,它存在于任何 quorum-based 规则中。例如,在大多数投票算法当中,如果大多数服务器永久性的挂了,那么您要么选择丢失100%的数据,要么违背数据的一致性选择一个存活的服务器作为数据可信的来源。 可用性和持久性保证向 Kafka 写数据时,producers 设置 ack 是否提交完成, 0:不等待broker返回确认消息,1: leader保存成功返回或, -1(all): 所有备份都保存成功返回.请注意. 设置 “ack = all” 并不能保证所有的副本都写入了消息。默认情况下,当 acks = all 时,只要 ISR 副本同步完成,就会返回消息已经写入。例如,一个 topic 仅仅设置了两个副本,那么只有一个 ISR 副本,那么当设置acks = all时返回写入成功时,剩下了的那个副本数据也可能数据没有写入。 尽管这确保了分区的最大可用性,但是对于偏好数据持久性而不是可用性的一些用户,可能不想用这种策略,因此,我们提供了两个topic 配置,可用于优先配置消息数据持久性:
|
|