Java 远程通讯_MINA_第1页
Java 远程通讯_MINA_第2页
Java 远程通讯_MINA_第3页
Java 远程通讯_MINA_第4页
Java 远程通讯_MINA_第5页
已阅读5页,还剩33页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、第一章 MINA前述1.1线程模型MINA线程模型采用了Reactors in threads模型,即Main Reactor + Sub Reactors的模式。由main reactor处理连接相关的任务:accept、connect等,当连接处理完毕并建立一个socket连接(称之为session)后,给每个session分配一个sub reactor,之后该session的所有IO、业务逻辑处理均交给了该sub reactor。每个reactor均是一个线程,sub reactor中只靠内核调度,没有任何通信且互不打扰。现在来讲讲我对线程模型演进的一些理解:Thread per Con

2、nection: 在没有nio之前,这是传统的java网络编程方案所采用的线程模型。即有一个主循环,socket.accept阻塞等待,当建立连接后,创建新的线程/从线程池中取一个,把该socket连接交由新线程全权处理。这种方案优缺点都很明显,优点即实现简单,缺点则是方案的伸缩性受到线程数的限制。Reactor in Single Thread: 有了nio后,可以采用IO多路复用机制了。我们抽取出一个单线程版的reactor模型,时序图见下文,该方案只有一个线程,所有的socket连接均注册在了该reactor上,由一个线程全权负责所有的任务。它实现简单,且不受线程数的

3、限制。这种方案受限于使用场景,仅适合于IO密集的应用,不太适合CPU密集的应用,且适合于CPU资源紧张的应用上。Reactor + Thread Pool: 方案2由于受限于使用场景,但为了可以更充分的使用CPU资源,抽取出一个逻辑处理线程池。reactor仅负责IO任务,线程池负责所有其它逻辑的处理。虽然该方案可以充分利用CPU资源,但是这个方案多了进出thread pool的两次上下文切换。Reactors in threads: 基于方案3缺点的考虑,将reactor分成两个部分。main reactor负责连接任务(accept、connect等),sub reac

4、tor负责IO、逻辑任务,即mina与netty的线程模型。该方案适应性十分强,可以调整sub reactor的数量适应CPU资源紧张的应用;同时CPU密集型任务时,又可以在业务处理逻辑中将任务交由线程池处理,如方案5。该方案有一个不太明显的缺点,即session没有分优先级,所有session平等对待均分到所有的线程中,这样可能会导致优先级低耗资源的session堵塞高优先级的session,但似乎netty与mina并没有针对这个做优化。Reactors in threads + Threads pool: 这也是我所在公司应用框架采用的模型,可以更为灵活的适应所有的应用场景:调

5、整reactor数量、调整thread pool大小等。1.2 任务粒度任务从逻辑上我给它分为成三种类型:连接相关的任务(bind、connect等)、写任务(write、flush)、调度任务(延迟、定时等),读任务则由selector加循环时间控制了。mina任务调度的趋势是逐渐变小,从session级别的调度 -> 类型级别任务的调度 -> 任务的调度。mina-1.1.7: SocketIoProcessor$Worker.runmina-2.0.4: AbstractPollingIoProcessor$Processor.runmina-3.0.0.

6、M3-SNAPSHOT: AbstractNioScessWritemina1、2的任务调度粒度为session。mina会将有IO任务的的session写入队列中,当循环执行任务时,则会轮询所有的session,并依次把session中的所有任务取出来运行。这样粗粒度的调度是不公平调度,会导致某些请求的延迟很高。mina3的模型改动比较大,代码相对就比较难看了,我仅是随便扫了一下,它仅提炼出writeQueue。1.3 数据如何Read/writejava nio如果是non-blocking的话,在每次write(bytesN)的时候,并不会将N字节全部wri

7、te出去,每次write仅一部分(具体大小和tcp_write_buffer有关)。那么,mina是怎么处理这种情况的呢?mina-1.1.7: SocketIoProcessor.doFlushmina-2.0.4: AbstractPollingIoProcessor.flushNowmina-3.0.0.M3-SNAPSHOT: AbstractNioScessWritemina1、2的方式基本一致。 在发送端每个session均有一个writeBufferQueue,有这样一个队列,可以保证写入与写出均有序。在真正write时

