kafka背景及架构介绍_第1页
kafka背景及架构介绍_第2页
kafka背景及架构介绍_第3页
kafka背景及架构介绍_第4页
kafka背景及架构介绍_第5页
已阅读5页,还剩35页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、Kafka背景及架构介绍目录1、使用消息系统的优点2、常用Message Queue介绍和对比3、选择Kafka的原因4、Kafka的架构及相关概念介绍5、Kafka启动和监控6、Kafka日志文件存储及生产、消费的原理7、Kafka稳定性、容错性、异常情况的介绍8、Kafka工具的使用9、Kafka项目中的应用为何使用消息系统1、数据持久化,数据缓冲,异步通信。保证数据不丢失,通过缓冲层来帮助任务最高效率的执行,可存储大量历史数据可以异步处理。2、扩展性,解耦处理过程。增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。3、灵活性,峰值处理能力。不会

2、因为突发的超负荷的请求而完全崩溃。4、可恢复性。系统的一部分组件失效时,不会影响到整个系统。加入队列中的消息仍然可以在系统恢复后被处理。5、时序性。在大多使用场景下,数据处理的顺序都很重要。常用Message Queue对比对比RabbitMQRocketMQkafka模式发布订阅发布订阅发布订阅持久化支持(内存/硬盘)支持(磁盘)支持(磁盘)事务支持支持不支持集群支持支持支持负载均衡支持支持支持堆积能力磁盘容量+内存磁盘容量磁盘容量(水平扩展,不同broker可以存储在不同磁盘上)消息丢失不会丢失理论上不会丢失理论上不会丢失消息重复可以使用消息确认等机制控制-理论上会有重复消息失败重试机制消

3、费失败支持定时重试消费失败支持定时重试,每次重试间隔时间顺延Kafka消费失败不支持重试客户端支持语言Java、C、C+、Python、PHP、Perl等Java、C+(不成熟)Java、C+等最大吞吐量时cpu、内存占用cpu占用非常高,内存占用根据配置(配置的太低对性能影响很大)-cpu占用不高,每个broker大概10%,内存1G左右Producer吞吐量1.8w/s(4个broker),消费者消费对其吞吐量影响很大-13w/s(4个broker,11个分区),消费者消费对其吞吐量影响很小Consumer吞吐量6千/s(3个消费者),峰值数据非常容易积压,一旦积压消费速度有很大影响,消费

4、者增加速度成倍下降-2.7w/s(3个消费者,11个分区),能累积很多数据,累积不会对消费速度有影响,增加消费者对消费速度的影响没有rabbitmq那么大。测试环境(203):Model: Dell Vostro 3800 CPU: Intel(R) Core(TM) i3-4170 CPU 3.70GHzRAM: 16 GbOS: CentOS 7.3.1611 x86_64rabbitmq1、RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。2、AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。3、AMQP协议更多用在企业

5、系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。RocketMQ结合部署结构图,描述集群工作流程:1、启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。2、Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。3、收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消

6、息时自动创建Topic。4、Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。5、Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。Kafka架构1、Kafka是Scala和Java编写的分布式发布-订阅消息系统。2、Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,采用消息的批量处理,zero-copy

7、机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度。3、Kafka对消息的重复、丢失、错误没有严格要求,需要开发者增加控制机制。1、KAFKA比较成熟,并且以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。2、高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率。3、支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输,采用消息的批量处理,zero-copy机制。4、同时支持离线数

8、据处理和实时数据处理。5、支持在线水平扩展,随时增加zookeeper节点,每个broker还可以分布在不同磁盘,增加处理性能。为何使用KAFKAKafka相关概念介绍zookeeper集群Kafka系统强依赖的组件。其存储了Kafka核心原数据 (如topic信息配置、broker信息、 消费分组等等,相当于DB充当了Kafka的配置管理中心) 。 Kafka的leader选举(如coordinator选举、controller选举、partition leader选举等等),同样也会借助于zookeeper。Broker消息缓存代理,Kafka集群包含一个或多个服务器,这些服务器被称为Br

