目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Netty in Order Service

编写网络应用程序基本步骤

编写网络应用程序基本步骤

上线之前复查

案例介绍

案例

  • AuthOperation: 授权操作
  • OrderOperation:点单操作
  • KeepaliveOperation:保持连接操作

数据结构设计

数据结构设计

Message Header

1* version	协议版本
2* opCode	不同的opCode,Message Body 对应的 operation/operation result 也是不同的,在JSON 解析时需要根据 opCode 解析到不同的 Message Body 类型。
3* streamId	相当于Message ID,标识信息的唯一 ID

Message Body(JSON)

1* operation	操作
2* operation result	操作结果

length

1* length	长度信息值得是整个Message(Header + Body)

 套接字(socket)是通信的基石,是支持 TCP/IP 协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议,本地主机的 IP 地址,本地进程的协议端口,远地主机的 IP 地址,远地进程的协议端口。

Netty 对三种常用封侦方式的支持
数据分层
Netty 对三种常用封侦方式的支持

1* 选择协议	TCP	粘包、半包(封帧)

项目依赖

pom.xml

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0"
 3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5    <modelVersion>4.0.0</modelVersion>
 6
 7    <groupId>com.learning.netty</groupId>
 8    <artifactId>netty-research</artifactId>
 9    <version>1.0-SNAPSHOT</version>
10    <build>
11        <plugins>
12            <plugin>
13                <groupId>org.apache.maven.plugins</groupId>
14                <artifactId>maven-compiler-plugin</artifactId>
15                <configuration>
16                    <source>8</source>
17                    <target>8</target>
18                </configuration>
19            </plugin>
20        </plugins>
21    </build>
22
23    <dependencies>
24        <dependency>
25            <groupId>io.netty</groupId>
26            <artifactId>netty-all</artifactId>
27            <version>4.1.39.Final</version>
28        </dependency>
29        <dependency>
30            <groupId>org.projectlombok</groupId>
31            <artifactId>lombok</artifactId>
32            <version>1.16.18</version>
33        </dependency>
34        <dependency>
35            <groupId>com.google.code.gson</groupId>
36            <artifactId>gson</artifactId>
37            <version>2.8.5</version>
38        </dependency>
39        <dependency>
40            <groupId>com.google.guava</groupId>
41            <artifactId>guava</artifactId>
42            <version>19.0</version>
43        </dependency>
44    </dependencies>
45</project>

Message

Message

 1import io.netty.buffer.ByteBuf;
 2import io.netty.example.study.util.JsonUtil;
 3import lombok.Data;
 4
 5import java.nio.charset.Charset;
 6
 7@Data
 8public abstract class Message<T extends MessageBody> {
 9    //MessageHeader
10    private MessageHeader messageHeader;
11  
12    //MessageBody:Operation | OperationResult
13    private T messageBody;
14
15    /**
16     * 获取 messageBody
17     * @return
18     */
19    public T getMessageBody(){
20        return messageBody;
21    }
22
23    /**
24     * 抽象方法,获取 MessageBodyDecodeClass
25     * @param opcode
26     * @return
27     */
28    public abstract Class<T> getMessageBodyDecodeClass(int opcode);
29
30    /**
31     * 编码
32     * @param byteBuf
33     */
34    public void encode(ByteBuf byteBuf) {
35        //Message Header
36        byteBuf.writeInt(messageHeader.getVersion());
37        byteBuf.writeLong(messageHeader.getStreamId());
38        byteBuf.writeInt(messageHeader.getOpCode());
39        //Message Body
40        byteBuf.writeBytes(JsonUtil.toJson(messageBody).getBytes());
41    }
42
43    /**
44     * 解码
45     * @param msg
46     */
47    public void decode(ByteBuf msg) {
48        //Message Header
49        int version = msg.readInt();
50        long streamId = msg.readLong();
51        int opCode = msg.readInt();
52
53        MessageHeader messageHeader = new MessageHeader();
54        messageHeader.setVersion(version);
55        messageHeader.setOpCode(opCode);
56        messageHeader.setStreamId(streamId);
57
58        this.messageHeader = messageHeader;
59
60        //Message Body
61        Class<T> bodyClazz = getMessageBodyDecodeClass(opCode);
62        T body = JsonUtil.fromJson(msg.toString(Charset.forName("UTF-8")), bodyClazz);
63        this.messageBody = body;
64    }
65}

MessageHeader

1@Data
2public class MessageHeader {
3    private int version = 1;
4    private int opCode;
5    private long streamId;
6}

MessageBody

