博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第十一章 dubbo通信框架-netty4
阅读量:6909 次
发布时间:2019-06-27

本文共 18361 字,大约阅读时间需要 61 分钟。

netty4是2.5.6引入的,2.5.6之前的netty用的是netty3。在dubbo源码中相较于netty3,添加netty4主要仅仅改了两个类:NettyServer,NettyClient。还有就是编解码。

使用方式:

服务端:

1 

客户端:

1 

 

一、服务端 - NettyServer

1 package com.alibaba.dubbo.remoting.transport.netty4;  2   3 import com.alibaba.dubbo.common.Constants;  4 import com.alibaba.dubbo.common.URL;  5 import com.alibaba.dubbo.common.logger.Logger;  6 import com.alibaba.dubbo.common.logger.LoggerFactory;  7 import com.alibaba.dubbo.common.utils.ExecutorUtil;  8 import com.alibaba.dubbo.common.utils.NetUtils;  9 import com.alibaba.dubbo.remoting.Channel; 10 import com.alibaba.dubbo.remoting.ChannelHandler; 11 import com.alibaba.dubbo.remoting.RemotingException; 12 import com.alibaba.dubbo.remoting.Server; 13 import com.alibaba.dubbo.remoting.transport.AbstractServer; 14 import com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers; 15 import com.alibaba.dubbo.remoting.transport.netty4.logging.NettyHelper; 16  17 import io.netty.bootstrap.ServerBootstrap; 18 import io.netty.buffer.PooledByteBufAllocator; 19 import io.netty.channel.ChannelFuture; 20 import io.netty.channel.ChannelInitializer; 21 import io.netty.channel.ChannelOption; 22 import io.netty.channel.EventLoopGroup; 23 import io.netty.channel.nio.NioEventLoopGroup; 24 import io.netty.channel.socket.nio.NioServerSocketChannel; 25 import io.netty.channel.socket.nio.NioSocketChannel; 26 import io.netty.util.concurrent.DefaultThreadFactory; 27  28 import java.net.InetSocketAddress; 29 import java.util.Collection; 30 import java.util.HashSet; 31 import java.util.Map; 32  33 public class NettyServer extends AbstractServer implements Server { 34  35     private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); 36  37     private Map
channels; //
38 39 private ServerBootstrap bootstrap; 40 41 private io.netty.channel.Channel channel; 42 43 private EventLoopGroup bossGroup; 44 private EventLoopGroup workerGroup; 45 46 public NettyServer(URL url, ChannelHandler handler) throws RemotingException { 47 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); 48 } 49 50 @Override 51 protected void doOpen() throws Throwable { 52 NettyHelper.setNettyLoggerFactory(); 53 54 bootstrap = new ServerBootstrap(); 55 56 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); 57 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), 58 new DefaultThreadFactory("NettyServerWorker", true)); 59 60 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); 61 channels = nettyServerHandler.getChannels(); 62 63 bootstrap.group(bossGroup, workerGroup) 64 .channel(NioServerSocketChannel.class) 65 .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) 66 .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) 67 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 68 .childHandler(new ChannelInitializer
() { 69 @Override 70 protected void initChannel(NioSocketChannel ch) throws Exception { 71 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); 72 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug 73 .addLast("decoder", adapter.getDecoder()) 74 .addLast("encoder", adapter.getEncoder()) 75 .addLast("handler", nettyServerHandler); 76 } 77 }); 78 // bind 79 ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); 80 channelFuture.syncUninterruptibly(); 81 channel = channelFuture.channel(); 82 83 } 84 85 @Override 86 protected void doClose() throws Throwable { 87 try { 88 if (channel != null) { 89 // unbind. 90 channel.close(); 91 } 92 } catch (Throwable e) { 93 logger.warn(e.getMessage(), e); 94 } 95 try { 96 Collection
channels = getChannels(); 97 if (channels != null && channels.size() > 0) { 98 for (com.alibaba.dubbo.remoting.Channel channel : channels) { 99 try {100 channel.close();101 } catch (Throwable e) {102 logger.warn(e.getMessage(), e);103 }104 }105 }106 } catch (Throwable e) {107 logger.warn(e.getMessage(), e);108 }109 try {110 if (bootstrap != null) {111 bossGroup.shutdownGracefully();112 workerGroup.shutdownGracefully();113 }114 } catch (Throwable e) {115 logger.warn(e.getMessage(), e);116 }117 try {118 if (channels != null) {119 channels.clear();120 }121 } catch (Throwable e) {122 logger.warn(e.getMessage(), e);123 }124 }125 126 public Collection
getChannels() {127 Collection
chs = new HashSet
();128 for (Channel channel : this.channels.values()) {129 if (channel.isConnected()) {130 chs.add(channel);131 } else {132 channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));133 }134 }135 return chs;136 }137 138 public Channel getChannel(InetSocketAddress remoteAddress) {139 return channels.get(NetUtils.toAddressString(remoteAddress));140 }141 142 public boolean isBound() {143 return channel.isActive();144 }145 }

