Netty
剖析:启动服务
1# 服务启动主线
2* Selector selector = sun.nio.ch.SelectorProviderImpl.openSelector()
3* ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel()
4* selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
5* javaChannel().bind(localAddress, config.getBacklog());
6* selectionKey.interestOps(OP_ACCEPT);
• Selector 是在 new NioEventLoopGroup()(创建一批 NioEventLoop)时创建。
• 第一次 Register 并不是监听 OP_ACCEPT,而是 0: selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this) 。
• 最终监听 OP_ACCEPT 是通过 bind 完成后的 fireChannelActive() 来触发的。
• NioEventLoop 是通过 Register 操作的执行来完成启动的。
• 类似 ChannelInitializer,一些 Hander 可以设计成一次性的,用完就移除,例如授权。
剖析:构建连接
1# 接受连接本质:selector.select()/selectNow()/select(timeoutMillis) 发现 OP_ACCEPT 事件,处理:
2* SocketChannel socketChannel = serverSocketChannel.accept()
3* selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
4* selectionKey.interestOps(OP_READ);
• 创建连接的初始化和注册是通过 pipeline.fireChannelRead 在 ServerBootstrapAcceptor 中完成的。
• 第一次 Register 并不是监听 OP_READ ,而是 0 :selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this) 。
• 最终监听 OP_READ 是通过“Register”完成后的 fireChannelActive(io.netty.channel.AbstractChannel.AbstractUnsafe#register0 中)来触发的
• Worker’s NioEventLoop 是通过 Register 操作执行来启动。
• 接受连接的读操作,不会尝试读取更多次(16 次)。
剖析:接收数据
读数据技巧
- 自适应数据大小的分配器(AdaptiveRecvByteBufAllocator):发放东西时,拿多大的桶去装?小了不够,大了浪费,所以会自己根据实际装的情况猜一猜下次情况,从而决定下次带多大的桶。
- 连续读(defaultMaxMessagesPerRead):发放东西时,假设拿的桶装满了,这个时候,你会觉得可能还有东西发放,所以直接拿个新桶等着装,而不是回家,直到后面出现没有装上的情况或者装了很多次需要给别人一点机会等原因才停止,回家。
• 读取数据本质:sun.nio.ch.SocketChannelImpl#read(java.nio.ByteBuffer)
• NioSocketChannel read() 是读数据, NioServerSocketChannel read() 是创建连接
• pipeline.fireChannelReadComplete(); 一次读事件处理完成
pipeline.fireChannelRead(byteBuf); 一次读数据完成,一次读事件处理可能会包含多次读数据操作。
• 为什么最多只尝试读取 16 次?“雨露均沾”
• AdaptiveRecvByteBufAllocator 对 bytebuf 的猜测:放大果断,缩小谨慎(需要连续 2 次判断)
剖析:业务处理
• 处理业务本质:数据在 pipeline 中所有的 handler 的 channelRead() 执行过程。(Handler 要实现 io.netty.channel.ChannelInboundHandler#channelRead(ChannelHandlerContextctx, Object msg),且不能加注解 @Skip 才能被执行到。中途可退出,不保证执行到 TailHandler。)
• 默认处理线程就是 Channel 绑定的 NioEventLoop 线程,也可以设置其他:pipeline.addLast(new UnorderedThreadPoolEventExecutor(10), serverHandler)
剖析:发送数据
写数据三种的方式
写数据的要点
- 对方仓库爆仓时,送不了的时候,会停止送,协商等电话通知什么时候好了,再送。
Netty写数据,写不进去时,会停止写,然后注册一个OP_WRITE 事件,来通知什么时候可以写进去了再写。
- 发送快递时,对方仓库都直接收下,这个时候再发送快递时,可以尝试发送更多的快递试试,这样效果更好。
Netty批量写数据时,如果想写的都写进去了,接下来的尝试写更多(调整maxBytesPerGatheringWrite)
- 发送快递时,发到某个地方的快递特别多,我们会连续发,但是快递车毕竟有限,也会考虑下其他地方。
Netty只要有数据要写,且能写的出去,则一直尝试,直到写不出去或者满16 次(writeSpinCount)。
- 揽收太多,发送来不及时,爆仓,这个时候会出个告示牌:收不下了,最好过 2 天再来邮寄吧。
Netty待写数据太多,超过一定的水位线(writeBufferWaterMark.high()),会将可写的标志位改成false ,让应用端自己做决定要不要发送数据了。
发送数据主线
• 写的本质:
1* Single write: sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer)
2* gathering write:sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer[], int, int)
• 写数据写不进去时,会停止写,注册一个 OP_WRITE 事件,来通知什么时候可以写进去了。
• OP_WRITE 不是说有数据可写,而是说可以写进去,所以正常情况,不能注册,否则一直触发。
• 批量写数据时,如果尝试写的都写进去了,接下来会尝试写更多(maxBytesPerGatheringWrite)。
• 只要有数据要写,且能写,则一直尝试,直到 16 次(writeSpinCount),写 16 次还没有写完,就直接 schedule 一个 task 来继续写,而不是用注册写事件来触发,更简洁有力。
• 待写数据太多,超过一定的水位线(writeBufferWaterMark.high()),会将可写的标志位改成 false ,让应用端自己做决定要不要继续写。
• channelHandlerContext.channel().write() :从 TailContext 开始执行;channelHandlerContext.write() : 从当前的 Context 开始。
剖析:断开连接
断开连接主线
• 断开连接的本质是多路复用器(Selector)接收到 OP_READ 事件 :
• 处理 OP_READ 事件:NioSocketChannel.NioSocketChannelUnsafe.read():
1* 接受数据
2* 判断接受的数据大小是否 < 0 , 如果是,说明是关闭,开始执行关闭:
3 1. 关闭 channel(包含 cancel 多路复用器的 key)。
4 2. 清理消息:不接受新信息,fail 掉所有 queue 中消息。
5 3. 触发 fireChannelInactive 和 fireChannelUnregistered 。
知识点
• 关闭连接本质
1* java.nio.channels.spi.AbstractInterruptibleChannel#close
2* java.nio.channels.SelectionKey#cancel
• 要点
1* 关闭连接,会触发 OP_READ 方法。读取字节数是 -1 代表关闭。
2* 数据读取进行时,强行关闭,触发 IO Exception,进而执行关闭。
3* Channel 的关闭包含了 SelectionKey 的 cancel 。
剖析:关闭服务
关闭服务的本质
1* 关闭所有连接及 Selector :
2 java.nio.channels.Selector#keys
3 java.nio.channels.spi.AbstractInterruptibleChannel#close
4 java.nio.channels.SelectionKey#cancel
5 selector.close()
6* 关闭所有线程:退出循环体 for (;;)
关闭服务要点
1* 优雅(DEFAULT_SHUTDOWN_QUIET_PERIOD)
2* 可控(DEFAULT_SHUTDOWN_TIMEOUT)
3* 先不接活,后尽量干完手头的活(先关 boss 后关 worker:不是100%保证)
Echo 服务
一个应答服务(回显服务器),客户端发送什么数据,服务端就响应的对应的数据是一个非常有的用于调试和检测的服务的吞吐量和负载能力。
引入 Netty 依赖
1<dependencies>
2 <dependency>
3 <groupId>io.netty</groupId>
4 <artifactId>netty-all</artifactId>
5 <version>4.1.32.Final</version>
6 </dependency>
7 </dependencies>
EchoServer
1import io.netty.bootstrap.ServerBootstrap;
2import io.netty.channel.ChannelFuture;
3import io.netty.channel.ChannelInitializer;
4import io.netty.channel.EventLoopGroup;
5import io.netty.channel.nio.NioEventLoopGroup;
6import io.netty.channel.socket.SocketChannel;
7import io.netty.channel.socket.nio.NioServerSocketChannel;
8
9public class EchoServer {
10 private int port;
11 public EchoServer(int port){
12 this.port = port;
13 }
14
15 /**
16 * 启动流程
17 */
18 public void run() throws InterruptedException {
19 //配置服务端线程组:两个线程组
20 EventLoopGroup bossGroup = new NioEventLoopGroup();
21 EventLoopGroup workGroup = new NioEventLoopGroup();
22
23 try{
24 //创建服务启动类:并将两个线程组交给ServerBootstrap
25 ServerBootstrap serverBootstrap = new ServerBootstrap();
26 //创建 server socket channel 并初始化,为其从 bossGroup 中选择一个 NioEventLoop
27 serverBootstrap.group(bossGroup, workGroup)
28 .channel(NioServerSocketChannel.class) //设置Channel类型为NIO
29 .childHandler(new ChannelInitializer<SocketChannel>() { //绑定业务处理类EchoServerHandler
30 protected void initChannel(SocketChannel ch) throws Exception {
31 ch.pipeline().addLast(new EchoServerHandler());
32 }
33 });
34
35 System.out.println("Echo 服务器启动ing");
36
37 //绑定端口,同步等待成功
38 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
39 //等待服务端监听端口关闭
40 channelFuture.channel().closeFuture().sync();
41 }finally {
42 //优雅退出,释放线程池
43 workGroup.shutdownGracefully();
44 bossGroup.shutdownGracefully();
45 }
46 }
47
48 public static void main(String [] args) throws InterruptedException {
49 int port = 8080;
50 if(args.length > 0){
51 port = Integer.parseInt(args[0]);
52 }
53 new EchoServer(port).run();
54 }
55}
EchoClientHandler
1import io.netty.buffer.ByteBuf;
2import io.netty.channel.ChannelHandlerContext;
3import io.netty.channel.ChannelInboundHandlerAdapter;
4import io.netty.util.CharsetUtil;
5
6public class EchoServerHandler extends ChannelInboundHandlerAdapter {
7
8 @Override
9 //读取到消息
10 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
11 //类型转换
12 ByteBuf data = (ByteBuf) msg;
13 System.out.println("服务端收到数据: "+ data.toString(CharsetUtil.UTF_8));
14 //将接收到的消息进行回写
15 ctx.writeAndFlush(data);
16 }
17
18 @Override
19 //读取完毕
20 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
21 System.out.println("EchoServerHandle channelReadComplete");
22 }
23
24 @Override
25 //出现异常
26 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
27 cause.printStackTrace();
28 ctx.close();
29 }
30}
EchoClient
1import io.netty.bootstrap.Bootstrap;
2import io.netty.channel.ChannelFuture;
3import io.netty.channel.ChannelInitializer;
4import io.netty.channel.EventLoopGroup;
5import io.netty.channel.nio.NioEventLoopGroup;
6import io.netty.channel.socket.SocketChannel;
7import io.netty.channel.socket.nio.NioSocketChannel;
8import java.net.InetSocketAddress;
9
10public class EchoClient {
11
12 private String host;
13 private int port;
14
15 public EchoClient(String host, int port){
16 this.host = host;
17 this.port = port;
18 }
19
20 public void start() throws InterruptedException {
21 //一个线程即可
22 EventLoopGroup group = new NioEventLoopGroup();
23
24 try {
25 //创建启动类:并将线程组交给Bootstrap
26 Bootstrap bootstrap = new Bootstrap();
27 bootstrap.group(group)
28 .channel(NioSocketChannel.class) //设置Channel类型为NIO
29 .remoteAddress(new InetSocketAddress(host, port)) //绑定Server段的IP地址和端口
30 .handler(new ChannelInitializer<SocketChannel>() {
31
32 protected void initChannel(SocketChannel ch) throws Exception { //绑定业务处理类ClientHandler
33 ch.pipeline().addLast(new EchoClientHandler());
34 }
35 });
36
37 //连接到服务端,connect是异步连接,在调用同步等待sync,等待连接成功
38 ChannelFuture channelFuture = bootstrap.connect().sync();
39 //阻塞直到客户端通道关闭
40 channelFuture.channel().closeFuture().sync();
41
42 }finally {
43 //优雅退出,释放NIO线程组
44 group.shutdownGracefully();
45 }
46 }
47
48 public static void main(String []args) throws InterruptedException {
49 new EchoClient("127.0.0.1",8080).start();
50 }
51}
EchoClientHandler
1import io.netty.buffer.ByteBuf;
2import io.netty.buffer.Unpooled;
3import io.netty.channel.ChannelHandlerContext;
4import io.netty.channel.SimpleChannelInboundHandler;
5import io.netty.util.CharsetUtil;
6
7public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
8 //管道读取数据:第二
9 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
10 System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8));
11 }
12
13 @Override
14 //管道建立成功:第一
15 public void channelActive(ChannelHandlerContext ctx) throws Exception {
16 System.out.println("Active");
17 ctx.writeAndFlush(Unpooled.copiedBuffer("This is a test!",CharsetUtil.UTF_8));
18 }
19
20 @Override
21 //管道消息读取完毕:第三
22 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
23 System.out.println("EchoClientHandler channelReadComplete");
24 }
25
26 @Override
27 //异常
28 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
29 cause.printStackTrace();
30 ctx.close();
31 }
32}
高性能 RPC 框架的 3 个要素
- IO 模型(Linux 网络 I/O 模型:五种)
- 数据协议(http/protobuf/Thrift)
- 线程模型(1:1 / 1:N / select / epoll)
Reactor 模式和 Netty 线程模型
设计模式——Reactor 模式(反应器设计模式),是一种基于事件驱动的设计模式,在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求 一般出现在高并发系统中,比如 Netty,Redis 等
Reactor 模式基于事件驱动,适合处理海量的 I/O 事件,属于同步非阻塞 IO(NIO)
优点
- 响应快,不会因为单个同步而阻塞,虽然 Reactor 本身依然是同步的。
- 编程相对简单,最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
- 可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
缺点
- 相比传统的简单模型,Reactor 增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
- Reactor 模式需要系统底层的的支持,比如 Java 中的 Selector 支持,操作系统的 select 系统调用支持
Netty 线程模型
Reactor 单线程模型(比较少用)
1* 对应小业务则适合,编码简单;对于高负载、大并发的应用场景不适合,一个 NIO 线程处理太多请求,则负载过高,并且可能响应变慢,导致大量请求超时,而且万一线程挂了,则不可用了
Reactor 多线程模型
1* 满足目前的大部分场景,也是 Netty 推荐使用的线程模型
EventLoop、EventLoopGroup
EventLoop
EventLoop 好比一个线程,1 个 EventLoop 可以服务多个 Channel,1 个 Channel 只有一个 EventLoop。
可以创建多个 EventLoop 来优化资源利用(多核 CPU),也就是 EventLoopGroup。一个 EventLoop 独占一个 CPU 核心。
EventLoopGroup
EventLoopGroup 负责分配 EventLoop 到新创建的 Channel,里面包含多个 EventLoop。
仅用单个线程来处理多个 Channels 的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。
但是,需要记住,现代的操作系统和 CPU 在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个 CPU 有多个内核,不使用多任务可能是在浪费 CPU 能力。
- EventLoopGroup 中有多个 EventLoop
- 每个 EventLoop 维护一个 Selector(也就是一个 EventLoop 可以维护多个 Channel)
- 一个 Selector(一个线程) 可以处理多个 Channel。
- EventLoopGroup 源码中使用 Runtime.getRuntime().availableProcessors()获取当前主机的 CPU 核心数,来决定默认创建 EventLoopGroup 中 EventLoop 的数量(默认是 CPU 核心数 * 2)。
1//手动调整EventLoopGroup 中EventLoop的数量。根据线程服务器的核心数调整。
2EventLoopGroup bossGroup = new NioEventLoopGroup(8);
ServerBootstrap
group
- 单线程
1EventLoopGroup bossGroup = new NioEventLoopGroup(1);
2try{
3 ServerBootstrap serverBootstrap = new ServerBootstrap();
4 serverBootstrap.group(bossGroup);
5}
- 多线程
1EventLoopGroup bossGroup = new NioEventLoopGroup();
2try{
3 ServerBootstrap serverBootstrap = new ServerBootstrap();
4 serverBootstrap.group(bossGroup);
5}
- 主从线程
1EventLoopGroup bossGroup = new NioEventLoopGroup();
2EventLoopGroup workGroup = new NioEventLoopGroup();
3try{
4 ServerBootstrap serverBootstrap = new ServerBootstrap();
5 serverBootstrap.group(bossGroup, workGroup);
6}
channel
设置 channel 通道类型 NioServerSocketChannel、OioServerSocketChannel。
1 bootstrap.channel(NioSocketChannel.class)
option
作用于每个新建立的 channel,设置 TCP 连接中的一些参数,如下
1* ChannelOption.SO_BACKLOG: 存放已完成三次握手的请求的等待队列的最大长度; (accept queue:全连接队列, net.core.somaxconn)
2 //Linux 服务器 TCP 连接底层知识:系统默认的 somaxconn 参数要足够大 ,如果 SO_BACKLOG 比 somaxconn 大,则会优先用 somaxconn,所以只有 SO_BACKLOG 小于 somaxconn 的时候,Netty 的 .option(SO_BACKLOG,1024) 才会生效。
3 syn queue:半连接队列,洪水攻击,tcp_max_syn_backlog
4 accept queue:全连接队列, net.core.somaxconn
5 .option(SO_BACKLOG,1024)
6
7* ChannelOption.TCP_NODELAY: 为了解决 Nagle 的算法问题,默认是 false, **要求高实时性,有数据时马上发送,就将该选项设置为 true**。关闭 Nagle 算法,如果要减少发送次数,就设置为 false,会累积一定大小后再发送;
8 .option(ChannelOption.TCP_NODELAY,true)
11、client发送SYN到server,将状态修改为SYN_SEND,如果server收到请求,则将状态修改为SYN_RCVD,并把该请求放到syns queue队列中。
22、server回复SYN+ACK给client,如果client收到请求,则将状态修改为ESTABLISHED,并发送ACK给server。
33、server收到ACK,将状态修改为ESTABLISHED,并把该请求从syns queue中放到accept queue。
4
5
6在linux系统内核中维护了两个队列:syns queue和accept queue
7
8syns queue
9 用于保存半连接状态的请求,其大小通过/proc/sys/net/ipv4/tcp_max_syn_backlog指定,一般默认值是512,不过这个设置有效的前提是系统的syncookies功能被禁用。互联网常见的TCP SYN FLOOD恶意DOS攻击方式就是建立大量的半连接状态的请求,然后丢弃,导致syns queue不能保存其它正常的请求。
10
11accept queue
12 用于保存全连接状态的请求,其大小通过/proc/sys/net/core/somaxconn指定,在使用listen函数时,内核会根据传入的backlog参数与系统参数somaxconn,取二者的较小值。
13 如果accpet queue队列满了,server将发送一个ECONNREFUSED错误信息Connection refused到client。
14
15ChannelOption.SO_BACKLOG
16 对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
childOption
作用于被 accept 之后的连接。
1.childOption()
childHandler
用于对每个通道里面的数据处理。
1.childHandler(new ChannelInitializer<SocketChannel>() { //绑定业务处理类EchoServerHandler
2 protected void initChannel(SocketChannel ch) throws Exception {
3 ch.pipeline().addLast(new EchoServerHandler());
4 }
5 });
客户端启动引导类 Bootstrap
- remoteAddress: 服务端地址
- handler:和服务端通信的处理器
Channel
一个 Channel 包含一个 ChannelPipeline,所有 ChannelHandler 都会顺序加入到 ChannelPipeline 中。创建 Channel 时会自动创建一个 ChannelPipeline,每个 Channel 都有一个管理它的 pipeline,这关联是永久性的。
- Channel: 客户端和服务端建立的一个连接通道
- ChannelHandler: 负责 Channel 的逻辑处理
- ChannelPipeline: 负责管理 ChannelHandler 的有序容器
Channel 声明周期
Channel 当状态出现变化,就会触发对应的事件。
channelRegistered --> channelActive --> channelInactive --> channelUnregistered
11. channelRegistered: channel 注册到一个 EventLoop。
22. channelActive: 变为活跃状态(连接到了远程主机),可以接受和发送数据。
33. channelInactive: channel 处于非活跃状态,没有连接到远程主机。
44. channelUnregistered: channel 已经创建,但是未注册到一个 EventLoop 里面,也就是没有和 Selector 绑定。
ChannelHandler
ChannelHandler 的基本方法
1* handlerAdded : 当 ChannelHandler 添加到 ChannelPipeline 调用
2* handlerRemoved : 当 ChannelHandler 从 ChannelPipeline 移除时调用
3* exceptionCaught : 执行抛出异常时调用
ChannelHandler 下两个子接口
ChannelInboundHandler:(入站)
1处理输入数据和Channel状态类型改变;适配器 ChannelInboundHandlerAdapter(适配器设计模式:帮你实现接口中的默认方法)
2如果不使用`适配器设计模式`默认需要实现所有的方法。
3而使用`适配器设计模式`,只需要 @Override 的相应的方法即可。
4常用的:SimpleChannelInboundHandler
ChannelOutboundHandler:(出站)
1处理输出数据,适配器 ChannelOutboundHandlerAdapter
ChannelPipeline
就像是厂里的流水线一样,可以在上面添加多个 ChannelHanler,也可看成是一串 ChannelHandler 实例,拦截穿过 Channel 的输入输出 event, ChannelPipeline 实现了拦截器的一种高级形式,使得用户可以对事件的处理以及 ChannelHanler 之间交互获得完全的控制权
ChannelHandlerContext
ChannelHandlerContext 是连接 ChannelHandler 和 ChannelPipeline 的桥梁
ChannelHandlerContext 部分方法和 Channel 及 ChannelPipeline 重合,好比调用 write 方法;
Channel、ChannelPipeline、ChannelHandlerContext 都可以调用此方法,前两者都会触发一个事件在整个 ChannelPipeline 中传播,而 ChannelHandlerContext 就只会在后续的 Handler 里面传播。
1@Override
2 //读取到消息
3 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
4 //第一种
5 //Channel channel = ctx.channel();
6 //channel.writeAndFlush(Unpooled.copiedBuffer("This is a test!", CharsetUtil.UTF_8));
7
8 //第二种
9 //ChannelPipeline channelPipeline = ctx.pipeline();
10 //channelPipeline.writeAndFlush(Unpooled.copiedBuffer("This is a test!", CharsetUtil.UTF_8));
11
12 //第三种
13 //类型转换
14 ByteBuf data = (ByteBuf) msg;
15 System.out.println("服务端收到数据: " + data.toString(CharsetUtil.UTF_8));
16 //将接收到的消息进行回写
17 ctx.writeAndFlush(data);
18 }
AbstractChannelHandlerContext 类
双向链表结构,next/prev 分别是后继节点,和前驱节点。
DefaultChannelHandlerContext
是实现类,但是大部分都是父类那边完成,这个只是简单的实现一些方法主要就是判断 Handler 的类型。ChannelInboundHandler 之间的传递,主要通过调用 ctx 里面的 FireXXX()方法来实现下个 handler 的调用。
1 ctx.fireChannelRead(data); //调用下一个handler
入站出站 Handler 执行顺序
一般的项目中,inboundHandler 和 outboundHandler 有多个,在 Pipeline 中的执行顺序?
InboundHandler 顺序执行,OutboundHandler 逆序执行。
1# pipeline 添加顺序一
2ch.pipeline().addLast(new InboundHandler1());
3ch.pipeline().addLast(new OutboundHandler1());
4ch.pipeline().addLast(new OutboundHandler2());
5ch.pipeline().addLast(new InboundHandler2());
6
7# pipeline 添加顺序二
8ch.pipeline().addLast(new OutboundHandler1());
9ch.pipeline().addLast(new OutboundHandler2());
10ch.pipeline().addLast(new InboundHandler1());
11ch.pipeline().addLast(new InboundHandler2());
12
13# 结果:执行顺序
14InboundHandler1 channelRead
15InboundHandler2 channelRead
16OutboundHandler2 write
17OutboundHandler1 write
结论
1 1)InboundHandler顺序执行,OutboundHandler逆序执行 。
2 2)InboundHandler之间传递数据,通过ctx.fireChannelRead(msg) 。
3 3)InboundHandler通过 Channel、ChannelPipeline 的 ctx.write(msg),则会传递到 OutboundHandler (Channel、ChannelPipeline 都会触发一个事件在整个 ChannelPipeline 中传播)。
4 4) 使用ctx.write(msg)传递消息,Inbound需要放在结尾,在Outbound之后,不然outboundhandler会不执行。但是使用channel.write(msg)、pipline.write(msg)情况会不一致,都会执行 。
5 5) Outbound和Inbound谁先执行,针对客户端和服务端而言,客户端是发起请求再接受数据,先Outbound再Inbound,服务端则相反,先Inbound再Outbound。
ChannelFuture
Netty 中的所有 I/O 操作都是异步的,这意味着任何 I/O 调用都会立即返回,而 ChannelFuture 会提供有关的信息 I/O 操作的结果或状态。
ChannelFuture 状态
注意:不要在 IO 线程内调用 future 对象的 sync 或者 await 方法;不能在 channelHandler 中调用 sync 或者 await 方法(会阻塞住服务于本 Channel 的 EvenLoop)
。
1* 未完成:当 I/O 操作开始时,将创建一个新的对象,新的最初是未完成的 - 它既没有成功,也没有成功,也没有被取消,因为 I/O 操作尚未完成。
2* 已完成:当 I/O 操作完成,不管是成功、失败还是取消,Future 都是标记为已完成的, 失败的时候也有具体的信息,例如原因失败,但请注意,即使失败和取消属于完成状态。
ChannelPromise
继承于 ChannelFuture,进一步拓展用于设置 IO 操作的结果。
网络数据传输编解码
Java 自带序列化的缺点
- 无法跨语言
- 与通用二进制编码相比,JDK 序列化之后的码流太大。
- 与通用二进制编码相比,JDK 序列化的性能很差。
业界常见编码框架
Google 的 Protobuf(PB)、Facebook 的 Trift、Jboss 的 Marshalling、Kyro 等。
Netty 里面的编解码
- 解码器:负责处理 “入站 InboundHandler” 数据
- 编码器:负责处理 “出站 OutboundHandler” 数据
Netty 里面提供默认的编解码器,也支持自定义编解码器
- Encoder:编码器
- Decoder:解码器
- Codec:编解码器
解码器 Decoder
Decoder 对应的就是 ChannelInboundHandler,主要就是字节数组转换为消息对象。
主要是两个方法
- decode:通常情况下。
- decodeLast:用于最后的几个字节处理,也就是 channel 关闭的时候,产生的最后一个消息。
抽象解码器
1ByteToMessageDecoder
2 用于将字节转为消息,需要检查缓冲区是否有足够的字节
3
4ReplayingDecoder
5 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,不是所有的ByteBuf都支持
6 选择:项目复杂性高则使用ReplayingDecoder,否则使用 ByteToMessageDecoder
7
8MessageToMessageDecoder
9 用于从一种消息解码为另外一种消息(例如POJO到POJO)
示例
1import io.netty.buffer.ByteBuf;
2import io.netty.channel.ChannelHandlerContext;
3import io.netty.handler.codec.ByteToMessageDecoder;
4import java.util.List;
5
6public class CustomDecoder extends ByteToMessageDecoder {
7 @Override
8 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
9 // int 是4个字节
10 if(in.readableBytes()>=4){
11 //添加到解码信息里面去
12 out.add(in.readInt());
13 }
14 }
15}
解码器具体的实现类
用的比较多的是(更多是为了解决 TCP 底层的粘包和拆包问题)
1DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
2LineBasedFrameDecoder: 以换行符为结束标志的解码器
3FixedLengthFrameDecoder:固定长度解码器
4LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
5StringDecoder:文本解码器,将接收到的对象转化为字符串,一般会与上面的进行配合,然后在后面添加业务handler
编码器 Encoder
Encoder 对应的就是 ChannelOutboundHandler,消息对象转换为字节数组。
Netty 本身未提供和解码一样的编码器,是因为场景不同,两者非对等的。
抽象编码器
1MessageToByteEncoder
2 消息转为字节数组,调用write方法,会先判断当前编码器是否支持需要发送的消息类型,如果不支持,则透传;
3
4MessageToMessageEncoder
5 用于从一种消息编码为另外一种消息(例如POJO到POJO)
示例
1import io.netty.buffer.ByteBuf;
2import io.netty.channel.ChannelHandlerContext;
3import io.netty.handler.codec.MessageToByteEncoder;
4
5public class CustomEncoder extends MessageToByteEncoder<Integer> {
6 @Override
7 protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
8 out.writeInt(msg);
9 }
10}
编解码器类 Codec
组合解码器和编码器,以此提供对于字节和消息都相同的操作。
- 优点:成对出现,编解码都是在一个类里面完成
- 缺点:耦合在一起,拓展性不佳
- 局限性:Byte[] => User User => Byte[]
1Codec:组合编解码
2 1)ByteToMessageCodec
3 2)MessageToMessageCodec
4
5 decoder:解码
6 1)ByteToMessageDecoder
7 2)MessageToMessageDecoder
8
9 encoder:编码
10 1)MessageToByteEncoder
11 2)MessageToMessageEncoder
TCP 粘包、半包
| | send | receive|
| --- | --- |
| 粘包 | ABC DEF | ABCDEF |
| 半包 | ABC DEF | AB CD EF |
粘包的主要原因
1* sender 每次写入数据 < 套接字缓冲区大小
2* receiver 读取套接字缓冲区数据不够及时
半包的主要原因
1* sender 写入数据 > 套接字缓冲区大小
2* sender 数据大于协议的 MTU(Maximum Transmission Unit,最大传输),必须拆包。
从收发和传输角度看
- 收发:一个发送可能被多次接收(半包),多个发送可能被一次接收(粘包)
- 传输:一个发送可能占用多个传输包(半包),多个发送可能公用一个传输包(粘包)。
根本原因
TCP 是流式协议,消息无边界。协议本身无法避免粘包,半包读写的发生需要在应用层进行处理。
Tips:UDP 像邮寄的包裹,虽然一次运输多个,但每个包裹都有 “界限”,一个一个签收,故不存在:粘包、半包问题。
应用层解决问题的根本手段:找出消息的边界
Netty 对三种常用封侦方式的支持
1* DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
2* LineBasedFrameDecoder: 以换行符为结束标志的解码器
3* FixedLengthFrameDecoder:固定长度解码器
4* LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
半包读写问题演示
EchoServerHandler
1public class ServerHandler extends ChannelInboundHandlerAdapter {
2
3 private int counter;
4
5 @Override
6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
7 ByteBuf buf = (ByteBuf)msg;
8 //获取可读取的字节大小
9 byte[] bytes = new byte[buf.readableBytes()];
10 //读取
11 buf.readBytes(bytes);
12
13 //根据换行符解码
14 String body = new String(bytes,"UTF-8").
15 substring(0,bytes.length - System.getProperty("line.separator").length());
16 System.out.println("服务端收到消息内容为:" + body + ", 收到消息次数:" + ++counter);
17 }
18
19 @Override
20 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
21 cause.printStackTrace();
22 ctx.close();
23 }
24
25}
ClientHandler
1public class ClientHandler extends ChannelInboundHandlerAdapter {
2
3 @Override
4 public void channelActive(ChannelHandlerContext ctx) throws Exception {
5
6 ByteBuf mes = null;
7 byte [] req = ("test.net"+System.getProperty("line.separator")).getBytes();
8 //连续发送 10 次
9 for(int i=0; i< 10; i++){
10 mes = Unpooled.buffer(req.length);
11 mes.writeBytes(req);
12 ctx.writeAndFlush(mes);
13 }
14 }
15
16 @Override
17 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
18 cause.printStackTrace();
19 ctx.close();
20 }
21}
控制台输出(客户端发送 10 次,服务端显示只接收到 1 次)
1Echo 服务器启动
2服务端收到消息内容为:test.net
3test.net
4test.net
5test.net
6test.net
7test.net
8test.net
9test.net
10test.net
11test.net,收到消息次数: 1
LineBasedFrameDecoder 解决 TCP 半包读写问题
LineBasedFrameDecoder: 以换行符为结束标志的解码器
StringDecoder:解码器将对象转成字符串
EchoClient
1public class EchoClient {
2
3 private String host;
4 private int port;
5
6 public EchoClient(String host, int port) {
7 this.host = host;
8 this.port = port;
9 }
10
11 public void start() throws Exception {
12 EventLoopGroup group = new NioEventLoopGroup();
13 try {
14 Bootstrap bootstrap = new Bootstrap();
15 bootstrap.group(group)
16 .channel(NioSocketChannel.class)
17 .remoteAddress(new InetSocketAddress(host, port))
18 .handler(new ChannelInitializer<SocketChannel>() {
19 @Override
20 public void initChannel(SocketChannel ch)throws Exception {
21 ch.pipeline().addLast(new ClientHandler());
22 } });
23 //连接到服务端,connect是异步链接,再调用同步方法sync,等待连接成功
24 ChannelFuture f = bootstrap.connect().sync();
25 //阻塞直到客户端通道关闭
26 f.channel().closeFuture().sync();
27 } finally {
28 //优雅退出,释放NIO线程组
29 group.shutdownGracefully().sync();
30 }
31 }
32
33 public static void main(String[] args) throws Exception {
34 new EchoClient("127.0.0.1", 8080).start();
35 }
36}
ClientHandler
1public class ClientHandler extends ChannelInboundHandlerAdapter {
2 @Override
3 public void channelActive(ChannelHandlerContext ctx) throws Exception {
4 ByteBuf mes = null;
5 //添加换行符
6 byte [] req = ("test.net"+System.getProperty("line.separator")).getBytes();
7 //连续发送
8 for(int i=0; i< 10; i++){
9 mes = Unpooled.buffer(req.length);
10 mes.writeBytes(req);
11 ctx.writeAndFlush(mes);
12 }
13 }
14
15 @Override
16 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
17 cause.printStackTrace();
18 ctx.close();
19 }
EchoServer
1public class EchoServer {
2
3 private int port;
4
5 public EchoServer(int port){
6 this.port = port;
7 }
8
9 public void run() throws Exception{
10
11 //配置服务端的线程组
12 EventLoopGroup bossGroup = new NioEventLoopGroup();
13 EventLoopGroup workGroup = new NioEventLoopGroup();
14
15 try{
16 ServerBootstrap serverBootstrap = new ServerBootstrap();
17 serverBootstrap.group(bossGroup,workGroup)
18
19 .channel(NioServerSocketChannel.class)
20 .option(ChannelOption.SO_BACKLOG,128)
21 .childHandler(new ChannelInitializer<SocketChannel>() {
22 @Override
23 protected void initChannel(SocketChannel ch) throws Exception {
24 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); //第一次解码(可以指定最大长度,如果1024字节还没有找到分隔符就抛出异常:TooLongFrameException)
25 ch.pipeline().addLast(new StringDecoder()); //二次解码:解码器将对象转成字符串
26 ch.pipeline().addLast(new ServerHandler()); //添加Handler
27 }
28 });
29
30 System.out.println("Echo 服务器启动");
31 //绑定端口,同步等待成功
32 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
33 //等待服务端监听端口关闭
34 channelFuture.channel().closeFuture().sync();
35 }finally {
36 //优雅退出,释放线程池
37 workGroup.shutdownGracefully();
38 bossGroup.shutdownGracefully();
39 }
40 }
41
42 public static void main(String[] args) throws Exception {
43 int port = 8080;
44 if (args.length > 0) {
45 port = Integer.parseInt(args[0]);
46 }
47 new EchoServer(port).run();
48 }
49}
ServerHandler
1public class ServerHandler extends ChannelInboundHandlerAdapter {
2
3 private int counter;
4
5 @Override
6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
7 String body = (String) msg;
8 System.out.println("服务端收到消息内容为:" + body + ", 收到消息次数:" + ++counter);
9 }
10
11 @Override
12 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
13 cause.printStackTrace();
14 ctx.close();
15 }
16}
控制台输出
1Echo 服务器启动
2服务端收到消息内容为:test.net, 收到消息次数:1
3服务端收到消息内容为:test.net, 收到消息次数:2
4服务端收到消息内容为:test.net, 收到消息次数:3
5服务端收到消息内容为:test.net, 收到消息次数:4
6服务端收到消息内容为:test.net, 收到消息次数:5
7服务端收到消息内容为:test.net, 收到消息次数:6
8服务端收到消息内容为:test.net, 收到消息次数:7
9服务端收到消息内容为:test.net, 收到消息次数:8
10服务端收到消息内容为:test.net, 收到消息次数:9
11服务端收到消息内容为:test.net, 收到消息次数:10
DelimiterBasedFrameDecoder 解决 TCP 半包读写问题
EchoClient
1public class EchoClient {
2
3 private String host;
4 private int port;
5
6 public EchoClient(String host, int port) {
7 this.host = host;
8 this.port = port;
9 }
10
11 public void start() throws Exception {
12 EventLoopGroup group = new NioEventLoopGroup();
13 try {
14 Bootstrap bootstrap = new Bootstrap();
15 bootstrap.group(group)
16 .channel(NioSocketChannel.class)
17 .remoteAddress(new InetSocketAddress(host, port))
18 .handler(new ChannelInitializer<SocketChannel>() {
19 @Override
20 public void initChannel(SocketChannel ch)throws Exception {
21 ch.pipeline().addLast(new ClientHandler());
22 } });
23 //连接到服务端,connect是异步链接,再调用同步方法sync,等待连接成功
24 ChannelFuture f = bootstrap.connect().sync();
25 //阻塞直到客户端通道关闭
26 f.channel().closeFuture().sync();
27 } finally {
28 //优雅退出,释放NIO线程组
29 group.shutdownGracefully().sync();
30 }
31 }
32
33 public static void main(String[] args) throws Exception {
34 new EchoClient("127.0.0.1", 8080).start();
35 }
36}
ClientHandler
1public class ClientHandler extends ChannelInboundHandlerAdapter {
2
3 @Override
4 public void channelActive(ChannelHandlerContext ctx) throws Exception {
5
6 String message = "Netty is a NIO client server framework which enables quick&_" +
7 "and easy development of network applications&_ " +
8 "such as protocol servers and clients.&_" +
9 " It greatly simplifies and streamlines&_" +
10 "network programming such as TCP and UDP socket server.&_";
11
12 ByteBuf mes = null;
13 mes = Unpooled.buffer(message.getBytes().length);
14 mes.writeBytes(message.getBytes());
15 ctx.writeAndFlush(mes);
16
17 }
18
19 @Override
20 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
21
22 cause.printStackTrace();
23 ctx.close();
24 }
25}
EchoServer
1public class EchoServer {
2
3 private int port;
4
5 public EchoServer(int port){
6 this.port = port;
7 }
8
9 public void run() throws Exception{
10
11 //配置服务端的线程组
12 EventLoopGroup bossGroup = new NioEventLoopGroup();
13
14 EventLoopGroup workGroup = new NioEventLoopGroup();
15
16 try{
17 ServerBootstrap serverBootstrap = new ServerBootstrap();
18 serverBootstrap.group(bossGroup,workGroup)
19
20 .channel(NioServerSocketChannel.class)
21 .option(ChannelOption.SO_BACKLOG,128)
22 .childHandler(new ChannelInitializer<SocketChannel>() {
23 @Override
24 protected void initChannel(SocketChannel ch) throws Exception {
25
26 ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); //将分隔符转换成 ByteBuf
27 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,false, delimiter)); //设置最大长度、false代表不去除delimiter(true 代表去除delimiter)、设置分隔符
28 ch.pipeline().addLast(new StringDecoder()); //将消息对象转换成字符串
29 ch.pipeline().addLast(new ServerHandler());
30 }
31 });
32
33 System.out.println("Echo 服务器启动");
34 //绑定端口,同步等待成功
35 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
36 //等待服务端监听端口关闭
37 channelFuture.channel().closeFuture().sync();
38
39 }finally {
40 //优雅退出,释放线程池
41 workGroup.shutdownGracefully();
42 bossGroup.shutdownGracefully();
43 }
44
45 }
46
47 public static void main(String[] args) throws Exception {
48 int port = 8080;
49
50 if (args.length > 0) {
51 port = Integer.parseInt(args[0]);
52 }
53 new EchoServer(port).run();
54 }
55}
ServerHandler
1public class ServerHandler extends ChannelInboundHandlerAdapter {
2 private int counter;
3
4 @Override
5 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
6
7 String body = (String) msg;
8 System.out.println("服务端收到消息内容为:" + body + ", 收到消息次数:" + ++counter);
9
10 }
11
12 @Override
13 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
14 cause.printStackTrace();
15 ctx.close();
16 }
17
18}
控制台输出
1Echo 服务器启动
2服务端收到消息内容为:Netty is a NIO client server framework which enables quick&_, 收到消息次数:1
3服务端收到消息内容为:and easy development of network applications&_, 收到消息次数:2
4服务端收到消息内容为: such as protocol servers and clients.&_, 收到消息次数:3
5服务端收到消息内容为: It greatly simplifies and streamlines&_, 收到消息次数:4
6服务端收到消息内容为:network programming such as TCP and UDP socket server.&_, 收到消息次数:5
LengthFieldBasedFrameDecoder 解决 TCP 半包读写问题
LengthFieldBasedFrameDecoder 常用选项
1maxFrameLength 数据包的最大长度
2lengthFieldOffset 长度字段的偏移位,长度字段开始的地方,意思是跳过指定长度个字节之后的才是消息体字段
3lengthFieldLength 长度字段占的字节数, 帧数据长度的字段本身的长度
4lengthAdjustment 一般 Header + Body,添加到长度字段的补偿值,如果为负数,开发人员认为这个 Header的长度字段是整个消息包的长度,则Netty应该减去对应的数字
5initialBytesToStrip 从解码帧中第一次去除的字节数, 获取完一个完整的数据包之后,忽略前面的指定位数的长度字节,应用解码器拿到的就是不带长度域的数据包
6failFast 是否快速失败
Message
1public class Message {
2
3 //可以指整个消息的长度,也可以指消息体的长度
4 private int length;
5
6 private String body;
7
8 public int getLength() {
9 return length;
10 }
11
12 public void setLength(int length) {
13 this.length = length;
14 }
15
16 public String getBody() {
17 return body;
18 }
19
20 public void setBody(String body) {
21 this.body = body;
22 }
23}
ByteBuf
JDK ByteBuffer:共用读写索引,每次读写操作都需要 Flip() 复位索引,扩容麻烦,而且扩容后容易造成浪费。
Netty ByteBuf:是传递字节数据的容器。读写使用不同的索引,所以操作便捷自动扩容,使用便捷。
ByteBuf 创建方法
11. ByteBufAllocator
2 池化 PooledByteBufAllocator :Netty4.x版本后默认使用池化,提高性能,并且最大程度减少内存碎片
3 非池化 UnpooledByteBufAllocator: 每次返回新的实例
4
52. Unpooled: 提供静态方法创建未池化的ByteBuf,可以创建堆内存和直接内存缓冲区
ByteBuf 使用模式
选择原则:大量 IO 数据读写,用“直接缓存区”; 业务消息编解码用“堆缓存区”
- 堆缓存区 HEAP BUFFER
1优点:存储在JVM的堆空间中,可以快速的分配和释放
2缺点:每次使用前会拷贝到直接缓存区(也叫堆外内存)
- 直接缓存区 DIRECR BUFFER
1优点:存储在堆外内存上,堆外分配的直接内存,不会占用堆空间
2缺点:内存的分配和释放,比在堆缓冲区更复杂
- 复合缓冲区 COMPOSITE BUFFER
1可以创建多个不同的ByteBuf,然后放在一起,但是只是一个视图
用法示例
1//ByteBufAllocator
2 PooledByteBufAllocator pooledByteBufAllocator = new PooledByteBufAllocator();
3 UnpooledByteBufAllocator unpooledByteBufAllocator = new UnpooledByteBufAllocator();
4
5//Unpooled
6 //创建一个堆缓冲区
7 ByteBuf heapBuffer = Unpooled.buffer(16);
8
9 //创建一个堆外缓冲区,直接内存
10 ByteBuf directBuffer = Unpooled.directBuffer(16);
11
12 //组合缓冲区
13 CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
14 compositeByteBuf.addComponents(heapBuffer,directBuffer);
Netty 中采用的设计模式
1* Builder构造器模式:ServerBootstap
2
3* 责任链设计模式:pipeline的事件传播
4
5* 工厂模式: 创建Channel
6
7* 适配器模式:HandlerAdapter
Netty 搭建单机百万连接服务器
必备知识
1* 网络 IO 模型
2* Linux 文件描述符(每一个 tcp 连接都要占一个文件描述符)
3 单进程文件句柄数(默认 1024,不同系统不一样,每个进程都有最大的文件描述符限制)
4 全局文件句柄数(也有默认值 65535,不同系统不一样)
5* 如何确定一个唯一的 TCP 连接:TCP 四元组:源 IP 地址、源端口、目的 ip、目的端口
每个进程开启一个 socket 连接,都会占用一个文件描述符。
在 Linux 系统中一切皆可以看成是文件,文件又可分为:普通文件、目录文件、链接文件和设备文件。
文件描述符(file descriptor)是内核为了高效管理已被打开的文件所创建的索引,其是一个非负整数(通常是小整数),用于指代被打开的文件,所有执行 I/O 操作(包括网络 socket 操作)的系统调用都通过文件描述符。
套接字(socket)是通信的基石,是支持 TCP/IP 协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议,本地主机的 IP 地址,本地进程的协议端口,远地主机的 IP 地址,远地进程的协议端口。
由于进程级文件描述符表的存在,不同的进程中会出现相同的文件描述符,它们可能指向同一个文件,也可能指向不同的文件。
Pom.xml
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6
7 <groupId>pub.soulboy</groupId>
8 <artifactId>millionServer</artifactId>
9 <version>1.0-SNAPSHOT</version>
10
11
12
13 <dependencies>
14 <dependency>
15 <groupId>io.netty</groupId>
16 <artifactId>netty-all</artifactId>
17 <version>4.1.32.Final</version>
18 </dependency>
19 <dependency>
20 <groupId>junit</groupId>
21 <artifactId>junit</artifactId>
22 <version>4.12</version>
23 <scope>test</scope>
24 </dependency>
25
26
27 </dependencies>
28
29 <build>
30 <plugins>
31 <!--maven的默认编译使用的jdk版本貌似很低,使用maven-compiler-plugin插件可以指定项目源码的jdk版本-->
32 <plugin>
33 <groupId>org.apache.maven.plugins</groupId>
34 <artifactId>maven-compiler-plugin</artifactId>
35 <version>3.7.0</version>
36 <configuration>
37 <source>1.8</source>
38 <target>1.8</target>
39 <encoding>utf-8</encoding>
40 </configuration>
41 </plugin>
42
43 <!--将依赖的jar包打包到当前jar包,常规打包是不会将所依赖jar包打进来的-->
44 <plugin>
45 <groupId>org.apache.maven.plugins</groupId>
46 <artifactId>maven-shade-plugin</artifactId>
47 <version>1.2.1</version>
48 <executions>
49 <execution>
50 <phase>package</phase>
51 <goals>
52 <goal>shade</goal>
53 </goals>
54 <configuration>
55 <transformers>
56 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
57 <!-- 打包指定启动类 -->
58 <!--<mainClass>pub.soulboy.NettyServer</mainClass>-->
59 <mainClass>pub.soulboy.NettyClient</mainClass>
60 </transformer>
61 </transformers>
62 </configuration>
63 </execution>
64 </executions>
65 </plugin>
66
67 </plugins>
68 </build>
69</project>
Config
1public class Config {
2 public static final int BEGIN_PORT = 8000;
3 public static final int END_PORT = 8050;
4}
NettyServer
1import io.netty.bootstrap.ServerBootstrap;
2import io.netty.channel.ChannelFutureListener;
3import io.netty.channel.ChannelOption;
4import io.netty.channel.EventLoopGroup;
5import io.netty.channel.nio.NioEventLoopGroup;
6import io.netty.channel.socket.nio.NioServerSocketChannel;
7
8public class NettyServer {
9 public static void main(String [] args){
10 new NettyServer().run(Config.BEGIN_PORT, Config.END_PORT);
11 }
12
13 public void run(int beginPort, int endPort){
14 System.out.println("服务端启动中");
15
16 //配置服务端线程组
17 EventLoopGroup bossGroup = new NioEventLoopGroup();
18 EventLoopGroup workGroup = new NioEventLoopGroup();
19
20 ServerBootstrap serverBootstrap = new ServerBootstrap();
21 serverBootstrap.group(bossGroup, workGroup)
22 .channel(NioServerSocketChannel.class)
23 .childOption(ChannelOption.SO_REUSEADDR, true) //快速复用端口号,避免关闭的TCP端口号需要被内核持有一段时间才能被设置为可用
24 .childHandler( new TcpCountHandler());
25
26 for(; beginPort < endPort; beginPort++){
27 int port = beginPort;
28 serverBootstrap.bind(port).addListener((ChannelFutureListener) future->{
29 System.out.println("服务端成功绑定端口 port = "+port);
30 });
31 }
32 }
33}
TcpCountHandler
1import io.netty.channel.ChannelHandler;
2import io.netty.channel.ChannelHandlerContext;
3import io.netty.channel.ChannelInboundHandlerAdapter;
4
5import java.util.concurrent.Executors;
6import java.util.concurrent.TimeUnit;
7import java.util.concurrent.atomic.AtomicInteger;
8
9@ChannelHandler.Sharable //多个线程共享此Handler
10public class TcpCountHandler extends ChannelInboundHandlerAdapter {
11
12 //计数当前服务器的连接数
13 private AtomicInteger atomicInteger = new AtomicInteger();
14
15 /**
16 * 构造函数: 添加定时任务,每三秒打印一下当前服务器的连接数
17 */
18 public TcpCountHandler(){
19 Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(()->{
20 System.out.println("当前连接数为 = "+ atomicInteger.get());
21 }, 0, 3, TimeUnit.SECONDS);
22 }
23
24 @Override
25 public void channelActive(ChannelHandlerContext ctx) throws Exception {
26 atomicInteger.incrementAndGet();
27 }
28
29 @Override
30 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
31 atomicInteger.decrementAndGet();
32 }
33
34 @Override
35 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
36 System.out.println("TcpCountHandler exceptionCaught");
37 cause.printStackTrace();
38 }
39}
NettyClient
1import io.netty.bootstrap.Bootstrap;
2import io.netty.channel.*;
3import io.netty.channel.nio.NioEventLoopGroup;
4import io.netty.channel.socket.SocketChannel;
5import io.netty.channel.socket.nio.NioSocketChannel;
6
7
8public class NettyClient {
9
10 private static final String SERVER = "127.0.0.1";
11
12 public static void main(String [] args){
13 new NettyClient().run(Config.BEGIN_PORT, Config.END_PORT);
14 }
15
16 public void run(int beginPort, int endPort){
17
18 System.out.println("客户端启动中");
19 EventLoopGroup group = new NioEventLoopGroup();
20 Bootstrap bootstrap = new Bootstrap();
21
22 bootstrap.group(group)
23 .channel(NioSocketChannel.class)
24 .option(ChannelOption.SO_REUSEADDR, true)//快速复用端口号,避免关闭的TCP端口号需要被内核持有一段时间才能被设置为可用
25 .handler(new ChannelInitializer<SocketChannel>() {
26 @Override
27 protected void initChannel(SocketChannel ch) throws Exception {
28 }
29 });
30
31
32 int index = 0 ;
33 int finalPort ;
34
35 while (true){
36 //TCP连接服务端的端口
37 finalPort = beginPort + index;
38 try {
39 bootstrap.connect(SERVER, finalPort).addListener((ChannelFutureListener)future ->{
40 if(!future.isSuccess()){
41 System.out.println("创建连接失败" );
42 }
43 }).get();
44
45 System.out.println("port = " + finalPort);
46 } catch (Exception e) {
47 //e.printStackTrace();
48 }
49 ++index;
50 if(index == (endPort - beginPort)){
51 index = 0 ;
52 }
53 }
54 }
55}
测试
1# 启动服务端 <!-- 打包指定启动类做区分 -->
2java -jar millionServer-1.0-SNAPSHOT.jar
3或者 java -jar millionServer-1.0-SNAPSHOT.jar -Xms5g -Xmx5g -XX:NewSize=3g -XX:MaxNewSize=3g
4
5
6当前连接数为 = 65455 //受限于全局文件句柄数
7
8
9# 启动客户端 <!-- 打包指定启动类做区分 -->
10java -jar millionServer-1.0-SNAPSHOT.jar
11
12port = 8026
13...
14port = 8028
15创建连接失败
Linux 核心参数优化
Linux TCP 链接 socket 数据数
进行 TCP 连接时,系统为每个 TCP 连接创建一个 socket 句柄,也就是一个文件句柄,但是 Linux 对每个进程打开的文件句柄数量做了限制,如果超出:报错 “Too many open file”。
局部文件句柄限制
单个进程最大文件打开数
1# 查看命令:一个进程最大打开的文件数 fd 不同系统有不同的默认值
2[root@master ~]# ulimit -n
31024
4
5# 临时修改(系统重启或者用户退出登录都会失效)
6[root@master tmp]# ulimit -n 1000000
7
8# 永久修改 (需要重启)
9[root@master tmp]# vim /etc/security/limits.conf
10root soft nofile 1000000
11root hard nofile 1000000
12* soft nofile 1000000
13* hard nofile 1000000
全局文件句柄限制
所有进程最大打开的文件数,不同系统是不一样,可以直接 echo 临时修改。
1# 查看命令
2[root@master tmp]# cat /proc/sys/fs/file-max
3177072
4
5# 临时修改
6[root@master tmp]# echo 1000000 > /proc/sys/fs/file-max
7
8# 永久修改 (永久修改全局文件句柄, 修改后生效 sysctl -p)
9[root@master tmp]# vim /etc/sysctl.conf
10fs.file-max = 1000000
SocketChannel 与 ServerSocketChannel 的区别
1* Socket 和 ServerSocket 是一对 他们是 java.net 下面实现 socket 通信的类
2* SocketChannel 和 ServerSocketChannel 是一对 他们是 java.nio 下面实现通信的类 支持异步通信
3* 服务器必须先建立 ServerSocket 或者 ServerSocketChannel 来等待客户端的连接
4* 客户端必须建立相对应的 Socket 或者 SocketChannel 来与服务器建立连接
5* 服务器接受到客户端的连接受,再生成一个 Socket 或者 SocketChannel 与此客户端通信
SocketChannel(7 个: childOption)
ServerSocketChannel(3 个: option )
1 serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
2 serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
3 serverBootstrap.option(ChannelOption.SO_REUSEADDR, true) //快速复用端口号,避免关闭的TCP端口号需要被内核持有一段时间才能被设置为可用
Netty 核心参数优化
ChannelOption (非系统相关:共 11 个)
1ChannelOption
2* childOption(ChannelOption.[XXX], [YYY])
3* option(ChannelOption.[XXX], [YYY])
System property (-Dio.netty.xxx,50+ )
1* 多种实现的切换:-Dio.netty.noJdkZlibDecoder
2* 参数的调优: -Dio.netty.eventLoopThreads
3* 功能的开启关闭: -Dio.netty.noKeySetOptimization
1 bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 10 * 1000);
图解三个抽象的参数
SO_REUSEADDR
缩短端口的可用等待时间。(要根据网络实际情况设置:很大程度上取决于网络环境)
SO_LINGER
前者可以更快的关闭,后者为了安全性会比前者多阻塞一会儿。根据需求取舍。
ALLOW_HALF_CLOSURE
前者关闭双线通道。
后者关闭其中一个通道。
1# channel.shutdownOutput()
2* 关闭之后不能 send (在channel.shutdownOutput()之后,如果发数据会抛出异常),但是可以 receive。
3
4# channel.shutdownInput()
5* 关闭之后不能 read (在channel.shutdownInput()之后,如果读取据会抛出异常),但是可以 send。
数据链路分析
1输入域名-》浏览器内核调度-》本地DNS解析-》远程DNS解析-》ip -》路由多层调转-》目的服务器
2 服务器内核-》代理服务器 nginx/ 网关 / 负载均衡设备-》目的服务器
3 服务器内核-》 应用程序(springboot)-》Redis-》Mysql