Netty in Order Service
编写网络应用程序基本步骤
案例介绍
- AuthOperation: 授权操作
- OrderOperation:点单操作
- KeepaliveOperation:保持连接操作
数据结构设计
Message Header
1* version 协议版本
2* opCode 不同的opCode,Message Body 对应的 operation/operation result 也是不同的,在JSON 解析时需要根据 opCode 解析到不同的 Message Body 类型。
3* streamId 相当于Message ID,标识信息的唯一 ID
Message Body(JSON)
1* operation 操作
2* operation result 操作结果
length
1* length 长度信息值得是整个Message(Header + Body)
套接字(socket)是通信的基石,是支持 TCP/IP 协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议,本地主机的 IP 地址,本地进程的协议端口,远地主机的 IP 地址,远地进程的协议端口。
Netty 对三种常用封侦方式的支持
1* 选择协议 TCP 粘包、半包(封帧)
项目依赖
pom.xml
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6
7 <groupId>com.learning.netty</groupId>
8 <artifactId>netty-research</artifactId>
9 <version>1.0-SNAPSHOT</version>
10 <build>
11 <plugins>
12 <plugin>
13 <groupId>org.apache.maven.plugins</groupId>
14 <artifactId>maven-compiler-plugin</artifactId>
15 <configuration>
16 <source>8</source>
17 <target>8</target>
18 </configuration>
19 </plugin>
20 </plugins>
21 </build>
22
23 <dependencies>
24 <dependency>
25 <groupId>io.netty</groupId>
26 <artifactId>netty-all</artifactId>
27 <version>4.1.39.Final</version>
28 </dependency>
29 <dependency>
30 <groupId>org.projectlombok</groupId>
31 <artifactId>lombok</artifactId>
32 <version>1.16.18</version>
33 </dependency>
34 <dependency>
35 <groupId>com.google.code.gson</groupId>
36 <artifactId>gson</artifactId>
37 <version>2.8.5</version>
38 </dependency>
39 <dependency>
40 <groupId>com.google.guava</groupId>
41 <artifactId>guava</artifactId>
42 <version>19.0</version>
43 </dependency>
44 </dependencies>
45</project>
Message
Message
1import io.netty.buffer.ByteBuf;
2import io.netty.example.study.util.JsonUtil;
3import lombok.Data;
4
5import java.nio.charset.Charset;
6
7@Data
8public abstract class Message<T extends MessageBody> {
9 //MessageHeader
10 private MessageHeader messageHeader;
11
12 //MessageBody:Operation | OperationResult
13 private T messageBody;
14
15 /**
16 * 获取 messageBody
17 * @return
18 */
19 public T getMessageBody(){
20 return messageBody;
21 }
22
23 /**
24 * 抽象方法,获取 MessageBodyDecodeClass
25 * @param opcode
26 * @return
27 */
28 public abstract Class<T> getMessageBodyDecodeClass(int opcode);
29
30 /**
31 * 编码
32 * @param byteBuf
33 */
34 public void encode(ByteBuf byteBuf) {
35 //Message Header
36 byteBuf.writeInt(messageHeader.getVersion());
37 byteBuf.writeLong(messageHeader.getStreamId());
38 byteBuf.writeInt(messageHeader.getOpCode());
39 //Message Body
40 byteBuf.writeBytes(JsonUtil.toJson(messageBody).getBytes());
41 }
42
43 /**
44 * 解码
45 * @param msg
46 */
47 public void decode(ByteBuf msg) {
48 //Message Header
49 int version = msg.readInt();
50 long streamId = msg.readLong();
51 int opCode = msg.readInt();
52
53 MessageHeader messageHeader = new MessageHeader();
54 messageHeader.setVersion(version);
55 messageHeader.setOpCode(opCode);
56 messageHeader.setStreamId(streamId);
57
58 this.messageHeader = messageHeader;
59
60 //Message Body
61 Class<T> bodyClazz = getMessageBodyDecodeClass(opCode);
62 T body = JsonUtil.fromJson(msg.toString(Charset.forName("UTF-8")), bodyClazz);
63 this.messageBody = body;
64 }
65}
MessageHeader
1@Data
2public class MessageHeader {
3 private int version = 1;
4 private int opCode;
5 private long streamId;
6}
MessageBody
1public abstract class MessageBody {
2}
RequestMessage
1public class RequestMessage extends Message<Operation>{
2 @Override
3 public Class getMessageBodyDecodeClass(int opcode) {
4 //获取指定的 Operation
5 return OperationType.fromOpCode(opcode).getOperationClazz();
6 }
7
8 public RequestMessage(){}
9
10 /**
11 * 构造方法:构造 RequestMessage
12 * @param streamId
13 * @param operation
14 */
15 public RequestMessage(Long streamId, Operation operation){
16 MessageHeader messageHeader = new MessageHeader();
17 messageHeader.setStreamId(streamId);
18 messageHeader.setOpCode(OperationType.fromOperation(operation).getOpCode());
19
20 //为 Message 设置 MessageHeader
21 this.setMessageHeader(messageHeader);
22 //为 Message 设置 MessageBody
23 this.setMessageBody(operation);
24 }
25}
ResponseMessage
1public class ResponseMessage extends Message <OperationResult>{
2 @Override
3 public Class getMessageBodyDecodeClass(int opcode) {
4 //获取指定的 OperationResult
5 return OperationType.fromOpCode(opcode).getOperationResultClazz();
6 }
7}
Operation / OperationResult
OperationType (枚举类)
1import io.netty.example.study.common.order.OrderOperation;
2import io.netty.example.study.common.order.OrderOperationResult;
3import io.netty.example.study.common.auth.AuthOperation;
4import io.netty.example.study.common.auth.AuthOperationResult;
5import io.netty.example.study.common.keepalive.KeepaliveOperation;
6import io.netty.example.study.common.keepalive.KeepaliveOperationResult;
7
8import java.util.function.Predicate;
9
10public enum OperationType {
11 //定义枚举
12 AUTH(1, AuthOperation.class, AuthOperationResult.class),
13 KEEPALIVE(2, KeepaliveOperation.class, KeepaliveOperationResult.class),
14 ORDER(3, OrderOperation.class, OrderOperationResult.class);
15
16 //枚举类无法显式为枚举中的常量显式赋值
17 private int opCode;
18 private Class<? extends Operation> operationClazz;
19 private Class<? extends OperationResult> operationResultClazz;
20
21 /**
22 * 枚举类的构造方法 :通过构造方法为枚举中的常量显式赋值
23 * @param opCode
24 * @param operationClazz
25 * @param responseClass
26 */
27 OperationType(int opCode, Class<? extends Operation> operationClazz, Class<? extends OperationResult> responseClass) {
28 this.opCode = opCode;
29 this.operationClazz = operationClazz;
30 this.operationResultClazz = responseClass;
31 }
32
33 //获取 opCode
34 public int getOpCode(){
35 return opCode;
36 }
37
38 //获取 operationClazz
39 public Class<? extends Operation> getOperationClazz() {
40 return operationClazz;
41 }
42
43 //获取 operationResultClazz
44 public Class<? extends OperationResult> getOperationResultClazz() {
45 return operationResultClazz;
46 }
47
48 /**
49 * 根据 opCode 获取对应的枚举
50 * @param type
51 * @return
52 */
53 public static OperationType fromOpCode(int type){
54 return getOperationType(requestType -> requestType.opCode == type);
55 }
56
57 /**
58 * 根据 operation 获取对应的枚举
59 * @param operation
60 * @return
61 */
62 public static OperationType fromOperation(Operation operation){
63 //当前遍历的 OperationType 实例的 operationClazz 属性等于 入参的操作类型 (OrderOperation | KeepaliveOperation | AuthOperation)
64 return getOperationType(requestType -> requestType.operationClazz == operation.getClass());
65 }
66
67 /**
68 * 函数式编程
69 * @param predicate 接收⼀个参数,⽤于判断是否满⾜⼀定的条件,过滤数据.
70 * @return
71 */
72 private static OperationType getOperationType(Predicate<OperationType> predicate){
73 OperationType[] values = values();
74 for (OperationType operationType : values) {
75 if(predicate.test(operationType)){
76 return operationType;
77 }
78 }
79 //遍历找不到就退出
80 throw new AssertionError("no found type");
81 }
82
83}
Operation
1public abstract class Operation extends MessageBody{
2
3 public abstract OperationResult execute();
4
5}
OperationResult
1import lombok.Data;
2
3@Data
4public abstract class OperationResult extends MessageBody{
5
6}
AuthOperation
1import io.netty.example.study.common.Operation;
2import lombok.Data;
3import lombok.extern.java.Log;
4
5@Data
6@Log
7public class AuthOperation extends Operation {
8
9 private final String userName;
10 private final String password;
11
12 @Override
13 public AuthOperationResult execute() {
14 if("admin".equalsIgnoreCase(this.userName)){
15 AuthOperationResult orderResponse = new AuthOperationResult(true);
16 return orderResponse;
17 }
18
19 return new AuthOperationResult(false);
20 }
21}
AuthOperationResult
1import io.netty.example.study.common.OperationResult;
2import lombok.Data;
3
4@Data
5public class AuthOperationResult extends OperationResult {
6
7 private final boolean passAuth;
8
9}
KeepaliveOperation
1import io.netty.example.study.common.Operation;
2import lombok.Data;
3import lombok.extern.java.Log;
4
5@Data
6@Log
7public class KeepaliveOperation extends Operation {
8
9 private long time ;
10
11 public KeepaliveOperation() {
12 this.time = System.nanoTime();
13 }
14
15 @Override
16 public KeepaliveOperationResult execute() {
17 KeepaliveOperationResult orderResponse = new KeepaliveOperationResult(time);
18 return orderResponse;
19 }
20}
KeepaliveOperationResult
1import io.netty.example.study.common.OperationResult;
2import lombok.Data;
3
4@Data
5public class KeepaliveOperationResult extends OperationResult {
6 private final long time;
7}
OrderOperation
1import io.netty.example.study.common.Operation;
2import lombok.Data;
3
4@Data
5public class OrderOperation extends Operation {
6
7 private int tableId;
8 private String dish;
9
10 public OrderOperation(int tableId, String dish) {
11 this.tableId = tableId;
12 this.dish = dish;
13 }
14
15 @Override
16 public OrderOperationResult execute() {
17 System.out.println("order's executing startup with orderRequest: " + toString());
18 //execute order logic
19 System.out.println("order's executing complete");
20 OrderOperationResult orderResponse = new OrderOperationResult(tableId, dish, true);
21 return orderResponse;
22 }
23}
OrderOperationResult
1import io.netty.example.study.common.OperationResult;
2import lombok.Data;
3
4@Data
5public class OrderOperationResult extends OperationResult {
6 private final int tableId;
7 private final String dish;
8 private final boolean complete;
9}
IdUtil
IdUtil
1import java.util.concurrent.atomic.AtomicLong;
2
3public final class IdUtil {
4
5 private static final AtomicLong IDX = new AtomicLong();
6
7 private IdUtil(){
8 //no instance
9 }
10
11 /**
12 * 生成 streamId 自增
13 * @return
14 */
15 public static long nextId(){
16 return IDX.incrementAndGet();
17 }
18
19}
JsonUtil
1public final class JsonUtil {
2
3 private static final Gson GSON = new Gson();
4
5 private JsonUtil() {
6 //no instance
7 }
8
9 public static <T> T fromJson(String jsonStr, Class<T> clazz){
10 return GSON.fromJson(jsonStr, clazz);
11 }
12
13 public static String toJson(Object object){
14 return GSON.toJson(object);
15 }
16
17}
服务端
OrderFrameDecoder (一次解码:得出一个没有粘包、半包问题的 ByteBuf,ByteBuf => ByteBuf)
1import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2
3public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
4 public OrderFrameDecoder() {
5 super(Integer.MAX_VALUE, 0, 2, 0, 2);
6 }
7}
OrderProtocolDecoder (二次解码:ByteBuf => RequestMessage)
1public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
2 @Override
3 protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
4 RequestMessage requestMessage = new RequestMessage();
5 requestMessage.decode(byteBuf);
6 //将解码后的数据传递出去
7 out.add(requestMessage);
8 }
9}
OrderServerProcessHandler
1import io.netty.channel.ChannelHandlerContext;
2import io.netty.channel.SimpleChannelInboundHandler;
3import io.netty.example.study.common.Operation;
4import io.netty.example.study.common.OperationResult;
5import io.netty.example.study.common.RequestMessage;
6import io.netty.example.study.common.ResponseMessage;
7
8/**
9 * SimpleChannelInboundHandler VS ChannelInboundHandlerAdapter
10 * 前者会自动释放自动释放 解码阶段使用的 ByteBuf(内存池 Or 堆外内存)
11 */
12public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
13 @Override
14 protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
15 //Operation = OrderOperation (MessageBody => Operation)
16 Operation operation = requestMessage.getMessageBody();
17 //OperationResult = OrderOperationResult (执行 Operation 得到 OperationResult)
18 OperationResult operationResult = operation.execute();
19
20 //构建 ResponseMessage = MessageHead + MessageResult
21 ResponseMessage responseMessage = new ResponseMessage();
22 responseMessage.setMessageHeader(requestMessage.getMessageHeader());
23 responseMessage.setMessageBody(operationResult);
24
25 //将业务处理后的结果(RequestMessage)返回客户端 (进入 OrderProtocolEncoder 、OrderFrameEncoder)
26 ctx.writeAndFlush(responseMessage);
27 }
28}
OrderProtocolEncoder (编码:RequestMessage => ByteBuf)
1import java.util.List;
2
3import io.netty.buffer.ByteBuf;
4import io.netty.channel.ChannelHandlerContext;
5import io.netty.example.study.common.ResponseMessage;
6import io.netty.handler.codec.MessageToMessageEncoder;
7
8/**
9 * 泛型 代表 Input 类型ResponseMessage
10 */
11public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {
12 @Override
13 protected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage, List<Object> out) throws Exception {
14 //先申请(分配)一个 ByteBuf
15 ByteBuf buffer = ctx.alloc().buffer();
16 //将 ResponseMessage encode 到 ByteBuf
17 responseMessage.encode(buffer);
18 //完成到 ResponseMessage => ByteBuf
19 out.add(buffer);
20 }
21}
OrderFrameEncoder (编码:得出一个没有粘包、半包问题的 ByteBuf (方便客户端处理 TCP 半包和粘包) ByteBuf => ByteBuf)
1import io.netty.handler.codec.LengthFieldPrepender;
2
3/**
4 *
5 */
6public class OrderFrameEncoder extends LengthFieldPrepender {
7 public OrderFrameEncoder() {
8 super(2);
9 }
10}
Server
1import io.netty.bootstrap.ServerBootstrap;
2import io.netty.channel.ChannelFuture;
3import io.netty.channel.ChannelInitializer;
4import io.netty.channel.ChannelPipeline;
5import io.netty.channel.nio.NioEventLoopGroup;
6import io.netty.channel.socket.nio.NioServerSocketChannel;
7import io.netty.channel.socket.nio.NioSocketChannel;
8import io.netty.example.study.server.codec.OrderFrameDecoder;
9import io.netty.example.study.server.codec.OrderFrameEncoder;
10import io.netty.example.study.server.codec.OrderProtocolDecoder;
11import io.netty.example.study.server.codec.OrderProtocolEncoder;
12import io.netty.example.study.server.handler.OrderServerProcessHandler;
13import io.netty.handler.logging.LogLevel;
14import io.netty.handler.logging.LoggingHandler;
15
16import java.util.concurrent.ExecutionException;
17
18public class Server {
19
20 public static void main(String[] args) throws InterruptedException, ExecutionException {
21
22 //启动类
23 ServerBootstrap serverBootstrap = new ServerBootstrap();
24 //设置 NioServerSocketChannel 模式
25 serverBootstrap.channel(NioServerSocketChannel.class);
26 //打印日志
27 serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
28 //创建 NioEventLoopGroup
29 NioEventLoopGroup group = new NioEventLoopGroup();
30 try{
31 //绑定 NioEventLoopGroup
32 serverBootstrap.group(group);
33 //handler()是发生在初始化的时候,childHandler()是发生在客户端连接之后。
34 //如果需要在客户端连接前的请求进行handler处理,则需要配置handler()
35 //如果是处理客户端连接之后的handler,则需要配置在childHandler()。
36 serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
37 @Override
38 protected void initChannel(NioSocketChannel ch) throws Exception {
39 ChannelPipeline pipeline = ch.pipeline();
40
41 //解码正序,编码逆序
42 pipeline.addLast(new OrderFrameDecoder()); //first decode
43 pipeline.addLast(new OrderFrameEncoder()); //first encode
44 pipeline.addLast(new OrderProtocolEncoder()); //seconds encode
45 pipeline.addLast(new OrderProtocolDecoder()); //seconds decode
46 pipeline.addLast(new OrderServerProcessHandler()); //Handler
47
48 pipeline.addLast(new LoggingHandler(LogLevel.INFO));
49 }
50 });
51
52 //启动并同步等待完成
53 ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
54
55 channelFuture.channel().closeFuture().sync();
56 } finally {
57 group.shutdownGracefully();
58 }
59 }
60}
客户端
OrderProtocolEncoder (RequestMessage => ByteBuf)
1import io.netty.buffer.ByteBuf;
2import io.netty.channel.ChannelHandlerContext;
3import io.netty.example.study.common.RequestMessage;
4import io.netty.handler.codec.MessageToMessageEncoder;
5
6import java.util.List;
7
8public class OrderProtocolEncoder extends MessageToMessageEncoder<RequestMessage> {
9 @Override
10 protected void encode(ChannelHandlerContext ctx, RequestMessage requestMessage, List<Object> out) throws Exception {
11 //分配一个 ByteBuf
12 ByteBuf buffer = ctx.alloc().buffer();
13 //将 RequestMessage encode 到 ByteBuf 中
14 requestMessage.encode(buffer);
15 out.add(buffer);
16 }
17}
OrderFrameEncoder (ByteBuf => ByteBuf (解决 TCP 粘包、半包))
1import io.netty.handler.codec.LengthFieldPrepender;
2
3public class OrderFrameEncoder extends LengthFieldPrepender {
4 public OrderFrameEncoder() {
5 super(2);
6 }
7}
OrderFrameDecoder (ByteBuf => ByteBuf (解决 TCP 粘包、半包))
1public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
2 public OrderFrameDecoder() {
3 super(Integer.MAX_VALUE, 0, 2, 0, 2);
4 }
5}
OrderProtocolDecoder (ByteBuf => ResponseMessage)
1import java.util.List;
2
3import io.netty.buffer.ByteBuf;
4import io.netty.channel.ChannelHandlerContext;
5import io.netty.example.study.common.ResponseMessage;
6import io.netty.handler.codec.MessageToMessageDecoder;
7
8/**
9 * 泛型 代表 input 类型 ByteBuf
10 */
11public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
12 @Override
13 protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
14 ResponseMessage responseMessage = new ResponseMessage();
15 //将 ByteBuf => ResponseMessage
16 responseMessage.decode(byteBuf);
17 out.add(responseMessage);
18 }
19}
20
OperationToRequestMessageEncoder (负责)
1import io.netty.channel.ChannelHandlerContext;
2import io.netty.example.study.common.Operation;
3import io.netty.example.study.common.RequestMessage;
4import io.netty.example.study.util.IdUtil;
5import io.netty.handler.codec.MessageToMessageEncoder;
6
7import java.util.List;
8
9/**
10 * 泛型 代表 input 类型 Operation
11 * 负责 OrderOperation => RequestMessage
12 */
13public class OperationToRequestMessageEncoder extends MessageToMessageEncoder <Operation> {
14 @Override
15 protected void encode(ChannelHandlerContext ctx, Operation operation, List<Object> out) throws Exception {
16 RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), operation);
17
18 out.add(requestMessage);
19 }
20}
ClientV1 (无法能到响应结果,只能从日志中打印看到)
1import io.netty.bootstrap.Bootstrap;
2import io.netty.channel.ChannelFuture;
3import io.netty.channel.ChannelInitializer;
4import io.netty.channel.ChannelPipeline;
5import io.netty.channel.nio.NioEventLoopGroup;
6import io.netty.channel.socket.nio.NioSocketChannel;
7import io.netty.example.study.client.codec.*;
8import io.netty.example.study.common.order.OrderOperation;
9import io.netty.handler.logging.LogLevel;
10import io.netty.handler.logging.LoggingHandler;
11
12import java.util.concurrent.ExecutionException;
13
14public class ClientV1 {
15
16 public static void main(String[] args) throws InterruptedException, ExecutionException {
17
18 Bootstrap bootstrap = new Bootstrap();
19 bootstrap.channel(NioSocketChannel.class);
20
21 NioEventLoopGroup group = new NioEventLoopGroup();
22 try{
23 bootstrap.group(group);
24
25 bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
26 @Override
27 protected void initChannel(NioSocketChannel ch) throws Exception {
28 ChannelPipeline pipeline = ch.pipeline();
29 pipeline.addLast(new OrderFrameDecoder());
30 pipeline.addLast(new OrderFrameEncoder());
31
32 pipeline.addLast(new OrderProtocolEncoder());
33 pipeline.addLast(new OrderProtocolDecoder());
34
35 pipeline.addLast(new OperationToRequestMessageEncoder());
36
37 pipeline.addLast(new LoggingHandler(LogLevel.INFO));
38 }
39 });
40
41 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090);
42
43 channelFuture.sync();
44
45 OrderOperation orderOperation = new OrderOperation(1001, "tudou");
46
47 channelFuture.channel().writeAndFlush(orderOperation);
48
49 channelFuture.channel().closeFuture().sync();
50
51 } finally {
52 group.shutdownGracefully();
53 }
54 }
55
56}
完善客户端(响应分发)
通过 streamId 来实现第二种响应分发的方式
1* 发起请求,立刻返回 Future
2* 将 Future 存入 Map 中,Key 为 streamId
3* Response 到达客户端先去 Map 中根据 streamID 查找对应的 Future 对象,将 Response 设置到 Future 中
4* 调用该 Future 对象的 get() 方法 :future.get()
OperationResultFuture
1import io.netty.example.study.common.OperationResult;
2import io.netty.util.concurrent.DefaultPromise;
3
4/**
5 * JDK Future不好用,使用 Netty 的 DefaultPromise
6 * V 代表 OperationResult
7 */
8public class OperationResultFuture extends DefaultPromise<OperationResult> {
9}
RequestPendingCenter
1import io.netty.example.study.common.OperationResult;
2
3import java.util.Map;
4import java.util.concurrent.ConcurrentHashMap;
5
6/**
7 * 请求等待中心
8 */
9public class RequestPendingCenter {
10
11 private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();
12
13 /**
14 * 将 Future 存入 Map 中,Key 为 streamId;
15 * @param streamId
16 * @param future
17 */
18 public void add(Long streamId, OperationResultFuture future){
19 this.map.put(streamId, future);
20 }
21
22 /**
23 * 根据 streamID 查找对应的 Future 对象,将 Response(OperationResultFuture) 设置到 Future 中
24 * @param streamId
25 * @param operationResult
26 */
27 public void set(Long streamId, OperationResult operationResult){
28 OperationResultFuture operationResultFuture = this.map.get(streamId);
29 if (operationResultFuture != null) { //判断 OperationResultFuture 存在才设置
30 operationResultFuture.setSuccess(operationResult);
31 this.map.remove(streamId); //从 map 中移除此 key-value
32 }
33 }
34}
ResponseDispatcherHandler
1import io.netty.channel.ChannelHandlerContext;
2import io.netty.channel.SimpleChannelInboundHandler;
3import io.netty.example.study.common.ResponseMessage;
4
5/**
6 * 响应分发Handler
7 */
8public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {
9
10 private RequestPendingCenter requestPendingCenter;
11
12 /**
13 * 构造方法注入 RequestPendingCenter
14 * @param requestPendingCenter
15 */
16 public ResponseDispatcherHandler(RequestPendingCenter requestPendingCenter) {
17 this.requestPendingCenter = requestPendingCenter;
18 }
19
20 /**
21 * 解析 ResponseMessage 根据 streamId 找到 Map 指定 key 对应的 Future 对象
22 * 调用
23 * @param ctx
24 * @param responseMessage
25 * @throws Exception
26 */
27 @Override
28 protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage responseMessage) throws Exception {
29 requestPendingCenter.set(responseMessage.getMessageHeader().getStreamId(), responseMessage.getMessageBody());
30 }
31}
ClientV2
1import io.netty.bootstrap.Bootstrap;
2import io.netty.channel.ChannelFuture;
3import io.netty.channel.ChannelInitializer;
4import io.netty.channel.ChannelPipeline;
5import io.netty.channel.nio.NioEventLoopGroup;
6import io.netty.channel.socket.nio.NioSocketChannel;
7import io.netty.example.study.client.codec.*;
8import io.netty.example.study.client.handler.dispatcher.OperationResultFuture;
9import io.netty.example.study.client.handler.dispatcher.RequestPendingCenter;
10import io.netty.example.study.client.handler.dispatcher.ResponseDispatcherHandler;
11import io.netty.example.study.common.OperationResult;
12import io.netty.example.study.common.RequestMessage;
13import io.netty.example.study.common.order.OrderOperation;
14import io.netty.example.study.util.IdUtil;
15import io.netty.handler.logging.LogLevel;
16import io.netty.handler.logging.LoggingHandler;
17
18import java.util.concurrent.ExecutionException;
19
20public class ClientV2 {
21
22 public static void main(String[] args) throws InterruptedException, ExecutionException {
23
24 Bootstrap bootstrap = new Bootstrap();
25 bootstrap.channel(NioSocketChannel.class);
26
27 NioEventLoopGroup group = new NioEventLoopGroup();
28 try{
29 bootstrap.group(group);
30 //请求等待中心
31 RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
32
33 bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
34 @Override
35 protected void initChannel(NioSocketChannel ch) throws Exception {
36 ChannelPipeline pipeline = ch.pipeline();
37 pipeline.addLast(new OrderFrameDecoder());
38 pipeline.addLast(new OrderFrameEncoder());
39
40 pipeline.addLast(new OrderProtocolEncoder());
41 pipeline.addLast(new OrderProtocolDecoder());
42
43 // ResponseDispatcherHandler 需要注入 RequestPendingCenter
44 // 调用 RequestPendingCenter.set(){ operationResultFuture.setSuccess(operationResult); }
45 pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
46
47 pipeline.addLast(new OperationToRequestMessageEncoder());
48
49 pipeline.addLast(new LoggingHandler(LogLevel.INFO));
50 }
51 });
52
53 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090);
54
55 channelFuture.sync();
56
57 //生成唯一 streamID
58 long streamId = IdUtil.nextId();
59
60 //请求数据 RequestMessage
61 RequestMessage requestMessage = new RequestMessage(
62 streamId, new OrderOperation(1001, "tudou"));
63
64 //操作结果占位符 OperationResultFuture
65 OperationResultFuture operationResultFuture = new OperationResultFuture();
66
67 //发送数据之前将数据,将请求根据 streamId 存入 Map中
68 requestPendingCenter.add(streamId, operationResultFuture);
69
70 //发送数据
71 channelFuture.channel().writeAndFlush(requestMessage);
72
73 //Future 阻塞直到
74 OperationResult operationResult = operationResultFuture.get();
75
76 //打印结果
77 System.out.println(operationResult);
78
79 channelFuture.channel().closeFuture().sync();
80 } finally {
81 group.shutdownGracefully();
82 }
83 }
84}
控制台输出
1# 客户端
2四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRegistered
3信息: [id: 0xd3e0b866] REGISTERED
4四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler connect
5信息: [id: 0xd3e0b866] CONNECT: /127.0.0.1:8090
6四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelActive
7信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] ACTIVE
8四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler write
9信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] WRITE: Message(messageHeader=MessageHeader(version=1, opCode=3, streamId=1), messageBody=OrderOperation(tableId=1001, dish=tudou))
10四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler flush
11信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] FLUSH
12OrderOperationResult(tableId=1001, dish=tudou, complete=true)
13四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
14信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] READ COMPLETE
15
16
17# 服务器端
18四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler channelRegistered
19信息: [id: 0xf9e86570] REGISTERED
20四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler bind
21信息: [id: 0xf9e86570] BIND: 0.0.0.0/0.0.0.0:8090
22四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler channelActive
23信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] ACTIVE
24四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRead
25信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] READ: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564]
26四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
27信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] READ COMPLETE
28四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRegistered
29信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] REGISTERED
30四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelActive
31信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] ACTIVE
32order's executing startup with orderRequest: OrderOperation(tableId=1001, dish=tudou)
33order's executing complete
34四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
35信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] READ COMPLETE
Netty 日志的原理以使用
Netty 日志框架原理
增加所有 log 依赖(option),但是没有 Jar 包,动态判断具体使用哪个 Log。
修改 JDK logger 级别
E:\Tools\JDK8\jre\lib\logging.properties
1.level= FINE //JDK没有DEBUG级别 只有FINE
使用 slf4j + log4j 示例
加上 pom.xml 依赖就可以使用具体的 log 日志框架。
&emps;还需要添加 log4j 配置。
pom.xml
1 <dependency>
2 <groupId>org.slf4j</groupId>
3 <artifactId>slf4j-log4j12</artifactId>
4 <version>1.7.22</version>
5 </dependency>
log4j.properties
1log4j.rootLogger=info,console
2log4j.appender.console=org.apache.log4j.ConsoleAppender
3log4j.appender.console.layout=org.apache.log4j.PatternLayout
4log4j.appender.console.layout.ConversionPattern=%d{HH🇲🇲ss} [%t] %C{1}: %m%n
衡量好 logging handler 的位置和级别
1 ChannelPipeline pipeline = ch.pipeline();
2 pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
3 ......
让应用易追踪
完善 “线程名”
1 //创建 NioEventLoopGroup
2 NioEventLoopGroup boss = new NioEventLoopGroup(0,new DefaultThreadFactory("boss"));
3 NioEventLoopGroup work = new NioEventLoopGroup(0,new DefaultThreadFactory("work"));
4 try{
5 //绑定 NioEventLoopGroup
6 serverBootstrap.group(boss,work);
7 ....
8 }
完善 “Handler” 名称
1pipeline.addLast("frameDecoder",new OrderFrameDecoder()); //first decode
使用好 Netty 的日志
跟踪诊断:可视化
示例:统计并展示当前那系统连接数
1* Console 日志定时输出
2* JMX 实时展示
3* ELKK、TIG、etc (复杂,需要各种软件配合)
引入依赖 (度量)
1 <dependency>
2 <groupId>io.dropwizard.metrics</groupId>
3 <artifactId>metrics-core</artifactId>
4 <version>4.1.1</version>
5 </dependency>
6 <dependency>
7 <groupId>io.dropwizard.metrics</groupId>
8 <artifactId>metrics-jmx</artifactId>
9 <version>4.1.1</version>
10 </dependency>
MetricsHandler
1import com.codahale.metrics.ConsoleReporter;
2import com.codahale.metrics.Gauge;
3import com.codahale.metrics.MetricRegistry;
4import com.codahale.metrics.jmx.JmxReporter;
5import io.netty.channel.ChannelDuplexHandler;
6import io.netty.channel.ChannelHandler;
7import io.netty.channel.ChannelHandlerContext;
8
9import java.util.concurrent.TimeUnit;
10import java.util.concurrent.atomic.AtomicLong;
11
12
13@ChannelHandler.Sharable //多个线程共享
14/**
15 * ChannelDuplexHandler 同时支持 Input和 Output 的Handler
16 */
17public class MetricsHandler extends ChannelDuplexHandler {
18
19 //记录当前连接数
20 private AtomicLong totalConnectionNumber = new AtomicLong();
21
22 //代码块
23 {
24 MetricRegistry metricRegistry = new MetricRegistry();
25 //name可以采用同名,定义 metrics 类型 Long
26 metricRegistry.register("totalConnectionNumber", new Gauge<Long>() {
27 @Override
28 public Long getValue() {
29 return totalConnectionNumber.longValue();
30 }
31 });
32
33 //console 方式展示数据
34 ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
35 consoleReporter.start(10, TimeUnit.SECONDS);
36
37 //jmx 格式展示数据
38 JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
39 jmxReporter.start();
40 }
41
42 /**
43 * 创建连接 ++totalConnectionNumber
44 * @param ctx
45 * @throws Exception
46 */
47 @Override
48 public void channelActive(ChannelHandlerContext ctx) throws Exception {
49 totalConnectionNumber.incrementAndGet();
50 super.channelActive(ctx);
51 }
52
53 /**
54 * 释放连接 --totalConnectionNumber
55 * @param ctx
56 * @throws Exception
57 */
58 @Override
59 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
60 totalConnectionNumber.decrementAndGet();
61 super.channelInactive(ctx);
62 }
63}
添加 Handler(度量)
1pipeline.addLast("metricHandler",new MetricsHandler());
Netty 值得可视化的数据 - “外在”
Netty 值得可视化的数据 –“内在”
跟踪诊断:让应用内存不“泄露”
Netty 内存泄漏是指什么?
原因:“忘记”release
1ByteBuf buffer = ctx.alloc().buffer();
2buffer.release() / ReferenceCountUtil.release(buffer) //忘记 release
后果:资源未释放 -> OOM
1* 堆外:未 free(PlatformDependent.freeDirectBuffer(buffer));
2* 池化:未归还 (recyclerHandle.recycle(this))
Netty 内存泄漏检测核心思路
Netty 内存泄漏检测核心思路:引用计数(buffer.refCnt())+ 弱引用(Weak reference)
• 引用计数
1 +1, 过 -1, = 0 时:尘归尘,土归土,资源也该释放了
2 那什么时候判断?“盖棺定论”时 -> 对象被 GC 后
• 强引用与弱引用
1#强引用
2 String 我是战斗英雄型强保镖 = new String(我是主人));
3
4# 弱引用
5 WeakReference<String> 我是爱写作的弱保镖 = new WeakReference<String>(new String(我是主人));
6 只有一个爱写作的保镖(弱引用)守护(引用)时:刺客(GC)来袭,主人(referent)必挂(GC掉)。
7 不过主人挂掉的(被 GC 掉)时候,我还是可以发挥写作特长:把我自己记到“小本本(ReferenceQueue)”上去。
8
9# WeakReference<String> 我是爱写作的弱保镖 = new WeakReference<String>(new String(我是主人),我的小本本ReferenceQueue);
示例:用 Netty 内存泄漏检测工具做检测