8、,大致逻辑均是一一将队列中的writeBuffer取出,写入socket,但有一些不同的是,mina1是每次peek一次,当该buffer全部写出之后再poll(mina3也是这种机制);而mina2则是直接poll第一个,将其存为currentWriteRequest,直到currentWriteRequest全部写出之后,才会poll下一个。这样的做法是为了省几次peek的时间么?同时mina在write时,有一种spin write的机制,即循环write多次。mina1的spin write count为256,写死在代码里了,表示256有点大;mina2这个机制废除但代码保留;每次w

9、rite只是输出了一部分数据,read同理,也有可能只会读入部分数据,这样就是导致读入的数据是残缺的。而mina默认不会理会这种由于nio导致的数据分片,需要由业务层自己额外做配置或者处理。 nfs-rpc: ProtocolUtils.decodemina-1.1.7: SocketIoProcessor.read, CumulativeProtocolDecoder.decodemina-2.0.4: AbstractPollingIoProcessor.read,CumulativeProtocolDecoder.decodemina-3.0.0.M3-SN

10、APSHOT: NioSelectorLoop.readBuffernfs-rpc在协议反序列化的过程中,就会考虑这个的问题,依次读入每个字节,当发现当前字节或者剩余字节数不够时,会将buf的readerIndex设置为初始状态。具体的实现,有兴趣的同学可以学习nfs-rpc:ProtocolUtils.decodenfs-rpc在decode时,出现错误就会将buf的readerIndex设为0,把readerIndex设置为0就必须要有个前提假设:每次decode时buf是同一个,即该buf是复用的。那么,具体情况是怎样呢?mina1、2的读buffer创建方式比较土,在每次re

11、ad之前,会重新allocate一个新的buf对象,该buf对象的大小是根据读入数据大小动态调整。当本次读入数据等于该buf大小,下一次allocate的buf对象大小会翻倍;当本次读入数据不足该buf大小的二分之一,下一次allocate的buf对象同样会缩小至一半。需要注意的是,*2与/2的代码都可以用位运算,但是mina1竟没用位运算,有意思。mina1、2处理数据分片可以继承CumulativeProtocolDecoder,该decoder会在session中存入(BUFFER, cumulativeBuffer)。decode过程为:1)先将message追加至cumulative

12、Buffer;2)调用具体的decode逻辑;3)判断cumulativeBuffer.hasRemaining(),为true则压缩cumulativeBuffer,为false则直接删除(BUFFER, cumulativeBuffer)。实现业务的decode逻辑可以参考nfs-rpc中MinaProtocolDecoder的代码。mina3在处理读buffer的创建与数据分片比较巧妙,它所有的读buffer共用一个buffer对象(默认64kb),每次均会将读入的数据追加至该buffer中,这样即省去了buffer的创建与销毁事件,也省去了cumulativeDecoder的处理逻辑,

13、让代码很清爽啊!ByteBuffer设计需要说明的是,只有mina1、2才有自己的buffer类,mina3内部只用nio的原生ByteBuffer类(提供了一个组合buffer的代理类-IoBuffer)。mina1、2自建buffer的原因如下:It doesnt provide useful getters and putters such as fill,get/putString, and get/putAsciiInt()enough.It is difficult to write variable-length data due to its fixed capacity第一条

14、比较好理解,即提供了更为方便的方法用以操作buffer。第二条则是觉得nio的ByteBuffer是定长的,无法自动扩容或者缩容,所以提供了自动扩/缩容的方法:IoBuffer.setAutoExpand, IoBuffer.setAutoShrink。但是扩/缩容的实现,也是基于nio的ByteBuffer,重新ByteBuffer.allocate(capacity),再把原有的数据拷贝过去。第二章 MINA框架与应用Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于 TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供 JAVA 对象的序列化服

15、务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步 IO 默认使用的是 JAVA NIO 作为底层支持)操作的编程模型。从官网文档“MINA based Application Architecture”中可以看到Mina作为一个通信层框架,在实际应用所处的位置,如图所示:Mina位于用户应用程序和底层Java网络API(和in-VM通信)之间,我们开发基于Mina的网络应用程序,就无需关心复杂的通信细节。再看一下,Mina架构,如图所示:通信过程:1.通过SocketConnector同服务器端建立连接2