1public abstract class MessageBody {
2}

RequestMessage

 1public class RequestMessage extends Message<Operation>{
 2    @Override
 3    public Class getMessageBodyDecodeClass(int opcode) {
 4        //获取指定的 Operation
 5        return OperationType.fromOpCode(opcode).getOperationClazz();
 6    }
 7
 8    public RequestMessage(){}
 9
10    /**
11     * 构造方法:构造 RequestMessage
12     * @param streamId
13     * @param operation
14     */
15    public RequestMessage(Long streamId, Operation operation){
16        MessageHeader messageHeader = new MessageHeader();
17        messageHeader.setStreamId(streamId);
18        messageHeader.setOpCode(OperationType.fromOperation(operation).getOpCode());
19
20        //为 Message 设置 MessageHeader
21        this.setMessageHeader(messageHeader);
22        //为 Message 设置 MessageBody
23        this.setMessageBody(operation);
24    }
25}

ResponseMessage

1public class ResponseMessage extends Message <OperationResult>{
2    @Override
3    public Class getMessageBodyDecodeClass(int opcode) {
4        //获取指定的 OperationResult
5        return OperationType.fromOpCode(opcode).getOperationResultClazz();
6    }
7}

Operation / OperationResult

OperationType (枚举类)

 1import io.netty.example.study.common.order.OrderOperation;
 2import io.netty.example.study.common.order.OrderOperationResult;
 3import io.netty.example.study.common.auth.AuthOperation;
 4import io.netty.example.study.common.auth.AuthOperationResult;
 5import io.netty.example.study.common.keepalive.KeepaliveOperation;
 6import io.netty.example.study.common.keepalive.KeepaliveOperationResult;
 7
 8import java.util.function.Predicate;
 9
10public enum OperationType {
11    //定义枚举
12    AUTH(1, AuthOperation.class, AuthOperationResult.class),
13    KEEPALIVE(2, KeepaliveOperation.class, KeepaliveOperationResult.class),
14    ORDER(3, OrderOperation.class, OrderOperationResult.class);
15
16    //枚举类无法显式为枚举中的常量显式赋值
17    private int opCode;
18    private Class<? extends Operation> operationClazz;
19    private Class<? extends OperationResult> operationResultClazz;
20
21    /**
22     * 枚举类的构造方法 :通过构造方法为枚举中的常量显式赋值
23     * @param opCode
24     * @param operationClazz
25     * @param responseClass
26     */
27    OperationType(int opCode, Class<? extends Operation> operationClazz, Class<? extends OperationResult> responseClass) {
28        this.opCode = opCode;
29        this.operationClazz = operationClazz;
30        this.operationResultClazz = responseClass;
31    }
32
33    //获取 opCode
34    public int getOpCode(){
35        return opCode;
36    }
37
38    //获取 operationClazz
39    public Class<? extends Operation> getOperationClazz() {
40        return operationClazz;
41    }
42
43    //获取 operationResultClazz
44    public Class<? extends OperationResult> getOperationResultClazz() {
45        return operationResultClazz;
46    }
47
48    /**
49     * 根据 opCode 获取对应的枚举
50     * @param type
51     * @return
52     */
53    public static OperationType fromOpCode(int type){
54        return getOperationType(requestType -> requestType.opCode == type);
55    }
56
57    /**
58     * 根据 operation 获取对应的枚举
59     * @param operation
60     * @return
61     */
62    public static OperationType fromOperation(Operation operation){
63        //当前遍历的 OperationType 实例的 operationClazz 属性等于  入参的操作类型 (OrderOperation | KeepaliveOperation | AuthOperation)
64        return getOperationType(requestType -> requestType.operationClazz == operation.getClass());
65    }
66
67    /**
68     * 函数式编程
69     * @param predicate  接收⼀个参数,⽤于判断是否满⾜⼀定的条件,过滤数据.
70     * @return
71     */
72    private static OperationType getOperationType(Predicate<OperationType> predicate){
73        OperationType[] values = values();
74        for (OperationType operationType : values) {
75            if(predicate.test(operationType)){
76                return operationType;
77            }
78        }
79        //遍历找不到就退出
80        throw new AssertionError("no found type");
81    }
82
83}

Operation

1public abstract class Operation extends MessageBody{
2
3    public abstract OperationResult execute();
4
5}

OperationResult

1import lombok.Data;
2
3@Data
4public abstract class OperationResult extends MessageBody{
5
6}

