分享

Kafka通讯协议指南

 陈永正的图书馆 2016-11-18

  中英文术语对照

为避免歧义,大部分的英文术语找不到合适中文对应时都保持英文原文,Kafka中一些基本术语也使用英文,其中一部分通过括号加入英文原文;另外,文中可能使用到的中英文术语包括但不限于:

英文 中文
Metadata 元数据
offset 偏移量
Comsumer 消费者
Comsumer Group 消费者组
Topic 主题
API 接口
Coordinator 协调器

1 简介

此文档涵盖了Kafka 0.8及之前版本的通讯协议实现。其目的是提供一个包含的可请求的协议及其二进制格式以及如何正确使用他们来实现一个客户端的通讯协议文档。本文假设您已经了解了Kafka基本的设计以及术语。

0.7和更早的版本所使用的协议与此类似,但我们(希望)通过一次性地斩断兼容性,以便清理原有设计上的沉疴,并且泛化一些概念。

如果遇到无法理解的情况,请参照英文原文

2 概述

卡夫卡协议是相当简单的,只有六个核心的客户端请求的API:

  1. 元数据(Metadata) – 描述可用的brokers,包括他们的主机和端口信息,并给出了每个broker上分别存有哪些分区;
  2. 发送(Send) – 发送消息到broker;
  3. 获取(Fetch) – 从broker获取消息,其中,一个获取数据,一个获取集群的元数据,还有一个获取topic的偏移量信息;
  4. 偏移量(Offsets) – 获取给定topic的分区的可用偏移量信息;
  5. 偏移量提交(Offset Commit) – 提交消费者组(Comsumer Group)的一组偏移量;
  6. 偏移量获取(Offset Fetch) – 获取一个消费者组的一组偏移量;

上述的API都将在下面详细说明。此外,从0.9版本开始,Kafka支持为消费者和Kafka连接进行分组管理。客户端API包括五个请求:

  1. 分组协调者(GroupCoordinator) – 用来定位一个分组当前的协调者。
  2. 加入分组(JoinGroup) – 成为某一个分组的一个成员,当分组不存在(没有一个成员时)创建分组。
  3. 同步分组(SyncGroup) – 同步分组中所有成员的状态(例如分发分区分配信息(Partition Assignments)到各个组员)。
  4. 心跳(Heartbeat) – 保持组内成员的活跃状态。
  5. 离开分组(LeaveGroup) – 直接离开一个组。

最后,有几个管理API,可用于监控/管理的卡夫卡集群(KIP-4完成时,这个列表将增长):

  1. 描述消费者组(DescribeGroups) – 用于检查一组群体的当前状态(如:查看消费者分区分配)。 
    1.列出组(ListGroups) – 列出某一个broker当前管理的所有组

3 开始

网络

Kafka使用基于TCP的二进制协议。该协议定义了所有API的请求及响应消息。所有消息都是有长度限制的,并且由后面描述的基本类型组成。

客户端启动的socket连接,并且写入请求的消息序列和读回相应的响应消息。连接和断开时均不需要握手消息。如果保持你保持长连接,那么TCP协议本身将会节省很多TCP握手时间,但如果真的重新建立连接,那么代价也相当小。

客户可能需要维持到多个broker的连接,因为数据是被分区的,而客户端需要和存储这些分区的broker服务器进行通讯。当然,一般而言,不需要为单个服务端和单个客户端间维护多个连接(即连接池技术)。

服务器的保证单一的TCP连接中,请求将被顺序处理,响应也将按该顺序返回。为保证broker的处理请求的顺序,单个连接同时也只会处理一个请求指令。请注意,客户端可以(也应该)使用非阻塞IO实现请求流水线,从而实现更高的吞吐量。也就是说,客户可以在等待上次请求应答的同时发送下个请求,因为待完成的请求将会在底层操作系统套接字缓冲区进行缓冲。除非特别说明,所有的请求是由客户端启动,并从服务器获取到相应的响应消息。

服务器能够配置请求大小的最大限制,超过这个限制将导致socket连接被断开。

分区和引导(Partitioning and bootstrapping)

Kafka是一个分区系统,所以不是所有的服务器都具有完整的数据集。主题(Topic)被分为P(预先定义的分区数量)个分区,每个分区被复制N(复制因子)份,Topic Partition根据顺序在“提交日志”中编号为0,1,…,P。

所有具有这种特性的系统都有一个如何制定某个特定数据应该被分配给哪个特定的分区的问题。Kafka中它由客户端直接控制分配策略,broker则没有特别的语义来决定消息发布到哪个分区。相反,生产者直接将消息发送到一个特定的分区,提取消息时,消费者也直接从某个特定的分区获取。如果两个生产者要使用相同的分区方案,那么他们必须用同样的方法来计算Key到分区映射关系。

这些发布或获取数据的请求必须发送到指定分区中作为leader的broker。此条件同时也会由broker保证,发送到不正确的broker的请求将会返回NotLeaderForPartition错误代码(后文所描述的)。

那么客户端如何找出哪些主题存在,他们有什么分区,以及这些分区被哪些broker存取,以便它可以直接将请求发送到所在的主机?这个信息是动态的,因此你不能只是提供每个客户端一些静态映射文件。所有的Kafka broker都可以回答这个描述集群的当前状态的数据请求:有哪些主题,这些主题都有多少分区,哪个broker是这些分区的Leader,以及这些broker主机的地址和端口信息。

换句话说,客户端只需要找到一个broker,broker将会告知客户端所有其他存在的broker,以及这些broker上面的所有分区。这个broker本身也可能会掉线,因此客户端实现的最佳做法是保存两个或三个broker地址,从而来引导列表。用户可以选择使用负载均衡器或只是静态地配置两个或三个客户的Kafka主机。

