Netty 源码分析之一 NioEventLoopGroup 初始化

June 15, 2024 作者: dyzmj 分类: 物联网源码 浏览: 4 评论: 0

Netty 源码分析之一 NioEventLoopGroup 初始化

一、Netty如何运行?

运行环境:

Windows10 12核 16G Dell台式机

netty版本:

4.1.65.Final

public class MyNettyServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
    				 .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new MyNettyServerHandler());
                        }
                    });
            ChannelFuture cf = bootstrap.bind(3247).sync();
            cf.addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("listen port 3247 success");
                } else {
                    System.out.println("listen port 3247 failure");
                }
            });
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

以上是一个常用的Netty启动模板,代码虽然不多,但已经搭起了一个强大健壮的服务器,下面我将按照上述代码梳理下Netty的启动流程,方便进一步理解Netty的运行机制和原理。

二、NioEventLoopGroup

先看下main方法中NioEventLoopGroup类的继承结构图:

image-20230531103317996

上图中上面部分是由Java提供的并发接口和迭代器,主要用于提交任务、调度任务这些,继承迭代器主要是为了统一外界遍历接口。下面部分就是Netty提供的,比如EventExecutorGroup事件执行器组,根据名称可以猜到里面应该定义了一些任务执行相关的方法,打开类结构可以看到里面的方法很多跟Java并发线程部分很多一样。

image-20230531103335984

下面EventLoopGroup继承了EventExecutorGroup接口,并且增加了一些方法,比如next()用来获取下一个EventLoop事件循环器。

image-20230531103421025

再向下由MultithreadEventLoopGroup扩展到了多线程事件循环组,继承了MultithreadEventExecutorGroup,实现EventLoopGroup,将两者结合,一方面是需要事件执行器,一方面又需要定义事件循环接口,也就是说我应该告诉执行器们它们应该做什么。最后NioEventLoopGroup只实现了很关键的newChild()方法:

image-20230531103435591

三、NioEventLoop

进入newChild()方法可以看到,最后返回的是一个NioEventLoop,我们先看下这个类的继承结构图:

image-20230531103455404

上图也是将继承结构归纳为JDK提供的和Netty提供的两类,不过先不用深究,只用看继承了哪些类的名称,最终继承了一个SinleThreadEventLoop单线程事件循环类,这时就可以知道NioEventLoop内有一个单线程循环,这里就不用再考虑线程安全问题了。

回过头来,我们已经可以看到NioEventLoopGroup和NioEventLoop的关系了:

image-20230531103513140

四、初始化事件循环组

1、创建事件循环组

现在,我们从main方法中的第一行来看看**new NioEventLoopGrop(1)**内究竟做了些什么。

public NioEventLoopGroup() {
    this(0);
}

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(
        int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    // 若未指定线程数量的话,默认输入为0,此处会使用CPU核数*2作为默认线程数量
}

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}

继续往里执行:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    // 这里创建了一个执行器选择工厂,就是如何选择执行器来做事,默认是可以从头到尾轮着选择,即取模的方式
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

最后到达终点:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    checkPositive(nThreads, "nThreads");
    // 线程池
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 事件执行器
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // 事件执行选择器
    chooser = chooserFactory.newChooser(children);
    // 终止事件
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

2、创建任务执行器new ThreadPerTaskExecutor(newDefaultThreadFactory())

newDefaultThreadFactory

首先创建线程工厂,类似于创建线程池的线程工厂,代码跟进去以后可以看到:

image-20230531103710919

此处可以看到前缀prefix变成了nioEventLoopGroup-2-,因为在前面初始化MultithreadEventExecutorGroup时,在MultithreadEventExecutorGroup中有个全局事件执行器GlobalEventExecutor的变量要初始化,根据类的加载流程,其会在构造方法执行前初始化。

image-20230531103725368

进入GlobalEventExecutor中可以看到构造方法已经创建了一个默认的线程工厂:

image-20230531103744973

进入默认线程工厂中可以看到,此时prefix前缀为globalEventExecutor-1-

image-20230531103757227

3、new ThreadPerTaskExecutor

我们再进入ThreadPerTaskExecutor的构造方法中可以看到,这个类就是设置了一个线程工厂,当有任务时就创建一个线程执行:

image-20230531103817705

4、初始化事件执行器

我们前面提到的nThreads线程数量就是用在此处,用来创建对应个数的EventExecutor事件执行器:

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
    boolean success = false;
    try {
        children[i] = newChild(executor, args);
        success = true;
    } catch (Exception e) {
        // TODO: Think about if this is a good exception type
        throw new IllegalStateException("failed to create a child event loop", e);
    } finally {
        ...
    }
}