16、.链接建立之后I/O的读写交给了I/O Processor线程,I/O Processor是多线程的 3.通过I/O Processor读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议 4.最后IoFilter将数据交给Handler进行业务处理,完成了整个读取的过程 5.写入过程也是类似,只是刚好倒过来,通过IoSession.write写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过I/

17、O Processor将数据写出到socket通道 IoFilterChain作为消息过滤链 1.读取的时候是从低级协议到高级协议的过程,一般来说从byte字节逐渐转换成业务对象的过程 2.写入的时候一般是从业务对象到字节byte的过程IoSession贯穿整个通信过程的始终MINA2工作流程:一个Acceptor线程有一个自己的Selector,这个Selector专门监听端口上的connect事件。当有客户端连接事件发生,那么Acceptor线程就会见建一个Socket线程,并把其封装成IoSession。后有一个分配器,分配给NIOProcessor。NIO

18、Processor会建立自己的Selector来监听分配的Socket。主要监听的是Socket的IO读写事件。(注意:NIOProcessor会有多个,一般有CPU个数+1,CPU个数也可以是核数,有些事双核,四核,每个NIOProcessor都有自己的Selector)。当监听到Socket上的IO读写事件时,交给IOProcessor线程池,以任务的形式来出来。IOProcessor线程操作完IO后,交给IOFilterChain ,做过滤,这里IOFilterChain可以定义自己的过滤器做一些特殊处理,可以编码,解码等。IOFilterChain做完后,会交个IoHander来做业务

19、逻辑处理。同样客户端的Connector也是一样的。也是一个Connector线程有一个自己的Selector,监听端口上的connect事件,当有连接建立后,就只有一个Socket线程,并封装成IOSession。后面的IOProcessor跟服务端一样的处理。MINA可以分为Acceport/Connector、NIoProcessor/IoProcessor线程池 、IoFilterChain过滤器链 、IoHandler业务处理等几部分。看一下NioSocketAcceptor的内部结构概念图:在NioSocketAcceptor中有个Acceptor线程,它负责将一个ServerSo

20、cketChannel注册到主Selector中,注册事件为OP_ACCEPT,当有OP_ACCEPT响应时,取出相应的socket,封装在NioSocketSession中,并给这个NioSocketSession对象分配一个NioProcessor。第三章 Acceptor/Connector3.1 IoServiceIoService是一个接口,有两种实现:IoAcceptor和IoConnector;其中IoAcceptor是针对Server端的实现,IoConnector是针对Client端的实现。AbstractIoAcceptor定义了所有的public接口,并定义了子类需要实现

21、的bindInternal函数,AbstractPollingIoAcceptor<S extends AbstractIoSession, H>作为它的一个派生类,主要就是实现bindInternal函数, AbstractPollingIoAcceptor<S extends AbstractIoSession, H>类定义了bindInternal的实现框架,NioSocketAcceptor使用selector实现了它需要的接口,比如select,wakeup,open等函数。 总体来说,bindInternal实现的功能就是开启一个新的线程

22、,在这个线程中绑定监听的地址,并接受客户端请求。看如下两个函数:IoService定义根据上图中IoService接口定义,我们给出接口中定义的方法,如下所示:我们可以看到,IoService主要定义了两类服务,一类是提供I/O操作相关服务,另一类是会话 (IoSession)相关服务,这两类服务,无论是在服务端还是在客户端,都会提供,以此来保证双方通信。那么,具体地这两类服务中都包括哪些内容,我们总结如下:· 管理IoService元数据,描述IoService本身,这些元数据都封装在TransportMetadata中,例如I/O 服务类型(如NIO,APR或RXTX),连接类型

23、(如无连接接),地址类型等。· 管理IoServiceListener,它是用来监听与一个IoService服务相关的事件的,比如服务的激活、会话的建立等 等,当然,这些监听服务不是提供给外部进行开发使用的,而是Mina内部使用的。· 管理IoHandler,从Mina框架的架构我们知道,IoHandler的具体实现是与业务逻辑处理相关的,也是最靠近应用层的。· 管理IoSession,即管理与一个IoService服务交互的会话对象,可以有一组会话同时使用该IoService服务。· 管理IoFilter链,IoFilter链基于事件拦截模式,它位于I

