Netty 源码分析之一 NioEventLoopGroup 初始化
Netty 源码分析之一 NioEventLoopGroup 初始化
一、Netty如何运行?
运行环境:
Windows10 12核 16G Dell台式机
netty版本:
4.1.65.Final
public class MyNettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new MyNettyServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(3247).sync();
cf.addListener(future -> {
if (future.isSuccess()) {
System.out.println("listen port 3247 success");
} else {
System.out.println("listen port 3247 failure");
}
});
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
以上是一个常用的Netty启动模板,代码虽然不多,但已经搭起了一个强大健壮的服务器,下面我将按照上述代码梳理下Netty的启动流程,方便进一步理解Netty的运行机制和原理。
二、NioEventLoopGroup
先看下main方法中NioEventLoopGroup类的继承结构图:
上图中上面部分是由Java提供的并发接口和迭代器,主要用于提交任务、调度任务这些,继承迭代器主要是为了统一外界遍历接口。下面部分就是Netty提供的,比如EventExecutorGroup事件执行器组,根据名称可以猜到里面应该定义了一些任务执行相关的方法,打开类结构可以看到里面的方法很多跟Java并发线程部分很多一样。
下面EventLoopGroup继承了EventExecutorGroup接口,并且增加了一些方法,比如next()用来获取下一个EventLoop事件循环器。
再向下由MultithreadEventLoopGroup扩展到了多线程事件循环组,继承了MultithreadEventExecutorGroup,实现EventLoopGroup,将两者结合,一方面是需要事件执行器,一方面又需要定义事件循环接口,也就是说我应该告诉执行器们它们应该做什么。最后NioEventLoopGroup只实现了很关键的newChild()方法:
三、NioEventLoop
进入newChild()方法可以看到,最后返回的是一个NioEventLoop,我们先看下这个类的继承结构图:
上图也是将继承结构归纳为JDK提供的和Netty提供的两类,不过先不用深究,只用看继承了哪些类的名称,最终继承了一个SinleThreadEventLoop单线程事件循环类,这时就可以知道NioEventLoop内有一个单线程循环,这里就不用再考虑线程安全问题了。
回过头来,我们已经可以看到NioEventLoopGroup和NioEventLoop的关系了:
四、初始化事件循环组
1、创建事件循环组
现在,我们从main方法中的第一行来看看**new NioEventLoopGrop(1)**内究竟做了些什么。
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
// 若未指定线程数量的话,默认输入为0,此处会使用CPU核数*2作为默认线程数量
}
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
继续往里执行:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// 这里创建了一个执行器选择工厂,就是如何选择执行器来做事,默认是可以从头到尾轮着选择,即取模的方式
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
最后到达终点:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");
// 线程池
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 事件执行器
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 事件执行选择器
chooser = chooserFactory.newChooser(children);
// 终止事件
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
2、创建任务执行器new ThreadPerTaskExecutor(newDefaultThreadFactory())
newDefaultThreadFactory
首先创建线程工厂,类似于创建线程池的线程工厂,代码跟进去以后可以看到:
此处可以看到前缀prefix变成了nioEventLoopGroup-2-,因为在前面初始化MultithreadEventExecutorGroup时,在MultithreadEventExecutorGroup中有个全局事件执行器GlobalEventExecutor的变量要初始化,根据类的加载流程,其会在构造方法执行前初始化。
进入GlobalEventExecutor中可以看到构造方法已经创建了一个默认的线程工厂:
进入默认线程工厂中可以看到,此时prefix前缀为globalEventExecutor-1-
3、new ThreadPerTaskExecutor
我们再进入ThreadPerTaskExecutor的构造方法中可以看到,这个类就是设置了一个线程工厂,当有任务时就创建一个线程执行:
4、初始化事件执行器
我们前面提到的nThreads线程数量就是用在此处,用来创建对应个数的EventExecutor事件执行器:
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
...
}
}
5、进入newChild创建NioEventLoop
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
进入newChild方法中,首先判断args可变参数是否存在4个,若存在则直接取出EventLoopTaskQueueFactory类型的参数,不存在直接为空。调试环境下可以看到我们实际输入的参数只有3个:
打开当前类NioEventLoopGroup的构造方法可以看到第四个参数,NioEventLoopGroup支持使用指定的任务队列工厂来替换默认的(参考:#9247)
6、NioEventLoop构造方法
接下来进入NioEventLoop的构造方法中:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
// 选择器提供器
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
// 选择器策略
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 获取选择器元组
final SelectorTuple selectorTuple = openSelector();
// 包装后的选择器
this.selector = selectorTuple.selector;
// 未包装的选择器(JDK提供的原始的NIO选择器)
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
首先调用了父类的构造方法,可以看到内部调用了两次newTaskQueue,创建了两个任务队列:
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
// 由上面的分析可知queueFactory队列工厂默认输入为空
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
然后进入newTaskQueue0方法中:
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
PlatformDependent类就是根据不同的操作系统创建不同的数据,此处创建的是MpscChunkedArrayQueue队列,它是JCTools下的一个高性能队列,这里只要知道是个队列就行,后面会单独列一章说明。
然后进入NioEventLoop的父类SingleThreadEventLoop构造方法中:
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
// 尾任务队列,此处暂不太了解有什么用处
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
再进入SingleThreadEventLoop的父类SingleThreadEventExecutor的构造方法中:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
// 添加任务唤醒,默认false
this.addTaskWakesUp = addTaskWakesUp;
// 最大等待任务数量
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
// 封装过的执行器
this.executor = ThreadExecutorMap.apply(executor, this);
// 任务队列
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
// 拒绝策略
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
最后是把NIO事件循环组放入AbstractEventExecutor抽象事件执行器中:
最后再看下类的层次结构(见下图),NioEventLoop的继承关系就很清晰了:
让我们回到SingleThreadEventExecutor的构造方法中继续往下执行,可以看到这一行:
this.executor = ThreadExecutorMap.apply(executor, this);
这里面是将上面创建好的ThreadPerTaskExecutor和NioEventLoop再包装一下,返回的是ThreadExecutorMap中的匿名内部对象executor,只是这里面使用ThreadPerTaskExecutor执行匿名内部类中的任务:
在匿名内部类中还有一个apply()方法,任务真正运行之前需要设置setCurrentEventExecutor中当前的事件执行器,也就是NioEventLoop,这里面使用的是Netty提供的FastThreadLocal,当前线程独占,任务执行完后就会置空,具体里面也比较复杂,暂不深入,后续会单独列一章进行说明:
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
到现在我们知道了,
1、SingleThreadEventExecutor的构造函数设置了执行器 ThreadExecutorMap中内部executor,这里面是封装了ThreadPerTaskExecutor和NioEventLoop,设置了taskQueue任务队列和rejectedExecutionHandler拒绝处理器。
2、在SingleThreadEventLoop中设置了尾任务tailTasks****。
3、最后回到NioEventLoop的构造函数中,设置provider选择器提供器,selectStrategy选择器策略,然后通过openSelector()获取选择器元组,这里面包含了未封装的unwrappedSelector原始的JDK的选择器和封装的SelectedSelectionKeySetSelector选择器。如果创建失败,则调用**shutdownGracefully()**进行优雅关闭。
完成NioEventLoop实例化后,我们重新回到MultithreadEventExecutorGroup的构造方法中继续往下走。
7、chooserFactory.newChooser(children)创建选择器
chooser = chooserFactory.newChooser(children);
这里其实就是怎么选择执行器,默认采用从头到尾轮询的方式:
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 判断执行器的长度是否为2的次幂
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
根据执行器的长度是否为2的次幂选择不同的计算方式。首先判断一个数是否是2的整数次幂:
思路一:
1、首先把2的整数次幂换成二进制数,十进制数2转换为二进制数是10B,4转换为二进制数是100B...
十进制 | 二进制 | 是否为2的整数次幂 |
---|---|---|
2 | 10B | 是 |
4 | 100B | 是 |
8 | 1000B | 是 |
16 | 10000B | 是 |
32 | 100000B | 是 |
100 | 1100100B | 否 |
2、如果一个数是2的整数次幂,那么当其转换为二进制时,最高位为1,其余位都是0,如果把这些2的整数次幂各自减一,再转为二进制数
十进制 | 二进制 | 原值-1 | 是否为2的整数次幂 |
---|---|---|---|
2 | 10B | 1B | 是 |
4 | 100B | 11B | 是 |
8 | 1000B | 111B | 是 |
16 | 10000B | 1111B | 是 |
32 | 100000B | 11111B | 是 |
100 | 1100100B | 1100011B | 否 |
3、此时将2的整数次幂原值和其减一的结果进行按位与运算,即计算 n & (n-1) ,得到的结果为 0。
十进制 | 二进制 | 原值减一 | n & (n-1) | 是否为2的整数次幂 |
---|---|---|---|---|
2 | 10B | 1B | 0 | 是 |
4 | 100B | 11B | 0 | 是 |
8 | 1000B | 111B | 0 | 是 |
16 | 10000B | 1111B | 0 | 是 |
32 | 100000B | 11111B | 0 | 是 |
100 | 1100100B | 1100011B | 1100000 | 否 |
**结论:**0和1按位运算的结果为0,因此所有的2的整数次幂和它自身减一的值进行按位与运算后,得到的结果都是0,反之,如果一个整数不是2的整数次幂时,得到的结果就不会是0。
public static boolean isPowerOfTwo(int val) {
return (val & val - 1) == 0;
}
思路二:
在开始前,我们先熟悉下负数转二进制的一些操作,首先定义一个Integer类型的数字比如32768,由于Integer类型占用了32个bit位,因此转为二进制的完整形式为:
0000 | 0000 | 0000 | 0000 | 1000 | 0000 | 0000 | 0000 |
---|
计算机中对有符号数字的表示有三种方法:原码、反码和补码,补码=反码+1,二进制数据中最高位表示符号位,最高位为0表示整数,最高位为1表示负数。
将-32768转化为二进制数据:
①首先将-32768的绝对值转换为二进制可得:0000 0000 0000 0000 1000 0000 0000 0000
0000 | 0000 | 0000 | 0000 | 1000 | 0000 | 0000 | 0000 |
---|
②求该二进制的反码:
1111 | 1111 | 1111 | 1111 | 0111 | 1111 | 1111 | 1111 |
---|
③最后将反码加1可得补码即为原值的二进制数据:
1111 | 1111 | 1111 | 1111 | 1000 | 0000 | 0000 | 0000 |
---|
对应算法:
1、将2的整数次幂转换为二进制数据
十进制 | 二进制 | 是否为2的整数次幂 |
---|---|---|
8 | 00000000000000000000000000001000 | 是 |
16 | 00000000000000000000000000010000 | 是 |
32 | 00000000000000000000000000100000 | 是 |
64 | 00000000000000000000000001000000 | 是 |
128 | 00000000000000000000000010000000 | 是 |
257 | 00000000000000000000000100000001 | 否 |
2、取2的整数次幂的负数,并通过取反加一操作获取对应的二进制数
十进制 | 二进制 | 原值的负数 | 是否为2的整数次幂 |
---|---|---|---|
8 | 00000000000000000000000000001000 | 11111111111111111111111111111000 | 是 |
16 | 00000000000000000000000000010000 | 11111111111111111111111111110000 | 是 |
32 | 00000000000000000000000000100000 | 11111111111111111111111111100000 | 是 |
64 | 00000000000000000000000001000000 | 11111111111111111111111111000000 | 是 |
128 | 00000000000000000000000010000000 | 11111111111111111111111110000000 | 是 |
257 | 00000000000000000000000100000001 | 11111111111111111111111011111111 | 否 |
3、此时将2的整数次幂原值与2的整数次幂负值进行按位与运算,即计算 n & (-n) ,得到的结果为 n
十进制 | 二进制 | 原值的负数 | n & (-n) | 是否为2的整数次幂 |
---|---|---|---|---|
8 | 00000000000000000000000000001000 | 11111111111111111111111111111000 | 00000000000000000000000000001000 | 是 |
16 | 00000000000000000000000000010000 | 11111111111111111111111111110000 | 00000000000000000000000000010000 | 是 |
32 | 00000000000000000000000000100000 | 11111111111111111111111111100000 | 00000000000000000000000000100000 | 是 |
64 | 00000000000000000000000001000000 | 11111111111111111111111111000000 | 00000000000000000000000001000000 | 是 |
128 | 00000000000000000000000010000000 | 11111111111111111111111110000000 | 00000000000000000000000010000000 | 是 |
257 | 00000000000000000000000100000001 | 11111111111111111111111011111111 | 00000000000000000000000000000001 | 否 |
结论:
2的整数次幂和其对应的负数进行按位与运算后,得到的结果都是其本身。
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
现在我们重新回到源码中可以看到,根据执行器的长度是否为2的整数次幂后选择不同的计算方式,一种是位运算,另一种就是常规取模的方式:
8、terminationListener设置终止监听器
创建完选择器后,在MultithreadEventExecutorGroup中设置终止监听器,就是如果要终止的时候,此处会有回调:
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
// 添加终止事件监听器
e.terminationFuture().addListener(terminationListener);
}
9、Collections.unmodifiableSet(childrenSet)将执行器存入一个不可变集合
将执行器添加到LinkedHashSet中,然后再将存入一个不可变集合,此处我们暂时不知道为什么要这样存放,暂不深究继续往下执行。
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
至此NioEventLoopGroup基本上完成初始化,当然还很多细节,我们继续往下学习。
#Netty(2)#Java(6)#NIO(3)评论