5、进入newChild创建NioEventLoop

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

进入newChild方法中,首先判断args可变参数是否存在4个,若存在则直接取出EventLoopTaskQueueFactory类型的参数,不存在直接为空。调试环境下可以看到我们实际输入的参数只有3个:

image-20230531104112991

打开当前类NioEventLoopGroup的构造方法可以看到第四个参数,NioEventLoopGroup支持使用指定的任务队列工厂来替换默认的(参考:#9247

image-20230531104148836

6、NioEventLoop构造方法

接下来进入NioEventLoop的构造方法中:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
            rejectedExecutionHandler);
    // 选择器提供器
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    // 选择器策略
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    // 获取选择器元组
    final SelectorTuple selectorTuple = openSelector();
    // 包装后的选择器
    this.selector = selectorTuple.selector;
    // 未包装的选择器(JDK提供的原始的NIO选择器)
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

首先调用了父类的构造方法,可以看到内部调用了两次newTaskQueue,创建了两个任务队列:

private static Queue<Runnable> newTaskQueue(
        EventLoopTaskQueueFactory queueFactory) {
    // 由上面的分析可知queueFactory队列工厂默认输入为空
    if (queueFactory == null) {
        return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
    }
    return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}

然后进入newTaskQueue0方法中:

private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
            : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

PlatformDependent类就是根据不同的操作系统创建不同的数据,此处创建的是MpscChunkedArrayQueue队列,它是JCTools下的一个高性能队列,这里只要知道是个队列就行,后面会单独列一章说明。

然后进入NioEventLoop的父类SingleThreadEventLoop构造方法中:

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
    // 尾任务队列,此处暂不太了解有什么用处
    tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}

再进入SingleThreadEventLoop的父类SingleThreadEventExecutor的构造方法中:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    // 添加任务唤醒,默认false
    this.addTaskWakesUp = addTaskWakesUp;
    // 最大等待任务数量
    this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    // 封装过的执行器
    this.executor = ThreadExecutorMap.apply(executor, this);
    // 任务队列
    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
    // 拒绝策略
    this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

最后是把NIO事件循环组放入AbstractEventExecutor抽象事件执行器中:

image-20230531104333539

最后再看下类的层次结构(见下图),NioEventLoop的继承关系就很清晰了:

image-20230531104350050

让我们回到SingleThreadEventExecutor的构造方法中继续往下执行,可以看到这一行:

this.executor = ThreadExecutorMap.apply(executor, this);

这里面是将上面创建好的ThreadPerTaskExecutorNioEventLoop再包装一下,返回的是ThreadExecutorMap中的匿名内部对象executor,只是这里面使用ThreadPerTaskExecutor执行匿名内部类中的任务:

image-20230531104420323

在匿名内部类中还有一个apply()方法,任务真正运行之前需要设置setCurrentEventExecutor中当前的事件执行器,也就是NioEventLoop,这里面使用的是Netty提供的FastThreadLocal,当前线程独占,任务执行完后就会置空,具体里面也比较复杂,暂不深入,后续会单独列一章进行说明:

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Runnable() {
        @Override
        public void run() {
            setCurrentEventExecutor(eventExecutor);
            try {
                command.run();
            } finally {
                setCurrentEventExecutor(null);
            }
        }
    };
}

到现在我们知道了,

1、SingleThreadEventExecutor的构造函数设置了执行器 ThreadExecutorMap中内部executor,这里面是封装了ThreadPerTaskExecutorNioEventLoop,设置了taskQueue任务队列和rejectedExecutionHandler拒绝处理器。

2、在SingleThreadEventLoop中设置了尾任务tailTasks****。

3、最后回到NioEventLoop的构造函数中,设置provider选择器提供器,selectStrategy选择器策略,然后通过openSelector()获取选择器元组,这里面包含了未封装的unwrappedSelector原始的JDK的选择器和封装的SelectedSelectionKeySetSelector选择器。如果创建失败,则调用**shutdownGracefully()**进行优雅关闭。

完成NioEventLoop实例化后,我们重新回到MultithreadEventExecutorGroup的构造方法中继续往下走。

7、chooserFactory.newChooser(children)创建选择器

chooser = chooserFactory.newChooser(children);

这里其实就是怎么选择执行器,默认采用从头到尾轮询的方式:

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    // 判断执行器的长度是否为2的次幂
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

根据执行器的长度是否为2的次幂选择不同的计算方式。首先判断一个数是否是2的整数次幂:

思路一

