四vip课程分布式框架专题32api源码分析rocketmq原理解析_第1页
四vip课程分布式框架专题32api源码分析rocketmq原理解析_第2页
四vip课程分布式框架专题32api源码分析rocketmq原理解析_第3页
四vip课程分布式框架专题32api源码分析rocketmq原理解析_第4页
四vip课程分布式框架专题32api源码分析rocketmq原理解析_第5页
已阅读5页,还剩51页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

前 第一章: 第二章 三:长轮 六:pull消息消 七 第三章: 三 四:HA&master HA异步 :Broker 第四章: 一:Namesrv功 二:Namesrv启动流程 三 一: Server 公用抽象 invokeSyncImpl同步调用实 invokeAsyncImpl异步调用实 invokeOnewayImpl单向请 mand接收请求处 mand接收响应处 二:NettyRemotingServerRemoting服务端实 前此文档是从学习mq源码过程中的笔记中整理出来的,由于时间及能力原因,理 mq有所帮助。mq是阿里基于开源思一款产品,代码托管于上,要想学好用mq请从 MQ获取最的文档、问题解答、原理介第一章一:Producer启动流Producerproducer信息(procduergroup)定时发送到,brokerAddrTable集合中列出的broker上去Producer发送消息只发送到master的broker机器,在通过broker的主从机制拷贝二 如何发送消Broker1的队列为queue0,queue1Broker3当然一般情况下的brokerbroker1_queue0,broker1_queue1,> queue都会通过sendWhichQueuequeue的轮询lastBrokerNamequeue发送失败,这次选择应该避开同一个queue发送失败后,重试几次retryTimesWhenSendFailed=-- -- //代表发送消息到达的 向指定broker的指定topic的指定queue发送消息发送失败(1)重试次数不到两次(2)3000(毫秒),换 发送普通消顺序消息发mq能够保证消息严格顺序,但是mqproducer保证顺序消息按顺序发送到同一个queue中,比如流程(1)下单(2)支付(3)支付成功,这三个消息需要根据特定规则将这个三个消息按顺序发送到一个queue一般消息是通过轮询所有队列发送的,顺序消息可以根据业务比如说订单号,messageQueueList分布式事物消一阶段,向broker发送一条prepared的消息,返回消息的offsetLocalTransactionExecuter二阶段,处理完本地事物务得到事物状态,根据offset查找到commitLog中的prepared消息,设置消息状态commitType或者rollbackType,让后将信息添加到commitLog中,其实二阶段生成了两条消息三:Broker落地消普通消息落Broker根据producer请求的 ode.SEND_MESSAGE选择对应的处理分布式事物消息落commitLog20位开始的八位记录是的消息在逻辑队列中的queueoffset,但是针对事物消息为preparedType和rollbackType的的是事物状态ConsumeQueuepreparedType和rollbackType的消息不会将请求分发到ConsumeQueue中去,即不处理,所以不会被消息transactionstabletableprepared消息记,通过TransactionStateService服务将消息加到事务状态的表格tranStateTable的文件中;如果是commitType和rollbackType消息,修改事物状态表格tranStateTable中的消息状态。记录TransactionRedoLog日志:记录了commitLogOffset,msgSize,preapredTransactionOffset,storeTimestamp。事物状态表是有MapedFileQueue将多个文件组成续的队列,它的单元是定长为24个字节的数据,tranStateTableOffset可以认为是事物状态消息的个数,索引偏移量,它的值是定时回查线程会定时扫描(默认每分钟)每个事务状态的表格文件,遍历事prepared状态的如果消息小于事务回查至少间隔时间(默认是一分钟)跳出终根据group随机选择一台producer建一个Runnable任务异步执行producer的回调接口,处理回调,在调endTransactionOneway向broker发送请求更新事物消息的最终状态事物消息的()tranRedoLog文件的消息DispatchRequest,更新TransactionState记录TransactionRedo删除事物状态表重建 将上面过滤出的prepared消息,添加到事物状态表文件redologcommitlog来做的第二拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息。 mq的一 启动流消费端Start向mq Factory本消费者启动端通信定时清理下线的borker定时持久化Consumer消费进度(广播到本地,集群到Broker)启动拉消息服务namesrv更新topic路由信息唤醒Rebalance服务线程二:消费端负载均消费端会通过RebalanceService线程,10topic下的所有队列负载消费端遍历自己的所有topic,依次调rebalanceByTopic选择一台broker获取基于group的所有消费端(有心跳向所有broker客户端信息)选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法获取队列集合Set<MessageQueue>mqSet将所有queue排好序类似于记录3)按照机房来配置队列获取指定机房的queue,置ProcessQueue的droped属性为true构建这个队列的ProcessQueue注:ProcessQueue正在被消费的队列,长轮询拉取到消息都会先到ProcessQueue的TreeMap<Long,的processqueue才能被执行消费rollback:将消费在msgTreeMapTemp中的消息,放回msgTreeMap重新消费mq的消息是由consumer端主动到broker拉取的consumerbroker发送拉消息PullMessageServiceLinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取消息2000,从内存中获取 //TODO这个值跟pullRequest.getNextOffset区构建 pull接口用到的消息中放入队列最大最小offset,方便应用来感知消息堆积度pullInterval0(拉消息间隔,如果为了降低拉取速度,0的值pullInterval0立刻在执行拉四 消息—并发消费消通过长轮询拉取到消息后会提交到消息服务ConsumeMessageConcurrentlyService,ConsumeMessageConcurrentlyServic的submitConsumeRequest方法构建ConsumeRequest消费端consumer构建一个消费消息任务ConsumeRequest消费一批消息的个数是可配置的consumeMessageBat axSize=1,默认批量个数为一个ConsumeRequest任务run给消息设置消费失败时候的retrytopic,当消息发送失败的时候发送到

