分享

Kafka权威指南...

 法律安全 2022-10-24 发布于浙江

生产者与消费者

主要从java客户端进行整理,现有Go、python、C++等客户端

1.生产者

一个程序在很多的情况下需要往kafka下写入消息,例如记录用户的活动,直播间送礼等。

1.1 创建Kafka生产者

要往Kafka中写入消息,首先要创建一个生产者对象,并设置一些属性。一般有三个必须属性。

  1. 指定的broker地址,地址的格式为host:port。可以不包含所有的broker地址,生产者会从给定的broker中找到其他的broker地址,为了防止宕机,一般至少要提供两个broker信息。
  2. key指定序列化器
  3. value指定序列化器

1.2发送消息到Kafka

主要分为三种方式:

  1. 发送并忘记
  2. 同步发送
  3. 异步发送

1.3生产者的配置

  1. acks:表示指定必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。若asks=0,表示生产者在写入消息成功之前不会等待任何来自于服务器的响应,若出现问题,则消息丢失。若asks=all,表示当所有节点均收到消息时,生产者才会收到一个来自服务器的成功响应。
  2. buffer.memory(内存缓冲区大小):用于设置生产者内存缓存区大小,生产者用它缓冲要发送到服务器的消息。
  3. compression.type(消息是否被压缩,已何种方式压缩):默认情况下消息不会被压缩,可设置为gzip等。指定压缩方式
  4. retries(重复次数):决定了生产者可以重发消息的次数,若达到这个次数,生产者就会放弃重试,并返回错误。这里需要注意的是:若需要严格保证发送顺序的情况,retries次数会影响顺序,这时需配合max.in.flight.request.per.connection=1使用,但是这样会影响吞吐量
  5. batch.size(一个批次可用的内存大小)
  6. linger.ms(发送消息时等待时间):会增加延时,但是会提升吞吐量
  7. client.id(标识消息来源)
  8. max.in.flight.requests.per.connection:指定生产者在收到服务器响应之前可以发送多少个消息。设置为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试
  9. time.ms(broker等待同步副本返回消息确认的时间)、request.timeout.ms(生产者在发送数据时等待服务器返回响应的时间)、metadata.fetch.timeout.ms(生产者在获取元数据时等待服务器返回响应的时间)
  10. max.block.ms (最大阻塞时间)
  11. max.request.size(生产者发送请求的大小)
  12. receive.buffer.bytes和send.buffer.bytes(分别指定了TCP接收以及发送数包的缓冲区大小)

1.4分区

由于kafka消息是一个个的键值对。
若键值为空,并且使用了默认的分区器,那么记录将会被随机的发送到主题的各个可用分区上,分区器用轮询算法将消息均衡的分步到各个分区上。
若键值不为空,且使用了默认的分区器,那么kafka会对键进行散列,然后根据散列的值将消息映射到对应的分区上。这里需注意的是:同一个键总是被映射到同一个分区上,所以在进行映射的,我们会使用主题所有的分区,而不仅仅是可用的分区,若写入的分区时不可用的,就会发生错误
若一个键的消息过多,且使用默认的分区器,会导致对应的分区很大,这时可以使用自定义的分区策略

2.消费者

2.1消费者与消费者群组

Kafka消费者从属于消费者群组,一个群组里的消费者订阅的是同一个主题,每个消费者接受主题一部分分区的消息。
下面几个图展示了消费者与分区的对应关系:
Alt
一个消费者收到4个分区的消息
Alt
两个消费者收到4个分区的消息
Alt
4个消费者收到4个分区的消息
但是往群组里添加更多的消费者,超过主题的分区数量时,就会有一部分消费者就会闲置,不会收到消息,例如下图:
Alt
5个消费者收到4个分区的消息
往群组中添加消费者是横向伸缩消费能力的主要方式。
若新增一个消费组,与之前的消费组互不影响,新增加的消费者群组跟之前的消费者群组相似。
Alt
两个消费者群组对应一个主题
简述:为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组。然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的消费只处理一部分消息。

2.2消费者群组和分区再均衡