AuthOperation

 1import io.netty.example.study.common.Operation;
 2import lombok.Data;
 3import lombok.extern.java.Log;
 4
 5@Data
 6@Log
 7public class AuthOperation extends Operation {
 8
 9    private final String userName;
10    private final String password;
11
12    @Override
13    public AuthOperationResult execute() {
14        if("admin".equalsIgnoreCase(this.userName)){
15            AuthOperationResult orderResponse = new AuthOperationResult(true);
16            return orderResponse;
17        }
18
19        return new AuthOperationResult(false);
20    }
21}

AuthOperationResult

1import io.netty.example.study.common.OperationResult;
2import lombok.Data;
3
4@Data
5public class AuthOperationResult extends OperationResult {
6
7    private final boolean passAuth;
8
9}

KeepaliveOperation

 1import io.netty.example.study.common.Operation;
 2import lombok.Data;
 3import lombok.extern.java.Log;
 4
 5@Data
 6@Log
 7public class KeepaliveOperation extends Operation {
 8
 9    private long time ;
10
11    public KeepaliveOperation() {
12        this.time = System.nanoTime();
13    }
14
15    @Override
16    public KeepaliveOperationResult execute() {
17        KeepaliveOperationResult orderResponse = new KeepaliveOperationResult(time);
18        return orderResponse;
19    }
20}

KeepaliveOperationResult

1import io.netty.example.study.common.OperationResult;
2import lombok.Data;
3
4@Data
5public class KeepaliveOperationResult extends OperationResult {
6    private final long time;
7}

OrderOperation

 1import io.netty.example.study.common.Operation;
 2import lombok.Data;
 3
 4@Data
 5public class OrderOperation extends Operation {
 6
 7    private int tableId;
 8    private String dish;
 9
10    public OrderOperation(int tableId, String dish) {
11        this.tableId = tableId;
12        this.dish = dish;
13    }
14
15    @Override
16    public OrderOperationResult execute() {
17        System.out.println("order's executing startup with orderRequest: " + toString());
18        //execute order logic
19        System.out.println("order's executing complete");
20        OrderOperationResult orderResponse = new OrderOperationResult(tableId, dish, true);
21        return orderResponse;
22    }
23}

OrderOperationResult

1import io.netty.example.study.common.OperationResult;
2import lombok.Data;
3
4@Data
5public class OrderOperationResult extends OperationResult {
6    private final int tableId;
7    private final String dish;
8    private final boolean complete;
9}

IdUtil
IdUtil

 1import java.util.concurrent.atomic.AtomicLong;
 2
 3public final class IdUtil {
 4
 5    private static final AtomicLong IDX = new AtomicLong();
 6
 7    private IdUtil(){
 8        //no instance
 9    }
10
11    /**
12     * 生成 streamId 自增
13     * @return
14     */
15    public static long nextId(){
16        return IDX.incrementAndGet();
17    }
18
19}

JsonUtil

 1public final class JsonUtil {
 2
 3    private static final Gson GSON = new Gson();
 4
 5    private JsonUtil() {
 6        //no instance
 7    }
 8
 9    public static <T> T fromJson(String jsonStr, Class<T> clazz){
10        return GSON.fromJson(jsonStr, clazz);
11    }
12
13    public static String toJson(Object object){
14        return GSON.toJson(object);
15    }
16
17}

服务端

OrderFrameDecoder (一次解码:得出一个没有粘包、半包问题的 ByteBuf,ByteBuf => ByteBuf)

1import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2
3public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
4    public OrderFrameDecoder() {
5        super(Integer.MAX_VALUE, 0, 2, 0, 2);
6    }
7}

OrderProtocolDecoder (二次解码:ByteBuf => RequestMessage)

1public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
2    @Override
3    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
4        RequestMessage requestMessage = new RequestMessage();
5        requestMessage.decode(byteBuf);
6	//将解码后的数据传递出去
7        out.add(requestMessage);
8    }
9}

OrderServerProcessHandler

 1import io.netty.channel.ChannelHandlerContext;
 2import io.netty.channel.SimpleChannelInboundHandler;
 3import io.netty.example.study.common.Operation;
 4import io.netty.example.study.common.OperationResult;
 5import io.netty.example.study.common.RequestMessage;
 6import io.netty.example.study.common.ResponseMessage;
 7
 8/**
 9 *  SimpleChannelInboundHandler VS ChannelInboundHandlerAdapter
10 *  前者会自动释放自动释放 解码阶段使用的 ByteBuf(内存池 Or 堆外内存)
11 */
12public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
13    @Override
14    protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
15        //Operation = OrderOperation  (MessageBody => Operation)
16        Operation operation = requestMessage.getMessageBody();
17        //OperationResult = OrderOperationResult (执行 Operation 得到 OperationResult)
18        OperationResult operationResult = operation.execute();
19
20        //构建 ResponseMessage = MessageHead + MessageResult
21        ResponseMessage responseMessage = new ResponseMessage();
22        responseMessage.setMessageHeader(requestMessage.getMessageHeader());
23        responseMessage.setMessageBody(operationResult);
24
25        //将业务处理后的结果(RequestMessage)返回客户端 (进入 OrderProtocolEncoder 、OrderFrameEncoder)
26        ctx.writeAndFlush(responseMessage);
27    }
28}