9、oker,负责消息的存储于转发,作为代理对外提供生产和消费服务。Topic消息主题(类别),逻辑上的概念,特指Kafka处理的消息源的不同分类,用户可以根据自己的业务形态将不同业务类别的消息分别存储到不同Topic。用户生产和消费时只需指定所关注的topic即可,不用关注该topic的数据存放的具体位置。PartitionTopic物理上的分组,在创建Topic时可以指定分区的数量,每个partition是一个有序的队列,按生产顺序存储着每条消息,而且每条消息都会分配一个64bit的自增长的有序offset(相当于消息id)。Partition是整个Kafka可以平行扩展的关键因素。Repli

10、cation副本,topic级别的配置,可以理解为topic消息的副本数。Kafka 0.8版本加入的概念,主要目的就是提高系统的可用性。防止broker意外崩溃导致部分partition不可以服务。Leaderreplica 中的一个角色, producer 和 consumer 只跟 leader 交互。 Followerreplica 中的一个角色,从 leader 中复制数据。ISRIn-Sync Replicas ,Kafka用来维护跟上leader数据的broker列表,当leader崩溃后,优先从该列中选举leader。ProducerProducer 生产者,采用Push方式进

11、行消息发布生产。 Producer可以通过与zookeeper连接获取broker信息, topic信息等等元数据,然后再与broker交互进行消息发布。Consumer消费者,采用Pull方式,从Broker端拉取消息并进行处理。当采用订阅方式订阅感兴趣的Topic时,Consumer必须属于一个消费分组,而且Kafka保证同一个Topic的一条消息只能被同一个消费分组(GroupId)中的一个Consumer消费,但多个消费分组可以同时消费这一条消息。Kafka启动和监控管理1、Zookeeper启动 cd /root/kafka/kafka-src/bin zkServer.sh sta

12、rt /root/kafka/kafka-src/config/perties zkServer.sh start /root/kafka/kafka-src/config/perties2、Kafka的brokers启动 cd /root/kafka/kafka-src/bin cat config/perties kafka-server-start.sh -daemon /root/kafka/kafka-src/config/perties & kafka-server-start1.sh -daemon /root/kafka/kafka-src/config/perties & k

13、afka-server-start2.sh -daemon /root/kafka/kafka-src/config/perties & kafka-server-start3.sh -daemon /root/kafka/kafka-src/config/perties &3、创建topic kafka-topics.sh -create -zookeeper 32:2182,32:2183 -replication-factor 2 -partitions 11 -topic mapgoo.gps4、启动监控 cd /rootkafka/kafka-manager-3 cat conf/a

14、pplication.conf bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9001 &5、登录监控 网页输入03:9001broker.id=0listeners=PLAINTEXT:/32:9092log.dirs=/opt/kafka/kafka-src/bin/kafka-logszookeeper.connect=:2182,:2183kafka-manager.zkhosts=:2182, :2183web监控的zookeepers和brokers的展示web监控的topic和partition

15、的展示Kafka日志文件的存储同一个topic下有多个不同的partition,每个partition为一个目录,partition命名的规则是topic的名称加上一个序号,序号从0开始。 每一个partition目录下的文件被平均切割成大小相等的数据文件,和一一对应的索引文件。每一个数据文件都被称为一个段(segment file),但每个段消息数量不一定相等,这种特性能够使得老的segment可以被快速清除。默认保留7天的数据。Segment文件命名的规则:partition全局的第一个segment从0(20个0)开始,后续的每一个segment文件名是上一个segment文件中最后一条

16、消息的offset值。那么这样命令有什么好处呢?假如有个消费者已经消费到了368776(offset值为368776),那么现在要继续消费的话,怎么做?分2个步骤,第1步是从所有文件log文件的的文件名中找到对应的log文件,第368776条数据位于上图中的“00000000000000368769.log”这个文件中,这一步涉及到一个常用的算法叫做“二分查找法”(假如我现在给你一个offset值让你去找,你首先是将所有的log的文件名进行排序,然后通过二分查找法进行查找,很快就能定位到某一个文件,紧接着拿着这个offset值到其索引文件中找这条数据究竟存在哪里);第2步是到index文件中去

17、找第368776条数据所在的位置。index文件中并没有为数据文件中的每条消息都建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。Kafka Replication的数据流producer 发布消息的过程Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader

18、 pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW(俗称高水位)并且向Producer发送ACK。kafka副本同步队列(ISR)副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性。默认情况下Kafka的replica数量为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中将其值小设置为大于1,比如3。 所有的副本(replic

19、as)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟。kafka数据可靠性和持久性保证当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的

