ZMQ通信机制-BCC(强婕)_第1页
ZMQ通信机制-BCC(强婕)_第2页
ZMQ通信机制-BCC(强婕)_第3页
ZMQ通信机制-BCC(强婕)_第4页
ZMQ通信机制-BCC(强婕)_第5页
已阅读5页,还剩6页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、ZeroMQ使用说明1.ZeroMQ介绍ZMQ(0MQ、ZeroMQ,OMQ)套嵌入式的网络看起来像是一链接库,但工作起来更像是一个并发式的框架,介于应用层和传输层之间(按照TCP/IP划分)。它提供的套接字可以在多种协议中传输消息,如线程间、进程间、TCP、广播等。你可以使用套接字构建多对多的连接模式,如扇出、发布-订阅、任务分发、请求-应答等。一开始ZMQ代表零中间件、零延迟,同时,它又有了新的含义:零管理、零成本、零浪费。总的来说,零表示最小、最简,这是贯穿于该项目的哲理。传统的TCPSocket的连接是1对1的,可以认为“1个Socket=1个连接”,每一个线程独立的维护一个Socke

2、t。但是ZMQ摒弃了这种1对1的模式,ZMQ的Socket可以很轻松的实现1对N,N对1和N对N的连接模式,一个ZMQ的Socket可以自动的维护一组连接,用户无法操作这些连接,用户只能操作套接字,而不是连接本身,所以说ZMQ的世界里,连接是私有的。2.基础知识:套接字APIzmq建立在标准套接字api之上,用IMessageSocket类封装,其生命周期主要包含四个部分:创建和销毁套接字:IMessageSocket:IMessageSocket-zmq_socket(void*,inttype),IMessageSocket:close()-zmq_close()配置和读取套接字选项:zm

3、q_setsockopt(void*s,intoption,constvoid*optval,size_toptvallen),zmq_getsockopt(void*s,intoption,void*optval,size_t*optvallen)为套接字建立连接:IMessageSocket:Bind-zmq_bind(void*s,constchar*addr),IMessageSocket:Connect-zmq_connect(void*s,constchar*addr)发送和接收消息:zmq_send(void*s,constvoid*buf,size_tlen,intflags)

4、,zmq_recv(void*s,void*buf,size_tlen,intflags)3.ZMQ常用模式1)发布-订阅(PUB-SUB:1-N)PublisherPUBsueSiabscfiberCMmectbmdlCflllllflCtCMneGtSUBSubscriber2)请求-应答(REQ-REP)对于Request类型的socket,它是同步的,它一个时刻只能对一个连接进行操作,在一个连接上发送了数据之后,必须接着在这个连接上执行recv,也就是send与recv必须同时匹配出现。因此ZMQ在使用一个Socket处理请求的过程中,会阻塞同一个端口的其他请求。Response类型的

5、socket也是同步的,与Request的意思差不多,不过顺序是先recv再send。3)独立对模式(PAIR-PAIR):线程间l-t0-1队列的实现,采用了lockfree实现,所以速度很快。对于特定的线程PAIR是最好的选择。4)管道模式(PUSH-PULL):这种模式主要用于发布数据到由管道排列的节点上面,数据总是沿着管道流动。每个管道阶段连接了至少一个节点。push会负载均衡的将消息分发到pull端,worker可以随时自由加入。push端无法recv,pull无法send。PULLSXrtk信封机制(ROUTER-DEALER):Replyaddress4EnvelopeEmpty

6、messagepart*Oaia信封机制的根本作用是让ROUTER知道如何将消息递送给正确的应答目标。*从ROUTER中读取一条消息时,0MQ会包上一层信封,上面注明了消息的来源。Frame1Frame2Frarne3*向ROUTER写入一条消息时(包含信封),0MQ会将信封拆开,并将消息递送给相应的对象。作用:实现多线程的请求-应答。4.消息收发接口发送进程间请求消息:发送消息体:SccMqContext_T:instance()-GetContext()-SendMessage/GetContext()-SendSyncMessage组帧:BccDealerHander:OnDealMes

7、sage发送数据包:SccMqContext_T:instance()-GetMessagSender()-SendMessage()接收进程间请求消息:接收:SccMqContext_T:instance()-GetMessagQueue()-OnMessage()处理:SccMqContext_T:instance()-bccAsynHandler.onDealAsynMsg(pAppFrame)发送线程间请求消息:发送:SCCManager_T:instance()-GetDispatcher()-PushMsg接收处理线程间请求消息:接收:SccWorkTask:OnMessage处理

8、:session_op发送响应消息:发送:MsgDealWorker:SendMsg2Dispatcher转发:SccMqContext_T:instance()-GetContext()-SendAsynResponse接收并处理响应消息:分发:DealerTask:HandleRpnMsg接收:AsynReqWorker:RecvReponse处理:AsynMsgCallbackltf:OnRecvResponse()发送订阅消息:SccMqContext_T:instance()-GetContext()-Notify订阅消息处理:BccSubHandler:OnMessage()超时处