当一个消费者被关闭或发生崩溃时,他就离开群组,原本由他读取的分区将由群组里的其他消费者来读取。在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。但是再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。此外,当分区被重新分配给另一个消费者的时,消费者当前的读取状态会丢失,他有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过向被指派为群组协调器的broker发送心跳来维持他们和群组的从属关系以及它们对分区的所有权关系。消费者会在轮询消息或者提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就过期,群组协调器认为它已经死亡,就会触发一次再均衡。
分区分配的过程
当消费者假如群组的时候,他会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者将成为“群主”。群主从协调器哪里获得群主的成员列表(列表中包含了所有最近发送过心跳的消费者,他们被认为是活跃的),并负责给每一个消费者分配分区,他使用一个实现了PartitionAssignor接口的类来决定那些分区应该被分配给那个消费者。

2.3创建一个消费者

每个编程语言都有自己客户端实现的消费者,在读取消费者之前,需要创建一个KafkaConsumer对象。在这里不多赘述。

2.4订阅主题

创建好消费者之后,开始订阅主题。java客户端通过调用subscribe()方法时传入一个正则表达式,正则表达式可以匹配多个主题。例如:consumer.subscribe(test.*)即为订阅所有与test相关的主题。

2.5轮询

消息轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。
轮询的原因是,kafka消费是一个长期运行的过程,需要持续的消费kafka请求数据,并且kafka需要长期对服务器进行轮询,否则就会被认为此消费者已经死亡。若发生再均衡也是在轮询的期间进行的。一般用poll()方法进行轮询,若消费者的缓冲区里没有可用数据时会发生阻塞。

2.6消费者的配置

  1. fetch.min.bytes:指定了最小的字节数
  2. fetch.amx.wait.ms:用于指定broker的等待时间,默认为500ms
  3. max.partiton.fetch.bytes:属性指定了服务器从每个分区里返回给消费者的最大字数。
  4. session.timeout.ms:指定了消费者在被认定为死亡之前可以与服务器断开连接的时间,默认为3S。若在3S内被认为已经死亡,协调器就会发生再均衡。发送心跳的时间间隔一般为session.timeout.ms的三分之一。
  5. auto.offset.reset:此属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该如何处理。它的默认值为latest(意思是在偏移量无效的情况下,消费者将从最新的记录开始读取数据)。另一个值是earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
  6. enacle.auto.commit:此属性指定了消费者是否自动提交偏移量,默认值为true。为了尽量避免出现重复数据和数据丢失,可以把它设置为false,由自己控制何时提交偏移量。
  7. partition.assignment.strategy:有两种分配策略,Range以及RoundRobin。该策略会把主题的若干个连续的分区分配给消费者。RoundRobin该策略把主题的所有分区诸葛分配给消费者。例如:消费者C1以及消费者C2同时订阅了主题T1和主题T2,且每个主题有3个分区。若使用Range策略,消费者C1可能会分配到T1以及T2的分区0以及分区1,C2可能会分配到T1以及T2的分区2。RoundRobin的策略会把主题的所有分区逐个进行分配给消费者,消费者C1可能会分配到T1的分区0和分区2以及T2的分区1;消费者C2可能会分配到T2的分区0和分区2以及T1的分区1。
  8. client.id:用来标识从客户端发送来的消息
  9. max.poll.records:用于控制单次调用call()方法能够返回的记录数量。
  10. receive.buffer.bytes和send.buffer.bytes,socket在读写数据时用到的TCP缓冲区设置大小。

2.7提交和偏移量

消费者可以使用Kafka来追踪消息在分区里的位置(偏移量)
我们把更新分区当前位置的操作叫做提交。
消费者往一个叫做consumer_offset的特殊主题发送消息,消息里包含每个分区的偏移量。若消费者一直处于运行状态,则偏移量没有用处,若消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡。完成再均衡之后,每个消费者可能会分配到新的分区,而不是之前出里的那个。这是为了继续工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。若提交的偏移量小宇客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
Alt
提交的偏移量小于客户端处理的最后一个消息的偏移量
Alt
提交的偏移量大于客户端处理的最后一个消息的偏移量
提交偏移量一般有五种方式:
自动提交
提交当前偏移量
异步提交
同步和异步组合提交
提交特定的偏移量

2.8再均衡监听器

再均衡监听器会在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多