手写MQ框架(四)_第1页
已阅读5页,还剩11页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、手写mq框架(四)手写mq框架(四)-用法netty改造梳理一、背景书接上文手写mq框架(三)-客户端实现 ,前面通过web的形式实现了mq的服务端和客户端,现在方案用法netty来改造一下。前段时光学习了一下netty的用法。也许有一些主意。netty封装了socket的用法,我们通过容易的调用即可构建高性能的网络应用。我方案采纳以下例子来对gmq举行改造。二、netty是什么netty是由jboss提供的一个java开源框架。netty提供异步的、大事驱动的网络应用程序框架和工具,用以迅速开发高性能、高牢靠性的网络服务器和客户端程序。netty是一个java框架,是网络编程框架,支持异步、

2、大事驱动的特性,所以性能表现很好。 三、netty的容易实现1、服务端1)simpleserverhandlerhandler是处理器,handler 是由 netty 生成用来处理 i/o 大事的。package ty.learnw3c.mqdemo;import ty.channel.channel;import ty.channel.channelhandlercontext;import ty.channel.simplechannelinboundhandler;import ty.channel.group.channelgroup;import ty.channel.group.

3、defaultchannelgroup;import ty.util.concurrent.globaleventexecutor;public class simpleserverhandler extends simplechannelinboundhandler public static channelgroup channels = new defaultchannelgroup(globaleventexecutor.instance); override public void handleradded(channelhandlercontext ctx) throws exce

4、ption channel incoming = ctx.channel(); system.out.println("server - " + incoming.remoteaddress() + " 加入n"); channels.add(ctx.channel(); override public void handlerremoved(channelhandlercontext ctx) throws exception channel incoming = ctx.channel(); system.out.println("serv

5、er - " + incoming.remoteaddress() + " 离开n"); channels.remove(ctx.channel(); override protected void channelread0(channelhandlercontext ctx, string s) throws exception channel incoming = ctx.channel(); system.out.println("" + incoming.remoteaddress() + "" + s); if(s

6、 = null | s.length() = 0) incoming.writeandflush("消息是空的呀!n"); else / mqrouter mqrouter = jsonobject.parseobject(s, mqrouter.class);/ system.out.println(mqrouter.geturi(); string responsemsg = "收到了," + s + "n" incoming.writeandflush(responsemsg); override public void cha

7、nnelactive(channelhandlercontext ctx) throws exception channel incoming = ctx.channel(); system.out.println("simplechatclient:"+incoming.remoteaddress()+"在线"); override public void channelinactive(channelhandlercontext ctx) throws exception channel incoming = ctx.channel(); syste

8、m.out.println("simplechatclient:"+incoming.remoteaddress()+"掉线"); override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception channel incoming = ctx.channel(); system.out.println("simplechatclient:"+incoming.remoteaddress()+"异样

9、"); cause.printstacktrace(); ctx.close(); 2)simpleserverinitializersimpleserverinitializer 用来增强多个的处理类到 channelpipeline 上,包括编码、解码、simpleserverhandler 等。package ty.learnw3c.mqdemo;import ty.channel.channelinitializer;import ty.channel.channelpipeline;import ty.channel.socket.socketchannel;import

10、ty.handler.codec.delimiterbasedframedecoder;import ty.handler.codec.delimiters;import ty.handler.codec.string.stringdecoder;import ty.handler.codec.string.stringencoder;public class simpleserverinitializer extends channelinitializer override protected void initchannel(socketchannel ch) throws except

11、ion channelpipeline pipeline = ch.pipeline(); pipeline.addlast("framer", new delimiterbasedframedecoder(8192, delimiters.linedelimiter(); pipeline.addlast("decoder", new stringdecoder(); pipeline.addlast("encoder", new stringencoder(); pipeline.addlast("handler&quo

12、t;, new simpleserverhandler(); system.out.println("simplechatclient:" + ch.remoteaddress() + "衔接上"); 3)simpleserverpackage ty.learnw3c.mqdemo;import ty.bootstrap.serverbootstrap;import ty.channel.channelfuture;import ty.channel.channeloption;import ty.channel.eventloopgroup;impor

13、t ty.channel.nio.nioeventloopgroup;import ty.channel.socket.nio.nioserversocketchannel;public class simpleserver private int port; public simpleserver(int port) this.port = port; public void run() throws exception eventloopgroup bossgroup = new nioeventloopgroup(); eventloopgroup workergroup = new n

14、ioeventloopgroup(); try serverbootstrap b = new serverbootstrap(); b.group(bossgroup, workergroup).channel(nioserversocketchannel.class) .childhandler(new simpleserverinitializer().option(channeloption.so_backlog, 128) .childoption(channeloption.so_keepalive, true); system.out.println("simplech

15、atserver 启动了"); channelfuture f = b.bind(port).sync(); f.channel().closefuture().sync(); finally workergroup.shutdowngracefully(); bossgroup.shutdowngracefully(); system.out.println("simplechatserver 关闭了"); public static void main(string args) throws exception int port; if (args.lengt

16、h 0) port = integer.parseint(args0); else port = 8080; new simpleserver(port).run(); 2、客户端 1)simpleclienthandlerpackage ty.learnw3c.mqdemo;import ty.channel.channelhandlercontext;import ty.channel.simplechannelinboundhandler;public class simpleclienthandler extends simplechannelinboundhandler overri

17、de protected void channelread0(channelhandlercontext ctx, string s) throws exception system.out.println("收到的信息:" + s); 2)simpleclientinitializerpackage ty.learnw3c.mqdemo;import ty.channel.channelinitializer;import ty.channel.channelpipeline;import ty.channel.socket.socketchannel;import ty

18、.handler.codec.delimiterbasedframedecoder;import ty.handler.codec.delimiters;import ty.handler.codec.string.stringdecoder;import ty.handler.codec.string.stringencoder;public class simpleclientinitializer extends channelinitializer override protected void initchannel(socketchannel ch) throws exceptio

19、n channelpipeline pipeline = ch.pipeline(); pipeline.addlast("framer", new delimiterbasedframedecoder(8192, delimiters.linedelimiter(); pipeline.addlast("decoder", new stringdecoder(); pipeline.addlast("encoder", new stringencoder(); pipeline.addlast("handler"

20、, new simpleclienthandler(); 3)simpleclientpackage ty.learnw3c.mqdemo;import java.io.bufferedreader;import java.io.inputstreamreader;import ty.bootstrap.bootstrap;import ty.channel.channel;import ty.channel.eventloopgroup;import ty.channel.nio.nioeventloopgroup;import ty.channel.socket.nio.niosocket

21、channel;public class simpleclient private final string host; private final int port; public simpleclient(string host, int port) this.host = host; this.port = port; public static void main(string args) throws exception new simpleclient("localhost", 8080).run(); public void run() throws exce

22、ption eventloopgroup group = new nioeventloopgroup(); try bootstrap bootstrap = new bootstrap() .group(group) .channel(niosocketchannel.class) .handler(new simpleclientinitializer(); channel channel = bootstrap.connect(host, port).sync().channel(); bufferedreader in = new bufferedreader(new inputstreamreader(system.in); while(true) string line = in.readline

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论