客户并不需要轮询地查看集群是否已经改变;它可以等到它接收到所用的元数据是过时的错误信息时一次性更新元数据。这中错误有两种形式:(1)一个套接字错误指示客户端不能与特定的broker进行通信,(2)请求响应表明该broker不再是其请求数据分区的Leader的错误。

  1. 轮询“起始”Kafka的URL列表,直到我们找到一个我们可以连接到的broker。获取集群元数据。
  2. 处理获取数据或者存储消息请求,根据这些请求所发送的主题和分区,将这些请求发送到合适的broker。
  3. 如果我们得到一个适当的错误(显示元数据已经过时时),刷新元数据,然后再试一次。

分区策略(Partitioning Strategies)

上面提到消息的分区分配是由生产者客户端控制,那么,为什么要把这个功能被暴露给最终用户?

在Kafka中,这样分区有两个目的:

  1. 它平衡了broker的数据和请求负载
  2. 它允许多个消费者之间处理分发消息的同时,能够维护本地状态,并且在分区中维持消息的顺序。我们称这种语义的分区(semantic partitioning)。

对于给定的使用场景下,你可能只关心其中的一个或两个。

为了实现简单的负载均衡,一个简单的策略是客户端发布消息是对所有broker进行轮询请求(round robin requests)。另一种选择,在那些生产者比消费者多的场景下,给每个客户机随机选择并发布消息到该分区。后一种的策略能够使用少得多的TCP连接。

语义分区是指使用关键字(key)来决定消息分配的分区。例如,如果你正在处理一个点击消息流时,可能需要通过用户ID来划分流,使得特定用户的所有数据会被单个消费者消费。要做到这一点,客户端可以采取与消息相关联的关键字,并使用关键字的某个Hash值来选择的传送的分区。

批处理(Batching)

我们的API鼓励将小的请求批量处理以提高效率。我们发现这能非常显著地提升性能。我们两个用来发送消息和获取消息的API,总是以一连串的消息工作,而不是单一的消息,从而鼓励批处理操作。聪明的客户端可以利用这一点,并支持“异步”操作模式,以此进行批处理哪些单独发送的消息,并把它们以较大的块进行发送。我们再进一步允许跨多个主题和分区的批处理,所以生产请求可能包含追加到许多分区的数据,一个读取请求可以一次性从多个分区提取数据的。

当然,如果他们喜欢,客户端实现者可以选择忽略这一点,所有消息一次都发送一个。

版本和兼容性(Versioning and Compatibility)

该协议的目的要达到在向后兼容的基础上渐进演化。我们的版本是基于每个API基础之上,每个版本包括一个请求和响应对。每个请求包含API Key,里面包含了被调用的API标识,以及表示这些请求和响应格式的版本号。

这样做的目的是允许客户端执行相应特定版本的请求。目标主要是为了在不允许停机的环境下进行更新,这种环境下,客户端和服务器不能一次性都切换所使用的API。

服务器将拒绝它不支持的版本的请求,并始终返回它期望收到的能够完成请求响应的版本的协议格式。预期的升级路径方式是,新功能将首先部署到服务器(老客户端无法完全利用他们的新功能),然后随着新的客户端的部署,这些新功能将逐步被利用。

目前,所有版本基线为0,当我们演进这些API时,我们将分别显示每个版本的格式。

4 通讯协议(The Protocol)

协议基本数据类型(Protocol Primitive Types)

The protocol is built out of the following primitive types. 
该协议是建立在下列基本类型之上。

  • 定长基本类型(Fixed Width Primitives) 
    int8, int16, int32, int64 – 不同精度(以bit数区分)的带符号整数,以大端(Big Endiam)方式存储.
  • 变长基本类型(Variable Length Primitives) 
    bytes, string – 这些类型由一个表示长度的带符号整数N以及后续N字节的内容组成。长度如果为-1表示空(null). string 使用int16表示长度,bytes使用int32.
  • 数组(Arrays) 
    这个类型用来处理重复的结构体数据。他们总是由一个代表元素个数int32整数N,以及后续的N个重复结构体组成,这些结构体自身是有其他的基本数据类型组成。我们后面会用BNF语法展示一个foo的结构体数组[foo]

请求格式语法要点(Notes on reading the request format grammars)

后面的BNF确切地以上下文无关的语法展示了请求和响应的二进制格式。每个API都会一起给出请求和响应,以及所有的子定义(sub-definitions)。BNF使用没有经过缩写的便于阅读的名称(比如我使用一个符号化了得名称来定义了一个生产者错误码,即便它只是int16整数)。一般在BNF中,一个序列表示一个连接,所以下面给出的MetadataRequest将是一个含有VersionId,然后clientId,然后TopicNames的阵列(每一个都有其自身的定义)。自定义类型一般使用驼峰法拼写,基本类型使用全小写方式乒协。当存在多中可能的自定义类型时,使用’|’符号分割,并且用括号表示分组。顶级定义不缩进,后续的子部分会被缩进。

一般的请求和响应格式(Common Request and Response Structure)

所有请求和响应都从以下语法起源,其余的会在本文剩下部分中进行增量描述:

  1. RequestOrResponse => Size (RequestMessage | ResponseMessage)
  2. Size => int32
域(FIELD) 描述
MessageSize MessageSize 域给出了 后续请求或响应消息的字节(bytes)长度。客户端可以先读取4字节的长度N,然后读取并解析后续的N字节请求内容。

请求(Requests)

所有请求都具有以下格式:

  1. RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
  2. ApiKey => int16
  3. ApiVersion => int16
  4. CorrelationId => int32
  5. ClientId => string
  6. RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
域(FIELD) 描述
ApiKey 这是一个表示所调用的API的数字id(即它表示是一个元数据请求?生产请求?获取请求等).
ApiVersion 这是该API的一个数字版本号。我们为每个API定义一个版本号,该版本号允许服务器根据版本号正确地解释请求内容。响应消息也始终对应于所述请求的版本的格式。目前所有API的支持版本为0。
CorrelationId 这是一个用户提供的整数。它将会被服务器原封不动地回传给客户端。用于匹配客户机和服务器之间的请求和响应。
ClientId 这是为客户端应用程序的自定义的标识。用户可以使用他们喜欢的任何标识符,他们会被用在记录错误时,监测统计信息等场景。例如,你可能不仅想要监视每秒的总体请求,还要根据客户端应用程序进行监视,那它就可以被用上(其中每一个都将驻留在多个服务器上)。这个ID作为特定的客户端对所有的请求的逻辑分组。