20、。-1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员由于某些情况会增加也会减少,最少就只剩一个leader),这样就变成了acks=1的情况。接下来对acks=1和-1的两种情况进行详细分析:request.required.acks=1 producer发送数据到leader,leader写本地日志成功,返回客户端成功;此时ISR中的副本还没有来得及拉取该消息,leader就宕机了,那么此次发送的消息就会丢失。注:LEO(log end

21、 offset)。request.required.acks=-1同步(Kafka默认为同步,即producer.type=sync)的发送模式,replication.factor=2且min.insync.replicas=2的情况下,不会丢失数据。有两种典型情况。acks=-1的情况下,数据发送到leader, ISR的follower全部完成数据同步后,leader此时挂掉,那么会选举出新的leader,数据不会丢失。acks=-1的情况下,数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。比如follower1h和follower2都有可能变成新的leader,

22、 producer端会得到返回异常,producer端会重新发送数据,follower1被选举为新的leader的话,数据可能会重复。当然如果在leader crash的时候,follower2还没有同步到任何数据,而且follower2被选举为新的leader的话,这样消息就不会重复。上图中,还有另一种情况,如果在Leader挂掉的时候,follower1同步了消息4,5,follower2同步了消息4,与此同时follower2被选举为leader,那么此时follower1中的多出的消息5该做如何处理呢? 这里就需要HW和LEO的协同配合了。如前所述,一个partition中的ISR列表

23、中,leader的HW是所有ISR列表里副本中最小的那个的LEO。类似于木桶原理,水位取决于最低那块短板。下面看看Leader挂掉,新的Leader产生的现象。主题iteblog的分布如下:手动停止部分Broker1/2/4,Partition 0的Leader由原来的1变为3,Partition 1的Leader由原来的2变为5,Partition 2的Leader由原来的3变为6,Partition 3的Leader由原来的4变为7。主题iteblog的分布如下:consumer 消费消息kafka 提供了两套 consumer API:The high-level consumer AP

24、I high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。该接口支持负载均衡,当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance,即重新负载所有的consumer。The low-level consumer API 使用Low Level Consumer (Simple Consumer)的主要原因是,用户

25、希望比Consumer Group更好的控制数据的消费,吞吐量可以达到更高,比如: 同一条消息读多次; 只读取某个Topic的部分Partition; 管理事务,从而确保每条消息被处理一次,且仅被处理一次。与Consumer Group相比,Low Level Consumer要求用户做大量的额外工作。 必须在应用程序中跟踪offset,从而确定下一条应该消费哪条消息(可以使用redis或者文件存储offset) 应用程序需要通过程序获知每个Partition的Leader是谁 必须处理Leader的变化 必须自己开发消费者的监控(可以定时查询redis 存储的偏移量来监控)使用Low Lev

26、el Consumer的一般流程如下 查找到一个“活着”的Broker,并且找出每个Partition的Leader 找出每个Partition的Follower 定义好请求,该请求应该能描述应用程序需要哪些数据 Fetch数据 识别Leader的变化,并对之作出必要的响应kafka 在 zookeeper 中的存储结构Kafka创建topicKafka创建topic命令很简单,一条命令足矣:bin/kafka-topics.sh -create -zookeeper localhost:2181 -replication-factor 3 -partitions 8 -topic itebl

27、og命令行部分 我们发起topic创建命令之后,Kafka会做一些基本的校验,比如是否同时指定了分区数、副本因子或是topic名字中是否含有非法字符等。 做完基本的校验之后,Kafka会从zookeeper的/brokers/ids下获取集群当前存活broker列表然后开始执行副本的分配工作。首先,分区副本的分配有以下2个目标: 尽可能地在各个broker之间均匀地分配副本; 如果分区的某个副本被分配到了一个broker,那么要尽可能地让该分区的其他副本均匀地分配到其他broker上;后台逻辑部分 所谓的后台逻辑其实是由Kafka的controller负责提供的。Kafka的controlle

