NioEventLoop
netty版本为
4.1.6.Final
在上一节服务端的启动中,我们了解到一个重要的组件NioEventLoop,本章来分析一下这个组件。
本文可以带着三个问题来看
1.默认情况下,Netty服务端起多少线程?何时启动?
2.Netty是如何解决jdk空轮询的bug的?
3.Netty是如何保证异步串行无锁化?
NioEventLoop分析三步走
1.NioEventLoop创建
- 如何创建的,在创建过程中做了哪些事情
2.NioEventLoop启动
- 学习NioEventLoop启动流程
3.NioEventLoop执行逻辑
- 学习NioEventLoop底层做了什么,并且是如何保持高效运转的
NioEventLoop创建
代码调用过程
从用户的代码入口开始new NioEventLoopGroup()
.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
进入到NioEventLoopGroup.NioEventLoopGroup()
,然后一路调用构造方法
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
super()
调用父类MultithreadEventLoopGroup
构造方法
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); //调用父类MultithreadEventLoopGroup
}
MultithreadEventLoopGroup.MultithreadEventLoopGroup()
/**
* @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)}
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);//继续调用父类MultithreadEventExecutorGroup
}
MultithreadEventExecutorGroup.MultithreadEventExecutorGroup()
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
调用MultithreadEventExecutorGroup()
方法
这一方法用来创建NioEventLoop,主要分为三个过程:
- 创建一个线程创建器
- 通过for循环创建NioEventLoop
- 通过chooserFactory.newChooser()创建线程选择器
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());//传进去一个newDefaultThreadFactory,创建一个线程创建器
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { //通过for循环创建NioEventLoop
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children); //通过chooserFactory.newChooser()创建线程选择器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
详细分析创建NioEventLoop的三个过程
1.ThreadPerTaskExecutor
- 每次执行任务都会创建一个线程实体
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();//通过threadFactory创建线程,把任务放进去,然后执行
}
}
- NioEventLoop线程命名规则创建nioEventLoop-1-xx
创建ThreadFactory
的过程
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass()); //返回一个简单类名
}
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
toPoolName
转换大写字符到小写字符
public static String toPoolName(Class<?> poolType) {
if (poolType == null) {
throw new NullPointerException("poolType");
}
String poolName = StringUtil.simpleClassName(poolType);
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
} else {
return poolName;
}
}
}
构造一个命名前缀prefix
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
if (poolName == null) { // 进行一些命名的判断
throw new NullPointerException("poolName");
}
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
newThread()
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); //每创建一个都自增一次
try {
...
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
Netty自己封装了一个FastThreadLocalThread
。继承自Thread,优化了一些操作。关于FastThreadLocalThread
和ThreadLocalThread
的区别,可以去查一下。
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
2.newChild()
- 保存线程执行器ThreadPerTaskExecutor
- 创建一个MpscQueue
- 创建一个selector
首先还是来看MultithreadEventExecutorGroup
里的for循环
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
....
newChild()通过传进去一个executor,这个executor就是前面的ThreadPerTaskExecutor。
查看newChild方法
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
发现进入NioEventLoop的一个构造函数,直接进入NioEventLoop的构造函数。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); //调用父类的构造函数
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector(); //通过openSelector创建selector,一个selector就跟一个NioEventLoop做一个唯一的绑定
selectStrategy = strategy;
}
跟一下父类调用方法进入到SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");//保存线程执行器,后面创建NioEventLoop底层线程的时候会用到。
taskQueue = newTaskQueue(this.maxPendingTasks);//taskQueue主要用在外部线程在执行一些netty任务的时候,如果判断不是在NioEventLoop对应的线程里面去执行,会直接放到一个任务队列里面,然后由NioEventLoop对应的一个线程去执行
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
newTaskQueue
通过newMpscQueue
实现
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
openSelector方法,通过selectorProvider去open一个selector。
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
.....
}
3.newChooser()
完成上两步,最后是MultithreadEventExecutorGroup
里面的chooser创建
chooser = chooserFactory.newChooser(children);
chooser的作用是给新连接绑定对应的NioEventLoop,对应的方法是next()
方法
@Override
public EventExecutor next() {
return chooser.next();
}
next()方法实现的原理很简单,从0开始匹配,当匹配到N+1个又循环开始。
netty对这个操作进行了优化,下面来看一下是怎么实现优化的。
进入到DefaultEventExecutorChooserFactory
的newChooser
方法,可以看到这个做了两个判断
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
先看PowerOfTowEventExecutorChooser
的实现原理图
简单描述一下这个过程,假设length是16,减一就是十六进制的1111,假设idx是111010,由于是2的整数次幂,所以每次自增都只会在最后一位进行自增,所以直接对length长度 的位数进行& 运算就可以实现循环移动。由于二进制&运算要比取模快速,计算机没有自带的取模运算,需要调用底层的一些哭,效率相对较低些,所以这个操作是进行了优化。
代码实现
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];//&要比取模快得多
}
}
GenericEventExecutorChooser
的实现,这个的实现是一个简单的取模。
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];//取模
}
}
最后看一下isPowerOfTwo
的判断实现,也是非常巧妙,利用补码的运算&操作。
Q:为什么需要2的整数次幂才可以这样操作?
A:如果不是2的整数次幂,进行自增时变化的不一定是最后一位,所以不一定能实现循环移位。
NioEventLoop启动
1.启动流程
2.代码调用过程
首先通过bind()
方法去执行一个task
,来看一下对应的源码
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {//这里就是执行一个task,做一个具体的端口绑定
@Override
public void run() {
...
}
});
}
我们进入到SingleThreadEventExecutor
的execute
方法里
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();//这里首先判断是否在本线程里面,判断当前执行的线程是否是NioEventLoop的线程,返回false
if (inEventLoop) {
addTask(task);
} else {
startThread();//进行实际线程的启动
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
看一下inEventLoop方法,这里Thread还没创建,所以是不相等的,返回false
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());//获取当前线程,在启动的时候其实就是main线程,由于是处理io事件,也叫IO线程
}
它就是拿当前线程和之前创建NioEventLoop时绑定的那个IO线程进行判断, 如果是一样的, 说明此线程就是绑定的IO线程, 可以执行读写操作, 如果不一样, 那么说明是其他线程, 就要把读写操作封装成任务放在队列中, 由绑定的那个IO线程去执行。这是Netty设计的异步串行无锁化.,在Netty中线程之间不用同步控制, 可以做到线程安全.
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;//比较是否为同一个
}
接下来跟进startThread()方法
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {//判断当前线程是否是未启动的
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread(); //进行线程实际的启动
}
}
}
doStartThread()
方法,调用executor创建一个新的线程并且启动,这里的executor是ThreadPerTaskExecutor
private void doStartThread() {
assert thread == null;// 断言thread为null
executor.execute(new Runnable() {//这里的executor就是ThreadPerTaskExecutor
@Override
public void run() {
thread = Thread.currentThread();//保存当前线程,这个保存实际上就是把NioEventLoop这个对象和唯一一个线程进行绑定
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();//调用run方法启动,进行实际的启动
success = true;
} catch (Throwable t) {
...
}
...
}
3.小结
我们来简单的回顾一下,主线程最终会调用一个bind方法,这个bind方法把一个实际的流程封装成一个task,然后调用服务端channel的一个execute方法,去具体的执行,然后netty会判断调用execute方法不是Nio线程,然后会调用startThread—>doStartThread方法开始创建线程,具体的创建过程就是ThreadPerTaskExecutor的执行过程,ThreadPerTaskExecutor每次执行execute方法的时候都会创建一个线程,对应的就是NioEventLoop的底层的一个线程,创建这个线程的具体逻辑就是:首先NioEventLoop会把当前线程进行一个保存,然后调用一个run方法,进行NioEventLoop启动
NioEventLoop执行
1.执行流程
2.代码调用过程
来看一下NioEventLoop中的run方法
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));//select轮询io事件
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;//默认50
if (ioRatio == 100) {
try {
processSelectedKeys();//处理io事件
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//执行时间1:1
}
...
}
}
select方法的执行逻辑
deadline以及任务穿插逻辑处理
阻塞式select
避免jdk空轮询的bug
这三个操作都在select方法的for循环里面实现
先来看select方法,wakenUp是用来标识当前状态是否是唤醒状态,每次进行select操作的时候都会把wakenUp设置为false,标志了要进行select操作,而且是未唤醒状态。
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));//设置为false
跟进select方法,首先我们要了解NioEventLoop底层有一个定时任务队列,这个队列是按照任务的截止时间排序的队列,delayNanos(currentTimeNanos)是为了计算当前任务队列第一个任务的截止时间。
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();//计算系统当前时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//执行时间不能超过selectDeadLineNanos这个时间
for (;;) {
============deadline以及任务穿插逻辑处理=============
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {//判断是否超时
if (selectCnt == 0) {//如果超时,一次也没有select的话
selector.selectNow();//进行一个非阻塞的方法
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) {//没有超时,判断是否还有任务,还有任务就进行非阻塞select
selector.selectNow();
selectCnt = 1;
break;
}
===========阻塞式select===========
int selectedKeys = selector.select(timeoutMillis);//未超时,且队列未为空时执行
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {//出现以下事件时,本次select轮询终止
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
//执行到这里说明至少执行了一次阻塞
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {//timeoutMillis过去了,没有选择任何内容
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();//重新建立一个selector,把旧的key全部注册到新的selector上
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
...
}
rebuildSelector()
源码
/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = openSelector();// 新建一个Selector
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {//把旧的key全部注册到新的selector上
Object a = key.attachment();//这个attachment就是netty经过包装的一个channel
try {
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();//拿到注册的事件
key.cancel();//取消原来的注册事件
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);//重新注册到新的selector上
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;//更新selectionKey
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
3.小结
首先select操作会进行一个deadline的处理,然后判断如果当前有任务在里面,就终止本次select,如果没有到截止时间或者当前taskqueue没有任务,则就行一次阻塞式select操作,在阻塞式select操作结束后,会判断这个select操作是否真的阻塞了这么久的时间,如果没有阻塞这么长时间,则说明有可能促发了jdk的NIO空轮询bug,接下来netty会判断空轮询的bug次数是否达到了阈值(默认512),如果达到了,就替换selector,执行rebuildSelector操作,这样避开了空轮询的bug。
以上我们学习了NioEventLoop执行的第一个过程–检测io事件,也就是调用jdk轮询器selector进行阻塞或者非阻塞的操作,NioEventLoop的第二个过程就是处理这些io事件,关于处理这些io事件,将在下一节processSelectedKeys()
执行逻辑分析。
processSelectedKeys()执行逻辑
- selected keySet 优化
select操作每次都会把已经就绪的io事件添加到底层的一个hashset,而netty会通过反射的方式把hashset替换成数组的实现,这样在任何情况下,select操作的时间复杂度都是O(1),优于hashset。
- processSelectedKeysOptimized()
这个方法真正的对io事件进行处理。
下面来看一下selected keySet 优化的源码
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();//调用jdk的API创建selector
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {//判断不需要优化就直接返回原生的selector,默认情况下会进行优化
return selector;
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();//netty会用这一个数据结构来替换hashset
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",//通过反射的方式拿到这个类对象
false,
PlatformDependent.getSystemClassLoader());
} catch (ClassNotFoundException e) {
return e;
} catch (SecurityException e) {
return e;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||//判断是否拿到了这个类以及selector是否是这一个类的实现
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
if (maybeSelectorImplClass instanceof Exception) {
Exception e = (Exception) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
}
return selector;//如果不是,则返回一个原生selector
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;//如果是,则拿到这个对象
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");//利用反射拿到这两个属性,这两个属性默认是hashset实现方式
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);//在这里直接进行替换
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
} catch (RuntimeException e) {
// JDK 9 can throw an inaccessible object exception here; since Netty compiles
// against JDK 7 and this exception was only added in JDK 9, we have to weakly
// check the type
if ("java.lang.reflect.InaccessibleObjectException".equals(e.getClass().getName())) {
return e;
} else {
throw e;
}
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
} else {
selectedKeys = selectedKeySet;//把selectedKeySet保存为netty的一个成员变量
logger.trace("instrumented a special java.util.Set into: {}", selector);
}
return selector;
}
SelectedSelectionKeySet.add
方法,不支持remove,contains等方法。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
....
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
if (isA) {
int size = keysASize;
keysA[size ++] = o;//这里的操作复杂度为O(1)
keysASize = size;
if (size == keysA.length) {
doubleCapacityA();
}
} else {
int size = keysBSize;
keysB[size ++] = o;
keysBSize = size;
if (size == keysB.length) {
doubleCapacityB();
}
}
return true;
}
接下来看处理io事件processSelectedKeysOptimized()
方法的实现
首先回到NioEventLoop的run方法,进入processSelectedKeys()
方法
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());//flip方法返回selectedKeys
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
进入processSelectedKeysOptimized()
方法
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];//遍历,拿到selectedKeys
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;//拿到之后把数组引用设置为null
final Object a = k.attachment();//拿到封装的channel
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);//在这里进行io处理
} else {
...
}
...
}
}
进入到processSelectedKey()
方法里
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();//拿到channel的unsafe
if (!k.isValid()) {//如果key非法的话,说明连接可能有问题,调用close方法直接关闭
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();//读取io事件
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
runAllTask()执行逻辑
- task的分类和添加
- 任务的聚合
- 任务的执行
首先来看一下task的分类和添加,netty里面默认有两个task queue队列,一个是普通任务队列MpscQueue,在创建NioEventLoop的时候就已经创建了,一个是定时任务
来看普通任务的创建和添加
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);//这里创建一个MpscQueue
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
SingleThreadEventExecutor.execute()方法
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();//这一段前面已经分析过,判断是内部线程启动还是外部线程启动
if (inEventLoop) {
addTask(task);
} else {
startThread();//如果是外部线程启动,先启动NioEventLoop这个线程
addTask(task);//调用addTask方法
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
addTask()方法
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);//这里就向taskQueue添加一个task
}
下面来看一下定时任务队列,定时任务队列在调用schedule方法的时候操作,这个schedule方法也是NioEventLoop对外暴露的方法
AbstractScheduledEventExecutor.schedule()方法
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");//callable其实就是一个任务
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(//这里把callable封装成一个ScheduledFutureTask,然后调用schedule方法
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
跟进schedule()
方法
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {//同样进行判断是外部线程发起的schedule还是NioEventLoop发起的
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {//如果是外部线程,那么把添加也变成一个线程安全的操作,scheduledTaskQueue()是非线程安全的,是一个非线程安全的PriorityQueue
@Override
public void run() {
scheduledTaskQueue().add(task);//通过把添加定时任务的操作也当做一个普通的task,来实现保证所有定时任务队列都是在NioEventLoop里面实现的
}
});
}
return task;
}
下面来看一下任务的聚合,回到NioEventLoop的run方法
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//这里传进去一个截止时间,执行时间不能超过这个timeoutNanos时间
}
跟进runAllTasks()
方法
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();//先从任务队列里面进行任务的聚合
Runnable task = pollTask(); //任务执行,到这里所有的任务都在普通的taskqueue里面,这里从里面拿一个任务
...
}
fetchFromScheduledTaskQueue()方法
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);//首先从定时任务队列里面去截止时间为nanoTime的任务
while (scheduledTask != null) {//如果定时任务不为null,那么直接塞到普通任务队列里面
if (!taskQueue.offer(scheduledTask)) {//添加进普通队列失败,必须重新放回到定时任务队列里面,否则这个任务就丢失了
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);//添加完之后循环执行
}
return true;
}
ScheduledFutureTask里面有个compareTo()方法
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();//直接进行任务时间对比,时间短的排在前面
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
pollScheduledTask()方法
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;//首先拿到定时任务队列
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();//调用peek方法拿到第一个任务
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {//截止时间小于nanoTime
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;//所有定时任务的截止时间还没到
}
上面已经完成了任务的聚合,把要执行的定时任务放到了普通任务队列里面,下面就来看任务的执行
继续回到runAllTasks方法
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();//任务执行,到这里所有的任务都在普通的taskqueue里面,这里从里面拿一个任务
if (task == null) {//队列为空
afterRunningAllTasks();//进行收尾操作
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;//计算一下截止时间,所有的任务执行时间都不能超过这个
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);//挨个执行任务
runTasks ++;//计数
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
safeExecute()方法
/**
* Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
*/
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);//异常只打印日志,不停止
}
}
NioEventLoop总结
用户在创建workerGroup和bossGroup时NioEventLoop被创建,不传参数时默认会创建两倍核数个NioEventLoop,每一个NioEventLoop都会有一个chooser进行线程逻辑分配,而这个chooser也会根据NioEventLoop的个数做一定的优化,NioEventLoop在创建的时候会创建一个selector和一个定时任务队列,在创建selector的时候,netty会通过反射的方式用数组实现来替换掉原来selector的两个hashset实现,NioEventLoop在首次调用execute()方法的时候启动线程,这个线程是一个fastThreadLocalThread,在启动线程之后,netty会将创建完成的线程保存到成员变量,这样就能判断执行NioEventLoop里面的逻辑是否是本线程。NioEventLoop的执行逻辑在run方法里,主要包括三个过程,第一过程是检测io事件,第二个过程是处理这些io事件,最后来执行任务队列。
最后来看一下文章开始的三个问题:
1.默认情况下,Netty服务端起多少线程?何时启动?
- 在不传参数的情况下默认是两倍CPU核数
- 在调用execute方法时会判断是否是NioEventLoop线程执行,如果是则直接执行,如果不是,先调用startThread方法,启动NioEventLoop线程
2.Netty是如何解决jdk空轮询的bug的?
- Netty通过一个计数的方式来避免空轮询,如果当前的一个阻塞操作并没有花这么长时间,那就有可能是触发了一个空轮询的bug,默认情况下这样的次数出现512次,则重新注册一个selector,把所有旧的key移到新的selector上去。
3.Netty是如何保证异步串行无锁化?
netty在所有外部线程调用inEventLoop或channel的方法时,通过inEventLoop方法判断是外部线程,在这种情况下,就把所有操作封装成一个task,放入MpscQueue里面,然后在NioEventLoop执行逻辑的第三个过程里面,这些任务被逐个执行。
在拿到一个channel后,不需要对这个channel进行同步就可以多线程并发读写,channelhead里面所有操作都是线程安全的,不需要进行同步。