netty4的写法与netty3有很大不同,下面是netty3:()

1 /** 2      * 启动netty服务,监听客户端连接 3      */ 4     @Override 5     protected void doOpen() throws Throwable { 6         NettyHelper.setNettyLoggerFactory(); 7         ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); 8         ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); 9         ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));10         bootstrap = new ServerBootstrap(channelFactory);11 12         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);13         channels = nettyHandler.getChannels();14         // https://issues.jboss.org/browse/NETTY-36515         // https://issues.jboss.org/browse/NETTY-37916         // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));17         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {18             public ChannelPipeline getPipeline() {19                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);20                 ChannelPipeline pipeline = Channels.pipeline();21                 /*int idleTimeout = getIdleTimeout();22                 if (idleTimeout > 10000) {23                     pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));24                 }*/25                 pipeline.addLast("decoder", adapter.getDecoder());26                 pipeline.addLast("encoder", adapter.getEncoder());27                 pipeline.addLast("handler", nettyHandler);28                 return pipeline;29             }30         });31         // bind32         channel = bootstrap.bind(getBindAddress());33     }

 

二、客户端 - NettyClient

1 package com.alibaba.dubbo.remoting.transport.netty4;  2   3 import com.alibaba.dubbo.common.Constants;  4 import com.alibaba.dubbo.common.URL;  5 import com.alibaba.dubbo.common.Version;  6 import com.alibaba.dubbo.common.logger.Logger;  7 import com.alibaba.dubbo.common.logger.LoggerFactory;  8 import com.alibaba.dubbo.common.utils.NetUtils;  9 import com.alibaba.dubbo.remoting.ChannelHandler; 10 import com.alibaba.dubbo.remoting.RemotingException; 11 import com.alibaba.dubbo.remoting.transport.AbstractClient; 12 import com.alibaba.dubbo.remoting.transport.netty4.logging.NettyHelper; 13  14 import io.netty.bootstrap.Bootstrap; 15 import io.netty.buffer.PooledByteBufAllocator; 16 import io.netty.channel.Channel; 17 import io.netty.channel.ChannelFuture; 18 import io.netty.channel.ChannelInitializer; 19 import io.netty.channel.ChannelOption; 20 import io.netty.channel.nio.NioEventLoopGroup; 21 import io.netty.channel.socket.nio.NioSocketChannel; 22 import io.netty.util.concurrent.DefaultThreadFactory; 23  24 import java.util.concurrent.TimeUnit; 25  26 public class NettyClient extends AbstractClient { 27  28     private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); 29  30     private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true)); 31  32     private Bootstrap bootstrap; 33  34     private volatile Channel channel; // volatile, please copy reference to use 35  36     public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { 37         super(url, wrapChannelHandler(url, handler)); 38     } 39  40     @Override 41     protected void doOpen() throws Throwable { 42         NettyHelper.setNettyLoggerFactory(); 43         final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); 44         bootstrap = new Bootstrap(); 45         bootstrap.group(nioEventLoopGroup) 46                 .option(ChannelOption.SO_KEEPALIVE, true) 47                 .option(ChannelOption.TCP_NODELAY, true) 48                 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 49                 //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) 50                 .channel(NioSocketChannel.class); 51  52         if (getTimeout() < 3000) { 53             bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); 54         } else { 55             bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()); 56         } 57  58         bootstrap.handler(new ChannelInitializer() { 59             protected void initChannel(Channel ch) throws Exception { 60                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); 61                 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug 62                         .addLast("decoder", adapter.getDecoder()) 63                         .addLast("encoder", adapter.getEncoder()) 64                         .addLast("handler", nettyClientHandler); 65             } 66         }); 67     } 68  69     protected void doConnect() throws Throwable { 70         long start = System.currentTimeMillis(); 71         ChannelFuture future = bootstrap.connect(getConnectAddress()); 72         try { 73             boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS); 74  75             if (ret && future.isSuccess()) { 76                 Channel newChannel = future.channel(); 77                 try { 78                     // Close old channel 79                     Channel oldChannel = NettyClient.this.channel; // copy reference 80                     if (oldChannel != null) { 81                         try { 82                             if (logger.isInfoEnabled()) { 83                                 logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); 84                             } 85                             oldChannel.close(); 86                         } finally { 87                             NettyChannel.removeChannelIfDisconnected(oldChannel); 88                         } 89                     } 90                 } finally { 91                     if (NettyClient.this.isClosed()) { 92                         try { 93                             if (logger.isInfoEnabled()) { 94                                 logger.info("Close new netty channel " + newChannel + ", because the client closed."); 95                             } 96                             newChannel.close(); 97                         } finally { 98                             NettyClient.this.channel = null; 99                             NettyChannel.removeChannelIfDisconnected(newChannel);100                         }101                     } else {102                         NettyClient.this.channel = newChannel;103                     }104                 }105             } else if (future.cause() != null) {106                 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "107                         + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());108             } else {109                 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "110                         + getRemoteAddress() + " client-side timeout "111                         + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "112                         + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());113             }114         } finally {115             if (!isConnected()) {116                 //future.cancel(true);117             }118         }119     }120 121     @Override122     protected void doDisConnect() throws Throwable {123         try {124             NettyChannel.removeChannelIfDisconnected(channel);125         } catch (Throwable t) {126             logger.warn(t.getMessage());127         }128     }129 130     @Override131     protected void doClose() throws Throwable {132         //can't shutdown nioEventLoopGroup133         //nioEventLoopGroup.shutdownGracefully();134     }135 136     @Override137     protected com.alibaba.dubbo.remoting.Channel getChannel() {138         Channel c = channel;139         if (c == null || !c.isActive())140             return null;141         return NettyChannel.getOrAddChannel(c, getUrl(), this);142     }143 }