OrderProtocolEncoder (编码:RequestMessage => ByteBuf)

 1import java.util.List;
 2
 3import io.netty.buffer.ByteBuf;
 4import io.netty.channel.ChannelHandlerContext;
 5import io.netty.example.study.common.ResponseMessage;
 6import io.netty.handler.codec.MessageToMessageEncoder;
 7
 8/**
 9 * 泛型 代表 Input 类型ResponseMessage
10 */
11public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {
12    @Override
13    protected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage, List<Object> out) throws Exception {
14        //先申请(分配)一个 ByteBuf
15        ByteBuf buffer = ctx.alloc().buffer();
16        //将 ResponseMessage encode 到 ByteBuf
17        responseMessage.encode(buffer);
18        //完成到 ResponseMessage => ByteBuf
19        out.add(buffer);
20    }
21}

OrderFrameEncoder (编码:得出一个没有粘包、半包问题的 ByteBuf (方便客户端处理 TCP 半包和粘包) ByteBuf => ByteBuf)

 1import io.netty.handler.codec.LengthFieldPrepender;
 2
 3/**
 4 *
 5 */
 6public class OrderFrameEncoder extends LengthFieldPrepender {
 7    public OrderFrameEncoder() {
 8        super(2);
 9    }
10}

Server

 1import io.netty.bootstrap.ServerBootstrap;
 2import io.netty.channel.ChannelFuture;
 3import io.netty.channel.ChannelInitializer;
 4import io.netty.channel.ChannelPipeline;
 5import io.netty.channel.nio.NioEventLoopGroup;
 6import io.netty.channel.socket.nio.NioServerSocketChannel;
 7import io.netty.channel.socket.nio.NioSocketChannel;
 8import io.netty.example.study.server.codec.OrderFrameDecoder;
 9import io.netty.example.study.server.codec.OrderFrameEncoder;
10import io.netty.example.study.server.codec.OrderProtocolDecoder;
11import io.netty.example.study.server.codec.OrderProtocolEncoder;
12import io.netty.example.study.server.handler.OrderServerProcessHandler;
13import io.netty.handler.logging.LogLevel;
14import io.netty.handler.logging.LoggingHandler;
15
16import java.util.concurrent.ExecutionException;
17
18public class Server {
19
20    public static void main(String[] args) throws InterruptedException, ExecutionException {
21
22        //启动类
23        ServerBootstrap serverBootstrap = new ServerBootstrap();
24        //设置 NioServerSocketChannel 模式
25        serverBootstrap.channel(NioServerSocketChannel.class);
26        //打印日志
27        serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
28        //创建 NioEventLoopGroup
29        NioEventLoopGroup group = new NioEventLoopGroup();
30        try{
31            //绑定 NioEventLoopGroup
32            serverBootstrap.group(group);
33            //handler()是发生在初始化的时候,childHandler()是发生在客户端连接之后。
34            //如果需要在客户端连接前的请求进行handler处理,则需要配置handler()
35            //如果是处理客户端连接之后的handler,则需要配置在childHandler()。
36            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
37                @Override
38                protected void initChannel(NioSocketChannel ch) throws Exception {
39                    ChannelPipeline pipeline = ch.pipeline();
40
41                    //解码正序,编码逆序
42                    pipeline.addLast(new OrderFrameDecoder()); //first decode
43                    pipeline.addLast(new OrderFrameEncoder()); //first encode
44                    pipeline.addLast(new OrderProtocolEncoder()); //seconds encode
45                    pipeline.addLast(new OrderProtocolDecoder()); //seconds decode
46                    pipeline.addLast(new OrderServerProcessHandler()); //Handler
47
48                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
49                }
50            });
51
52            //启动并同步等待完成
53            ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
54
55            channelFuture.channel().closeFuture().sync();
56        } finally {
57            group.shutdownGracefully();
58        }
59    }
60}

客户端