ackIndex来标记成ackIndex设置为-秒钟以后重试重新消费消息,在走一次上面的消费流程。offsetTable的更新,后面有定时任务定时更新到broker上去五:push消费-顺序消费消构建一个线程池来接收消费请求ConsumeRequest构建一个单线程的本地线程,ConsumeRequest,用来执行,Map<brokernameSet<MessageQueue>>brokernamebrokermaster机器地址brokerNameSet<MessageQueue>发送broker请求锁定这些队列。brokerbrokerqueuequeue被某个BrokerMessageQueue,设置对应的正在处理队列ProccessQueuelocked属性为true没有锁定设置为false通过长轮询拉取到消息后会提交到消息服务ConsumeMessageOrderlyService,交到线程池。ConsumeRequest是由ProcessQueue和Messagequeue组成。messagequeueconsumer内,同一个队列串行truelock放到本地线程稍后锁定在消费。如果locktrue且没有过期,开始消费消息计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将processqueue,proccessqueue获取批次消息,processqueue.takeMessags(batchSize从msgTreeMap mit方新到broker上去六:pull消息消列七 利用DefaultMQPushConsumerImpl 获取ProcessQueueTable<MessageQueue,ProcessQueue>的keyset的messagequeue去获取offsetTable中的messagequeue的值,在update的时候如果没有对应的Messagequeue会构建,但是也会rebalance的时候将没有分配到的messagequeue删除offsetbrokerUnregiser客户端 第三章一:brker的启BrokernamesrvTopic在broker文件上的json格式 }Namesrv接收Broker的topic信息,namesrv只存内存,但是broker有任务定时推 Mq通过ConfigMananger来管理配置加载以及持久{"%RETRY%group_name":{ //100读权限,10写权 6是110读写权 }}}加载消费进度偏移量{"%RETRYgroup_name@group_name":{0:0重试队列消费group_name":{//队列 //队列 }}}{" "groupName":"group_name",}}}二:消mq的消息的是由consumequeue和commitLog配合完成consumequeue消息的逻辑队列,相当于字典的 文件commitLog上的位置,文件地址:${user.home}\store\consumequeue\${ consumequeue中单元是一个20字节定长的数据,是顺序写顺序读commitLogOffset是指这条消息在commitLog成一个ConsumeQueue,TopicAQueue=1组成一个另一个ConsumeQueue. //消息体BODY 这个标志 //QUEUEOFFSETconsumequeue的偏移量,可以代表这个队列中消息的个数,要通过这个值查找到consumequeue中数据,QUEUEOFFSET*20才是偏移地址+8+4+8+8 +8+8+8//RECONSUMETIMES消息被某个订阅组重新消费了几次(立计数),topic名字为%retry%groupNamequeueId=0的队列 //4个字节存放消息体大小值,后bodylength大小 //topictopic + //2个字节(short)存放属性值大小,后存放MapedFilePageCache文件封装,操作物理文件在内存中的映射以及将内存数据持久化到物理文件中,代码中写死了要求os系统的页大小为4k,消息刷盘根据参数FileChannelfileChannel=newRandomAccessFile(file,fileSize:filename:filename是消息在此文件的中初始偏移量,排好序后组成了续的消息队getLastMapedFile获取,此函数如果集合中一个也没有创建一个,如果最后一个写满了也创一个MapedFile文件地址,通过预分配服务AllocateMapedFileService异步预创建下一个MapedFile1G的文mitWhereDefaultMessageStore消息层实<1>从mapedFileQueue获取的映射文 <5>三LoadLoadcommit遍历出${user.homestore\${commitlog}commitLog文件,按文件名(文件名就是文件的初始偏移量MapedFile对象,在MapedFileQueue中用集合list把这些MapedFile文件组成一个逻辑上连续的队列Loadconsume遍历${user.homestore\consumequeue下的所有文件夹(topic就是一个文件}\queueId,文件来构建ConsueQueue对象DefaultMessageStore中结构Map<topic,Map<queueId,ConsumequeueMapedFileQueuemapedFile组成一个逻辑上连续的队加载${user.home}\store\checkpoint这个文件了3个long类型的值来记录模型最终一致的时间点,这个3个long的值为physicMsgTimestamp为commitLog最后刷盘的时间indexMsgTimestamp为索引最终刷盘时间abort,linuxviabort文件存在,系统认为是异常恢为什么说先正常恢复异常恢复在哪呢?broker是异常启动时候异常恢复commitLog时会重新构建请到DispatessageService服务,来重新生成ConsumeQueue数据,索引以及事物消息的redolog,确,现在文件写到哪了(wrotePositionFlush到了什么位置(committedPosition)?ConsumeQueuemapedFiles集合中,从倒数第三个文件开始恢复(为什元是20字节的定长数据,所以是依次分别取了Offsetlong类型了commitLog的数据偏移量 int类型了在commitLog上消息大小tagcodetag的哈希值目前 mq判断的consumequeue数据是否有效的方式为判断offset>=0&&size>0设置 所在文件 的 commitedPosition消息大小值msgSize大于 文件错误恢复结束等于0 当${user.home}\store\abort文件存在,代表异常恢复${user.homestore\checkpoint获取最终一致的时间点说明checkpoint在此mapedfile文件中从checkpoint所在mapedFile开始恢复数据,它的整体过程跟正常恢复(1)消息后派送到分发消息服务Dispat 建ConsumeQueue以及索引 四:HA&masterbroker启动的时候BrokerControllerslavemaster地址更新,没有配置所有broker会想namesrv,从namesrv获取haServerAddr,然后更新到HA当HA 的Mas ddress不为空的时(因为brokermaster和slave都构建了HA 会主动连接master获取SocketChannelMasterSlave请求的端口,默认为服务端口+1接收slave上传的offsetlong类型intposthis.byteBufferRead.position()(this.byteBufferRead.position(8)//没有理解意图longreadOffset=this.byteBufferRead.getLong(pos-8); 同步topic的配置信息 Master通过AcceptSocketServiceslave的连接,每个masterslave连接都会构建一个HAConnectionmasterslave部署结构的会有多个HAConnection实例,MasterHAConnectionslaveWriteSocketService对象和Slave反馈服务线程对象ReadSocketServiceslaveRequestOffsetslave同步完数据都会向masterack表示下次同步的数据的offset。slaveslaveRequestOffset=0mastercommitLog文件开始同步(如果要把master上的所有commitLog文件同步到slave的话,把masterOffset值赋为minOffset)socket<PhyOffset><BodySize>Bodyslave通过HA 通知前端线程,如果是同步的话通知是否成功Slave通过 masterslavecommitlog的数据,masterslave写入数据的格式是Slave初始化DefaultMessageStore时候会构建ReputMessageService服务线程并在启动服务的start方法中被启动HA同步,当msg写入master的commitlog文件后,判断maser的角色如果是同步双SYNC_MASTER,等待master同步到slave在返回结果HA异步五:刷盘策,六:索引服索引结索引文件由索引文件头IndexHeader,Slot: 8位long类型,索引文件构建第一个索引的消息落在broker的时间 8位long类型,索引文件构建最后一个索引消息落broker时间 8位long类型,索引文件构建第一个索引的消息commitLog偏移量 8位long类型,索引文件构建最后一个索引消息commitLog偏移量 4位int类型,构建索引占用的槽位数(这个值貌似没有具体作用) 4位int类型,索引文件中构建的索引个数计算消息的对应的slotPos=Math.abs(keyHash)%hashSlotNum消息在IndexFile中的偏移量absSlotPos=IndexHeader.INDEX_HEADER_SIZE+slotPos*4位int值,的是key的hash8位long值的是消息在commitlog的物理偏移量4位int 4位int :索引服IndexService文件indexFile的路径queryOffsettopickey时间跨度来查询消息:构建索引服分发消息索引服务将消息位置分发到ConsumeQueue中后,加入IndexService构建索引key(topic+"#"+key)计算槽位在indexfile的具体偏移量位置:Broker (comsumer,producer)之间的心跳 ChannelInfo的时间戳,来表 , HouseKeeService线程定时清除不活动的连接 :Broker与namesrv之间的(1)broker的topic信构建或者更新第四章接收broker的请求broker路由信息(包括master和slave) 的请求根据某个topic获取所有到broker的路由信息二 启动流程三它的多个slave id表示是master还是slaveid=0为 大于0为, BrokerLiveInfo代表一个活的broker由最后更新时间,一个channel,数据版本和Ha地址组成四:Namesrv与broker间的心也是topic信息到namesrvNamesrvControllerRouteInfoMangerscanNotActiveBroker方法来定时清理不活动的broker(默认两分钟没有向namesrv发送心跳更新BrokerLiveInfo时间戳的BrokerLiveInfo的时间戳,如果过期关闭channel连接第五Remoting通信层一: Server 公用抽象过opaque从缓存查找对应的ResponseFuture对象invokeSyncImpl同步调用应过来,就用不到缓存中的ResponseFuturel)invokeAsyncImpl异步调用限默认2048个发送成功 dRequestOK(false),信号量通过once释放, 单向请mand接收请求根据请求code查找对应的处理器线程池pair,没有用默认的 若不是o

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论