9、理:AsynMsgCallbackltf:OnTimeout()5.消息通信流程1)进程间通信消息发送SccMqContext:StartNotify()订阅消息SccMqContext_T:instance()-GetContext()-SendMessage/SendSyncMessage进程间通信请求消息ptrReq-、endMessageGetContext()-NotifyclientPairSocket-SendPtdINDINGASYNREQTiyEOUTSTRtimeoutSocket_接收超时任务请求,并添加至timeoutReqMapasynReqWorkerSocket_

10、发送超时任务请求,TimeoutApynRequestPUSH起任务定时器ptrPublisher-Notify、PAIRTAIR用serverPairSocket接收请求消息AsynReqWorker:RecvPairy器时./定起ptrReq_-pTimeoutTask_-HandlerTimeout()/时1同:internalSocket-SendUBAsynReqWorker:RegAsynRemoteService注册一个DEALER保存至到AsynReqMap、用AsynSocketInfo-pSocket转发请求;并保存至msgMap_遍历timeoutReqMap_判断超时后

11、由pushWorkSockyt发送超吋消息PUSHBINDING_ASYNPULLREQ_WORKER_TIMEOUT_STRtimeoutRecvSocket接收消息msgMap_中找sequence寸应的AsynMsgCallbackItf消息接收SUBConsumerRecvTask*pRecvTask_-pSocke接收转BccSubHandler:OnMessag处理(通过RegMsgConsume绑定IMessageConsume,bccSubHandler)-DEALERROUTERcallback-OnTimeoutpDealTask_-dealerSocket接收消息转Bcc

12、DealerHander:OnDealMessag处理通过ptrDealer_-RegMsgDeale绑定msgid,bccDea1erHand1erpQueue-OPULL)nMessage()g.pSender-SendMessage()发送至主线程PUSHbccAsynHandler.onDealAsynMsg在OnAsynMsgMap找消息处理函数pDealTask_-dealerSocket接收消息Task:异步/同步消息处理过程DealerRecvTDealerTask::HandleCallback.同步消息DealerTask:ReponseMsg异步消息DealerTask:

13、DealerMsgmsgDealerMapj找pRpnHandlermsgHandleMapj找pRpnHandler:起响应定时器reqMsgSocket发送给piDeabr-AplRpuL、timeoutRegSocket发送时)响应超时请求和PUSHPULL;rpnSocket_收ReponseTask:Rec处理*I.BiNDING_DPULLI异步:pRpnHandler-OnDealMessage!同步:pRpnHandlerpOnReponseMessage:同步消息Iu:IMessageDealen:SendAsynResponse响应超时请求TimeoutAsynReques

14、tPUSH)LE_TIMEOUTreponseTimeoutSocket接收响应超时任务请求,并添口至pnTimeoutMap停定时器从rpnTimeoutMapf删除2)线程间通信消息发送消息接收SccMqContext_T:instance()-GetContext()-SendAsynResponseSCCManager_T:instance()-GetDispatcher()-PushMsg定Workerid)线程间通信(指MsgDispatcher-OnMessage(20)用pPushSockeL获取待发送的响应消息clientPairSocket-SendPtr1PAIRrMgD

15、ispatchePAIRrOnMessaoe1rMsgDispatcher:RecvPair用serverPairSocket_接收并由pPushSocket转发出去ROUTERROUTERSCCManager_T:instance()-DEALER例如:MsgDealWorker的子类::SccWorkTask:OnMessage(调用:!session_op中消息对应的处理函数)GetDispatcher()SccMqContext:ReceiveZmqLoopDealer在MsgDispatcher的taskMap_中找到workerid对应MsgDealWorker,用pDealSoc

16、ket_接收消息,并调OnMessgeO处理用pDealSocket发送消息若需要回处理结果响应I处理完成后发送响应结果MIsgDealWorker:SendMsg2Dispatcher.回结果响应3)响应机制响应发送III:ptrDealer_-SendAsynResponse(sequence,msgbody)asynRpnSender2AsynRpnServerSocket_发送响应消息,c;/ubp.mq.dealer.asyn.rpn_*PUSHPULLptrDealer_-pDealTask_-asynRpnServerSocket_接收响应消息pDealTask-HandleRp

17、nMsg停响应定时器pDealTask-dealerSocket发送响应ROUTERDEALERptrReq_-workTask_-asynReqMap_AsynSocketInfo-pSocket_接收消息AsynReqWorker:RecvReponse在msgMap_中找到原始请求,并调用对应handler处理响应AsynMsgCallbackItf:OnRecvResponse(同步消息:AsynMsgRpnHandler:OnRecvResponse,把结果冋灌到promise)6.关键类SccMqContextclassSccMqContext:privateNonCopyable

