消息中间件原理与实现_第1页
消息中间件原理与实现_第2页
消息中间件原理与实现_第3页
消息中间件原理与实现_第4页
消息中间件原理与实现_第5页
已阅读5页,还剩11页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、消息中间件原理与实现10748206桂勇哲10748210 胡栋梁10712059 穆斌摘要: 现今,越来越多的企业面临着各种各样的数据集成和系统整合,CORBA、DCOM、RMI等RPC中间件技术也应运而生,但由于采用RPC同 步处理技术,在性能、健壮性、可扩展性上都存在着诸多缺点。而基于消息的异步处理模型采用非阻塞的调用特性,发送者将消息发送给消息服务器,消息服务器在合适的时候再将消息转发给接收者;发送和接收是异步的,发送者无需等待,二者的生命周期也可以不必相同,而且发送者可以将消息间接传给多个接收者,大大提 高了程序的性能、可扩展性及健壮性,这使得异步处理模型在分布式应用上比起同步处理模

2、型更具有吸引力。 本文首先介绍了消息中间件的原理,然后实现消息中间件的一些最重要的功能, 并说明了实现方法,以及相应功能的应用,最后介绍消息中间件还可以添加哪些重要性质,以更好的进行消息服务,保证消息的一致异步有效的技术。关键字:消息中间件,实现,点对点,发布/订阅,持久消息一、中间件简介1.1 中间件的定义中间件(middleware)是基础软件的一大类,属于可复用的软件范畴。中间件在操作系统软件,网络和数据库之上,应用软件之下,总的作用是为处于自己上层的应用软件提供运行于开发的环境,帮助用户灵活、高效的开发和集成复杂的应用软件。中间件是位于平台(硬件和操作系统)和应用之间的通用服务,这些服

3、务具有标准的程序接口和协议。针对不同的操作系统和硬件平台,它们可以有符合接口和协议规范的多种实现。 也许很难给中间件一个严格的定义,但中间件应具有如下的一些特点: 满足大量应用的需要 运行于多种硬件和OS平台 支持分布计算,提供跨网络、硬件和OS平台的透明性的应用或服务的交互 支持标准的协议 支持标准的接口 IDC对中间件的定义为:中间件是一种独立的系统软件或服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源,中间件定位于客户机服务器的操作系统之上,管理计算机资源和网络通信。因而中间件是指一类软件,是基于分布式处理的软件,最突出的特点是其网络通信功能。也可认为中间件是位于平台和应用之

4、间的通用服务,这些服务具有标准的程序接口和协议。针对不同的操作系统和硬件平台,可以有符合接口和协议的多种实现。1.2 中间件的分类按照IDC的分类方法,中间件可分为六类:1) 终端仿真/屏幕转换2) 数据访问中间件(UDA)3) 远程过程调用中间件(RPC)4) 消息中间件(MOM)5) 交易中间件(TPM)6) 对象中间件在实际应用中,一般将中间件分为两大类:一类是底层 中间件,用于支撑单个应用系统或解决一类问题,包括交易中间件、应用服务器、消息中间件、数据访问中间件等;另一类是高层中间件,更多的用于系统整合,包 括企业应用集成中间件、工作流中间件、门户中间件等,他们通常会与多个应用系统打交

5、道,在系统中层次较高,并大多基于前一类的底层中间件运行。二、面向消息的中间件2.1 消息中间件的功能 面向消息的中间件:MOM指的是利用高效可靠的消息传递机制进行平台无关的数据交 流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可在分布环境下扩展进程间的通信,并支持多通讯协议、语言、应用程序、硬 件和软件平台。目前流行的MOM中间件产品有IBM的MQSeries、BEA的MessageQ等。主要特点: 通讯程序可在不同的时间运行 程序不在网络上直接相互通话,而是间接地将消息放入消息队列,因为程序间没有直接的联系。所以它们不必同时运行。消息放入适当的队列时,目标程序甚

6、至根本不需要正在运行;即使目标程序在运行,也不意味着要立即处理该消息。 对应用程序的结构没有约束 在复杂的应用场合中,通讯程序之间不仅可以是一对一的关系,还可以进行一对多和多对一方式,甚至是上述多种方式的组合。多种通讯方式的构造并没有增加应用程序的复杂性。 程序将消息放入消息队列或从消息队列中取出消息来进行通讯,与此关联的全部活动,比如维护消息队列、维护程序和队列之间的关系、处理网络的重新启动和在网络中移动消息等是MOM的任务,程序不直接与其它程序通话,并且它们不涉及网络通讯的复杂性2.3 消息中间件的传递模型消息中间件一般有两种传递模型:点对点模型(PTP)和发布-订阅模型(Pub/Sub)

