




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
Kafka中英文术语比照为避开歧义,大局部的英文术语找不到适宜中文对应时都保持英文原文,Kafka中一些根本术语也使用英文,其中一局部通过括号参与英文原文;另外,文中可能使用到的中英文术语包括但不限于:MetadataoffsetComsumer
ComsumerGroupTopic 主题API 接口Coordinator 协调器简介此文档涵盖了Kafka0.8一个包含的可恳求的协议及其二进制格式以及如何正确使用他们来实现一个客户端的通讯协议文档。本文假设您已经了解了Kafka根本的设计以及术语。0.7和更早的版本所使用的协议与此类似,但我们〔期望〕通过一次性地斩断兼容性,以便清理原有设计上的沉疴,并且泛化一些概念。假设遇到无法理解的状况,请参照英文原文概述卡夫卡协议是相当简洁的,只有六个核心的客户端恳求的API:元数据〔Metadata〕–描述可用的brokers,包括他们的主机broker发送〔Send〕–发送消息到broker;猎取〔Fetch〕–从brokertopic偏移量〔Offsets〕–猎取给定topic的分区的可用偏移量信息;偏移量提交〔OffsetCommit〕–提交消费者组〔ComsumerGroup〕的一组偏移量;偏移量猎取〔OffsetFetch〕–猎取一个消费者组的一组偏移量;上述的API都将在下面具体说明。此外,从0.9版本开头,Kafka支持为消费者和Kafka连接进展分组治理。客户端API包括五个恳求:分组协调者〔GroupCoordinator〕–用来定位一个分组当前的协调者。参与分组〔JoinGroup〕–成为某一个分组的一个成员,当分组不存在〔没有一个成员时〕创立分组。同步分组〔SyncGroup〕–同步分组中全部成员的状态〔例如分发分区安排信息(PartitionAssignments)到各个组员〕。心跳〔Heartbeat〕–保持组内成员的活泼状态。离开分组〔LeaveGroup〕–直接离开一个组。最终,有几个治理API,可用于监控/治理的卡夫卡集群〔KIP-4完成时,这个列表将增长〕:描述消费者组〔DescribeGroups〕–用于检查一组群体的当前状态〔如:查看消费者分区安排〕。1.列出组〔ListGroups〕–列出某一个broker组开头网络Kafka使用基于TCP的二进制协议。该协议定义了全部API的恳求及响应消息。全部消息都是有长度限制的,并且由后面描述的根本类型组成。客户端启动的socket连接,并且写入恳求的消息序列和读回相应的响应消息。连接和断开时均不需要握手消息。假设保持你保持长连接,那么TCP协议本身将会节约很多TCP握手时间,但假设真的重建立连接,那么代价也相当小。客户可能需要维持到多个broker的连接,由于数据是被分区的,而客户端需要和存储这些分区的broker效劳器进展通讯。固然,一般而言,不需要为单个效劳端和单个客户端间维护多个连接〔即连接池技术〕。效劳器的保证单一的TCP连接中,恳求将被挨次处理,响应也将按该挨次返回。为保证broker的处理恳求的挨次,单个连接同时也只会处理一个恳求指令。请留意,客户端可以〔也应当〕使用非堵塞IO实现恳求流水线,从而实现更高的吞吐量。也就是说,客户可以在等待上次恳求应答的同时发送下个恳求,由于待完成的恳求将会在底层操作系统套接字缓冲区进展缓冲。除非特别说明,全部的恳求是由客户端启动,并从效劳器猎取到相应的响应消息。效劳器能够配置恳求大小的最大限制,超过这个限制将导致socket分区和引导〔Partitioningandbootstrapping〕Kafka是一个分区系统,所以不是全部的效劳器都具有完整的数据集。主题(Topic)被分为P〔预先定义的分区数量〕个分区,每个分区被复制N〔复制因子〕份,TopicPartition依据挨次在“提交日志”0,1,…,P。全部具有这种特性的系统都有一个如何制定某个特定数据应当被安排给哪个特定的分区的问题。Kafka中它由客户端直接把握安排策略,broker则没有特别的语义来打算消息公布到哪个分区。相反,生产者直接将消息发送到一个特定的分区,提取消息时,消费者也直接从某个特定的分区猎取。假设两个生产者要使用一样的分区方案,那么他Key这些公布或猎取数据的恳求必需发送到指定分区中作为leader的broker。此条件同时也会由broker保证,发送到不正确的broker的恳求将会返回NotLeaderForPartition错误代码〔后文所描述的〕。那么客户端如何找出哪些主题存在,他们有什么分区,以及这些分区被哪些broker存取,以便它可以直接将恳求发送到所在的主机?这个信息是动态的,因此你不能只是供给每个客户端一些静态映射文件。全部的Kafkabroker都可以答复这个描述集群的当前状态的数据恳求:有哪些主题,这些主题都有多少分区,哪个broker是这些分区Leaderbroker换句话说,客户端只需要找到一个broker,broker将会告知客户端全部其他存在的broker,以及这些broker上面的全局部区。这个broker本身也可能会掉线,因此客户端实现的最正确做法是保存两个或三个broker地址,从而来引导列表。用户可以选择使用负载均衡器或Kafka客户并不需要轮询地查看集群是否已经转变;它可以等到它接收到所用的元数据是过时的错误信息时一次性更元数据。这中错误有两种形式:〔1〕一个套接字错误指示客户端不能与特定的broker进展通信,〔2〕恳求响应说明该 broker不再是其恳求数据分区的Leader的错误。轮询“起始”Kafka的URL列表,直到我们找到一个我们可以broker。猎取集群元数据。处理猎取数据或者存储消息恳求,依据这些恳求所发送的主题broker。假设我们得到一个适当的错误(显示元数据已经过时时),刷元数据,然后再试一次。分区策略〔PartitioningStrategies〕上面提到消息的分区安排是由生产者客户端把握,那么,为什么要把这个功能被暴露给最终用户?Kafka它平衡了broker的数据和恳求负载它允很多个消费者之间处理分发消息的同时,能够维护本地状态,并且在分区中维持消息的挨次。我们称这种语义的分区〔semanticpartitioning〕。对于给定的使用场景下,你可能只关心其中的一个或两个。为了实现简洁的负载均衡,一个简洁的策略是客户端公布消息是对全部broker进展轮询恳求(roundrobinrequests)。另一种选择,在那些生产者比消费者多的场景下,给每个客户机随机选择并公布消息到该分区。后一种的策略能够使用少得多的TCP语义分区是指使用关键字(key)来打算消息安排的分区。例如,假设你正在处理一个点击消息流时,可能需要通过用户ID来划分流,使得特定用户的全部数据会被单个消费者消费。要做到这一点,客户端可以实行与消息相关联的关键字,并使用关键字的某个Hash值来选择的传送的分区。批处理〔Batching〕我们的API鼓舞将小的恳求批量处理以提高效率。我们觉察这能格外显著地提升性能。我们两个用来发送消息和猎取消息的API,总是以一连串的消息工作,而不是单一的消息,从而鼓舞批处理操作。聪明的客户端可以利用这一点,并支持“异步”操作模式,以此进展批处理哪些单独发送的消息,并把它们以较大的块进展发送。我们再进一步允许跨多个主题和分区的批处理,所以生产恳求可能包含追加到很多分区的数据,一个读取恳求可以一次性从多个分区提取数据的。固然,假设他们宠爱,客户端实现者可以选择无视这一点,全部消息一次都发送一个。版本和兼容性〔VersioningandCompatibility〕该协议的目的要到达在向后兼容的根底上渐进演化。我们的版本是基于每个API根底之上,每个版本包括一个恳求和响应对。每个恳求APIKey,里面包含了被调用的API标识,以及表示这些恳求和响应格式的版本号。这样做的目的是允许客户端执行相应特定版本的恳求。目标主要是为了在不允许停机的环境下进展更,这种环境下,客户端和效劳API。效劳器将拒绝它不支持的版本的恳求,并始终返回它期望收到的能够完成恳求响应的版本的协议格式。预期的升级路径方式是,功能将首先部署到效劳器〔老客户端无法完全利用他们的功能〕,然后随着的客户端的部署,这些功能将逐步被利用。目前,全部版本基线为0,当我们演进这些API时,我们将分别显示每个版本的格式。通讯协议〔TheProtocol〕协议根本数据类型〔ProtocolPrimitiveTypes〕Theprotocolisbuiltoutofthefollowingprimitivetypes.该协议是建立在以下根本类型之上。定长根本类型〔FixedWidthPrimitives〕int8,int16,int32,int64–不同精度(以bit数区分)的带符号整数,以大端〔BigEndiam〕方式存储.变长根本类型〔VariableLengthPrimitives〕bytes,string–这些类型由一个表示长度的带符号整数N以及后续N字节的内容组成。长度假设为-1表示空〔null〕.string使用int16bytesint32.数组〔Arrays〕这个类型用来处理重复的构造体数据。他们总是由一个代表元素个数int32整数N,以及后续的N个重复构造体组成,这些构造体自身是有其他的根本数据类型组成。我们后面会用BNF语法呈现一个foo[foo]恳求格式语法要点〔Notesonreadingtherequestformatgrammars〕后面的BNF精准地以上下文无关的语法呈现了恳求和响应的二进制格式。每个API都会一起给出恳求和响应,以及全部的子定义〔sub-definitions〕。BNF使用没有经过缩写的便于阅读的名称〔比方我使用一个符号化了得名称来定义了一个生产者错误码,即便它只是int16整数〕。一般在BNF中,一个序列表示一个连接,所以下面给出的MetadataRequest将是一个含有VersionId,然后clientId,然后TopicNames的阵列〔每一个都有其自身的定义〕。自定义类型一般使用驼峰法拼写,根本类型使用全小写方式乒协。当存在多中可能的自定义类型时,使用’|’符号分割,并且用括号表示分组。顶级定义不缩进,后续的子局部会被缩进。一般的恳求和响应格式〔CommonRequestandResponseStructure〕全部恳求和响应都从以下语法起源,其余的会在本文剩下局部中进展增量描述:1.2.RequestOrResponseResponseMessage)3.4.5.Size=>int326.域〔FIELD〕
=> Size (RequestMessage |描述MessageSizeMessageSize域给出了 后续恳求或响应消息的字节(bytes)长度。客户端可以先读取4字节的长度N,然后读取并解析后续的N字节恳求内容。恳求〔Requests〕全部恳求都具有以下格式:1.2.RequestMessage=> ApiKeyClientIdRequestMessage3.4.5.ApiKey=>int166.7.8.ApiVersion=>int169.10.11. CorrelationId=>int3212.
ApiVersionCorrelationId13.14.15.
ClientId=>string16.17. RequestMessage
=> MetadataRequest |ProduceRequest |
FetchRequest
| OffsetRequest |OffsetCommitRequest|OffsetFetchRequest18.ApiKeyApiVersion
描述id〔即它表示是一个元数据恳求?生产恳求?猎取恳求等〕.API号,该版本号允许效劳器依据版本号正确地解释恳求内容。响API0。客户端。用于匹配客户机和效劳器之间的恳求和响应。ClientId
场景。例如,你可能不仅想要监视每秒的总体恳求,还要依据客户端应用程序进展监视,那它就可以被用上〔其中每一个都将驻留在多个效劳器上〕。这个ID恳求的规律分组。下面我们就来描述各种恳求和响应消息。响应〔Responses〕1.2.Response=>CorrelationIdResponseMessage3.4.5.CorrelationId=>int326.7.8.ResponseMessage
=> MetadataResponse |ProduceResponse
| FetchResponse
| OffsetResponse |OffsetCommitResponse|OffsetFetchResponse9.域〔FIELD〕 描述数。全部响应都是与恳求成对匹配〔例如,我们将发送回一个元数据恳求,会得到一个元数据响应〕。消息集〔Messagesets〕生产和猎取消息指令恳求共享同一个消息集构造。在Kafka中,消息是由一个键值对以及少量相关的元数据组成。消息集学问一个有偏移量和大小信息的消息序列。这种格式正好即可用于在broker上的磁盘上存储,也可用在线上数据交换。消息集也是Kafka中的压缩单元,我们也允许消息递归包含压缩消息从而允许批量压缩。留意,在通讯协议中,消息集之前没有类似的其他数组元素的int32。1.2.MessageSet=>[OffsetMessageSizeMessage]3.4.5.Offset=>int646.7.8.MessageSize=>int329.消息格式1.2.Message=>CrcMagicByteAttributesKeyValue3.4.5.Crc=>int326.7.8.MagicByte=>int89.10.11. Attributes=>int812.13.14. Key=>bytes8.域〔FIELD〕OffsetCrcMagicByteAttributesKeyValue
Value=>bytes描述Kafka息,实际上它并不知道偏移量的具体值,这时候它可以填写任意值。CrcCRC32broker息的完整性。这是一个用于允许消息二进制格式的向后兼容演化的版本id。当0。这个字节保存有关信息的元数据属性。最低的20。KeyKeynull.Kafkanull。压缩〔Compression〕Kafka支持压缩多条消息以提高效率,固然,这比压缩一条原始消息要来得简洁。由于单个消息可能没有足够的冗余信息以到达良好的压缩比,压缩的多条信息必需以特别方式批量发送〔固然,假设真的需要的话,你可以自己压缩批处理的一个消息〕。要被发送的消息被包装〔未压缩〕在一个MessageSet构造中,然后将其压缩并存储在一个单一的“消息”中,一起保存的还有相应的压缩编解码集。接收系统通过解压缩得到实际的消息集。外层MessageSet应当只包含一个压缩的“消息”〔Kafka-1718〕。卡夫卡目前支持一下两种压缩编解码器编号:压缩算法〔COMPRESSION〕编码器编号〔CODEC〕None 0GZIP 1Snappy 2接口(TheAPIs)本节将给出每个API的用法、二进制格式,以及它们的字段的含义的细节。元数据接口〔MetadataAPI〕这个API答复以下问题:存在哪些主题〔Topic〕?每个主题有几个分区〔Partition〕?每个分区的Leader分别是哪个broker?这些broker的地址和端口分别是什么?这是唯一一个能发往集群中任意一个broker的恳求消息。由于可能有很多主题,客户端可以给一个的可选主题名列表,以便只返回主题元数据的一个子集。返回的元数据是在分区级别,为了便利和以避开冗余,以主题为组集中在一起。每个分区的元数据中包含了leader以及全部副本以及正在同步的副本的信息。留意:假设broker配置中设置了”auto.create.topics.enable”,主题元数据恳求将会以默认的复制因子和默认的分区数为参数创立主题。主题元数据恳求〔TopicMetadataRequest〕1.2.TopicMetadataRequest=>[TopicName]3.4.5.TopicName=>string6.域〔FIELD〕 描述TopicName 元数据反响〔MetadataResponse〕响应包含的每个分区的元数据,这些分区元数据以主题为组组装brokeridbroker。每个broker一个地址和端口。1.2.MetadataResponse=>[Broker][TopicMetadata]3.4.5.Broker=>NodeIdHostPort(anynumberofbrokersmaybereturned)6.7.8.NodeId=>int329.10.11. Host=>string12.13.14. Port=>int3215.16.17. TopicMetadata => TopicErrorCode TopicName[PartitionMetadata]1.
TopicErrorCode=>int1622.23. PartitionMetadata => PartitionErrorCodePartitionIdLeaderReplicasIsr24.25.26. PartitionErrorCode=>int1627.28.29. PartitionId=>int3230.31.32. Leader=>int3233.34.35. Replicas=>[int32]9.域〔FIELD〕Leader
Isr=>[int32]描述KafkabrokeridLeaderLeaderid-1。Replicas slaveIsrBroker
〔“caughtup”,表示数据已经完全复制到这些节点〕状态的子集kafkabrokerid,主机名,端口信息可能的错误码〔PossibleErrorCodes〕UnknownTopic(3)LeaderNotAvailable(5)InvalidTopic(17)TopicAuthorizationFailed(29)生产者接口〔ProduceAPI〕生产者API用于将消息集发送到效劳器。为了提高效率,它允许在单个恳求中发送多个不同主题的不同分区的消息。生产者API使用通用的消息集格式,但由于发送时还没有被安排偏移量,因此可以任意填写该值。生产消息恳求〔ProduceRequest〕1.2.ProduceRequest=>RequiredAcksTimeout[TopicName[PartitionMessageSetSizeMessageSet]]3.4.5.RequiredAcks=>int166.7.8.Timeout=>int329.10.11. Partition=>int3212.13.14. MessageSetSize=>int3215.域〔FIELD〕 描述这个值表示效劳端收到多少确认后才发送反响消息给客户RequiredAcks
0,那么效劳端将不发送反响消息〔这是唯一的效劳端不发送反响消息的状况〕1,那么效劳器将等到数据写入到本地日之后发送反响消息。假设这个TimeoutTopicNamePartition
值是-1,那么效劳端将堵塞,知道这个消息被全部的同步副1堵塞,直到收到这个数量的写入反响后再反响响应消息〔但效劳器不会等大于同步中副本的数量,即到达同步中复本个数后,会停顿等待,即使所填的值大于这个副本个数〕。这个值供给了以毫秒为单位的超时时间,效劳器可以在这个时间内可以等待接收所需的Ack的限制,有以下缘由:〔1〕不包括网络延迟,〔2〕计时器开头在这一恳求的处理开头,所以假设有很多恳求,由于效劳器负载而导致的排队等待时间将不被包括在内,〔3〕假设本地写入时间超过超时,我们将不会终止本地写操作,这样这个超时时间就不会得到遵守。要使硬超时时间,客户端应当使用套接字超时。该数据将会公布到的分区MessageSetSize后续消息集的长度,字节为单位MessageSet 上面描述的标准格式的消息集合生产消息响应〔ProduceResponse〕1.2.ProduceResponse=>[TopicName[PartitionErrorCodeOffset]]3.4.5.TopicName=>string6.7.8.Partition=>int329.10.11. ErrorCode=>int1612.13.14. Offset=>int6415.域 描述Topic 假设有,此分区对应的错误信息。错误以分区为单位供给,由于可〔Leader〕,但是其他的分区的恳求操作成功的状况Offset 追加到该分区的消息集中的安排给第一个消息的偏移量。可能的错误码〔PossibleErrorCodes〕:(未完待续TODO)猎取消息接口〔FetchAPI〕猎取消息接口用于猎取一些主题分区的一个或多个的日志块。规律上依据指定主题,分区和消息起始偏移量开头猎取一批消息。在一般状况下,返回消息的偏移量将大于或等于开头偏移量。然而,假设是压缩消息,有可能返回的消息的偏移量比起始偏移量小。这类的消息的数量通常较少,并且调用者必需负责过滤掉这些消息。猎取数据指令恳求遵循一个长轮询模型,假设没有足够数量的消息可用,它们可以堵塞一段时间。作为优化,效劳器被允许在消息集的末尾返回局部消息。客户应处理这种状况。有一点要留意的是,猎取消息API需要指定消费的分区。现在的问题是如何让消费者知道消费哪个分区?特别地,作为一组消费者,如何使得每个消费者猎取分区的一个子集,并且平衡这些分区。我们zookeeperScalaJava法的缺点是,它需要一个相当胖的客户端并且需要客户端与zookeeperKafka〔API〕,允许该功能被移动到在效劳器端并被更便利地访问。一个简洁的消费者的客户端可以通过配置指定访问的分区,但这样将不能在某些消费者失效后做到分区的动态重安排。我们期望能在下一个主要版本解决这一空白。数据猎取恳求〔FetchRequest〕1.2.FetchRequest => ReplicaId MaxWaitTime MinBytes[TopicName[PartitionFetchOffsetMaxBytes]]3.4.5.ReplicaId=>int326.7.8.MaxWaitTime=>int329.10.11. MinBytes=>int3212.13.14. TopicName=>string15.16.17. Partition=>int3218.19.20. FetchOffset=>int644.
MaxBytes=>int32域 描述IDID。一般消费者客户端应ReplicaId
该始终将其指定为-1,IDbroker他们自己的节点ID。基于调试目的,以非代理身份模拟副本broker-2。位。返回响应消息的最小字节数目,必需设置。假设客户端将此值设为0,效劳器将会马上返回,但假设没有的数据,效劳端会返回一个空消息集。假设它被设置为1,则效劳器将在至少一个分区收到一个字节的数据的状况下马上返回,或者等到超时时间达MinBytes 到。通过设置较高的值,结合超时设置,消费者可以在牺牲一点实时性能的状况下通过一次读取较大的字节的数据块从而提高的吞吐量〔MaxWaitTime100MinBytes64K,将允许效劳器累积数据到达64K前等待长达100ms再响应〕。TopicName主题〔topic〕名称idFetchOffsetMaxBytes 息的大小。猎取消息响应〔FetchResponse〕1.2.FetchResponse => [TopicName [Partition ErrorCodeHighwaterMarkOffsetMessageSetSizeMessageSet]]3.4.5.TopicName=>string6.7.8.Partition=>int329.10.11. ErrorCode=>int1612.13.14. HighwaterMarkOffset=>int6415.16.17. MessageSetSize=>int3218.域TopicNamePartition
描述〔Topic〕名称。id。此分区日志中最末尾的偏移量。此信息可被客户端用来确定后面还有多少条消息。MessageSetSizeMessageSet
此分区中消息集的字节长度此分区猎取到的消息集,格式与之前描述一样可能的错误码〔PossibleErrorCodes〕OFFSET_OUT_OF_RANGE(1)UNKNOWN_TOPIC_OR_PARTITION(3)NOT_LEADER_FOR_PARTITION(6)REPLICA_NOT_AVAILABLE(9)UNKNOWN(-1)偏移量接口〔又称ListOffset〕〔OffsetAPI〕此API描述了一个主题分区的偏移量有效范围。生产者和猎取数据API的恳求必需发送到分区Leader所在的broker上,这需要通过API响应包含分区的起始偏移量以及“日志末端偏移量”,即,将被追加到给定分区中的下一个消息的偏移量。我们也觉得这个API是略微有点时髦。偏移量恳求〔OffsetRequest〕1.2.OffsetRequest=>ReplicaId[TopicName[PartitionTimeMaxNumberOfOffsets]]3.4.5.ReplicaId=>int326.7.8.TopicName=>string9.10.11. Partition=>int3212.13.14. Time=>int6415.16.17. MaxNumberOfOffsets=>int3218.域 描述-1表示offset〕;-2示猎取最早的有效偏移量。留意,由于猎取到偏移值都是降序排序,因此恳求最早Offset的恳求将总是返回一个值偏移量响应〔OffsetResponse〕1.2.OffsetResponse=>[TopicName[PartitionOffsets]]3.4.5.PartitionOffsets=>PartitionErrorCode[Offset]6.7.8.Partition=>int329.10.11. ErrorCode=>int1612.13.14. Offset=>int6415.可能的错误吗〔PossibleErrorCodes〕o UNKNOWN_TOPIC_OR_PARTITION(3)NOT_LEADER_FOR_PARTITION(6)UNKNOWN(-1)偏移量提交/猎取接口〔OffsetCommit/FetchAPI〕这些API使得偏移量的能够集中治理。了解更多偏移量治理。依据Kafka-993的评论,直到Kafka,这些API调用无法完全正0.8.2消费者组协调员恳求〔GroupCoordinatorRequest〕消费者组〔ConsumerGroup〕偏移量信息,由一个特定的broker维护,这个broker称为消费者组协调员。即消费者需要向从这个特定的broker提交和猎取偏移量。可以通过发出一组协调员觉察恳求从而获得当前协调员信息。1.2.GroupCoordinatorRequest=>GroupId3.4.5.GroupId=>string6.消费者组协调员响应〔GroupCoordinatorResponse〕1.2.GroupCoordinatorResponse=>ErrorCodeCoordinatorIdCoordinatorHostCoordinatorPort3.4.5.ErrorCode=>int166.7.8.CoordinatorId=>int329.10.11. CoordinatorHost=>string12.13.14. CoordinatorPort=>int3215.可能的错误码〔PossibleErrorCodes〕GROUP_COORDINATOR_NOT_AVAILABLE(15)NOT_COORDINATOR_FOR_GROUP(16)GROUP_AUTHORIZATION_FAILED(30)偏移量提交恳求〔OffsetCommitRequest〕1.2.v0(在.5.OffsetCommitRequest=>ConsumerGroupId[TopicName[PartitionOffsetMetadata]]6.7.8.ConsumerGroupId=>string9.10.11. TopicName=>string12.13.14. Partition=>int3215.16.17. Offset=>int6418.19.20. Metadata=>string7.28.29. v1(在1.32. OffsetCommitRequest=>ConsumerGroupIdConsumerGroupGenerationIdConsumerId[TopicName[PartitionOffsetTimeStampMetadata]]33.34.35. ConsumerGroupId=>string36.37.38. ConsumerGroupGenerationId=>int3239.40.41. ConsumerId=>string42.43.44. TopicName=>string45.46.47. Partition=>int3248.49.50. Offset=>int6451.52.53. TimeStamp=>int6454.55.56. Metadata=>string0.61.62. v2(在0.8.363.64.65. OffsetCommitRequest => ConsumerGroupConsumerGroupGenerationId
ConsumerId
RetentionTime[TopicName[PartitionOffsetMetadata]]66.67.68. ConsumerGroupId=>string69.70.71. ConsumerGroupGenerationId=>int3272.73.74. ConsumerId=>string75.76.77. RetentionTime=>int6478.79.80. TopicName=>string81.82.83. Partition=>int3284.85.86. Offset=>int6487.88.89. Metadata=>string90.在V0和v1版本中,每个分区的时间戳作为提交时间戳定义,偏移量协调员将保存消费者所提交的偏移量,直到当前时间超过提交时间戳+偏移量保存时间,此偏移量保存时间在broker配置中指定;假设时间错域没有设值,那么broker会将此值设定为接收到提交偏移量恳求的时间,用户可以通过设置这个提交时间戳到达延长偏移量保存时间的目的。在v2版本中,我们移除了时间戳域,但是增加了一个全局保存时间域〔详情参见KAFKA-1634〕;broker会设置提交时间戳为接收到恳求的时间,但是提交的偏移量能被保存到提交恳求中用户指定的保存时间,假设这个保存时间没有设值,那么broker会使用默认的保存时间。偏移量提交响应〔OffsetCommitResponse〕1.2.v0,v1andv2:3.4.5.OffsetCommitResponseErrorCode]]]6.7.8.TopicName=>string9.
=> [TopicName [Partition10.11. Partition=>int3212.13.14. ErrorCode=>int1615.可能的错误码〔PossibleErrorCodes〕OFFSET_METADATA_TOO_LARGE(12)GROUP_LOAD_IN_PROGRESS(14)GROUP_COORDINATOR_NOT_AVAILABLE(15)NOT_COORDINATOR_FOR_GROUP(16)ILLEGAL_GENERATION(22)UNKNOWN_MEMBER_ID(25)REBALANCE_IN_PROGRESS(27)INVALID_COMMIT_OFFSET_SIZE(28)TOPIC_AUTHORIZATION_FAILED(29)GROUP_AUTHORIZATION_FAILED(30)偏移量猎取恳求〔OffsetFetchRequest〕依据KAFKA-1841的注释,V0和V1是在上是一样的,但V0〔0.8.1或更高版本支持〕从zookeeper读取的偏移量,而V1〔0.8.2或更高版本支持〕从卡夫卡读偏移。1.2.OffsetFetchRequest => ConsumerGroup [TopicName[Partition]]3.4.5.ConsumerGroup=>string6.7.8.TopicName=>string2.
Partition=>int32偏移量猎取响应〔OffsetFetchResponse〕1.2.OffsetFetchResponse=>[TopicName[PartitionOffsetMetadataErrorCode]]3.4.5.TopicName=>string6.7.8.Partition=>int329.10.11. Offset=>int6412.13.14. Metadata=>string8.
ErrorCode=>int16请留意,消费者组下一个主题的分区假设没有偏移量,broker不会设定一个错误码〔由于它不是一个真正的错误〕,但会返回空的元数据并将偏移字段为-1。可能的错误码〔PossibleErrorCodes〕UNKNOWN_TOPIC_OR_PARTITION(3)<-只在求中消灭GROUP_LOAD_IN_PROGRESS(14)NOT_COORDINATOR_FOR_GROUP(16)ILLEGAL_GENERATION(22)UNKNOWN_MEMBER_ID(25)TOPIC_AUTHORIZATION_FAILED(29)GROUP_AUTHORIZATION_FAILED(30)组籍治理接口〔GroupMembershipAPI〕
v0版本的请这些恳求用于客户端参与卡夫卡所治理的消费者组。从更高层次上看,集群中每个消费者组都会安排一个broker〔及消费者组协调员〕,以简化消费者组治理。一旦得到了组协调员地址〔使用上面的消费者组协调员恳求〕,组成员可以参与该组,同步状态,然后认真跳消息保持在组中的活泼状态。当客户端关闭时,它会使用离开组请求从消费者组中注销。此协议的语义在Kafka客户端安排协议中有具体描述。组建治理接口的主要使用场景是消费者组,但这些恳求也尽量设计得一般化以便支持其他应用场景〔KafkaConnect组〕。这种设计的带来的代价就是是一些特定的组语义(groupsemantics)被推到了客户端实现。例如,下面定义的JoinGroup和SyncGroup恳求无明确定义的字段以支持消费者组分区安排。相反,它们在其中包含有一些通用的字节数组〔bytearrays〕,用这些字节数组就可以使得分区安排切入在消费者客户端实现。但是,虽然这种实现允许每个客户端来实现来定义分区方案,但是Kafka工具的兼容性要求这些客户端使用Kafka客户端使用的标准consumer-groups.sh这个应用程序会假定用这种格式来显示分区安排。因此,我们建议客户遵循一样的模式,使这些工具对全部客户端实现都可以正常工作。参与组恳求〔JoinGroupRequest〕参与组恳求用于让客户端成为组的成员。当成员参与一个现有组,之前参与大的全部的会员必需通过发送一个参与组的要求来重入组。当成员第一次参与该组,成员编号将是空的〔即“”〕,但重参与的成员都应当使用与之前生成的一样的会员ID。1.2.JoinGroupRequest
=> GroupId SessionTimeoutMemberIdProtocolTypeGroupProtocols3.4.5.GroupId=>string6.7.8.SessionTimeout=>int329.10.11. MemberId=>string12.13.14. ProtocolType=>string15.16.17. GroupProtocols =>ProtocolMetadata]18.19.20. ProtocolName=>string21.
[ProtocolName22.23.24.
ProtocolMetadata=>bytesProtocolType字段定义了该组实现的嵌入协议。组协调器确保该组中的全部成员都支持一样的协议类型。组中包含的协议〔GroupProtocols〕字段中的协议名称和元数据的含义取决于协议类型。请留意,参与群恳求允很多协议/元数据对。这使得滚动升级时无需停机。协调器会选择全部成员支持的一种协议,升级后的成员既包括版本和老版本的协议,一旦全部成员都升级,协调器将选择列在数组中最前面的组协议〔GroupProtocol〕。消费者组:下文我们定义了消费者组使用的嵌入协议。我们建议全部消费者客户端实现遵循这个格式,以便Kafka工具能够对全部的客户端正常工作1.2.ProtocolType=>“consumer“.7.8.ProtocolName=>AssignmentStrategy5.16.17.UserData18.19.
AssignmentStrategy=>stringProtocolMetadata => Version Subscription20. Version=>int1621.22.23. Subscription=>[Topic]24.25.26. Topic=>string27.28.29. UserData=>bytes30.UserData域的可以用来自定义安排策略。例如,在一个粘性分区策略实现中,这个字段可以包含之前的安排。在基于资源的安排策略,CPUKafkaConnect“connect”的协议类型,和协议细节也是基Connect参与组响应〔JoinGroupResponse〕接收到来自该组中的全部成员组的参与组恳求后,协调器将选择一个成员作为Leader,并且选择全部成员支持的协议。Leader将收到会员的完整列表与选择的协议相关的元数据。其他追随者成员,会收到一个空会员数组。Leader需要检查每个成员的元数据,并且使用下SyncGroup一旦加参与组阶段完成,协调器会增加该组的GenerationId,这个Id是发送给每个成员的响应中的一个域,同时也会在心跳和偏移量提交恳求中。当协调器重平衡〔rebalance〕了一个组,协调器将发送一个错误码,表示客户端成员需要重参与组。假设重平衡完成前成员未重入组〔rejoin〕,那么它将有一个旧generationId,在IdILLEGAL_GENERATION1.2.JoinGroupResponse
=> ErrorCode GenerationIdGroupProtocolLeaderIdMemberIdMembers3.4.5.ErrorCode=>int166.7.8.GenerationId=>int329.10.11. GroupProtocol=>string12.13.14. LeaderId=>string15.16.17. MemberId=>string18.19.20. Members=>[MemberIdMemberMetadata]21.22.23. MemberId=>string7.
MemberMetadata=>bytes消费者组:协调器负责选择全部成员都兼容协议〔即分区安排策略〕,Leader是实际执行安排的成员,参与群恳求可以包含多个安排策略,从而支持现有版本升级或者更改不同的安排策略。可能的错误码〔PossibleErrorCodes〕:GROUP_LOAD_IN_PROGRESS(14)GROUP_COORDINATOR_NOT_AVAILABLE(15)NOT_COORDINATOR_FOR_GROUP(16)INCONSISTENT_GROUP_PROTOCOL(23)UNKNOWN_MEMBER_ID(25)INVALID_SESSION_TIMEOUT(26)GROUP_AUTHORIZATION_FAILED(30)同步组恳求〔SyncGroupRequest〕组长〔groupleader〕使用同步组恳求用来向当前组中的全部成员进展状态安排〔例如分区安排〕。全部成员参与该组后,马上发送SyncGroupLeader1.2.SyncGroupRequest=>GroupIdGenerationIdMemberIdGroupAssignment3.4.5.GroupId=>string6.7.8.GenerationId=>int329.10.11. MemberId=>string12.13.14. GroupAssignment =>MemberAssignment]15.16.17. MemberId=>string
[MemberId1.
MemberAssignment=>bytes消费者组:消费则组中MemberAssignment域的格式如下:1.2.MemberAssignment=>VersionPartitionAssignment3.4.5.Version=>int166.7.8.PartitionAssignment=>[Topic[Partition]]9.10.11. Topic=>string12.13.14. Partition=>int3215.16.17. UserData=>bytes18.全部了“consumer”协议类型的客户端实现都需要支持这个方案同步组响应〔SyncGroupResponse〕组中的每个成员都会接收到leader发出的syncgroup指令Eachmemberinthegroupwillreceivetheassignmentfromtheleaderinthesyncgroupresponse.1.2.SyncGroupResponse=>ErrorCodeMemberAssignment3.4.5.ErrorCode=>int166.7.8.MemberAssignment=>bytes9.可能的错误代码〔PossibleErrorCodes〕:GROUP_COORDINATOR_NOT_AVAILABLE(15)NOT_COORDINATOR_FOR_GROUP(16)ILLEGAL_GENERATION(22)UNKNOWN_MEMBER_ID(25)REBALANCE_IN_PROGRESS(27)GROUP_AUTHORIZATION_FAILED(30)心跳恳求〔HeartbeatRequest〕每当一个成员参与并同步完成,他将开头发送心跳恳求使自己留在组里。当协调器在配置的会话超时时间内没有他的收到心跳恳求,该成员会被踢出该组。1.2.HeartbeatRequest=>GroupIdGenerationIdMemberId3.4.5.GroupId=>string6.7.8.GenerationId=>int32.
MemberId=>string心跳响应〔HeartbeatResponse〕1.2.HeartbeatResponse=>ErrorCode3.4.5.ErrorCode=>int166.可能的错误码〔PossibleErrorCodes〕:GROUP_COORDINATOR_NOT_AVAILABLE(15)NOT_COORDINATOR_FOR_GROUP(16)ILLEGAL_GENERATION(22)UNKNOWN_MEMBER_ID(25)REBALANCE_IN_PROGRESS(27)GROUP_AUTHORIZATION_FAILED(30)退组恳求〔LeaveGroupRequest〕当想要离开组群时,用户可以发送一个退组恳求。这优先于会话超时,由于它能使该组快速再平衡,这对于消费者而言这意味着可以用更短的时间将分区安排到一个活动的成员。1.2.LeaveGroupRequest=>GroupIdMemberId3.4.5.GroupId=>string6.7.8.MemberId=>string9.10.11. LeaveGroupResponse12.13.14. LeaveGroupResponse=>ErrorCode15.16.17. ErrorCode=>int1618.可能的错误代码〔PossibleErrorCodes〕:GROUP_LOAD_IN_PROGRESS(14)CONSUMER_COORDINATOR_NOT_AVAILABLE(15)NOT_COORDINATOR_FOR_CONSUMER(16)UNKNOWN_CONSUMER_ID(25)GROUP_AUTHORIZATION_FAILED(30)治理接口〔AdministrativeAPI〕组列表恳求〔ListGroupsRequest〕该API可用于找到当前被broker治理的组群。为了得到集群内的broker1.2.ListGroupsRequest=>3.4.5.ListGroupsResponse6.7.8.ListGroupsResponse=>ErrorCodeGroups9.10.11. ErrorCode=>int1612.13.14. Groups=>[GroupIdProtocolType]15.16.17. GroupId=>string18.19.20. ProtocolType=>string21.可能的错误代码〔PossibleErrorCodes〕:GROUP_COORDINATOR_NOT_AVAILABLE(15)AUTHORIZATION_FAILED(29)组明细恳求〔DescribeGroupsRequest〕1.2.DescribeGroupsRequest=>[GroupId]3.4.5.GroupId=>string6.组明细反响〔DescribeGroupsResponse〕1.2.DescribeGroupsResponse=>[ErrorCodeGroupIdStateProtocolTypeProtocolMembers]3.4.5.ErrorCode=>int166.7.8.GroupId=>string9.10.11. State=>string12.13.14. ProtocolType=>string15.16.17. Protocol=>string18.19.20. Members => [MemberIdMemberMetadataMemberAssignment]21.
ClientId ClientHost22.23. MemberId=>string24.25.26. ClientId=>string27.28.29. ClientHost=>string30.31.32. MemberMetadata=>bytes33.34.35. MemberAssignment=>bytes36.可能的错误码〔PossibleErrorCodes〕:GROUP_LOAD_IN_PROGRESS(14)GROUP_COORDINATOR_NOT_AVAILABLE(15)NOT_COORDINATOR_FOR_GROUP(16)AUTHORIZATION_FAILED(29)常量〔Constants〕接口关键字〔ApiKeys〕下面是恳求中ApiKey的数字值,用来表示上面所述的恳求类型。接口名称〔APINAME〕APIKEYProduceRequest0FetchRequest1OffsetRequest2MetadataRequest3Non-userfacingcontrolAPIs4-7OffsetCommitRequest8OffsetFetchRequest9GroupCoordinatorRequest10JoinGroupRequest11HeartbeatRequest12LeaveGroupRequest13SyncGroupRequest14DescribeGroupsRequest15ListGroupsRequest16错误代码〔ErrorCodes〕我们用数字代码表示效劳器发生的问题。这些可以由客户端转换成客户端中的特别(Exceptions)或者其他任何适当的错误处理机制。这里是当前正在使用的错误代码表:编码是否可重试NoError
〔CODE〔RETRIABLDESCRIPTION 描述〕 E〕0 Noerror–itUnknown -1OffsetOutOfRange 1
worked!servererror知错误Therequestedoffset isoutside range offsets the forthegiven移量。topic/partition.Thisindicates
InvalidMessage /
that aCorruptMessage
Yes
messagecontentsdoesnotmatchitsCRCThisrequest
它的CRC合。UnknownTopicOrPartitionInvalidMessageSize
Yes4
is for topic partition existonthis区。broker.The has a为负数。negativesizeThiserroristhrownifweare in the会 在middleofaleader选leadership electionandLeaderNotAvailable5Yesthere iscurrentlynoleaderfor有thisleader因partitionand被henceitisunavailableforwrites.Thiserroristhrowniftheclient
attempts tosendmessages
NotLeaderForPartition
Yes
区的forsomeleader。partition.Itindicates
that theclientsmetadata outofdate.Thiserroristhrowniftherequest
过期。RequestTimedOut
Yes
exceeds theuser-specifiedtimelimitintherequest.
误BrokerNotAvailable 8ReplicaNotAvailable 9MessageSizeTooLarge 10
具用在没场合。当broker时抛出略〕。The serverStaleControllerEpochCode 11OffsetMetadataTooLargeCode12GroupLoadInProgressCode 14
has maximum to unbounded memory allocation. client attempt produce a误。messagelarger thismaximum.Internal之communication.offset串时触metadata发。会这个错误:当人Yes offsets 移量时区的发生变化后15leCode
response group membership requests 员请求(such heartbeats) when metadata by the载。coordinator.The returnsthis还没有被活是会coordinator误。is notactive.The brokerreturnsthiserrorcodeif
调器的NotCoordinatorForGroupCode16
it 接an Yes fetch orcommitrequestforagroupthati
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 网络切片智能化的NFV功能虚拟化创新应用-洞察阐释
- 航空货运信息安全保障-洞察阐释
- 神经康复在SIRS中的作用-洞察阐释
- 2025年基础教育改革远程学习心得体会
- 外脚手架作业分包合同
- 六年级健康知识普及计划
- 家委会监督与反馈机制计划
- 肠道菌群体外发酵模型构建及多糖类益生元调控菌群的构效关系探究
- 医院消防安全设施施工方案
- 职业安全与健康心得体会
- 高中生物必修一实验通知单
- 运动员健康证明表
- 课件:第四章 社会工作项目的执行(《社会工作项目策划与评估》课程)
- 冷库施工组织设计施工方案
- 咯血诊断与治疗课件
- 医学影像专业个人简历
- 检验科 医院感染管理质量督查评分表
- 独立性检验 公开课比赛一等奖-完整版获奖课件
- 网络信息系统瘫痪演练PDCA改进
- 高分子材料成型加工基础添加剂及配方设计课件
- 水泥水化热实验原始记录
评论
0/150
提交评论