OrderProtocolEncoder (RequestMessage => ByteBuf)

 1import io.netty.buffer.ByteBuf;
 2import io.netty.channel.ChannelHandlerContext;
 3import io.netty.example.study.common.RequestMessage;
 4import io.netty.handler.codec.MessageToMessageEncoder;
 5
 6import java.util.List;
 7
 8public class OrderProtocolEncoder extends MessageToMessageEncoder<RequestMessage> {
 9    @Override
10    protected void encode(ChannelHandlerContext ctx, RequestMessage requestMessage, List<Object> out) throws Exception {
11        //分配一个 ByteBuf
12        ByteBuf buffer = ctx.alloc().buffer();
13        //将 RequestMessage encode 到 ByteBuf 中
14        requestMessage.encode(buffer);
15        out.add(buffer);
16    }
17}

OrderFrameEncoder (ByteBuf => ByteBuf (解决 TCP 粘包、半包))

1import io.netty.handler.codec.LengthFieldPrepender;
2
3public class OrderFrameEncoder extends LengthFieldPrepender {
4    public OrderFrameEncoder() {
5        super(2);
6    }
7}

OrderFrameDecoder (ByteBuf => ByteBuf (解决 TCP 粘包、半包))

1public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
2    public OrderFrameDecoder() {
3        super(Integer.MAX_VALUE, 0, 2, 0, 2);
4    }
5}

OrderProtocolDecoder (ByteBuf => ResponseMessage)

 1import java.util.List;
 2
 3import io.netty.buffer.ByteBuf;
 4import io.netty.channel.ChannelHandlerContext;
 5import io.netty.example.study.common.ResponseMessage;
 6import io.netty.handler.codec.MessageToMessageDecoder;
 7
 8/**
 9 * 泛型 代表 input 类型 ByteBuf
10 */
11public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
12    @Override
13    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
14        ResponseMessage responseMessage = new ResponseMessage();
15        //将 ByteBuf => ResponseMessage
16        responseMessage.decode(byteBuf);
17        out.add(responseMessage);
18    }
19}
20

OperationToRequestMessageEncoder (负责)

 1import io.netty.channel.ChannelHandlerContext;
 2import io.netty.example.study.common.Operation;
 3import io.netty.example.study.common.RequestMessage;
 4import io.netty.example.study.util.IdUtil;
 5import io.netty.handler.codec.MessageToMessageEncoder;
 6
 7import java.util.List;
 8
 9/**
10 * 泛型 代表 input 类型 Operation
11 * 负责 OrderOperation  =>  RequestMessage
12 */
13public class OperationToRequestMessageEncoder extends MessageToMessageEncoder <Operation> {
14    @Override
15    protected void encode(ChannelHandlerContext ctx, Operation operation, List<Object> out) throws Exception {
16          RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), operation);
17
18          out.add(requestMessage);
19     }
20}

ClientV1 (无法能到响应结果,只能从日志中打印看到)

 1import io.netty.bootstrap.Bootstrap;
 2import io.netty.channel.ChannelFuture;
 3import io.netty.channel.ChannelInitializer;
 4import io.netty.channel.ChannelPipeline;
 5import io.netty.channel.nio.NioEventLoopGroup;
 6import io.netty.channel.socket.nio.NioSocketChannel;
 7import io.netty.example.study.client.codec.*;
 8import io.netty.example.study.common.order.OrderOperation;
 9import io.netty.handler.logging.LogLevel;
10import io.netty.handler.logging.LoggingHandler;
11
12import java.util.concurrent.ExecutionException;
13
14public class ClientV1 {
15
16    public static void main(String[] args) throws InterruptedException, ExecutionException {
17
18        Bootstrap bootstrap = new Bootstrap();
19        bootstrap.channel(NioSocketChannel.class);
20
21        NioEventLoopGroup group = new NioEventLoopGroup();
22        try{
23            bootstrap.group(group);
24
25            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
26                @Override
27                protected void initChannel(NioSocketChannel ch) throws Exception {
28                    ChannelPipeline pipeline = ch.pipeline();
29                    pipeline.addLast(new OrderFrameDecoder());
30                    pipeline.addLast(new OrderFrameEncoder());
31
32                    pipeline.addLast(new OrderProtocolEncoder());
33                    pipeline.addLast(new OrderProtocolDecoder());
34
35                    pipeline.addLast(new OperationToRequestMessageEncoder());
36
37                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
38                }
39            });
40
41            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090);
42
43            channelFuture.sync();
44
45            OrderOperation orderOperation = new OrderOperation(1001, "tudou");
46
47            channelFuture.channel().writeAndFlush(orderOperation);
48
49            channelFuture.channel().closeFuture().sync();
50
51        } finally {
52            group.shutdownGracefully();
53        }
54    }
55
56}

