Netty新连接接入
引言
在本文开始之前,可以带着这两个问题来看
- Netty是在哪里检测有新连接接入的?
- 新连接是怎么样注册到NioEventLoop线程的?
首先来看新连接接入的四个处理逻辑步骤,接下来分析这四个步骤。
新连接通过服务端channel绑定的selector轮询出accept事件,检测出新连接之后,基于JDKNIO的channel创建一个netty的NioSocketChannel,也就是客户端channel,接着netty给客户端channel分配一个NioEventLoop,并且把该channel注册到此NioEventLoop对应的selector上,至此,这条channel后续相关的读写都由此NioEventLoop进行管理,最后向这个channel对应的selector注册读事件,注册的过程和服务端启动注册accept事件复用同一段逻辑,下面来详细分析这四个处理逻辑。
1.检测新连接
原理流程
开始之前,我们先回顾上两章内容,netty服务端启动之后会绑定一个boss线程,这个boss线程就是NioEventLoop
,bossNioEventLoop
在调用bind方法之后启动,它的run方法会干两件事情,第一,select IO事件,那么这里对应的IO事件就是accept事件,也就是新连接接入的一个事件,在拿到accept事件之后,第二个过程就是处理这个accept事件,也就是本小节要分析的内容。新连接检测从服务端NioEventLoop.run
方法第二个过程,也就是processSelectedKey开始,接下来调用NioMessageUnsafe.read()
方法,然后通过一个while循环,循环调用doReadMessages()
来创建新连接对象,而创建新连接对象最核心的方法就是调用javaChannel().accept()
方法创建jdkchannel,与服务端启动的思路一样,netty会把jdk的channel封装成netty自定义的一个channel,下面我们按照这个流程来看一遍源码。
代码实现
首先启动服务端,然后在NioEventLoop的processSelectedKey()
方法打一个断点,用telnet 建立一个新连接
telnet 127.0.0.1 8888
代码执行进入processSelectedKey()
方法
接下来往下走,获取readyOps
会运行到判断这是一个accept事件,调用unsafe.read()方法,可以看到这个unsafe是NioMessageUnsafe
首先看一下这个read方法的大概逻辑
@Override
public void read() {
assert eventLoop().inEventLoop();//首先确定是在NioEventLoop线程里面执行,如果是外部线程调用,则下面的逻辑就不进行了
final ChannelConfig config = config();//服务端channel的config
final ChannelPipeline pipeline = pipeline();//服务端pipeline
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//这个handle主要是处理服务端接收的速率
allocHandle.reset(config);//重置配置
boolean closed = false;
Throwable exception = null;
try {
try {
do {//do while循环
int localRead = doReadMessages(readBuf);//读连接
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);//这个分配器会把读到的连接作为计数
} while (allocHandle.continueReading());//是否继续读
} catch (Throwable t) {
exception = t;
}
...//拿到连接后接下来的流程
}
}
代码执行过程
进入doReadMessages()方法,通过拿到服务端channel对应的一个底层的jdk channel,也就是调用javaChannel().accept()拿到,调用accept方法去创建一个jdk底层的channel。
拿到这个channel,可以看到是java.nio.channels.SocketChannel的channel,接下来把这个channel封装成 NioSocketChannel,放到buf里面,这个buf是上一层方法传进来的一个容器的对象。
创建channel完成,return 1,回到这里说明已经创建完了一个channel
这里的readBuf是服务端channel的NioMessageUnsafe的一个filter,是一个ArrayList,主要就是用来临时存载读到的连接。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
...
}
continueReading()方法
@Override
public boolean continueReading() {
return config.isAutoRead() &&
attemptedBytesRead == lastBytesRead &&
totalMessages < maxMessagePerRead &&//判断当前的连接数是否超过最大连接数
totalBytesRead < Integer.MAX_VALUE;
}
小结
总结一下,在服务端channel NioEventLoop第二个过程processSelectKey检测出accept事件之后,通过jdk的accept方法去创建一个jdk的channel,然后把它包装成netty自定义的channel,在连接过程中通过一个Handle对象去控制连接接入的速率,默认情况下,一次性读取16个连接。
2.创建NioSocketChannel
原理流程
在上一小节检测新连接的时候,最后一步是调用javaChannel.accept创建jdkchannel。创建完成之后,把服务端channel和客户端channel当做参数传到NioSocketChannel的构造函数里面,接下来进行一系列的创建过程。NioSocketChannel构造函数是入口,和创建服务端channel不一样的是,这里是直接调用new关键字,而服务端channel是通过反射的方式创建,大家可以思考一下netty为什么要这样设计。NioSocketChannel主要会做两件事,第一回逐层调用父类构造函数来做一些事情,第二是创建一个和NioSocketChannel绑定的配置类NioSocketChannelConfig()。
第一个过程里面首先把此channel配置为非阻塞,然后将感兴趣的读事件op
保存到成员变量方便后续注册到selector上。接着创建和此channel相关的一些组件,id作为唯一标识,unsafe作为底层的读写,pipeline作为业务逻辑的载体。第二个过程设置此channelTcpNotDelay为true,也就是禁止Nagle算法,这样小的数据包就会尽可能的发送出去,降低延时。接下来按照此流程来看一遍源码。
代码实现
来看NioServerSocketChannel的doReadMessages方法
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();//这里建立一个客户端channel
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));//创建一个netty自定义的客户端channel,把this服务端channel和ch客户端channel传入构造函数里面
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
接下来NioSocketChannel会逐层调用父类构造函数和创建config
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);//调用父类构造函数
config = new NioSocketChannelConfig(this, socket.socket());//创建config,通过jdk channel的socket方法拿到一个底层的socket
}
先来看父类构造函数
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);//这里传进去一个OP_READ,这个OP_READ就是客户端channel的一个read事件,后续将此channel绑定到selector上去的时候就表示我对这个读事件比较感兴趣,后面如果有读事件,请告诉我
}
继续进入super
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;//对channel进行保存,这个就是创建完成的jdkchannel
this.readInterestOp = readInterestOp;//这是刚刚传进来的读事件
try {
ch.configureBlocking(false);//设置此channel为非阻塞模式
} catch (IOException e) {//如果出现异常就关闭
....
}
}
继续跟super,这里创建相关组件
protected AbstractChannel(Channel parent) {
this.parent = parent;//创建此客户端channel的服务端channel,也就是我们在服务端启动过程中提到的用反射创建的channel
id = newId();//创建组件
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
接下来看创建config过程,跟进NioSocketChannelConfig
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket);
}
跟进super
/**
* Creates a new instance.
*/
public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
super(channel);
if (javaSocket == null) {
throw new NullPointerException("javaSocket");
}
this.javaSocket = javaSocket;//保存
// Enable TCP_NODELAY by default if possible.
if (PlatformDependent.canEnableTcpNoDelayByDefault()) {//判断一下默认情况下是不是要EnableTcpNoDelay,这里默认返回true
try {
setTcpNoDelay(true);
} catch (Exception e) {
// Ignore.
}
}
}
跟进canEnableTcpNoDelayByDefault(),返回了一个常量
/**
* Returns {@code true} if and only if it is fine to enable TCP_NODELAY socket option by default.
*/
public static boolean canEnableTcpNoDelayByDefault() {
return CAN_ENABLE_TCP_NODELAY_BY_DEFAULT;
}
运行在Linux服务端isAndroid()返回false,总体返回true
private static final boolean CAN_ENABLE_TCP_NODELAY_BY_DEFAULT = !isAndroid();//
接下来看setTcpNoDelay()
@Override
public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
try {
javaSocket.setTcpNoDelay(tcpNoDelay);
} catch (SocketException e) {
throw new ChannelException(e);
}
return this;
}
跟进javaSocket.setTcpNoDelay()方法,这里已经到达java.net.socket包
/**
* Enable/disable {@link SocketOptions#TCP_NODELAY TCP_NODELAY}
* (disable/enable Nagle's algorithm).
*
* @param on {@code true} to enable TCP_NODELAY,
* {@code false} to disable.
*
* @exception SocketException if there is an error
* in the underlying protocol, such as a TCP error.
*
* @since JDK1.1
*
* @see #getTcpNoDelay()
*/
public void setTcpNoDelay(boolean on) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
getImpl().setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on));
}
小结
创建NioSocketChannel可以分成两个部分,第一部分逐层调用父类函数,设置该channel的阻塞模式为false,保存读事件,创建一系列组件。第二部分是创建config,默认情况下设置setTcpNoDelay为true,这样小的数据包可以尽快地发出去,降低延时。