18、,publicubp:platform:thread:WorkerRequestpublic:BccAsynMsgHandlerbccAsynHandler;/处理pMsgQueue里的消息(OnAsynMsgMap)private:IMessageContext*contextPtr_;/绑定上下文BccSubHandlerbccSubHandler_;消息订阅handler,SUB注册到contextPtr_-consumerList_(msgTopic_,pHandler_)BccSubHandlerbccSubStatusHandler_;状态订阅handler,SUBBccDeale

19、rHanderbccDealerHandler_;请求消息处理handler,注册到contextPtr_-ptrDealer_-MsgDealerMapmsgId,IMessageDealerHandler*IMessageQueue*pMsgQueue_;/PULL:接收队列IMessageSender*pMsgSender_;/PUSH:发送队列;IMessageContextlmpl:classMQ_IMPORT_EXPORTIMessageContextImpl:private:void*zmqContext_;/ZMQcontextstd:auto_ptrptrReq_;/std:

20、auto_ptrptrDealer_;/std:auto_ptrptrPublisher_;/订阅消息发布者,PUBMsgConsumerListconsumerList_;DefaultReponseHander*pDftRpnHandler_;响应消息处理handler注册至UcontextPtr_-ptrDealer_-msgHandleMap_msgId,IMessageReponseHandler*;IMessageContextlmpl-ptrReq_:std:auto_ptr-ptrDealer_:std:auto_ptr-ptrPublisher_:std:auto_ptr-c

21、onsumerList_:MsgConsumerList-pDftRpnHandler:DefaultReponseHander+SendSyncMessage(destSvcKey:std:string,message:【Message,timeout:ACE_UINT32,retry:ACE_UINT32):IMessage+SendMessage(destSvcKey:std:string,msgbody:IMessage,cb:AsynMsgCallbackltf,timeout:ACE_UINT32):ACE_INT32+SendAsynResponse(sequenee:ACE_U

22、INT64,msgbody:IMessage):ACE_INT32+RegMsgReponser(handler:IMessageReponseHandler,msgIds:char):ACE_INT32+RegMsgDealer(handler:IMessageDealerHandler,msgIds:char):ACE_INT32+Notify(header:IMessageHeader,msgBody:IMessage,topic:std:string,filter:std:string):ACE_INT32+RegMsgConsumer(destSvcKey:std:string,to

23、pic:std:string,cb:IMessageConsumerHandler,filter:std:string):ACE_INT32,IMessageAsynRequestIMessageReponseIMessageDealerIMessagePublisherIMessageAsynRequest:AsynReqWorkerIMessageAsynRequest-mqContext_:IMessageContextlmpl-clientPairSocket_:std:auto_ptr-serverPairSocket_:std:auto_ptr-workTask_:AsynReqW

24、orker-pTimeoutTask:RequestTimeoutTaskge+SendSyncMessage(destSvcKey:std:string,header:IMessageHeader,message:【Message,timeout:ACE_UINT32,tryTimes:ACE_UINT32):IMessa+SendMessage(promise:IMessagePromise,destSvcKey:std:string,message:IMessage,timeout:ACE_UINT32):ACE_INT32AsynReqWorker-context_:IMessageC

25、ontextImpl-pAsynReq_:IMessageAsynRequest-msgMap_:MsgCallbackMap-socketList_:std:vector-asynReqMap_:AsynReqMap-timeoutRecvSocket_:std:auto_ptr-asynReqWorkerSocket_:std:auto_ptr-asynpn_wait_timeout_:ACE_UINT64-maxidletime:ACEUINT64+Run(:void):ACE_INT32#FindMsgCallback(seq:ACE_UINT64):AsynMsgCallbackIt

26、f#RefreshPollSockets():zmq_pollitem_t#RecvReponse(:IMessageSocket):void#RecvPair(:IMessageSocket):void#RecvTimeout(:IMessageSocket):void#RegAsynRemoteService(endpoint:std:string,destSvcKey:std:string):AsynSocketInfo#DealIdleSocket():voidIMessageDealer:DealerTaskIMessageDealer-mqContext_:IMessageCont

27、extImpl-pMsgTimeoutTask_:TimeoutMsgTask-pDealTask_:DealerTask-msgDealerMap_:MsgDealerMap-msgHandleMap_:SyncMsgHandleMap-asynRpnSender2AsynRpnServerSocket:std:autoptr+SendAsynResponse(sequenee:ACE_UINT64/msgbody:IMessage):ACE_INT32+RegMsgDealer(handler:IMessageDealerHandler,msgIds:char):ACE_INT32+RegMsgReponser(handler:IMessageReponseHandler,msgIds:char):ACE_INT32DealerTask-pDealer_:IMessageDealer-dealerSocket_:std:auto_ptr-asynRpnServerSocket_:std:auto_ptr-dealTaskRegMsg2TimeoutSocket_:std:auto_ptr-timeout2DealTaskServerSocket_:std:auto_ptr-ptrRpn_:st

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论