netty3:

1 protected void doOpen() throws Throwable { 2         NettyHelper.setNettyLoggerFactory(); 3         bootstrap = new ClientBootstrap(channelFactory); 4         // config 5         // @see org.jboss.netty.channel.socket.SocketChannelConfig 6         bootstrap.setOption("keepAlive", true); 7         bootstrap.setOption("tcpNoDelay", true); 8         bootstrap.setOption("connectTimeoutMillis", getTimeout()); 9         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);10         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {11             public ChannelPipeline getPipeline() {12                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);13                 ChannelPipeline pipeline = Channels.pipeline();14                 pipeline.addLast("decoder", adapter.getDecoder());15                 pipeline.addLast("encoder", adapter.getEncoder());16                 pipeline.addLast("handler", nettyHandler);17                 return pipeline;18             }19         });20     }21 22 protected void doConnect() throws Throwable {23         long start = System.currentTimeMillis();24         ChannelFuture future = bootstrap.connect(getConnectAddress());25         try {26             boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);27 28             if (ret && future.isSuccess()) {29                 Channel newChannel = future.getChannel();30                 newChannel.setInterestOps(Channel.OP_READ_WRITE);31                 try {32                     // 关闭旧的连接33                     Channel oldChannel = NettyClient.this.channel; // copy reference34                     if (oldChannel != null) {35                         try {36                             if (logger.isInfoEnabled()) {37                                 logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);38                             }39                             oldChannel.close();40                         } finally {41                             NettyChannel.removeChannelIfDisconnected(oldChannel);42                         }43                     }44                 } finally {45                     if (NettyClient.this.isClosed()) {46                         try {47                             if (logger.isInfoEnabled()) {48                                 logger.info("Close new netty channel " + newChannel + ", because the client closed.");49                             }50                             newChannel.close();51                         } finally {52                             NettyClient.this.channel = null;53                             NettyChannel.removeChannelIfDisconnected(newChannel);54                         }55                     } else {56                         NettyClient.this.channel = newChannel;57                     }58                 }59             } else if (future.getCause() != null) {60                 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "61                         + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());62             } else {63                 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "64                         + getRemoteAddress() + " client-side timeout "65                         + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "66                         + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());67             }68         } finally {69             if (!isConnected()) {70                 future.cancel();71             }72         }73     }

还有就是编解码。

后续会做netty4源码阅读计划。

 

转载地址:http://yyfcl.baihongyu.com/

你可能感兴趣的文章
ADB am 命令详细参数
查看>>
NOIp 数学基础
查看>>
nginx支持ipv6
查看>>
点名器
查看>>
Codeforces Problems-122A. Lucky Division
查看>>
移动端适配代码
查看>>
Js设置所有连接是触发/swt/的代码
查看>>
JS高级程序设计2nd部分知识要点1
查看>>
mac10.8 更新系统出错
查看>>
'-[UITableViewController loadView] loaded the "XXX" nib but didn't get a UITableView.'
查看>>
ARM裸板开发:07_IIC 通过IIC总线接口读写时钟芯片时间参数实现的总结
查看>>
C# 笔记 如何调用一个有返回值的方法
查看>>
加载静态文件,父模板的继承和扩展
查看>>
高科技犯罪:东欧ATM取款机惊现木马!
查看>>
黑客大赛苹果及微软操作系统均被攻破
查看>>
自由职业者和外包接单项目分析
查看>>
一起谈.NET技术,HTTP协议及POST与GET操作差异,C#中如何使用POST、GET等
查看>>
文明重启怎么做RUST堡垒_文明重启怎么地下建房-地下建房技巧
查看>>
用python画微信捂脸_用 Python 画一个捂脸表情
查看>>
mysql alter float_mysql-数据类型
查看>>