1、首先把2的整数次幂换成二进制数,十进制数2转换为二进制数是10B,4转换为二进制数是100B...

十进制二进制是否为2的整数次幂
210B
4100B
81000B
1610000B
32100000B
1001100100B

2、如果一个数是2的整数次幂,那么当其转换为二进制时,最高位为1,其余位都是0,如果把这些2的整数次幂各自减一,再转为二进制数

十进制二进制原值-1是否为2的整数次幂
210B1B
4100B11B
81000B111B
1610000B1111B
32100000B11111B
1001100100B1100011B

3、此时将2的整数次幂原值和其减一的结果进行按位与运算,即计算 n & (n-1) ,得到的结果为 0

十进制二进制原值减一n & (n-1)是否为2的整数次幂
210B1B0
4100B11B0
81000B111B0
1610000B1111B0
32100000B11111B0
1001100100B1100011B1100000

**结论:**0和1按位运算的结果为0,因此所有的2的整数次幂和它自身减一的值进行按位与运算后,得到的结果都是0,反之,如果一个整数不是2的整数次幂时,得到的结果就不会是0。

public static boolean isPowerOfTwo(int val) {
    return (val & val - 1) == 0;
}

思路二

在开始前,我们先熟悉下负数转二进制的一些操作,首先定义一个Integer类型的数字比如32768,由于Integer类型占用了32个bit位,因此转为二进制的完整形式为:

00000000000000001000000000000000

计算机中对有符号数字的表示有三种方法:原码、反码和补码,补码=反码+1,二进制数据中最高位表示符号位,最高位为0表示整数,最高位为1表示负数。

将-32768转化为二进制数据:

①首先将-32768的绝对值转换为二进制可得:0000 0000 0000 0000 1000 0000 0000 0000

00000000000000001000000000000000

②求该二进制的反码:

11111111111111110111111111111111

③最后将反码加1可得补码即为原值的二进制数据:

11111111111111111000000000000000

对应算法:

1、将2的整数次幂转换为二进制数据

十进制二进制是否为2的整数次幂
800000000000000000000000000001000
1600000000000000000000000000010000
3200000000000000000000000000100000
6400000000000000000000000001000000
12800000000000000000000000010000000
25700000000000000000000000100000001

2、取2的整数次幂的负数,并通过取反加一操作获取对应的二进制数

十进制二进制原值的负数是否为2的整数次幂
80000000000000000000000000000100011111111111111111111111111111000
160000000000000000000000000001000011111111111111111111111111110000
320000000000000000000000000010000011111111111111111111111111100000
640000000000000000000000000100000011111111111111111111111111000000
1280000000000000000000000001000000011111111111111111111111110000000
2570000000000000000000000010000000111111111111111111111111011111111

3、此时将2的整数次幂原值与2的整数次幂负值进行按位与运算,即计算 n & (-n) ,得到的结果为 n

十进制二进制原值的负数n & (-n)是否为2的整数次幂
8000000000000000000000000000010001111111111111111111111111111100000000000000000000000000000001000
16000000000000000000000000000100001111111111111111111111111111000000000000000000000000000000010000
32000000000000000000000000001000001111111111111111111111111110000000000000000000000000000000100000
64000000000000000000000000010000001111111111111111111111111100000000000000000000000000000001000000
128000000000000000000000000100000001111111111111111111111111000000000000000000000000000000010000000
257000000000000000000000001000000011111111111111111111111101111111100000000000000000000000000000001

结论:

2的整数次幂和其对应的负数进行按位与运算后,得到的结果都是其本身。

private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}

现在我们重新回到源码中可以看到,根据执行器的长度是否为2的整数次幂后选择不同的计算方式,一种是位运算,另一种就是常规取模的方式:

image-20230531104951021

8、terminationListener设置终止监听器

创建完选择器后,在MultithreadEventExecutorGroup中设置终止监听器,就是如果要终止的时候,此处会有回调:

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
    @Override
    public void operationComplete(Future<Object> future) throws Exception {
        if (terminatedChildren.incrementAndGet() == children.length) {
            terminationFuture.setSuccess(null);
        }
    }
};
for (EventExecutor e: children) {
    // 添加终止事件监听器
    e.terminationFuture().addListener(terminationListener);
}

9、Collections.unmodifiableSet(childrenSet)将执行器存入一个不可变集合

将执行器添加到LinkedHashSet中,然后再将存入一个不可变集合,此处我们暂时不知道为什么要这样存放,暂不深究继续往下执行。

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);

至此NioEventLoopGroup基本上完成初始化,当然还很多细节,我们继续往下学习。

#Netty(2)#Java(6)#NIO(3)

评论