24、oHandler与IoService两层之间,Mina为 了方便使用IoFilter链,直接内置了一个IoFilterChainBuilder(具体实现为 DefaultIoFilterChainBuilder)。· 管理一些相关的统计信息,如读写字节数、读写消息数、读写时间等。现在来看看Acceptor线程 3.2 IoAcceptor从IoAcceptor接口定义,可以很好地看出它具有的一些基本操作,如下所示:可以看到上面定义的方法中,主要是与IP地址相关的操作,主要包括绑定和解绑定,这些操作的实现是在该接口的抽象实现类AbstractIoAcceptor中给予实现的,

25、在AbstractIoAcceptor中并没有涉及到有关SocketChannel的I/O操作,有关如何基于轮询的策略去检查SocketChannel是否有相应的事件被触发,这些I/O相关的操作被封装到AbstractPollingIoAcceptor类中。以基于TCP的NIO通信为例,具体接收客户端到来的连接请求,这些逻辑是在AbstractPollingIoAcceptor的实现类NioSocketAcceptor中实现的,这里创建了用来管理与客户端通信的NioSocketSession对象(它是IoSession的NIO实现)。3.3 IoConnectorIoConnector的接口定

26、义,如下所示:IoConnector定义的操作基本是与连接到服务端的。同样,AbstractIoConnector实现了Connector接口定义的基本操作。以基于TCP的NIO通信为例,客户端和服务端有部分操作非常类似,如轮询SocketChannel检查是否有事件触发,读写请求等,所以,客户端在AbstractIoConnector的抽象实现类AbstractPollingIoConnector中处理于此相关的逻辑。与NioSocketAcceptor对应,客户端有一个NioSocketConnector实现类。通过上面IoAcceptor和IoConnector的说明,我们还不知道具体I

27、/O操作是由谁来处理的。实际上,无论是服务端还是客户端,在处理轮询通道的抽象服务中,封装了一个IoProcessor抽象,它才是实际处理I/O操作的抽象部分。为了将通信的宏观抽象过程与通信过程中的处理细节分开,将IoProcessor独立出来,与宏观通信过程的逻辑解耦合。以基于TCP的NIO通信为例,在AbstractPollingIoAcceptor和AbstractPollingIoConnector中都有一个IoProcessor实例(这里是实现类NioProcessor的实例),通过调用它提供的处理操作来完成实际的I/O操作。3.4 IoSession对应的状态迁移图,如图所示:Abs

28、tractIoSession实现了Session完成的主要功能,所谓Session其实是一个物理连接的逻辑抽象,所以在NioSession这一层,它与Nio的channel是相关的,Session需要通过这个底层连接实现它的逻辑功能。 Session的主要功能,包括,关闭连接,在连接上进行read,write、设置和读取Session级别的属性,进行流量控制等。为了深入理解Session,我们需要了解如下几个问题: 1. Session是在何时,由谁创建的?在前文分析中,我们知道在Acceptor线程中,在接收用户请求,并创建Session,具体来说,这个Session是在

29、NioSocketAcceptor.accept方法中创建的。而该函数又是在AbstractPollingIoAcceptor<S extends AbstractIoSession, H>.processHandles中调用的。我们看到在创建session之后,会调用initSession对session进行初始化,然后把它加入到Processor中。2. Session是如何进行读写的?Session创建之后是如何收到对端数据,如何提供发送数据的接口的呢?先来看写操作,我们看看AbstractIoSession.write方法该方法中,要被write的message,被包装在W

30、riteRequest中,并且返回的是一个writeFuture,这就是说,我们可以使用这个future不被中断的等待写操作完成。同时,该方法中调用了filterChain的fireFilterWrite函数,它的作用是遍历filterChain中的所有filter,触发他们的fireFilterWrite方法。AbstractIoSession.getFilterChain只是一个接口,需要在派生类中实现。在其派生类NioSession中,我们可以看到这个filterChain是DefaultIoFilterChain实例。它的fireFilterWrite方法实际上是,从tail到head

31、遍历链表,既然是反向遍历,那么Head是最后一个被遍历到的filter,这个head是一个HeadFilter实例从这个类,我们可以清楚的看到我们请求写的writeRequest被写入到了session的写队列中了。那么问题来了,这个写队列是从哪冒出来的,它是如何创建,又是谁从这个队列中把写请求取出来,发送出去的呢? 从AbstractIoSession.getWriteRequestQueue方法,我们知道其中的WriteRequestQueue实例,是被set进去的,到底是在哪里被set进去的呢,我们看前面提到的方法AbstractPollingIoAcceptor<S e