下面我们就来描述各种请求和响应消息。

响应(Responses)

  1. Response => CorrelationId ResponseMessage
  2. CorrelationId => int32
  3. ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
域(FIELD) 描述
CorrelationId 服务器传回给客户端它所提供用作关联请求和响应消息的整数。

所有响应都是与请求成对匹配(例如,我们将发送回一个元数据请求,会得到一个元数据响应)。

消息集(Message sets)

生产和获取消息指令请求共享同一个消息集结构。在Kafka中,消息是由一个键值对以及少量相关的元数据组成。消息集知识一个有偏移量和大小信息的消息序列。这种格式正好即可用于在broker上的磁盘上存储,也可用在线上数据交换。

消息集也是Kafka中的压缩单元,我们也允许消息递归包含压缩消息从而允许批量压缩。

注意, 在通讯协议中,消息集之前没有类似的其他数组元素的int32。

  1. MessageSet => [Offset MessageSize Message]
  2. Offset => int64
  3. MessageSize => int32

消息格式

  1. Message => Crc MagicByte Attributes Key Value
  2. Crc => int32
  3. MagicByte => int8
  4. Attributes => int8
  5. Key => bytes
  6. Value => bytes
域(FIELD) 描述
Offset 这是在Kafka中作为日志序列号使用的偏移量。当生产者发送消息,实际上它并不知道偏移量的具体值,这时候它可以填写任意值。
Crc Crc是的剩余消息字节的CRC32值。broker和消费者可用来检查信息的完整性。
MagicByte 这是一个用于允许消息二进制格式的向后兼容演化的版本id。当前值是0。
Attributes 这个字节保存有关信息的元数据属性。最低的2位包含用于消息的压缩编解码器。其他位应该被设置为0。
Key Key是一个可选项,它主要用来进行指派分区。Key可以为null.
Value Value是消息的实际内容,类型是字节数组。Kafka支持本身递归包含,因此本身也可能是一个消息集。消息可以为null。

压缩(Compression)

Kafka支持压缩多条消息以提高效率,当然,这比压缩一条原始消息要来得复杂。因为单个消息可能没有足够的冗余信息以达到良好的压缩比,压缩的多条信息必须以特殊方式批量发送(当然,如果真的需要的话,你可以自己压缩批处理的一个消息)。要被发送的消息被包装(未压缩)在一个MessageSet结构中,然后将其压缩并存储在一个单一的“消息”中,一起保存的还有相应的压缩编解码集。接收系统通过解压缩得到实际的消息集。外层MessageSet应该只包含一个压缩的“消息”(详情见Kafka-1718)。

卡夫卡目前支持一下两种压缩编解码器编号:

压缩算法(COMPRESSION) 编码器编号(CODEC)
None 0
GZIP 1
Snappy 2

接口(The APIs)

本节将给出每个API的用法、二进制格式,以及它们的字段的含义的细节。

元数据接口(Metadata API)

这个API回答下列问题:

  • 存在哪些主题(Topic)?
  • 每个主题有几个分区(Partition)?
  • 每个分区的Leader分别是哪个broker?
  • 这些broker的地址和端口分别是什么?

这是唯一一个能发往集群中任意一个broker的请求消息。

因为可能有很多主题,客户端可以给一个的可选主题名列表,以便只返回主题元数据的一个子集。

返回的元数据是在分区级别,为了方便和以避免冗余,以主题为组集中在一起。每个分区的元数据中包含了leader以及所有副本以及正在同步的副本的信息。

注意: 如果broker配置中设置了”auto.create.topics.enable”, 主题元数据请求将会以默认的复制因子和默认的分区数为参数创建主题。

主题元数据请求(Topic Metadata Request)
  1. TopicMetadataRequest => [TopicName]
  2. TopicName => string
域(FIELD) 描述
TopicName 要获取元数据的主题数组。 如果为空,就返回所有主题的元数据
元数据反馈(Metadata Response)

响应包含的每个分区的元数据,这些分区元数据以主题为组组装在一起。该元数据以broker id来指向具体的broker。每个broker有一个地址和端口。

  1. MetadataResponse => [Broker][TopicMetadata]
  2. Broker => NodeId Host Port (any number of brokers may be returned)
  3. NodeId => int32
  4. Host => string
  5. Port => int32
  6. TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
  7. TopicErrorCode => int16
  8. PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
  9. PartitionErrorCode => int16
  10. PartitionId => int32
  11. Leader => int32
  12. Replicas => [int32]
  13. Isr => [int32]
域(FIELD) 描述
Leader 该分区作为Leader节点的Kafka broker id。如果在一个Leader选举过程中,没有Leader存在,这个id将是-1。
Replicas 该分区中,其他活着的作为slave的节点集合。
Isr 副本集合中,所有处在与Leader跟随(“caught up”,表示数据已经完全复制到这些节点)状态的子集
Broker kafka broker节点的id, 主机名, 端口信息
可能的错误码(Possible Error Codes)
  • UnknownTopic (3)
  • LeaderNotAvailable (5)
  • InvalidTopic (17)
  • TopicAuthorizationFailed (29)

生产者接口(Produce API)

生产者API用于将消息集发送到服务器。为了提高效率,它允许在单个请求中发送多个不同主题的不同分区的消息。

生产者API使用通用的消息集格式,但由于发送时还没有被分配偏移量,因此可以任意填写该值。

生产消息请求(Produce Request)
  1. ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
  2. RequiredAcks => int16
  3. Timeout => int32
  4. Partition => int32
  5. MessageSetSize => int32