完善客户端(响应分发)

响应分发
 通过 streamId 来实现第二种响应分发的方式
streamId

1* 发起请求,立刻返回 Future
2* 将 Future 存入 Map 中,Key 为 streamId
3* Response 到达客户端先去 Map 中根据 streamID 查找对应的 Future 对象,将 Response 设置到 Future 中
4* 调用该 Future 对象的 get() 方法 :future.get()

OperationResultFuture

1import io.netty.example.study.common.OperationResult;
2import io.netty.util.concurrent.DefaultPromise;
3
4/**
5 * JDK Future不好用,使用 Netty 的 DefaultPromise
6 * V 代表 OperationResult
7 */
8public class OperationResultFuture extends DefaultPromise<OperationResult> {
9}

RequestPendingCenter

 1import io.netty.example.study.common.OperationResult;
 2
 3import java.util.Map;
 4import java.util.concurrent.ConcurrentHashMap;
 5
 6/**
 7 * 请求等待中心
 8 */
 9public class RequestPendingCenter {
10
11    private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();
12
13    /**
14     * 将 Future 存入 Map 中,Key 为 streamId;
15     * @param streamId
16     * @param future
17     */
18    public void add(Long streamId, OperationResultFuture future){
19        this.map.put(streamId, future);
20    }
21
22    /**
23     * 根据 streamID 查找对应的 Future 对象,将 Response(OperationResultFuture) 设置到 Future 中
24     * @param streamId
25     * @param operationResult
26     */
27    public void set(Long streamId, OperationResult operationResult){
28        OperationResultFuture operationResultFuture = this.map.get(streamId);
29        if (operationResultFuture != null) { //判断 OperationResultFuture 存在才设置
30            operationResultFuture.setSuccess(operationResult);
31            this.map.remove(streamId); //从 map 中移除此 key-value
32        }
33     }
34}

ResponseDispatcherHandler

 1import io.netty.channel.ChannelHandlerContext;
 2import io.netty.channel.SimpleChannelInboundHandler;
 3import io.netty.example.study.common.ResponseMessage;
 4
 5/**
 6 * 响应分发Handler
 7 */
 8public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {
 9
10    private RequestPendingCenter requestPendingCenter;
11
12    /**
13     * 构造方法注入  RequestPendingCenter
14     * @param requestPendingCenter
15     */
16    public ResponseDispatcherHandler(RequestPendingCenter requestPendingCenter) {
17        this.requestPendingCenter = requestPendingCenter;
18    }
19
20    /**
21     * 解析 ResponseMessage 根据 streamId 找到 Map 指定 key 对应的 Future 对象
22     * 调用
23     * @param ctx
24     * @param responseMessage
25     * @throws Exception
26     */
27    @Override
28    protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage responseMessage) throws Exception {
29        requestPendingCenter.set(responseMessage.getMessageHeader().getStreamId(), responseMessage.getMessageBody());
30    }
31}

ClientV2

 1import io.netty.bootstrap.Bootstrap;
 2import io.netty.channel.ChannelFuture;
 3import io.netty.channel.ChannelInitializer;
 4import io.netty.channel.ChannelPipeline;
 5import io.netty.channel.nio.NioEventLoopGroup;
 6import io.netty.channel.socket.nio.NioSocketChannel;
 7import io.netty.example.study.client.codec.*;
 8import io.netty.example.study.client.handler.dispatcher.OperationResultFuture;
 9import io.netty.example.study.client.handler.dispatcher.RequestPendingCenter;