32、xtends AbstractIoSession, H>.processHandles,在这个方法中调用了initSession方法,它是用来初始化session,按理说应该在这里,该方法是在AbstractIoService类中定义的. 从该方法中可以看到,session的WriteRequestQueue实例,实际上就是session.getService().getSessionDataStructureFactory().getWriteRequestQueue(session)。好吧,我们还得接着寻找,session.getService是谁呢?这里的session显

33、然是NioSession,service是NioSocketAcceptor,getSessionDataStructureFactory方法是在基类AbstractIoService中定义的,在默认情况下,get出来的实例,是DefaultIoSessionDataStructureFactory的类实例,我们再来看这个类 最终我们看到session.getService().getSessionDataStructureFactory().getWriteRequestQueue(session),实际上得到的是 new DefaultWriteRequestQueue

34、(),这个类实际上是对ConcurrentLinkedQueue实例的封装,也就是说我们所添加的WriteRequest都是添加到ConcurrentLinkedQueue这个实例中的。 综上所述,在初始化session的时候,把IoService都会new一个WriteRequestQueue实例赋值给session,同时,为了防止多个线程在读写这个Queue的时候发生竞争,这里使用了ConcurrentLinkedQueue。我们返回来再看HeadFilter.filterWrite方法,其中:s.getProcessor().write(s, writeRequest);

35、60;其中的write方法,对应AbstractPollingIoProcessor.write 这里可以看到WriteRequest被添加到session的WriteRequestQueue中,然后调用了AbstractPollingIoProcessor.flush方法,这里的flush,只是把session加入到flushingSessions队列中。在IoProcessor的分析中,我们会知道有一个processor的线程,专门会从session中读取WriteRequest,然后通过session的Channel把数据发送出去。至此,我们来回顾整个发送数据的过程,首先是在I

36、oService中创建IoSession的时候,会给它创建一个写队列,其次IoSession的写操作,都是放入到这个写队列中的,最后,IoProccessor的线程会去读这个写队列最终通过底层Channel把数据发送出去。下面我们还需要分析读操作是如何处理的,既然是读数据,必然是从网络中获取数据,这就着落在processor线程中了,在AbstractPollingIoPcessor类中调用了process方法,这个方法在判断session可读的情况下回调用read方法,read方法会从session的channel中读取数据,然后触发session的MessageRe

37、ceived事件,如果session结束了,还会去触发InputClosed事件,当然,如果session出现了异常,会触发ExceptionCaught事件,这里的事件也是通过filterChain触发,前面分析过,这个filterChain实例是DefaultIoFilterChain,它的fireMessageReceived方法是从head到tail遍历链表,在它的tailFilter.messageReceived方法中触发了handler.messageReceived方法,也就是说,这个事件传递首先是传递给各个filter,最终再传递给handler的,这是符合我们要求filte

38、r先进行各种处理,最终交给handler来处理的需求。 同理,Inputclosed和ExceptionCaught这两个事件,也是从head到tail遍历的,最终交给handler处理。 3. Session是如何读取和设置属性的?最后,我们再来看看Session是如何存取属xìng的,经过前面的分析,我们看到在初始化Session的时候initSession,除了给这个Session初始化了WriteRequestQueue,同时也初始化了AttributeMap (AbstractIoSession) session).setAttributeMa