域(FIELD) 描述
RequiredAcks 这个值表示服务端收到多少确认后才发送反馈消息给客户端。如果设置为0,那么服务端将不发送反馈消息(这是唯一的服务端不发送反馈消息的情况)。如果这个值为1,那么服务器将等到数据写入到本地日之后发送反馈消息。如果这个值是-1,那么服务端将阻塞,知道这个消息被所有的同步副本写入后再反馈响应消息。对于其他大于1的值,服务端将会阻塞,直到收到这个数量的写入反馈后再反馈响应消息(但服务器不会等大于同步中副本的数量,即达到同步中复本个数后,会停止等待,即使所填的值大于这个副本个数)。
Timeout 这个值提供了以毫秒为单位的超时时间,服务器可以在这个时间内可以等待接收所需的Ack确认的数目。超时并非一个确切的限制,有以下原因:(1)不包括网络延迟,(2)计时器开始在这一请求的处理开始,所以如果有很多请求,由于服务器负载而导致的排队等待时间将不被包括在内,(3)如果本地写入时间超过超时,我们将不会终止本地写操作,这样这个超时时间就不会得到遵守。要使硬超时时间,客户端应该使用套接字超时。
TopicName 该数据将会发布到的主题名称
Partition 该数据将会发布到的分区
MessageSetSize 后续消息集的长度,字节为单位
MessageSet 上面描述的标准格式的消息集合
生产消息响应(Produce Response)
  1. ProduceResponse => [TopicName [Partition ErrorCode Offset]]
  2. TopicName => string
  3. Partition => int32
  4. ErrorCode => int16
  5. Offset => int64
描述
Topic 此响应对应的主题。
Partition 此响应对应的分区。
ErrorCode 如果有,此分区对应的错误信息。错误以分区为单位提供,因为可能存在给定的分区不可用或者被其他的主机维护(非Leader),但是其他的分区的请求操作成功的情况
Offset 追加到该分区的消息集中的分配给第一个消息的偏移量。
可能的错误码(Possible Error Codes):(未完待续 TODO)

获取消息接口(Fetch API)

获取消息接口用于获取一些主题分区的一个或多个的日志块。逻辑上根据指定主题,分区和消息起始偏移量开始获取一批消息。在一般情况下,返回消息的偏移量将大于或等于开始偏移量。然而,如果是压缩消息,有可能返回的消息的偏移量比起始偏移量小。这类的消息的数量通常较少,并且调用者必须负责过滤掉这些消息。

获取数据指令请求遵循一个长轮询模型,如果没有足够数量的消息可用,它们可以阻塞一段时间。

作为优化,服务器被允许在消息集的末尾返回部分消息。客户应处理这种情况。

有一点要注意的是,获取消息API需要指定消费的分区。现在的问题是如何让消费者知道消费哪个分区?特别地,作为一组消费者,如何使得每个消费者获取分区的一个子集,并且平衡这些分区。我们使用zookeeper动态地为Scala和Java客户端完成这个任务。这种方法的缺点是,它需要一个相当胖的客户端并且需要客户端与zookeeper联系。我们尚未创建一个Kafka接口(API),允许该功能被移动到在服务器端并被更方便地访问。一个简单的消费者的客户端可以通过配置指定访问的分区,但这样将不能在某些消费者失效后做到分区的动态重新分配。我们希望能在下一个主要版本解决这一空白。

数据获取请求(Fetch Request)
  1. FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
  2. ReplicaId => int32
  3. MaxWaitTime => int32
  4. MinBytes => int32
  5. TopicName => string
  6. Partition => int32
  7. FetchOffset => int64
  8. MaxBytes => int32
描述
ReplicaId 副本ID的是发起这个请求的副本节点ID。普通消费者客户端应该始终将其指定为-1,因为他们没有节点ID。其他broker设置他们自己的节点ID。基于调试目的,以非代理身份模拟副本broker发出获取数据指令请求时,这个值填-2。
MaxWaitTime 如果没有足够的数据可发送时,最大阻塞等待时间,以毫秒为单位。
MinBytes 返回响应消息的最小字节数目,必须设置。如果客户端将此值设为0,服务器将会立即返回,但如果没有新的数据,服务端会返回一个空消息集。如果它被设置为1,则服务器将在至少一个分区收到一个字节的数据的情况下立即返回,或者等到超时时间达到。通过设置较高的值,结合超时设置,消费者可以在牺牲一点实时性能的情况下通过一次读取较大的字节的数据块从而提高的吞吐量(例如,设置MaxWaitTime至100毫秒,设置MinBytes为64K,将允许服务器累积数据达到64K前等待长达100ms再响应)。
TopicName 主题(topic)名称
Partition 获取数据的Partition id
FetchOffset 获取数据的起始偏移量
MaxBytes 此分区返回消息集所能包含的最大字节数。这有助于限制响应消息的大小。
获取消息响应(Fetch Response)
  1. FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  2. TopicName => string
  3. Partition => int32
  4. ErrorCode => int16
  5. HighwaterMarkOffset => int64
  6. MessageSetSize => int32
描述
TopicName 返回消息所对应的主题(Topic)名称。
Partition 返回消息所对应的分区id。
HighwaterMarkOffset 此分区日志中最末尾的偏移量。此信息可被客户端用来确定后面还有多少条消息。
MessageSetSize 此分区中消息集的字节长度
MessageSet 此分区获取到的消息集,格式与之前描述相同
可能的错误码(Possible Error Codes)
  • OFFSET_OUT_OF_RANGE (1)
  • UNKNOWN_TOPIC_OR_PARTITION (3)
  • NOT_LEADER_FOR_PARTITION (6)
  • REPLICA_NOT_AVAILABLE (9)
  • UNKNOWN (-1)

偏移量接口(又称ListOffset)(Offset API)

此API描述了一个主题分区的偏移量有效范围。生产者和获取数据API的请求必须发送到分区Leader所在的broker上,这需要通过使用元数据的API来确定。

响应包含分区的起始偏移量以及“日志末端偏移量”,即,将被追加到给定分区中的下一个消息的偏移量。

我们也觉得这个API是稍微有点时髦。

偏移量请求(Offset Request)
  1. OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
  2. ReplicaId => int32
  3. TopicName => string
  4. Partition => int32
  5. Time => int64
  6. MaxNumberOfOffsets => int32