28、r内部保存了很多信息,其中有一个分区状态机,用于记录topic各个分区的状态。这个状态机内部注册了一些zookeeper监听器。监听器一方面会更新controller的缓存信息(比如更新集群当前所有的topic列表以及更新新增topic的分区副本分配方案缓存等),另一方面就是创建对应的分区及其副本对象并为每个分区确定leader副本及ISR。有了Replication机制后,每个Partition可能有多个备份。某个Partition的Replica列表叫作AR(Assigned Replicas),AR中的第一个Replica即为“Preferred Replica”。创建一个新的Topic

29、或者给已有Topic增加Partition时,Kafka保证Preferred Replica被均匀分布到集群中的所有Broker上。理想情况下,Preferred Replica会被选为Leader。以上两点保证了所有Partition的Leader被均匀分布到了集群当中,这一点非常重要,因为所有的读写操作都由Leader完成,若Leader分布过于集中,会造成集群负载不均衡。但是,随着集群的运行,该平衡可能会因为Broker的宕机而被打破,该工具就是用来帮助恢复Leader分配的平衡。假设有0-7个Broker,Broker 1/2/4都被停止, Replica列表分布如下:Preferr

30、edReplica Leader Election Tool再启动Broker 1后, Broker 1 不会再成为Leader ,这时负载不均匀, Isr变化如下:再启动Broker 1后, 运行该工具后, Leader变化如下:该工具的设计目标与Preferred Replica Leader Election Tool有些类似,都旨在促进Kafka集群的负载均衡。不同的是,Preferred Replica Leader Election只能在Partition的AR范围内调整其Leader,使Leader分布均匀,而该工具还可以调整Partition的AR。 该工具有三种使用模式 (1

31、)generate模式,给定需要重新分配的Topic,自动生成reassign plan(并不执行) $KAFKA_HOME/bin/kafka-reassign-partitions.sh -zookeeper :2181 -topics-to-move-json-file /tmp/topics-to-move.json -broker-list 2,3 -generate (2)execute模式,根据指定的reassign plan重新分配Partition $KAFKA_HOME/bin/kafka-reassign-partitions.sh -zookeeper :2181 -r

32、eassignment-json-file /tmp/reassign-plan.json -execute (3)verify模式,验证重新分配Partition是否成功 $KAFKA_HOME/bin/kafka-reassign-partitions.sh -zookeeper :2181 -reassignment-json-file /tmp/reassign-plan.json verify 接下来用Topic Tool再次验证: bin/kafka-topics.sh -zookeeper :2181 -describe -topic iteblogKafka Reassign

33、Partitions Tool该工具旨在从整个集群的Broker上收集状态改变日志,并生成一个集中的格式化的日志以帮助诊断状态改变相关的故障。每个Broker都会将其收到的状态改变相关的的指令存于名为state-change.log的日志文件中。某些情况下,Partition的Leader election可能会出现问题,此时我们需要对整个集群的状态改变有个全局的了解从而诊断故障并解决问题。该工具将集群中相关的state-change.log日志按时间顺序合并,同时支持用户输入时间范围和目标Topic及Partition作为过滤条件,最终将格式化的结果输出。 用法 bin/kafka-run-

34、class.sh kafka.tools.StateChangeLogMerger -logs /opt/kafka_2.11-/logs/state-change.log -topic iteblog -partitions 0,1,2,3,4,5,6,7State Change Log Merge ToolKafka项目中的部署框架MRS的数据按objectid对最大分区ID进行取模push到不同的partition; MRS可以多线程或者多进程push。SAS、VTS和RNS必须使用不同的组ID(GroupId),这样才能拉取全量的同分数据; 比如SAS设置GroupId为0,VTS设置

35、GroupId为1 , RNS设置GroupId为2,在同一个GroupId中,每个partition需对应一个线程(或者一个进程),或者一个线程轮询读取多个partition(这种方法会影响吞吐量),如果多个线程消费同一个partition,那么有的线程永远读不到数据,会造成线程浪费。 使用librdkafka时,一个进程消费partition不宜过多,主要还是librdkafka给每个partition创建多个线程, partition越多创建的线程越多,比如,一个线程消费51个分区,创建的线程数有300个左右。 Producer的接口 CProducerKafka生产者 类方法:int init(const std:string& brokers, const std:string& groupid, const std:string& topicname, const int maxpartition, CMsgDeliveredCb* pMsgDelivered = NULL); templa

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论