7、。1. 点对点模型(PTP)点对点模型用于消息生产者和消息消费者之间点到点的通信。消息生产者将消息发动到由某个名字标识的特定消费者。这个名字实际上对应于消息服务中的一个队列(Queue),在消息传动给消费者之前它被存储在这个队列中。队列可以是持久的,以保证在消息服务出现故障时仍然能够传递消息。2发布-订阅模型(Pub/Sub)发布-订阅模型用称为主题(topic)的内容分层结构代替了PTP模型中的惟一目的地,发送应用程序发布自己的消息,指出消息描述的是有关分层结构中的一个主题的信息。希望接收这些消息的应用程序订阅了这个主题。订阅包含子主题的分层结构中的主题的订阅者可以接收该主题和其子主题发表的

8、所有消息。 下图展示了发布和订阅模型:多个应用程序可以就一个主题发布和订阅消息,而应用程序对其他人仍然是匿名的。MOM 起着代理(broker)的作用,将一个主题已发表的消息路由给该主题的所有订阅者。2.3 消息中间件产品从上个世纪90年代初,随着不同厂商消息中间件大量上市,消息中间件技术得到了长足的发展。目前,IBM和BEA的中间件产品在银行、证券、电信等高端行业,以及IT等行业中得到广泛应用。IBM凭借其在1999年推出的应用服务器WebSphere,扎根金融、证券等行业,在超大型以及系统整合型应用方面优势突出;BEA则是专门从事中间件开发的公司,它的应用服务器WebLogic在美国市场占

9、有率超过60,在国内电信及证券行业占据主要地位;Sun、Oracle、Sybase和Borland等厂商也都有自己的应用服务器;近年来,以金蝶、东方通等公司为代表的国产中间件产品也发展迅速。由于没有统一的规范和标准,基于消息中间件的应用不可移植,不同的消息中间件也不能互操作,这大大阻碍了消息中间件的发展。Java Message Service(JMS, Java消息服务)是SUN及其伙伴公司提出的旨在统一各种消息中间件系统接口的规范。它定义了一套通用的接口和相关语义,提供了诸如持久、验证和事务的消息服务,它最主要的目的是允许Java应用程序访问现有的消息中间件。JMS规范没有指定在消息节点间

10、所使用的通讯底层协议,来保证应用开发人员不用与其细节打交道,一个特定的JMS实现可能提供基于TCP/IP、HTTP、UDP或者其它的协议。目前许多厂商采用并实现了JMS API,现在,JMS产品能够为企业提供一套完整的消息传递功能,下面是一些比较流行的JMS商业软件和开源产品。1IBM MQSeriesIBM MQ系列产品提供的服务使得应用程序可以使用消息队列进行相互交流,通过一系列基于Java的API,提供了MQSeries在Java中应用开发的方法。它支持点到点和发布/订阅两种消息模式,在基本消息服务的基础上增加了结构化消息类,通过工作单元提供数据整合等内容。2WebLogicWebLog

11、ic是BEA公司实现的基于工业标准的J2EE应用服务器,支持大多数企业级JavaAPI,它完全兼容JMS规范,支持点到点和发布/订阅消息模式,它具有以下一些特点:1) 通过使用管理控制台设置JMS配置信息;2) 支持消息的多点广播;3) 支持持久消息存储的文件和数据库;4) 支持XML消息,动态创建持久队列和主题。3SonicMQSonicMQ是Progress公司实现的JMS产品。除了提供基本的消息驱动服务之外,SonicMQ也提供了很多额外的企业级应用开发工具包,它具有以下一些基本特征:1) 提供JMS规范的完全实现,支持点到点消息模式和发布/订阅消息模式;2) 支持层次安全管理;3) 确

12、保消息在Internet上的持久发送;4) 动态路由构架(DRA)使企业能够通过单个消息服务器动态的交换消息;5) 支持消息服务器的集群。4Active MQActive MQ是一个基于Apcache 2.0 licenced发布,开放源码的JMS产品。其特点为:1) 提供点到点消息模式和发布/订阅消息模式;2) 支持JBoss、Geronimo等开源应用服务器,支持Spring框架的消息驱动;3) 新增了一个P2P传输层,可以用于创建可靠的P2P JMS网络连接;4) 拥有消息持久化、事务、集群支持等JMS基础设施服务。5OpenJMSOpenJMS是一个开源的JMS规范的实现,它包含以下几