39、p(session.getService().getSessionDataStructureFactory().getAttributeMap(session);同样的,在DefaultIoSessionDataStructureFactory中,也为每个session都生成了一个DefaultIoSessionAttributeMap的实例,这个实例封装了一个ConcurrentHashMap实例,这同样是为了在多线程读取该实例的时候,能够正常访问数据。4. IoSession内存数据结构每当有一个新的会话被创建,及使用了IoService提供的服务,就对应创建了一个IoSession实例,

40、而且,与IoSession 相关的一些实时数据需要在内存中保存,以便IoService实例能够随时访问并对该会话实例提供需要的I/O读写服务。Mina定义了 IoSessionDataStructureFactory,来保存会话相关数据,这个结构提供了如下两个方法:可以看出,上面方法中的IoSessionAttributeMap和WriteRequestQueue都是与一个IoSession相 关的数据对象,我们可以看一下,这几个类之间的关系,如图所示:与一个IoSession有关的数据,都在上面的结构中保存着。其中主要包含两类:一类是用户在启动会话时定义的属性集合,另一类是会话期 间可能需要

41、进行读写操作。每个IoSession实例调用write方法的时候,都会对应这一个WriteRequest对象,封装了写请求数据。而提供I/O服务的IoService实例在运行时会把对应的WriteRequest对象放入/移出IoSessionDataStructureFactory 结构所持有的队列。· Executor:处理I/O事件的执行每个IoService都对应这一个Executor,用来处理被触发的I/O事件。第四章 NioProcessor/IoProcessor线程池4.1 IoProcessorIoProcessor及其附属类是一个很重要的类,它们是真正进行读写数据的

42、类,在AbstractPollingIoProcessor类,要想深入了解IoProcess,需要回答以下两个问题: 1. 谁创建了IoProcessor?在AbstractPollingIoAcceptor的构造函数中,需要指明IoProcessor的类,在其派生类NioSocketAcceptor类中指明使用NioProcessor.class。在AbstractPollingIoAcceptor的构造函数中,是这样使用这个类的, new SimpleIoProcessorPool<S>(processorClass) SimpleIoProces

43、sorPool类在构造函数中使用class.newInstance,创建了若干个IoProcessor,个数可以是通过参数指定的,也可以使用默认的,即CPU核数+1。SimpleIoProcessorPool本身也是一个IoProcessor,它实际上对外提供了IoProcessor的接口,实现上是根据Session,在它的pool中选择一个Processor,然后设置给Session,后续的操作,如add,remove都是在这个特定的processor上执行的。 2. IoProcessor的运行机制 在前面分析IoService的时候,我们知道在acceptor线程ac

44、cept一个新的session的时候,会把这个session加入到它的processor中,也就是会调用AbstractPollingIoProcessor.add方法,它实际上只是把session加入到newSessions队列中,并启动了一个新的线程processor,当然,此时是运行在acceptor线程上的。具体的读写数据是在processor线程上执行的。当然为了线程间的竞争,newSessions也是用了ConcurrentLinkedQueue类。我们来看AbstractPollingIoProcessor.Processor 根据以上对IoService,IoSess

45、ion,IoProcessor的分析,我们知道对于服务器端的程序,在用户程序的主线程中调用acceptor的bind方法,实际上启动了一个acceptor线程用来accept新的session,如果有新的session到来,会有在session加入到processor的过程中,会启动一个processor线程,如果当前CPU是多核的话,下一个sesion的到来,会启动另外一个processor线程。这些processor线程是用来检查是否有读写事件的。用户添加到filterChain的filter都是在这个线程中执行的,最后会把事件传递给handler进行最终的处理。也就是说,当有多个sess

46、ion的时候,会有多个processor线程,session的个数是大于等于processor的个数的。同时,一个processor会对应多个session,单一个session只对应一个processor线程。Mina的线程模型被称为所谓的reactors in threads,即一个线程负责接收用户请求,即acceptor线程,另外几个线程负责处理session的读写,注意线程之间是通过共享Concurrent的队列来实现请求的移交的,除此之外,他们并没有消息的交互,它们完全靠系统的线程切换来运行,这就降低了编程的复杂性。 第五章 IoFilterChain过滤器链5.1 概述I

47、/O Filter Chain层是介于I/O Service层与I/O Handler层之间的一层,从它的命名上可以看出,这个层可以根据实际应用的需要,设置一组IoFilter来对I/O Service层与I/O Handler层之间传输数据进行过滤,任何需要在这两层之间进行处理的逻辑都可以放到IoFilter中。我们看一下IoFilter的抽象层次设计,如图所示:通过上述类图可见,要实现一个自定义的IoFilter,一般是直接实现IoFilterAdapter类。同时,Mina也给出了几类常用的开发IoFilter的实现类,如下所示:· LoggingFilter记录所有事件和请求

48、· ProtocolCodecFilter将到来的ByteBuffer转换成消息对象(POJO)· CompressionFilter压缩数据· SSLFilter增加SSL TLS StartTLS支持想要实现一个自定义的IoFilter实现类,只需要基于上述给出的几个实现类即可。如果想要实现自己的IoFilter,可以参考如下例子:下面通过一个例子来说明,如何使用IoFilter的实现类。5.2 ProtocolCodecFilter下面是Mina自带的例子,使用了ProtocolCodecFilter类:上面设置了两个IoFilter,关键是看如果基于文本行

49、的消息,使用一ProtocolCodecFilter包裹了一TextLineCodecFactory类的实例,使用起来非常容易。构造一个ProtocolCodecFilter实例,需要实现一个ProtocolCodecFactory实例,一个ProtocolCodecFactory包含了对消息进行编解码(Codec)的逻辑,这样实现的好处是将编解码的逻辑和IoFilter解耦合。下面看一下类图:LoggingFilter如果需要记录通信过程中的事件以及请求,则可以直接使用LoggingFilter类,使用方法可以参考上面的例子。CompressionFilterCompressionFilte

50、r是与压缩/解压缩数据相关的IoFilter,我们可以看一下该类的构造方法,如下所示:基本上就构造方法参数中指定的3个参数与压缩/解压缩相关:· compressionLevel· compressInbound· compressOutbound使用的时候也比较简单,只需要创建一个CompressionFilter实例,加入到Filter Chain中即可。DefaultIoFilterChainBuilderMina自带的DefaultIoFilterChainBuilder可以非常容易就可以构建一个Filter Chain,默认在创建IoAcceptor和I

51、oConnector的时候,可以直接通过他们获取到一个DefaultIoFilterChainBuilder的实例,然后调用add*方法设置IoFilter链,如下面代码中示例:下面看一下来自Mina官网的表格,Mina框架也给出了一些典型的IoFilter的实现,引用如下所示:5.3 IoBufferIoBuffer是MINA内部使用的一个byte buffer,MINA并没有直接使用NIO 的ByteBuffer。不过IoBuffer 是对 ByteBuffer 的一个封装。IoBuffer 中的很多方法都是对 ByteBuffer 的直接继承。只是对 ByteBuffer 添加了一些扩展

52、了更加实用的方法。基本用法由于IoBuffer是对Nio的ByteBuffer 的封装,所以基本概念还是相同的,下面简单介绍一下:1、capacity:该属性描述这个缓冲区最多能缓冲多少个元素,也是Buffer最大存储元素数,这个值是在创建Buffer的时候指定的,且不能修改。2、Limit:在从Buffer中向Channel中写数据时,limit变量指示了还剩多少数据可以读取,在从Channel中读取数据到Buffer中时,limit变量指示了还剩多少空间可供存放数据。position正常情况下小于或者等于limit。3、Position:Buffer实际上也就是个array。当你从Chan

53、nel中读数据时,你把从Channel中读出来的数据放进底层array,position变量用来跟踪截止目前为止已经写了多少数据。更精确的讲,它指示如果下次写Buffer时数据应该进入array的哪个位置。因此如果已经从Channel中读出了3个字节,Buffer的position会被置为3,指向array中第四个位置。4、Mark:一个可以记忆的Position位置的值,在调用reset()方法时会将缓冲区的Position重置为该索引,并非总是需要定义Mark,但是在定义Mark时,不能将其定义为负数,并且不能让它大于Position,如果定义了Mark,则在该Position或Limit

54、调整为小于该Mark值时,该Mark将被丢弃。下面通过一个例子来说明:i、初始状态下:此时position为0,limit和capacity都被设为9;ii、从Channel中读入4个字节数据到Buffer,这时position指向4(第5个):iii、在做写操作之前,我们必须调用一次flip()方法,这个方法做了两件重要的事情: 1. 将limit设置到当前的position处。 2. 设置position为0。iiii、执行写操作后;iv、执行clear后,position设为0,limit设为capition,mark则丢弃;因为IoBuffer是一个抽象类,不能直接

55、实例化,所有使用的时候需要调用allocate方法来进行内存分配;allocate有两种定义:这里:capacity:buffer的大小;direct:如果为true,则得到direct buffer,如果为false,则得到heap bufferdirect buffer和heap buffer的区别分析:Direct Buffer不是分配在堆上的,它不被GC直接管理(但Direct Buffer的JAVA对象是归GC管理的,只要GC回收了它的JAVA对象,操作系统才会释放Direct Buffer所申请的空间),它似乎给人感觉是“内核缓冲区(buffer in kernel)”。Heap Buffer则是分配在堆上的,或者我们可以简单理解为Heap Buffer就是byte数组的一种封装形式。当我们把一个Heap Buffer写入Channel的时候,实际上底层实现会先构建一个临时的D

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论