10import io.netty.example.study.client.handler.dispatcher.ResponseDispatcherHandler;
11import io.netty.example.study.common.OperationResult;
12import io.netty.example.study.common.RequestMessage;
13import io.netty.example.study.common.order.OrderOperation;
14import io.netty.example.study.util.IdUtil;
15import io.netty.handler.logging.LogLevel;
16import io.netty.handler.logging.LoggingHandler;
17
18import java.util.concurrent.ExecutionException;
19
20public class ClientV2 {
21
22    public static void main(String[] args) throws InterruptedException, ExecutionException {
23
24        Bootstrap bootstrap = new Bootstrap();
25        bootstrap.channel(NioSocketChannel.class);
26
27        NioEventLoopGroup group = new NioEventLoopGroup();
28        try{
29            bootstrap.group(group);
30            //请求等待中心
31            RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
32
33            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
34                @Override
35                protected void initChannel(NioSocketChannel ch) throws Exception {
36                    ChannelPipeline pipeline = ch.pipeline();
37                    pipeline.addLast(new OrderFrameDecoder());
38                    pipeline.addLast(new OrderFrameEncoder());
39
40                    pipeline.addLast(new OrderProtocolEncoder());
41                    pipeline.addLast(new OrderProtocolDecoder());
42
43                    // ResponseDispatcherHandler 需要注入  RequestPendingCenter
44                    // 调用 RequestPendingCenter.set(){ operationResultFuture.setSuccess(operationResult);  }
45                    pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
46
47                    pipeline.addLast(new OperationToRequestMessageEncoder());
48
49                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
50                }
51            });
52
53            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090);
54
55            channelFuture.sync();
56
57            //生成唯一 streamID
58            long streamId = IdUtil.nextId();
59
60            //请求数据 RequestMessage
61            RequestMessage requestMessage = new RequestMessage(
62                    streamId, new OrderOperation(1001, "tudou"));
63
64            //操作结果占位符 OperationResultFuture
65            OperationResultFuture operationResultFuture = new OperationResultFuture();
66
67            //发送数据之前将数据,将请求根据 streamId 存入 Map中
68            requestPendingCenter.add(streamId, operationResultFuture);
69
70            //发送数据
71            channelFuture.channel().writeAndFlush(requestMessage);
72
73            //Future 阻塞直到
74            OperationResult operationResult = operationResultFuture.get();
75
76            //打印结果
77            System.out.println(operationResult);
78
79            channelFuture.channel().closeFuture().sync();
80        } finally {
81            group.shutdownGracefully();
82        }
83    }
84}

控制台输出

 1# 客户端
 2四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRegistered
 3信息: [id: 0xd3e0b866] REGISTERED
 4四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler connect
 5信息: [id: 0xd3e0b866] CONNECT: /127.0.0.1:8090
 6四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelActive
 7信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] ACTIVE
 8四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler write
 9信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] WRITE: Message(messageHeader=MessageHeader(version=1, opCode=3, streamId=1), messageBody=OrderOperation(tableId=1001, dish=tudou))
10四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler flush
11信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] FLUSH
12OrderOperationResult(tableId=1001, dish=tudou, complete=true)
13四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
14信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] READ COMPLETE
15
16
17# 服务器端
18四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler channelRegistered
19信息: [id: 0xf9e86570] REGISTERED
20四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler bind
21信息: [id: 0xf9e86570] BIND: 0.0.0.0/0.0.0.0:8090
22四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler channelActive
23信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] ACTIVE
24四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRead
25信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] READ: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564]
26四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
27信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] READ COMPLETE
28四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRegistered
29信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] REGISTERED
30四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelActive
31信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] ACTIVE
32order's executing startup with orderRequest: OrderOperation(tableId=1001, dish=tudou)
33order's executing complete
34四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
35信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] READ COMPLETE

Netty 日志的原理以使用

Netty 日志框架原理
 增加所有 log 依赖(option),但是没有 Jar 包,动态判断具体使用哪个 Log。

修改 JDK logger 级别
E:\Tools\JDK8\jre\lib\logging.properties

1.level= FINE		//JDK没有DEBUG级别 只有FINE

使用 slf4j + log4j 示例
 加上 pom.xml 依赖就可以使用具体的 log 日志框架。
&emps;还需要添加 log4j 配置。

pom.xml

1        <dependency>
2            <groupId>org.slf4j</groupId>
3            <artifactId>slf4j-log4j12</artifactId>
4            <version>1.7.22</version>
5        </dependency>

log4j.properties

1log4j.rootLogger=info,console
2log4j.appender.console=org.apache.log4j.ConsoleAppender
3log4j.appender.console.layout=org.apache.log4j.PatternLayout
4log4j.appender.console.layout.ConversionPattern=%d{HH🇲🇲ss} [%t] %C{1}: %m%n

衡量好 logging handler 的位置和级别

1	ChannelPipeline pipeline = ch.pipeline();
2	pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
3	......

让应用易追踪

完善 “线程名”

1        //创建 NioEventLoopGroup
2        NioEventLoopGroup boss = new NioEventLoopGroup(0,new DefaultThreadFactory("boss"));
3        NioEventLoopGroup work = new NioEventLoopGroup(0,new DefaultThreadFactory("work"));
4        try{
5            //绑定 NioEventLoopGroup
6            serverBootstrap.group(boss,work);
7	....
8	}

完善 “Handler” 名称