描述
Time 用来请求一定时间(毫秒)前的所有消息。这里有两个特殊取值:-1表示获取最后一个offset(也就是后面即将到来消息的offset值); -2表示获取最早的有效偏移量。注意,因为获取到偏移值都是降序排序,因此请求最早Offset的请求将总是返回一个值
偏移量响应(Offset Response)
  1. OffsetResponse => [TopicName [PartitionOffsets]]
  2. PartitionOffsets => Partition ErrorCode [Offset]
  3. Partition => int32
  4. ErrorCode => int16
  5. Offset => int64
可能的错误吗(Possible Error Codes)
    • UNKNOWN_TOPIC_OR_PARTITION (3)
  • NOT_LEADER_FOR_PARTITION (6)
  • UNKNOWN (-1)

偏移量提交/获取接口(Offset Commit/Fetch API)

这些API使得偏移量的能够集中管理。了解更多偏移量管理。按照Kafka-993的评论,直到Kafka 0.8.1.1,这些API调用无法完全正常使用,他们这将在0.8.2版本中提供。

消费者组协调员请求(Group Coordinator Request)

消费者组(Consumer Group)偏移量信息,由一个特定的broker维护,这个broker称为消费者组协调员。即消费者需要向从这个特定的broker提交和获取偏移量。可以通过发出一组协调员发现请求从而获得当前协调员信息。

  1. GroupCoordinatorRequest => GroupId
  2. GroupId => string
消费者组协调员响应(Group Coordinator Response)
  1. GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort
  2. ErrorCode => int16
  3. CoordinatorId => int32
  4. CoordinatorHost => string
  5. CoordinatorPort => int32
可能的错误码(Possible Error Codes)
  • GROUP_COORDINATOR_NOT_AVAILABLE (15)
  • NOT_COORDINATOR_FOR_GROUP (16)
  • GROUP_AUTHORIZATION_FAILED (30)

偏移量提交请求(Offset Commit Request)

  1. v0 (在0.8.1及之后的版本中支持)
  2. OffsetCommitRequest => ConsumerGroupId [TopicName [Partition Offset Metadata]]
  3. ConsumerGroupId => string
  4. TopicName => string
  5. Partition => int32
  6. Offset => int64
  7. Metadata => string
  8. v1 (在0.8.2及之后的版本中支持)
  9. OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
  10. ConsumerGroupId => string
  11. ConsumerGroupGenerationId => int32
  12. ConsumerId => string
  13. TopicName => string
  14. Partition => int32
  15. Offset => int64
  16. TimeStamp => int64
  17. Metadata => string
  18. v2 (在0.8.3及之后的版本中支持)
  19. OffsetCommitRequest => ConsumerGroup ConsumerGroupGenerationId ConsumerId RetentionTime [TopicName [Partition Offset Metadata]]
  20. ConsumerGroupId => string
  21. ConsumerGroupGenerationId => int32
  22. ConsumerId => string
  23. RetentionTime => int64
  24. TopicName => string
  25. Partition => int32
  26. Offset => int64
  27. Metadata => string

在V0和v1版本中,每个分区的时间戳作为提交时间戳定义,偏移量协调员将保存消费者所提交的偏移量,直到当前时间超过提交时间戳+偏移量保留时间,此偏移量保留时间在broker配置中指定;如果时间错域没有设值,那么broker会将此值设定为接收到提交偏移量请求的时间,用户可以通过设置这个提交时间戳达到延长偏移量保存时间的目的。

在v2版本中,我们移除了时间戳域,但是增加了一个全局保存时间域(详情参见KAFKA-1634);broker会设置提交时间戳为接收到请求的时间,但是提交的偏移量能被保存到提交请求中用户指定的保存时间,如果这个保存时间没有设值,那么broker会使用默认的保存时间。

偏移量提交响应(Offset Commit Response)
  1. v0, v1 and v2:
  2. OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
  3. TopicName => string
  4. Partition => int32
  5. ErrorCode => int16
可能的错误码(Possible Error Codes)
  • OFFSET_METADATA_TOO_LARGE (12)
  • GROUP_LOAD_IN_PROGRESS (14)
  • GROUP_COORDINATOR_NOT_AVAILABLE (15)
  • NOT_COORDINATOR_FOR_GROUP (16)
  • ILLEGAL_GENERATION (22)
  • UNKNOWN_MEMBER_ID (25)
  • REBALANCE_IN_PROGRESS (27)
  • INVALID_COMMIT_OFFSET_SIZE (28)
  • TOPIC_AUTHORIZATION_FAILED (29)
  • GROUP_AUTHORIZATION_FAILED (30)

偏移量获取请求(Offset Fetch Request)

根据KAFKA-1841的注释,V0和V1是在上是相同的,但V0(0.8.1或更高版本支持)从zookeeper读取的偏移量,而V1(0.8.2或更高版本支持)从卡夫卡读偏移。

  1. OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
  2. ConsumerGroup => string
  3. TopicName => string
  4. Partition => int32
偏移量获取响应(Offset Fetch Response)
  1. OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
  2. TopicName => string
  3. Partition => int32
  4. Offset => int64
  5. Metadata => string
  6. ErrorCode => int16

请注意,消费者组下一个主题的分区如果没有偏移量,broker不会设定一个错误码(因为它不是一个真正的错误),但会返回空的元数据并将偏移字段为-1。

可能的错误码(Possible Error Codes)
  • UNKNOWN_TOPIC_OR_PARTITION (3) <- 只在v0版本的请求中出现
  • GROUP_LOAD_IN_PROGRESS (14)
  • NOT_COORDINATOR_FOR_GROUP (16)
  • ILLEGAL_GENERATION (22)
  • UNKNOWN_MEMBER_ID (25)
  • TOPIC_AUTHORIZATION_FAILED (29)
  • GROUP_AUTHORIZATION_FAILED (30)

组籍管理接口(Group Membership API)

这些请求用于客户端参加卡夫卡所管理的消费者组。从更高层次上看,集群中每个消费者组都会分配一个broker(及消费者组协调员),以简化消费者组管理。一旦得到了组协调员地址(使用上面的消费者组协调员请求),组成员可以加入该组,同步状态,然后用心跳消息保持在组中的活跃状态。当客户端关闭时,它会使用离开组请求从消费者组中注销。此协议的语义在Kafka客户端分配协议中有详细描述。