13、个特征:1) 它支持点到点模型和发布/订阅模型;2) 支持同步与异步消息发送;3) 可视化管理界面,支持Applet;4) 能够与Jakarta Tomcat这样的Servlet容器结合;5) 支持RMI、TCP、HTTP与SSL协议。三、面向消息的中间件的实现3.1基本体系结构主要组件有: MS servers(用于消息通信),客户端(用于发送消息和接收消息), 后备存储(用于持久消息存储,基于文件或者数据库).3.2MYMQ特性1. 消息通信模型MYMQ 支持两种消息通信模型:点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。除了下列不同之外,这两种消

14、息通信模型非常地相似:PTP 模型规定了一个消息只能有一个接收者;Pub/Sub 模型允许一个消息可以有多个接收者。2. 消息组成消息传递系统的中心就是消息。一条 Message 分为三个组成部分: 消息头(header):消息头包含了许多字段,为消息确定路由。属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。消息的主体(body)包含要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。 3. 消息确认模式采取自动确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收。4. 消息的一次性传送3.3MYMQ的类3.3.1MYM

15、Q的message bool _isPersistent; bool _toDelete; bool _isMessage; int _id; std:string _fromName; Destination _dest; std:string _content; 在mymq中,发送的每个消息均为message类型,对于不同目的发送的message消息,需要指定该消息的目的地,以及相关属性。事实上,我们有一个类, Destination 用来表示消息的目的地,实际上,它指向了在server上的一个队列,或者一个主题。表明了该消息将被发送到那个队列中。isPersistent用来说明,该消息需

16、要持久保存。即当服务器意外出错时候,消息仍然能够得到恢复,需要时刻保持该消息保存在了persistent message storage当中。message中的isCommand用来说明,这个消息是一个信息,还是一个控制指令,当为控制指令时,服务器会根据消息的不同内容,进行不同的操作。id是一个消息的标识,用来保证同一个消息只被接收一次。fromName 说明消息是从什么地方发送来的,同样用来保证同一个消息的一次接收。3.3.2MYMQ的queue,topicMYMQ的queue ,实际上是一个对stl的queue的封装,对应每一个queue,有一个 Destination目的地对应的名字,当

17、server收到一个消息后,找到对应的queue, 将消息保存在queue当中。当收到接收消息的命令时,则到相应的队列中去取出消息 。在这些简单的过程中,需要时刻注意的问题有, 需要根据消息的类型判断,是否需要进行持久存储,即保持消息在硬盘当中。 根据收到的消息的源fromName 以及消息的id值, 从自己的数据库中查找是否有接收到过该消息, 如果没有,则处理该消息, 否则则要回应一个错误回复。当消息的消费者需要从队列中取出消息的时候, 还应该注意的是,如果消息的队列中目前没有消息,那么也应该给以消息消费者相应的正确答复。 这里需要区别的是, 根据消息的消费者的同步或异步请求,会有不同的结果

18、 。MYMQ的topic, 不光要维护一个消息的队列,而且还需要维护更多的信息 。因为有不只一个消息的消费者要消费该主题中的信息,所以, topic类型中还需要维护多个连接。当收到一个消息的时候, 向所有的消息消费者发送该消息。对应每个消息消费者, server维护了一个 connection, 用来表示彼此之间的连接。当connection,因为意外断开时候, connection能够检测到网络的错误,并进行相应的处理 。此外,由于topic还需要支持持久订阅的功能,所以还需要维护另外的一些messageStorage,以保证消息的传送, 在后边还会讲到。3.3.3MYMQ的connecti

19、onMYMQ中的connection 是对 socket的一个封装。在connection中,记录了该连接,以及该连接对应的消息消费者的名字。connection会在检测到连接出现错误的时候,作出处理。删除当前连接,或者维护当前连接等待网络回复正常,再继续用来发送信息。当我们需要进行持久订阅时,使用的是durableConnection , 将一直维护连接,并保存发送失败的消息,等待网络恢复。 而使用connection时,如果网络出现错误,则认为该消费者已经离开,会删除该connection3.3.4MYMQ的serverserver中维护了当前服务器的名字, 并且维护了所有的queue,