1pipeline.addLast("frameDecoder",new OrderFrameDecoder()); //first decode

使用好 Netty 的日志

跟踪诊断:可视化

示例:统计并展示当前那系统连接数

1* Console 日志定时输出
2* JMX 实时展示
3* ELKK、TIG、etc (复杂,需要各种软件配合)

引入依赖 (度量)

 1        <dependency>
 2            <groupId>io.dropwizard.metrics</groupId>
 3            <artifactId>metrics-core</artifactId>
 4            <version>4.1.1</version>
 5        </dependency>
 6        <dependency>
 7            <groupId>io.dropwizard.metrics</groupId>
 8            <artifactId>metrics-jmx</artifactId>
 9            <version>4.1.1</version>
10        </dependency>

MetricsHandler

 1import com.codahale.metrics.ConsoleReporter;
 2import com.codahale.metrics.Gauge;
 3import com.codahale.metrics.MetricRegistry;
 4import com.codahale.metrics.jmx.JmxReporter;
 5import io.netty.channel.ChannelDuplexHandler;
 6import io.netty.channel.ChannelHandler;
 7import io.netty.channel.ChannelHandlerContext;
 8
 9import java.util.concurrent.TimeUnit;
10import java.util.concurrent.atomic.AtomicLong;
11
12
13@ChannelHandler.Sharable //多个线程共享
14/**
15 * ChannelDuplexHandler 同时支持 Input和 Output 的Handler
16 */
17public class MetricsHandler extends ChannelDuplexHandler {
18
19    //记录当前连接数
20    private AtomicLong totalConnectionNumber = new AtomicLong();
21
22    //代码块
23    {
24        MetricRegistry metricRegistry = new MetricRegistry();
25        //name可以采用同名,定义 metrics 类型 Long
26        metricRegistry.register("totalConnectionNumber", new Gauge<Long>() {
27            @Override
28            public Long getValue() {
29                return totalConnectionNumber.longValue();
30            }
31        });
32
33        //console 方式展示数据
34        ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
35        consoleReporter.start(10, TimeUnit.SECONDS);
36
37        //jmx 格式展示数据
38        JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
39        jmxReporter.start();
40    }
41
42    /**
43     * 创建连接  ++totalConnectionNumber
44     * @param ctx
45     * @throws Exception
46     */
47    @Override
48    public void channelActive(ChannelHandlerContext ctx) throws Exception {
49        totalConnectionNumber.incrementAndGet();
50        super.channelActive(ctx);
51    }
52
53    /**
54     * 释放连接 --totalConnectionNumber
55     * @param ctx
56     * @throws Exception
57     */
58    @Override
59    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
60        totalConnectionNumber.decrementAndGet();
61        super.channelInactive(ctx);
62    }
63}

添加 Handler(度量)

1pipeline.addLast("metricHandler",new MetricsHandler());

Netty 值得可视化的数据 - “外在”
外在

Netty 值得可视化的数据 –“内在”
内在

跟踪诊断:让应用内存不“泄露”

Netty 内存泄漏是指什么?
原因:“忘记”release

1ByteBuf buffer = ctx.alloc().buffer();
2buffer.release() / ReferenceCountUtil.release(buffer)  //忘记 release

后果:资源未释放 -> OOM

1* 堆外:未 free(PlatformDependent.freeDirectBuffer(buffer));
2* 池化:未归还 (recyclerHandle.recycle(this))

Netty 内存泄漏检测核心思路
 Netty 内存泄漏检测核心思路:引用计数(buffer.refCnt())+ 弱引用(Weak reference)

• 引用计数

1	+1, 过 -1, = 0 时:尘归尘,土归土,资源也该释放了
2	那什么时候判断?“盖棺定论”时 -> 对象被 GC 后

• 强引用与弱引用

1#强引用
2	String 我是战斗英雄型强保镖 = new String(我是主人));
3
4# 弱引用
5	WeakReference<String> 我是爱写作的弱保镖 = new WeakReference<String>(new String(我是主人));
6	只有一个爱写作的保镖(弱引用)守护(引用)时:刺客(GC)来袭,主人(referent)必挂(GC掉)。
7	不过主人挂掉的(被 GC 掉)时候,我还是可以发挥写作特长:把我自己记到“小本本(ReferenceQueue)”上去。
8
9# WeakReference<String> 我是爱写作的弱保镖 = new WeakReference<String>(new String(我是主人),我的小本本ReferenceQueue);

Netty 内存泄漏检测核心思路

示例:用 Netty 内存泄漏检测工具做检测


作者:Soulboy