组建管理接口的主要使用场景是消费者组,但这些请求也尽量设计得一般化以便支持其他应用场景(例如,Kafka Connect组)。这种设计的带来的代价就是是一些特定的组语义(group semantics)被推到了客户端实现。例如,下面定义的JoinGroup和SyncGroup请求无明确定义的字段以支持消费者组分区分配。相反,它们在其中包含有一些通用的字节数组(byte arrays),用这些字节数组就可以使得分区分配切入在消费者客户端实现。

但是,虽然这种实现允许每个客户端来实现来定义分区方案,但是Kafka工具的兼容性要求这些客户端使用Kafka客户端使用的标准方案。例如,consumer-groups.sh这个应用程序会假定用这种格式来显示分区分配。因此,我们建议客户遵循相同的模式,使这些工具对所有客户端实现都可以正常工作。

加入组请求(Join Group Request)

加入组请求用于让客户端成为组的成员。当新成员加入一个现有组,之前加入大的所有的会员必须通过发送一个新加入组的要求来重新入组。当成员第一次加入该组,成员编号将是空的(即“”),但重新加入的成员都应该使用与之前生成的相同的会员ID。

  1. JoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols
  2. GroupId => string
  3. SessionTimeout => int32
  4. MemberId => string
  5. ProtocolType => string
  6. GroupProtocols => [ProtocolName ProtocolMetadata]
  7. ProtocolName => string
  8. ProtocolMetadata => bytes

ProtocolType字段定义了该组实现的嵌入协议。组协调器确保该组中的所有成员都支持相同的协议类型。组中包含的协议(GroupProtocols)字段中的协议名称和元数据的含义取决于协议类型。请注意,加入群请求允许多协议/元数据对。这使得滚动升级时无需停机。协调器会选择所有成员支持的一种协议,升级后的成员既包括新版本和老版本的协议,一旦所有成员都升级,协调器将选择列在数组中最前面的组协议(GroupProtocol)。

消费者组: 下文我们定义了消费者组使用的嵌入协议。我们建议所有消费者客户端实现遵循这个格式,以便Kafka工具能够对所有的客户端正常工作

  1. ProtocolType => "consumer"
  2. ProtocolName => AssignmentStrategy
  3. AssignmentStrategy => string
  4. ProtocolMetadata => Version Subscription UserData
  5. Version => int16
  6. Subscription => [Topic]
  7. Topic => string
  8. UserData => bytes

UserData域的可以用来自定义分配策略。例如,在一个粘性分区策略实现中,这个字段可以包含之前的分配。在基于资源的分配策略,也可以包括每个运行消费者主机上的CPU个数等信息。

Kafka Connect使用“connect”的协议类型,和协议细节也是基于Connect的内部实现。

加入组响应(Join Group Response)

接收到来自该组中的所有成员组的加入组请求后,协调器将选择一个成员作为Leader,并且选择所有成员支持的协议。Leader将收到会员的完整列表与选择的协议相关的元数据。其他追随者成员,会收到一个空会员数组。Leader需要检查每个成员的元数据,并且使用下文中描述的SyncGroup请求来分配状态。

一旦加加入组阶段完成,协调器会增加该组的GenerationId,这个Id是发送给每个成员的响应中的一个域,同时也会在心跳和偏移量提交请求中。当协调器重新平衡(rebalance)了一个组,协调器将发送一个错误码,表示客户端成员需要重新加入组。如果重新平衡完成前成员未重入组(rejoin),那么它将有一个旧generationId,在新的请求使用这个旧Id时,这将导致ILLEGAL_GENERATION错误。

  1. JoinGroupResponse => ErrorCode GenerationId GroupProtocol LeaderId MemberId Members
  2. ErrorCode => int16
  3. GenerationId => int32
  4. GroupProtocol => string
  5. LeaderId => string
  6. MemberId => string
  7. Members => [MemberId MemberMetadata]
  8. MemberId => string
  9. MemberMetadata => bytes

消费者组: 协调器负责选择所有成员都兼容协议(即分区分配策略),Leader是实际执行分配的成员,加入群请求可以包含多个分配策略,从而支持现有版本升级或者更改不同的分配策略。

可能的错误码(Possible Error Codes):
  • GROUP_LOAD_IN_PROGRESS (14)
  • GROUP_COORDINATOR_NOT_AVAILABLE (15)
  • NOT_COORDINATOR_FOR_GROUP (16)
  • INCONSISTENT_GROUP_PROTOCOL (23)
  • UNKNOWN_MEMBER_ID (25)
  • INVALID_SESSION_TIMEOUT (26)
  • GROUP_AUTHORIZATION_FAILED (30)
同步组请求(SyncGroup Request)

组长(group leader)使用同步组请求用来向当前组中的所有成员进行状态分配(例如分区分配)。所有成员加入该组后,立即发送SyncGroup,但只有Leader承担这个工作。

  1. SyncGroupRequest => GroupId GenerationId MemberId GroupAssignment
  2. GroupId => string
  3. GenerationId => int32
  4. MemberId => string
  5. GroupAssignment => [MemberId MemberAssignment]
  6. MemberId => string
  7. MemberAssignment => bytes

消费者组: 消费则组中MemberAssignment域的格式如下:

  1. MemberAssignment => Version PartitionAssignment
  2. Version => int16
  3. PartitionAssignment => [Topic [Partition]]
  4. Topic => string
  5. Partition => int32
  6. UserData => bytes

所有了“consumer”协议类型的客户端实现都需要支持这个方案

同步组响应(Sync Group Response)

组中的每个成员都会接收到leader发出的sync group 指令 
Each member in the group will receive the assignment from the leader in the sync group response.

  1. SyncGroupResponse => ErrorCode MemberAssignment
  2. ErrorCode => int16
  3. MemberAssignment => bytes