20、topic。当然,server还需要维护一个表, 记录从不同的producer那里接收到的信息,以判断消息是否在之前被接收过。最后,增加的Master Slave 特性, 也需要server进行些信息记录, 当有备份存在时候, 要先对消息进行备份,保持两者间的一致性。最后, server里维护了对message persistent storage 的引用,用来对消息进行持久存储。3.3.5MYMQ的producer, consumer对消息的生产者和消费者提供了两个类,以简化操作, 对producer, 提供了一个sendMessage 方法, 当对producer初始化后, 调用sendM

21、essage 即可以发送消息到消息指定的目的地。而对consumer 提供了, receive , subscribe, durablesubscribe的方法, 分别支持, 点点通信, 主题订阅模式, 以及持久订阅的功能。3.4MYMQ的特性实现3.4.1点对点的通讯点点通信, 是通过queue 来实现的。当producer发送消息到某个queue, consumer从相应的queue中recieve该消息的时候, 则实现了两者之间的消息通讯。producer 的代码:#include "producer.h" #include <iostream> int

22、main() Producer p("","7777"); /指明服务器的ip, 端口 p.setName("producer"); /producer的名字 Destination dest; dest.setName("queue"); /指明消息发送到哪个队列 Message msg; msg.setDestination(dest); for(int i=0;i<100;i+) /发送100个消息 char tmp256; sprintf(tmp,"the %dth mes

23、sage for queue",i); msg.setContent(tmp); std:cout<<"send msg is "<<msg.getContent()<<std:endl; p.sendMessage(msg); consumer的代码:#include "destination.h" #include "message.h" #include "consumer.h" #include <iostream> int main() Consu

24、mer p("","7777"); /指明服务器的地址 Destination dest; dest.setName("queue"); for(int i=0;i<100;i+) /收取100 个消息 Message recv=p.recieveMessage(dest); std:cout<<"recv msg is "<<recv.getContent()<<std:endl; return 0; 3.4.2主题/订阅在每个消息的消费者对某个主题进行

25、订阅的时候, 就会在该主题中增加一个新的connection. 在topic的类中, 有一个 std:vector<Connection> _connections; 保存了该topic对应的所有connection 。 当topic中有消息的时候, 就将该详细发送到所有的connection。而当有一个消费者对该主题进行订阅的时候,会调用consumer的subscribe方法, server会在connections里查找, 看是否在之前该消费者进行过订阅。 这是通过该消费者的名字进行查找的 。如果有, 则修改原来的connection, 因为名字相同, 说明该消费者之前可能出

26、现了错误 ,现在进行了新的连接。如果没有找到相同的, 说明这是一个新的消费者, 增加一个新连接。当connection出现错误的时候, 向该connection发送消息会失败, 当出现这样的情况时, server认为该消费者出现了故障,或者网络出现了问题。 会删除该connection。3.4.3持久订阅在某些时候,我们需要持久订阅。 持久订阅的含义是, 当我们的消费者出现故障,或者是暂时离开的情况下, 如果订阅的主题中有了新的消息不能简单的丢弃, 而是应该在消费者重新恢复正常工作的时候,可以再次拿到这些消息。为了实现持久订阅,我们的consumer首先要调用durableSubscribe方

27、法,发送命令给server。server会对应的为该消费者建立一个 durableConnection 的连接。于connection不同的地方是, durableConnection自己本身也维护了一个 message Store , 当durableConnection发送某个消息失败的情况下, 不像connection丢弃该消息, 而是将该消息放到messageStore中,从而当消费者恢复的时候, 仍然可以接收到之前没有收到的消息 。3.4.4connection的维护当connection建立之后, 由于断电或网络错误, 该连接有可能中断。因此,server需要维护它的连接是可以使用

28、的。connection会发送ping测试的指令,等待接收回复。如果在一定时间内没有收到客户端的回复,则说明该连接目前出现了错误,对该连接进行修改,标识该连接目前不可用。当我们进行对主题的订阅时,要维持一个时间很久的连接。此时,有可能server出现了故障而坏掉了, 当server重新启动后, consumer并不知道之前的连接也出现了错误。因此, 这就需要consumer每隔一端时间对server进行一此ping检测连接。如果连接出现了问题, 则重新连接server。3.4.5消息不被丢失的保证在任何情况下, 消息都不应该丢失, 这是设计的目的。因此,消息只有在得到了最终确认后, 才会被删除

29、掉。当发送消息到目的地后, 目的地处理消息,发回确认信息。只有收到确认信息后,确认该消息已经成功发送,才会删除该消息,保证了消息不会因为断电等意外, 没有得到处理就被删除了。我们可以将这理解为一此事务,最后对msg的del相当于事务的commit。对于持久订阅, 在最后得到确认之后, 才会从磁盘中删除该文件, 保证了消息不被丢失。但是这就引入了一个问题:当消息已经被收到处理之后, 如果确认消息到达前, server重新启动, 那么因为没有收到确认信息, 之前的消息将被重新发送, 因此, 就需要有机制保证消息的一次性。3.4.6消息的一次性保证在server和consumer中,我们都维护了一个

30、结构, 记录了从某个源发送来的消息的最大id值。如果, 从某个源,发送来的消息的id值小于我们记录的最大id值, 那么说明我们已经在之前接收过消息, 因此,不会重复处理该消息。于consumer不同的是, 当server收到producer发送的消息时,会修改msg的内容, 将msg的fromName修改为server的 name,将msg的id,修改为server目前的id值。 server和producer中都会自动维护一个id值, 用来唯一的标识每一个msg。3.4.7消息的持久存储对于设置了 persistent的 msg,server在收到消息的时候, 将消息放到内存的队列当中,同时

31、, 将消息保存到磁盘上。 当发送消息的时候, 会从磁盘文件上将该msg删除。磁盘记录的文件, 通过berkeley db实现。 对每一个消息, 在文件中记录了一个 id, msg 的对。因为我们保证了对每一个msg, id是唯一的 :当server收到每一个msg时,重新设置该msg, 给该msg一个唯一的id标识。 当我们server向consumer发送该消息的时候, 就会通过发送消息的id值, 从磁盘文件中查找到该msg, 并删除它。 磁盘存储使用了 b-tree 建立索引, 可以保证添加删除的高效率。3.4.8备份机制由于所有的消息都需要通过server,因此当server出现故障的时

32、候, 消息的发送就无法进行了。因此,增加了Master Slave机制。 对一个message service服务, 同时维护两个服务器, 一个为主,进行消息的接收处理发送。另一个为备份服务器,当主服务器出现故障时, 客户端直接连接到备份服务器,就可以继续进行工作, 而不需要等待主服务器恢复,从而使得服务的性能得到了保证。备份服务器会得到主服务器所得到的所有消息。主服务器接收到消息, 只有当该消息转发给备份服务器,并得到确认后, 才会回复消息, 给producer以回复。当主服务器向消费者发送了一个消息, 并打算删除该消息时,会通知备份服务器删除该消息, 保持两者间的一致。当主服务出现故障时,

33、 producer和consumer会根据自己记录的对应备份服务器地址,转而去连接备份服务器, 备份服务器替代主服务器进行服务。 bool _isBackup; std:string _primaryName; bool _hasBackup; char _backupIp16; char _backupPort5; 服务器维护了isBackup, 说明自身是否是backup服务器 , hasBackup 说明是否有备份服务器,如果有备份, 则会先发送消息给备份服务器 。3.4.9cluster 机制当有多个服务器计划共同工作的时候,在server中设置它的next server。当serve

34、r收到消息, 并处理消息后, 会将该消息发送给 它的next server。比如我们有 A B C 三个服务器共同工作, A 的next server 是B, B的 next server 是C当我们的 producer发送了消息到 A , consumer从C 取消息时, A处理过消息后, 会发送给 B, B会发送给 C,从而consumer可以得到该消息。四、测试4.1点对点的通讯首先运行server, ./server 7777 端口为7777 ./producer 发送100 个消息到server./state 查看目前server的状态为:the oldest msg from pr

35、oducer is 99 Queue size is 1 100 messages in queue0 queue : 说明消息已经发送到了服务器。./consumer 消费这100个消息。4.2主题订阅./server 7777./subscribe1./subscribe2/建立两个对主题的订阅./publish 0/发送一个消息, 可以看到该消息被两个订阅者接收到了 4.3主题持久订阅./server 7777./durablesubscribe1./durablesubscribe2/建立两个持久连接./publish 0 发送一个消息, 可以看到两个订阅多收到了该消息kill durablesubscribe2/ 使一个订阅离开./publish 1发送一个消息, 可以看到 durablesubscribe 1 收到了该消息 重新启动durablesubscribe2可以看到durablesubscribe2也接收到了该消息4.4消息的持久存储./server 7777./persistentProducer 发送100个需要持久存储的消息./state可以看到server中目前有这100个消息kill server./server 7777restart server 可以看到server中仍然有这100 个消息./consumer消费这100个

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论