Netty 源码分析之二 ServerBootstrap 服务引导器
Netty 源码分析之二 ServerBootstrap 服务引导器
ServerBootstrap类结构
我们继续上一章的例子,上一章已经创建了bossGroup和workerGroup,再往下就是创建ServerBootstrap服务启动器了,先看下这个类的继承结构图:
可以看到服务启动器和其父类中都是一些基本的属性的设置,构造方法中并没有做太多事情,只初始化了一些属性选项及配置:
// EventLoopGroup对象 -- bossGroup
volatile EventLoopGroup group;
// channel工厂用于创建channel
@SuppressWarnings("deprecation")
private volatile ChannelFactory<? extends C> channelFactory;
// 本地地址
private volatile SocketAddress localAddress;
// The order in which ChannelOptions are applied is important they may depend on each other for validation
// purposes.
// 可选项集合
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
// 集合属性
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
// 启动器启动的时候设置的处理器, 如 .handler(new LoggingHandler(LogLevel.INFO))设置进来的
private volatile ChannelHandler handler;
AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
group = bootstrap.group;
channelFactory = bootstrap.channelFactory;
handler = bootstrap.handler;
localAddress = bootstrap.localAddress;
synchronized (bootstrap.options) {
options.putAll(bootstrap.options);
}
attrs.putAll(bootstrap.attrs);
}
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
// 构造方法
public ServerBootstrap() { }
bossGroup 和 workerGroup
然后就是将之前创建的两个Group设置一下:
bootstrap.group(bossGroup, workerGroup)
其中bossGroup也就是parentGroup,主要是负责处理TCP/IP连接的,而workerGroup也就是childGroup,主要是负责Channel(通道)的I/O事件处理。
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
ServerBootstrap 参数设置
再往下看:
.channel(NioServerSocketChannel.class)
这里就比较讲究了,传入的对象是一个Class对象,应该可以想到里面使用的可能是反射,进入到源码里面可以看到:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
上面代码中首先通过new ReflectiveChannelFactory 创建了一个反射通道工厂,在内部经过反射获取无参的构造方法,当调用工厂中的newChannel() 方法时就可以创建通道实例对象了:
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
// 在反射通道工厂中取得构造方法,在后面可以直接创建实例对象
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// 无参构造方法
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
// 创建实例对象
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}
继续进入channelFactory方法中,主要就是设置下通道工厂,返回自身对象,使其能够继续进行链式调用。
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
继续往下,设置通道的可选项参数,其中option是针对parentGroup的,childOption是针对childGroup的
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE,true)
ChannelOption配置
ChannelOption.SO_BACKLOG
BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。ChannelOption.SO_BACKLOG对应的是TCP/IP协议listen函数中的backlog参数(在linux服务器中输入 man listen可查看详细),函数listen(int socketfd, int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端连接的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。
ChannelOption.SO_REUSEADDR
ChannelOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口。比如,某个服务器进程占用了TCP的3247端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中会经常使用,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程中使用,而且程序死掉以后,内核也需要一定的时间才能够是否此端口,不设置SO_REUSEADDR就无法正常使用该端口。(参考:SO_REUSEPORT)
ChannelOption.SO_KEEPALIVE
ChannelOption.SO_KEEPALIVE参数对应于套接字中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,系统会测试连接的状态,这个选项用于可能长时间没有数据交流的连接。如果在两个小时内没有数据的通信时,服务端会自动发送一个活动探测数据报文。如果客户端因为断电、网路异常或客户端异常时,那么服务端的连接可以关闭,释放资源。
ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF分别对应套接字选项中SO_SNDBUF和SO_RCVBUF参数,这两个参数用于操作发送缓冲区和接收缓冲区的大小。发送缓冲区用于保存发送数据,直到发送成功;接收缓冲区用于保存网络协议站内接收到的数据,直到应用程序读取成功。
ChannelOption.SO_LINGER
ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发送完剩余的数据,造成的数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送。
ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数是的作用就是禁止使用Nagle算法,适用于小数据即时传输。与TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。在实际使用中,通常希望服务是低延时的,因此建议将TCP_NODELAY设置为true。
ChannelOption.ALLOCATOR
Netty4.x版本中使用对象池,重用缓冲区
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Java支持的套接字选项
TCP_NODELAY
用于设置是否禁用Nagle算法.Nagle算法用于自动合并大量小的缓冲区消息;这个过程(称为nagling)通过减少必须发送的包的数量来提高网络应用程序系统的性能.
仅用于TCP: SocketImpl
SO_BINDADDR
套接字绑定的本地地址,类型为 INADDR_ANY.套接字创建时绑定的地址无法在之后修改.
可用于: SocketImpl, DatagramSocketImpl
SO_REUSEADDR
此选项在Java中仅用于标识多播套接字.多播套接字默认配置该选项.
可用于: DatagramSocketImpl
SO_BROADCAST
用于关闭或开启数据报套接字的广播功能
IP_MULTICAST_IF
设置发送多播数据包的传出接口.用于多个网络接口的主机希望使用系统默认值以外的其他值.可用于广播: DatagramSocketImpl
IP_MULTICAST_IF2
和IP_MULTICAST_IF一样,但是可以支持IPV4和IPV6.
IP_MULTICAST_LOOP
此选项启用或禁用多播数据报的loopback. 多播套接字默认启用此选项.
IP_TOS
此选项在IP头为TCP或UDP套接字设置服务类型(type-of-service)或流量类(traffic class)字段
SO_LINGER
指定延迟关闭超时. 此选项禁用/启用从TCP套接字的close()立即返回. 使用非零整数超时启用此选项,意味着close()将阻塞,等待向对等方发送和确认所有写入的数据, 之后套接字将被优雅地关闭. 在到达延迟超时时, 使用TCP RST强制关闭套接字. 启用超时为0的选项会立即强制关闭. 如果指定的超时值超过65,535, 则会减少到65,535.
仅对TCP: SocketImpl有效
SO_TIMEOUT
设置阻塞套接字操作的超时:ServerSocket.accept();SocketInputStream.read();DatagramSocket.receive();必须在进入阻塞操作之前设置, 该选项才能生效. 如果超时过期, 操作将继续阻塞,
java.io.InterruptedIOException. 在这种情况下, 套接字不会关闭.
适用于所有的套接字实现:SocketImpl, DatagramSocketImpl
SO_SNDBUF
设置平台底层I/O发送缓冲区大小.当setSendBufferSize时, 这是应用程序向内核提供的关于通过套接字发送数据时使用的缓冲区大小的建议.当getSendBufferSize时, 这必须返回平台在此套接字上发送数据时实际使用的缓冲区的大小.
适用于所有的套接字实现:SocketImpl, DatagramSocketImpl.
SO_RCVBUF
设置平台底层I/O接收缓冲区大小.当setReceiveBufferSize时, 这是应用程序向内核提供的关于通过套接字接收数据时使用的缓冲区大小的建议.当getReceiveBufferSize时, 这必须返回平台在此套接字上接收数据时实际使用的缓冲区的大小.
适用于所有的套接字实现:SocketImpl, DatagramSocketImpl.
SO_KEEPALIVE
当为一个TCP套接字设置了keepalive选项,并且在两个小时内套接字之间没有任何数据交换(注意:实际的值取决于实现),TCP会自动向对等端发送一个keepalive探测。这个探测是一个TCP段,对等端必须响应它。预期会有三种响应:
- 对等端响应期望的ACK. 应用程序不会被通知(因为一切正常). TCP将在下一个2小时无活动后再次发送一个探测.
- 对等端响应RST, 这表示对端主机已崩溃并重启. 套接字将被关闭.
- 对等端无响应. 套接字将被关闭.
- 此选项的目的时检测对端主机是否已崩溃.
- 仅适用于 TCP socket: SocketImpl
SO_OOBINLINE
当设置OOBINLINE选项时,在套接字上接收到的任何TCP紧急数据(TCP URG)都将通过套接字输入流接收. 当该选项被禁用时(这是默认值), 紧急数据将被悄悄地丢弃(不通知应用程序).
SO_BACKLOG
backlog参数是套接字上挂起连接的最大数量. 它的确切语义是特定于实现的. 具体地说, 实现可以设置最大长度, 也可以选择忽略参数. 如果backlog参数的值为0或负值, 则使用特定于实现的缺省值
ChannelHandler配置
现在回到代码中继续往下看,设置通道处理器,其中handler是针对parentGroup的,childHandler是针对childGroup的。
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new MyNettyServerHandler());
}
});
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
Netty的日志处理
在hander方法内我们首先定义了一个日志处理器,用于设置netty在通讯过程中记录日志的等级。至于ChannelHandler通道处理器后面会单独详谈,此处我们先来研究一下netty是如何处理日志的:
首先我们先看下LoggingHandler类的信息:
@Sharable
@SuppressWarnings({ "StringConcatenationInsideStringBufferAppend", "StringBufferReplaceableByString" })
public class LoggingHandler extends ChannelDuplexHandler {
// 默认日志级别为 Debug
private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG;
// 实际使用的日志框架,如 slf4j、log4j等
protected final InternalLogger logger;
// 日志框架使用的日志级别
protected final InternalLogLevel internalLevel;
// Netty 使用的日志级别
private final LogLevel level;
// 用于控制ByteBuf和ByteBufHolder的日志记录格式和详细程度
private final ByteBufFormat byteBufFormat;
}
@Sharable
:表示同一实例的ChannelHandler实例可以多次添加到一个或多个ChannelPipeline中,没有竞争条件。如果未指定此注释,则每次将其添加到管道时都必须创建一个新的处理程序实例,因为它具有非共享状态 (例如成员变量)。
此处使用 @Sharable
说明 LoggingHandler没有状态相关变量,所有 Channel 都可以使用一个实例。继承自 ChannelDuplexHandler 表示对出入站事件都进行日志记录。
日志等级:
public enum LogLevel {
TRACE(InternalLogLevel.TRACE),
DEBUG(InternalLogLevel.DEBUG),
INFO(InternalLogLevel.INFO),
WARN(InternalLogLevel.WARN),
ERROR(InternalLogLevel.ERROR);
}
看下 LoggingHandler 的构造方法:
public LoggingHandler(LogLevel level) {
// ByteBufFormat.HEX_DUMP: 表示将指定的ByteBuf格式化打印,便于人工阅读
this(level, ByteBufFormat.HEX_DUMP);
}
public LoggingHandler(LogLevel level, ByteBufFormat byteBufFormat) {
this.level = ObjectUtil.checkNotNull(level, "level");
this.byteBufFormat = ObjectUtil.checkNotNull(byteBufFormat, "byteBufFormat");
// 设置实际的日志框架
logger = InternalLoggerFactory.getInstance(getClass());
// 设置框架日志级别
internalLevel = level.toInternalLevel();
}
既然 LoggingHandler 继承了 ChannelDuplexHandler 类,那它一定实现了很多其中的方法,如 channelActive、channelRead、close等方法,看下这些方法中的日志是如何打印的,以 channelRead为例
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 根据日志等级打印输入日志内容,并格式化显示 msg
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "READ", msg));
}
ctx.fireChannelRead(msg);
}
// 格式话输入的 msg
protected String format(ChannelHandlerContext ctx, String eventName, Object arg) {
if (arg instanceof ByteBuf) {
return formatByteBuf(ctx, eventName, (ByteBuf) arg);
} else if (arg instanceof ByteBufHolder) {
return formatByteBufHolder(ctx, eventName, (ByteBufHolder) arg);
} else {
return formatSimple(ctx, eventName, arg);
}
}
// 根据 byteBufFormat 判断使用何种类型格式化日志内容
private String formatByteBuf(ChannelHandlerContext ctx, String eventName, ByteBuf msg) {
String chStr = ctx.channel().toString();
int length = msg.readableBytes();
if (length == 0) {
StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 4);
buf.append(chStr).append(' ').append(eventName).append(": 0B");
return buf.toString();
} else {
int outputLength = chStr.length() + 1 + eventName.length() + 2 + 10 + 1;
if (byteBufFormat == ByteBufFormat.HEX_DUMP) {
int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4;
int hexDumpLength = 2 + rows * 80;
outputLength += hexDumpLength;
}
StringBuilder buf = new StringBuilder(outputLength);
buf.append(chStr).append(' ').append(eventName).append(": ").append(length).append('B');
if (byteBufFormat == ByteBufFormat.HEX_DUMP) {
buf.append(NEWLINE);
appendPrettyHexDump(buf, msg);
}
return buf.toString();
}
}
childHandler处理
我们继续回到 .childHandler(new ChannelInitializer<SocketChannel>() {})
方法中,其实也就是设置了 childHandler
,此时设置的就是通道初始化的 ChannelInitializer
:
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
ChannelInitializer 通道初始化器
ChannelInitializer 是一种特殊的 ChannelInboundHandler,它提供了在通道注册到 eventLoop 后初始化通道的简单方法,用于在某个 Channel 注册到 EventLoop 后,对这个 Channel 执行一些初始化操作。ChannelInitializer 虽然会在一开始被注册到 Channel 相关的 pipeline 里,但是在初始化完成之后,ChannelInitializer 会将自己从 pipeline 中移除,不会影响后续的操作。
ChannelInitializer类图
可以看到 ChannelInitializer 继承自ChannelInboundHandler 接口,且为抽象类,不能直接使用。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
ChannelInitializer 声明了一个抽象方法 initChannel(C ch)
,要想实例化 ChannelInitializer,就需要实现这个方法。
由于示例代码中启动的是 TCP 的服务,所以在初始化的 ChannelInitializer 时,我们使用 SocketChannel
,即TCP 通道。
继续看 initChannel 抽象方法的实现:
#Netty(2)#NIO(3)评论