可能的错误代码(Possible Error Codes):
  • GROUP_COORDINATOR_NOT_AVAILABLE (15)
  • NOT_COORDINATOR_FOR_GROUP (16)
  • ILLEGAL_GENERATION (22)
  • UNKNOWN_MEMBER_ID (25)
  • REBALANCE_IN_PROGRESS (27)
  • GROUP_AUTHORIZATION_FAILED (30)
心跳请求(Heartbeat Request)

每当一个成员加入并同步完成,他将开始发送心跳请求使自己留在组里。当协调器在配置的会话超时时间内没有他的收到心跳请求,该成员会被踢出该组。

  1. HeartbeatRequest => GroupId GenerationId MemberId
  2. GroupId => string
  3. GenerationId => int32
  4. MemberId => string
心跳响应(Heartbeat Response)
  1. HeartbeatResponse => ErrorCode
  2. ErrorCode => int16
可能的错误码(Possible Error Codes):
  • GROUP_COORDINATOR_NOT_AVAILABLE (15)
  • NOT_COORDINATOR_FOR_GROUP (16)
  • ILLEGAL_GENERATION (22)
  • UNKNOWN_MEMBER_ID (25)
  • REBALANCE_IN_PROGRESS (27)
  • GROUP_AUTHORIZATION_FAILED (30)
退组请求(LeaveGroup Request)

当想要离开组群时,用户可以发送一个退组请求。这优先于会话超时,因为它能使该组快速再平衡,这对于消费者而言这意味着可以用更短的时间将分区分配到一个活动的成员。

  1. LeaveGroupRequest => GroupId MemberId
  2. GroupId => string
  3. MemberId => string
  4. LeaveGroup Response
  5. LeaveGroupResponse => ErrorCode
  6. ErrorCode => int16
可能的错误代码(Possible Error Codes):
  • GROUP_LOAD_IN_PROGRESS (14)
  • CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
  • NOT_COORDINATOR_FOR_CONSUMER (16)
  • UNKNOWN_CONSUMER_ID (25)
  • GROUP_AUTHORIZATION_FAILED (30)

管理接口(Administrative API)

组列表请求(ListGroups Request)

该API可用于找到当前被broker管理的组群。为了得到集群内的所有组列表,你必须向所有broker发送组列表请求。

  1. ListGroupsRequest =>
  2. ListGroups Response
  3. ListGroupsResponse => ErrorCode Groups
  4. ErrorCode => int16
  5. Groups => [GroupId ProtocolType]
  6. GroupId => string
  7. ProtocolType => string
可能的错误代码(Possible Error Codes):
  • GROUP_COORDINATOR_NOT_AVAILABLE (15)
  • AUTHORIZATION_FAILED (29)
组明细请求(DescribeGroups Request)
  1. DescribeGroupsRequest => [GroupId]
  2. GroupId => string
组明细反馈(DescribeGroups Response)
  1. DescribeGroupsResponse => [ErrorCode GroupId State ProtocolType Protocol Members]
  2. ErrorCode => int16
  3. GroupId => string
  4. State => string
  5. ProtocolType => string
  6. Protocol => string
  7. Members => [MemberId ClientId ClientHost MemberMetadata MemberAssignment]
  8. MemberId => string
  9. ClientId => string
  10. ClientHost => string
  11. MemberMetadata => bytes
  12. MemberAssignment => bytes
可能的错误码(Possible Error Codes):
  • GROUP_LOAD_IN_PROGRESS (14)
  • GROUP_COORDINATOR_NOT_AVAILABLE (15)
  • NOT_COORDINATOR_FOR_GROUP (16)
  • AUTHORIZATION_FAILED (29)

常量(Constants)

接口关键字(Api Keys)

下面是请求中ApiKey的数字值,用来表示上面所述的请求类型。

接口名称(API NAME) APIKEY值
ProduceRequest 0
FetchRequest 1
OffsetRequest 2
MetadataRequest 3
Non-user facing control APIs 4-7
OffsetCommitRequest 8
OffsetFetchRequest 9
GroupCoordinatorRequest 10
JoinGroupRequest 11
HeartbeatRequest 12
LeaveGroupRequest 13
SyncGroupRequest 14
DescribeGroupsRequest 15
ListGroupsRequest 16

错误代码(Error Codes)

我们用数字代码表示服务器发生的问题。这些可以由客户端转换成客户端中的异常(Exceptions)或者其他任何适当的错误处理机制。这里是当前正在使用的错误代码表:

错误名称(ERROR) 编码(CODE) 是否可重试(RETRIABLE) DESCRIPTION 描述
NoError 0 No error–it worked! 没有错误
Unknown -1 An unexpected server error 服务器未知错误
OffsetOutOfRange 1 The requested offset is outside the range of offsets maintained by the server for the given topic/partition. 请求的偏移量超过服务器维护的主题分区的偏移量。
InvalidMessage / CorruptMessage 2 Yes This indicates that a message contents does not match its CRC 这个错误表示消息的内容与它的CRC校验码不符合。
UnknownTopicOrPartition 3 Yes This request is for a topic or partition that does not exist on this broker. broker上不存在所请求的主题或者分区。
InvalidMessageSize 4 The message has a negative size 消息长度为负数。
LeaderNotAvailable 5 Yes This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes. 这个错误会在leader选举之间抛出,一样那位此时这个分区没有leader因此不能被写入。
NotLeaderForPartition 6 Yes This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date. 这个错误表示客户端正在把消息发送给副本,而不是分区的leader。这说明客户端的元数据已经过期。
RequestTimedOut 7 Yes This error is thrown if the request exceeds the user-specified time limit in the request. 当这个请求超过了用户自定义的请求时间限制抛出此错误
BrokerNotAvailable 8 This is not a client facing error and is used mostly by tools when a broker is not alive. 这个不是客户端所能接受到的错误,一般被工具用在broker没有活动的场合。
ReplicaNotAvailable 9 If replica is expected on a broker, but is not (this can be safely ignored). 当broker希望有副本而实际上并没有时抛出(这个错误可以被安全地忽略)。
MessageSizeTooLarge 10 The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum. 当服务器配置了一个最大消息长度以避免无限制的内存分配时,客户端产生了一个超过这个最大值的消息会抛出此错误。
StaleControllerEpochCode 11 Internal error code for broker-to-broker communication. broker之间内部通讯是的错误。
OffsetMetadataTooLargeCode 12 If you specify a string larger than configured maximum for offset metadata 如果你赋了一个超过所配置的最大偏移量元数据的字符串时触发。
GroupLoadInProgressCode 14 Yes The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition), or in response to group membership requests (such as heartbeats) when group metadata is being loaded by the coordinator. broker会在以下情况下返回这个错误:当broker人在加载偏移量时(主题分区的leader发生变化后)请求偏移量获取请求;或者正在反馈组成员请求(比如心跳)时,组的元数据正在被协调器加载。
GroupCoordinatorNotAvailableCode 15 Yes The broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active. 组协调器请求,偏移量提交和大部分组管理请求时,偏移量主题还没有被建立或者组协调器还没有激活是broker会返回此错误。
NotCoordinatorForGroupCode 16 Yes The broker returns this error code if it receives an offset fetch or commit request for a group that it is not a coordinator for. 非该组协调器的broker接收到一个偏移量获取或提交请求时返回此错误。
InvalidTopicCode 17 For a request which attempts to access an invalid topic (e.g. one which has an illegal name), or if an attempt is made to write to an internal topic (such as the consumer offsets topic). 请求指令尝试访问一个非法的主题(例如,一个包含非法名称的主题),或者尝试写入一个内部主题(例如消费者偏移量主题)。
RecordListTooLargeCode 18 If a message batch in a produce request exceeds the maximum configured segment size. 批处理消息片段数组的长度超过了配置的最大消息片段数。
NotEnoughReplicasCode 19 Yes Returned from a produce request when the number of in-sync replicas is lower than the configured minimum and requiredAcks is -1. 当同步中的副本数量小于配置的最小数量,并且requiredAcks设置为-1时返回此错误
NotEnoughReplicasAfterAppendCode 20 Yes Returned from a produce request when the message was written to the log, but with fewer in-sync replicas than required. 消息已经写入日志文件,但是同步中的副本数量比请求中要求的数量少时返回此错误码
InvalidRequiredAcksCode 21 Returned from a produce request if the requested requiredAcks is invalid (anything other than -1, 1, or 0). 请求的requiredAcks非法(任何非-1,1或者0)时返回此错误码。
IllegalGenerationCode 22 Returned from group membership requests (such as heartbeats) when the generation id provided in the request is not the current generation. 组籍管理请求(诸如心跳请求)时generation id不是与当前不一致时返回此错误码
InconsistentGroupProtocolCode 23 Returned in join group when the member provides a protocol type or set of protocols which is not compatible with the current group. 加入组请求时成员提供的协议类型或者协议类型组与当前组不兼容时返回。
InvalidGroupIdCode 24 Returned in join group when the groupId is empty or null. 加入组请求时groupId为空或者null是返回。
UnknownMemberIdCode 25 Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation. 组请求(偏移量提交/获取,心跳等)时memberId不在当前的generation。
InvalidSessionTimeoutCode 26 Return in join group when the requested session timeout is outside of the allowed range on the broker 加入组请求时请求的会话超时超过broker允许的限制。
RebalanceInProgressCode 27 Returned in heartbeat requests when the coordinator has begun rebalancing the group. This indicates to the client that it should rejoin the group. 心跳请求时协调器已经开始了组的再平衡,这意味着客户端必须重新加入组。
InvalidCommitOffsetSizeCode 28 This error indicates that an offset commit was rejected because of oversize metadata. 这个错意味着偏移量提交因为超过了元数据大小而被拒绝。
TopicAuthorizationFailedCode 29 Returned by the broker when the client is not authorized to access the requested topic. 客户端没有访问请求主题的权限时,broker返回此错误。
GroupAuthorizationFailedCode 30 Returned by the broker when the client is not authorized to access a particular groupId. 客户端没有访问特定groupId的权限时,broker返回此错误。
ClusterAuthorizationFailedCode 31 Returned by the broker when the client is not authorized to use an inter-broker or administrative API. 客户端没有权限访问broker之间的接口或者管理接口时,broker返回此错误。

5 一些常见的哲学问题(Some Common Philosophical Questions)

有些人问,为什么我们不使用HTTP。有许多原因,最主要的是客户端实现可以使用一些更高级的TCP特性–请求的多工(multiplex)能力(译者注:同一个TCP连接中同时发送多个请求,http长连接必须等到前一次请求结束才能发送后一个请求,否则需要多个http连接),同时轮询多个连接的能力,等等。我们还发现HTTP库在许多编程语言中非常是出奇地破旧(shabby -_-!)。

还有人问,也许我们可以支持许多不同的协议。此前的经验是,多协议支持的是很难添加和测试新功能,因为他们要被移植到许多协议实现中。我们感觉,大多数用户并不在乎支持多个协议这些特性,他们只是希望在自己选择的语言中实现了良好可靠的客户端。

另一个问题是,为什么我们不采用XMPP,STOMP,AMQP或现有的协议。这个问题的不同协议有不同答案,但在共通的问题是,这些协议的确确定了大部分实现,但如果我们没有协议的控制权,我们就实现不了我们的功能。我们相信,我们可以实现比现有消息系统更好的真正的分布式消息系统,但要做到这一点,我们需要建立不同的工作模式。

最后一个问题是,为什么我们不使用的Protocol Buffers或Thrift来定义我们的请求消息格式。这些库擅长帮助您管理非常多的序列化的消息。然而,我们只有几个消息。而且这些库跨语言的支持是有点参差不齐(取决于软件包)。最后,我们颇为谨慎地管理二进制日志格式和传输协议之间的映射,而用如果使用这些系统将变得不太可能。最后,我们比较喜欢让API有明确的版本并且通过检查版来引入原本为空的新值,因为它能更细致地控制兼容性。 


本文链接 http://tec./detail/105d0n2i586mj8s11.html

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多