目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

Netty in Order Service

编写网络应用程序基本步骤

编写网络应用程序基本步骤

上线之前复查

案例介绍

案例

  • AuthOperation: 授权操作
  • OrderOperation:点单操作
  • KeepaliveOperation:保持连接操作

数据结构设计

数据结构设计

Message Header

* version	协议版本
* opCode	不同的opCode,Message Body 对应的 operation/operation result 也是不同的,在JSON 解析时需要根据 opCode 解析到不同的 Message Body 类型。
* streamId	相当于Message ID,标识信息的唯一 ID

Message Body(JSON)

* operation	操作
* operation result	操作结果

length

* length	长度信息值得是整个Message(Header + Body)

 套接字(socket)是通信的基石,是支持 TCP/IP 协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议,本地主机的 IP 地址,本地进程的协议端口,远地主机的 IP 地址,远地进程的协议端口。

Netty 对三种常用封侦方式的支持
数据分层
Netty 对三种常用封侦方式的支持

* 选择协议	TCP	粘包、半包(封帧)

项目依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.learning.netty</groupId>
    <artifactId>netty-research</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.39.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>19.0</version>
        </dependency>
    </dependencies>
</project>

Message

Message

import io.netty.buffer.ByteBuf;
import io.netty.example.study.util.JsonUtil;
import lombok.Data;

import java.nio.charset.Charset;

@Data
public abstract class Message<T extends MessageBody> {
    //MessageHeader
    private MessageHeader messageHeader;
  
    //MessageBody:Operation | OperationResult
    private T messageBody;

    /**
     * 获取 messageBody
     * @return
     */
    public T getMessageBody(){
        return messageBody;
    }

    /**
     * 抽象方法,获取 MessageBodyDecodeClass
     * @param opcode
     * @return
     */
    public abstract Class<T> getMessageBodyDecodeClass(int opcode);

    /**
     * 编码
     * @param byteBuf
     */
    public void encode(ByteBuf byteBuf) {
        //Message Header
        byteBuf.writeInt(messageHeader.getVersion());
        byteBuf.writeLong(messageHeader.getStreamId());
        byteBuf.writeInt(messageHeader.getOpCode());
        //Message Body
        byteBuf.writeBytes(JsonUtil.toJson(messageBody).getBytes());
    }

    /**
     * 解码
     * @param msg
     */
    public void decode(ByteBuf msg) {
        //Message Header
        int version = msg.readInt();
        long streamId = msg.readLong();
        int opCode = msg.readInt();

        MessageHeader messageHeader = new MessageHeader();
        messageHeader.setVersion(version);
        messageHeader.setOpCode(opCode);
        messageHeader.setStreamId(streamId);

        this.messageHeader = messageHeader;

        //Message Body
        Class<T> bodyClazz = getMessageBodyDecodeClass(opCode);
        T body = JsonUtil.fromJson(msg.toString(Charset.forName("UTF-8")), bodyClazz);
        this.messageBody = body;
    }
}

MessageHeader

@Data
public class MessageHeader {
    private int version = 1;
    private int opCode;
    private long streamId;
}

MessageBody

public abstract class MessageBody {
}

RequestMessage

public class RequestMessage extends Message<Operation>{
    @Override
    public Class getMessageBodyDecodeClass(int opcode) {
        //获取指定的 Operation
        return OperationType.fromOpCode(opcode).getOperationClazz();
    }

    public RequestMessage(){}

    /**
     * 构造方法:构造 RequestMessage
     * @param streamId
     * @param operation
     */
    public RequestMessage(Long streamId, Operation operation){
        MessageHeader messageHeader = new MessageHeader();
        messageHeader.setStreamId(streamId);
        messageHeader.setOpCode(OperationType.fromOperation(operation).getOpCode());

        //为 Message 设置 MessageHeader
        this.setMessageHeader(messageHeader);
        //为 Message 设置 MessageBody
        this.setMessageBody(operation);
    }
}

ResponseMessage

public class ResponseMessage extends Message <OperationResult>{
    @Override
    public Class getMessageBodyDecodeClass(int opcode) {
        //获取指定的 OperationResult
        return OperationType.fromOpCode(opcode).getOperationResultClazz();
    }
}

Operation / OperationResult

OperationType (枚举类)

import io.netty.example.study.common.order.OrderOperation;
import io.netty.example.study.common.order.OrderOperationResult;
import io.netty.example.study.common.auth.AuthOperation;
import io.netty.example.study.common.auth.AuthOperationResult;
import io.netty.example.study.common.keepalive.KeepaliveOperation;
import io.netty.example.study.common.keepalive.KeepaliveOperationResult;

import java.util.function.Predicate;

public enum OperationType {
    //定义枚举
    AUTH(1, AuthOperation.class, AuthOperationResult.class),
    KEEPALIVE(2, KeepaliveOperation.class, KeepaliveOperationResult.class),
    ORDER(3, OrderOperation.class, OrderOperationResult.class);

    //枚举类无法显式为枚举中的常量显式赋值
    private int opCode;
    private Class<? extends Operation> operationClazz;
    private Class<? extends OperationResult> operationResultClazz;

    /**
     * 枚举类的构造方法 :通过构造方法为枚举中的常量显式赋值
     * @param opCode
     * @param operationClazz
     * @param responseClass
     */
    OperationType(int opCode, Class<? extends Operation> operationClazz, Class<? extends OperationResult> responseClass) {
        this.opCode = opCode;
        this.operationClazz = operationClazz;
        this.operationResultClazz = responseClass;
    }

    //获取 opCode
    public int getOpCode(){
        return opCode;
    }

    //获取 operationClazz
    public Class<? extends Operation> getOperationClazz() {
        return operationClazz;
    }

    //获取 operationResultClazz
    public Class<? extends OperationResult> getOperationResultClazz() {
        return operationResultClazz;
    }

    /**
     * 根据 opCode 获取对应的枚举
     * @param type
     * @return
     */
    public static OperationType fromOpCode(int type){
        return getOperationType(requestType -> requestType.opCode == type);
    }

    /**
     * 根据 operation 获取对应的枚举
     * @param operation
     * @return
     */
    public static OperationType fromOperation(Operation operation){
        //当前遍历的 OperationType 实例的 operationClazz 属性等于  入参的操作类型 (OrderOperation | KeepaliveOperation | AuthOperation)
        return getOperationType(requestType -> requestType.operationClazz == operation.getClass());
    }

    /**
     * 函数式编程
     * @param predicate  接收⼀个参数,⽤于判断是否满⾜⼀定的条件,过滤数据.
     * @return
     */
    private static OperationType getOperationType(Predicate<OperationType> predicate){
        OperationType[] values = values();
        for (OperationType operationType : values) {
            if(predicate.test(operationType)){
                return operationType;
            }
        }
        //遍历找不到就退出
        throw new AssertionError("no found type");
    }

}

Operation

public abstract class Operation extends MessageBody{

    public abstract OperationResult execute();

}

OperationResult

import lombok.Data;

@Data
public abstract class OperationResult extends MessageBody{

}

AuthOperation

import io.netty.example.study.common.Operation;
import lombok.Data;
import lombok.extern.java.Log;

@Data
@Log
public class AuthOperation extends Operation {

    private final String userName;
    private final String password;

    @Override
    public AuthOperationResult execute() {
        if("admin".equalsIgnoreCase(this.userName)){
            AuthOperationResult orderResponse = new AuthOperationResult(true);
            return orderResponse;
        }

        return new AuthOperationResult(false);
    }
}

AuthOperationResult

import io.netty.example.study.common.OperationResult;
import lombok.Data;

@Data
public class AuthOperationResult extends OperationResult {

    private final boolean passAuth;

}

KeepaliveOperation

import io.netty.example.study.common.Operation;
import lombok.Data;
import lombok.extern.java.Log;

@Data
@Log
public class KeepaliveOperation extends Operation {

    private long time ;

    public KeepaliveOperation() {
        this.time = System.nanoTime();
    }

    @Override
    public KeepaliveOperationResult execute() {
        KeepaliveOperationResult orderResponse = new KeepaliveOperationResult(time);
        return orderResponse;
    }
}

KeepaliveOperationResult

import io.netty.example.study.common.OperationResult;
import lombok.Data;

@Data
public class KeepaliveOperationResult extends OperationResult {
    private final long time;
}

OrderOperation

import io.netty.example.study.common.Operation;
import lombok.Data;

@Data
public class OrderOperation extends Operation {

    private int tableId;
    private String dish;

    public OrderOperation(int tableId, String dish) {
        this.tableId = tableId;
        this.dish = dish;
    }

    @Override
    public OrderOperationResult execute() {
        System.out.println("order's executing startup with orderRequest: " + toString());
        //execute order logic
        System.out.println("order's executing complete");
        OrderOperationResult orderResponse = new OrderOperationResult(tableId, dish, true);
        return orderResponse;
    }
}

OrderOperationResult

import io.netty.example.study.common.OperationResult;
import lombok.Data;

@Data
public class OrderOperationResult extends OperationResult {
    private final int tableId;
    private final String dish;
    private final boolean complete;
}

IdUtil
IdUtil

import java.util.concurrent.atomic.AtomicLong;

public final class IdUtil {

    private static final AtomicLong IDX = new AtomicLong();

    private IdUtil(){
        //no instance
    }

    /**
     * 生成 streamId 自增
     * @return
     */
    public static long nextId(){
        return IDX.incrementAndGet();
    }

}

JsonUtil

public final class JsonUtil {

    private static final Gson GSON = new Gson();

    private JsonUtil() {
        //no instance
    }

    public static <T> T fromJson(String jsonStr, Class<T> clazz){
        return GSON.fromJson(jsonStr, clazz);
    }

    public static String toJson(Object object){
        return GSON.toJson(object);
    }

}

服务端

OrderFrameDecoder (一次解码:得出一个没有粘包、半包问题的 ByteBuf,ByteBuf => ByteBuf)

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
    public OrderFrameDecoder() {
        super(Integer.MAX_VALUE, 0, 2, 0, 2);
    }
}

OrderProtocolDecoder (二次解码:ByteBuf => RequestMessage)

public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        RequestMessage requestMessage = new RequestMessage();
        requestMessage.decode(byteBuf);
	//将解码后的数据传递出去
        out.add(requestMessage);
    }
}

OrderServerProcessHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.example.study.common.Operation;
import io.netty.example.study.common.OperationResult;
import io.netty.example.study.common.RequestMessage;
import io.netty.example.study.common.ResponseMessage;

/**
 *  SimpleChannelInboundHandler VS ChannelInboundHandlerAdapter
 *  前者会自动释放自动释放 解码阶段使用的 ByteBuf(内存池 Or 堆外内存)
 */
public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
        //Operation = OrderOperation  (MessageBody => Operation)
        Operation operation = requestMessage.getMessageBody();
        //OperationResult = OrderOperationResult (执行 Operation 得到 OperationResult)
        OperationResult operationResult = operation.execute();

        //构建 ResponseMessage = MessageHead + MessageResult
        ResponseMessage responseMessage = new ResponseMessage();
        responseMessage.setMessageHeader(requestMessage.getMessageHeader());
        responseMessage.setMessageBody(operationResult);

        //将业务处理后的结果(RequestMessage)返回客户端 (进入 OrderProtocolEncoder 、OrderFrameEncoder)
        ctx.writeAndFlush(responseMessage);
    }
}

OrderProtocolEncoder (编码:RequestMessage => ByteBuf)

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.example.study.common.ResponseMessage;
import io.netty.handler.codec.MessageToMessageEncoder;

/**
 * 泛型 代表 Input 类型ResponseMessage
 */
public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage, List<Object> out) throws Exception {
        //先申请(分配)一个 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        //将 ResponseMessage encode 到 ByteBuf
        responseMessage.encode(buffer);
        //完成到 ResponseMessage => ByteBuf
        out.add(buffer);
    }
}

OrderFrameEncoder (编码:得出一个没有粘包、半包问题的 ByteBuf (方便客户端处理 TCP 半包和粘包) ByteBuf => ByteBuf)

import io.netty.handler.codec.LengthFieldPrepender;

/**
 *
 */
public class OrderFrameEncoder extends LengthFieldPrepender {
    public OrderFrameEncoder() {
        super(2);
    }
}

Server

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.study.server.codec.OrderFrameDecoder;
import io.netty.example.study.server.codec.OrderFrameEncoder;
import io.netty.example.study.server.codec.OrderProtocolDecoder;
import io.netty.example.study.server.codec.OrderProtocolEncoder;
import io.netty.example.study.server.handler.OrderServerProcessHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.util.concurrent.ExecutionException;

public class Server {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        //启动类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //设置 NioServerSocketChannel 模式
        serverBootstrap.channel(NioServerSocketChannel.class);
        //打印日志
        serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
        //创建 NioEventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        try{
            //绑定 NioEventLoopGroup
            serverBootstrap.group(group);
            //handler()是发生在初始化的时候,childHandler()是发生在客户端连接之后。
            //如果需要在客户端连接前的请求进行handler处理,则需要配置handler()
            //如果是处理客户端连接之后的handler,则需要配置在childHandler()。
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();

                    //解码正序,编码逆序
                    pipeline.addLast(new OrderFrameDecoder()); //first decode
                    pipeline.addLast(new OrderFrameEncoder()); //first encode
                    pipeline.addLast(new OrderProtocolEncoder()); //seconds encode
                    pipeline.addLast(new OrderProtocolDecoder()); //seconds decode
                    pipeline.addLast(new OrderServerProcessHandler()); //Handler

                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                }
            });

            //启动并同步等待完成
            ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();

            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端

OrderProtocolEncoder (RequestMessage => ByteBuf)

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.example.study.common.RequestMessage;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;

public class OrderProtocolEncoder extends MessageToMessageEncoder<RequestMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, RequestMessage requestMessage, List<Object> out) throws Exception {
        //分配一个 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        //将 RequestMessage encode 到 ByteBuf 中
        requestMessage.encode(buffer);
        out.add(buffer);
    }
}

OrderFrameEncoder (ByteBuf => ByteBuf (解决 TCP 粘包、半包))

import io.netty.handler.codec.LengthFieldPrepender;

public class OrderFrameEncoder extends LengthFieldPrepender {
    public OrderFrameEncoder() {
        super(2);
    }
}

OrderFrameDecoder (ByteBuf => ByteBuf (解决 TCP 粘包、半包))

public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
    public OrderFrameDecoder() {
        super(Integer.MAX_VALUE, 0, 2, 0, 2);
    }
}

OrderProtocolDecoder (ByteBuf => ResponseMessage)

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.example.study.common.ResponseMessage;
import io.netty.handler.codec.MessageToMessageDecoder;

/**
 * 泛型 代表 input 类型 ByteBuf
 */
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        ResponseMessage responseMessage = new ResponseMessage();
        //将 ByteBuf => ResponseMessage
        responseMessage.decode(byteBuf);
        out.add(responseMessage);
    }
}

OperationToRequestMessageEncoder (负责)

import io.netty.channel.ChannelHandlerContext;
import io.netty.example.study.common.Operation;
import io.netty.example.study.common.RequestMessage;
import io.netty.example.study.util.IdUtil;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;

/**
 * 泛型 代表 input 类型 Operation
 * 负责 OrderOperation  =>  RequestMessage
 */
public class OperationToRequestMessageEncoder extends MessageToMessageEncoder <Operation> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Operation operation, List<Object> out) throws Exception {
          RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), operation);

          out.add(requestMessage);
     }
}

ClientV1 (无法能到响应结果,只能从日志中打印看到)

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.study.client.codec.*;
import io.netty.example.study.common.order.OrderOperation;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.util.concurrent.ExecutionException;

public class ClientV1 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);

        NioEventLoopGroup group = new NioEventLoopGroup();
        try{
            bootstrap.group(group);

            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new OrderFrameDecoder());
                    pipeline.addLast(new OrderFrameEncoder());

                    pipeline.addLast(new OrderProtocolEncoder());
                    pipeline.addLast(new OrderProtocolDecoder());

                    pipeline.addLast(new OperationToRequestMessageEncoder());

                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                }
            });

            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090);

            channelFuture.sync();

            OrderOperation orderOperation = new OrderOperation(1001, "tudou");

            channelFuture.channel().writeAndFlush(orderOperation);

            channelFuture.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully();
        }
    }

}

完善客户端(响应分发)

响应分发
 通过 streamId 来实现第二种响应分发的方式
streamId

* 发起请求,立刻返回 Future
* 将 Future 存入 Map 中,Key 为 streamId
* Response 到达客户端先去 Map 中根据 streamID 查找对应的 Future 对象,将 Response 设置到 Future 中
* 调用该 Future 对象的 get() 方法 :future.get()

OperationResultFuture

import io.netty.example.study.common.OperationResult;
import io.netty.util.concurrent.DefaultPromise;

/**
 * JDK Future不好用,使用 Netty 的 DefaultPromise
 * V 代表 OperationResult
 */
public class OperationResultFuture extends DefaultPromise<OperationResult> {
}

RequestPendingCenter

import io.netty.example.study.common.OperationResult;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 请求等待中心
 */
public class RequestPendingCenter {

    private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();

    /**
     * 将 Future 存入 Map 中,Key 为 streamId;
     * @param streamId
     * @param future
     */
    public void add(Long streamId, OperationResultFuture future){
        this.map.put(streamId, future);
    }

    /**
     * 根据 streamID 查找对应的 Future 对象,将 Response(OperationResultFuture) 设置到 Future 中
     * @param streamId
     * @param operationResult
     */
    public void set(Long streamId, OperationResult operationResult){
        OperationResultFuture operationResultFuture = this.map.get(streamId);
        if (operationResultFuture != null) { //判断 OperationResultFuture 存在才设置
            operationResultFuture.setSuccess(operationResult);
            this.map.remove(streamId); //从 map 中移除此 key-value
        }
     }
}

ResponseDispatcherHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.example.study.common.ResponseMessage;

/**
 * 响应分发Handler
 */
public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {

    private RequestPendingCenter requestPendingCenter;

    /**
     * 构造方法注入  RequestPendingCenter
     * @param requestPendingCenter
     */
    public ResponseDispatcherHandler(RequestPendingCenter requestPendingCenter) {
        this.requestPendingCenter = requestPendingCenter;
    }

    /**
     * 解析 ResponseMessage 根据 streamId 找到 Map 指定 key 对应的 Future 对象
     * 调用
     * @param ctx
     * @param responseMessage
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage responseMessage) throws Exception {
        requestPendingCenter.set(responseMessage.getMessageHeader().getStreamId(), responseMessage.getMessageBody());
    }
}

ClientV2

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.study.client.codec.*;
import io.netty.example.study.client.handler.dispatcher.OperationResultFuture;
import io.netty.example.study.client.handler.dispatcher.RequestPendingCenter;
import io.netty.example.study.client.handler.dispatcher.ResponseDispatcherHandler;
import io.netty.example.study.common.OperationResult;
import io.netty.example.study.common.RequestMessage;
import io.netty.example.study.common.order.OrderOperation;
import io.netty.example.study.util.IdUtil;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.util.concurrent.ExecutionException;

public class ClientV2 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);

        NioEventLoopGroup group = new NioEventLoopGroup();
        try{
            bootstrap.group(group);
            //请求等待中心
            RequestPendingCenter requestPendingCenter = new RequestPendingCenter();

            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new OrderFrameDecoder());
                    pipeline.addLast(new OrderFrameEncoder());

                    pipeline.addLast(new OrderProtocolEncoder());
                    pipeline.addLast(new OrderProtocolDecoder());

                    // ResponseDispatcherHandler 需要注入  RequestPendingCenter
                    // 调用 RequestPendingCenter.set(){ operationResultFuture.setSuccess(operationResult);  }
                    pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));

                    pipeline.addLast(new OperationToRequestMessageEncoder());

                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                }
            });

            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090);

            channelFuture.sync();

            //生成唯一 streamID
            long streamId = IdUtil.nextId();

            //请求数据 RequestMessage
            RequestMessage requestMessage = new RequestMessage(
                    streamId, new OrderOperation(1001, "tudou"));

            //操作结果占位符 OperationResultFuture
            OperationResultFuture operationResultFuture = new OperationResultFuture();

            //发送数据之前将数据,将请求根据 streamId 存入 Map中
            requestPendingCenter.add(streamId, operationResultFuture);

            //发送数据
            channelFuture.channel().writeAndFlush(requestMessage);

            //Future 阻塞直到
            OperationResult operationResult = operationResultFuture.get();

            //打印结果
            System.out.println(operationResult);

            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

控制台输出

# 客户端
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xd3e0b866] REGISTERED
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0xd3e0b866] CONNECT: /127.0.0.1:8090
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] ACTIVE
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler write
信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] WRITE: Message(messageHeader=MessageHeader(version=1, opCode=3, streamId=1), messageBody=OrderOperation(tableId=1001, dish=tudou))
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler flush
信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] FLUSH
OrderOperationResult(tableId=1001, dish=tudou, complete=true)
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] READ COMPLETE


# 服务器端
四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xf9e86570] REGISTERED
四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xf9e86570] BIND: 0.0.0.0/0.0.0.0:8090
四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] ACTIVE
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] READ: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564]
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] READ COMPLETE
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] REGISTERED
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] ACTIVE
order's executing startup with orderRequest: OrderOperation(tableId=1001, dish=tudou)
order's executing complete
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] READ COMPLETE

Netty 日志的原理以使用

Netty 日志框架原理
 增加所有log依赖(option),但是没有Jar包,动态判断具体使用哪个Log。

修改 JDK logger 级别
E:\Tools\JDK8\jre\lib\logging.properties

.level= FINE		//JDK没有DEBUG级别 只有FINE

使用 slf4j + log4j 示例
 加上 pom.xml 依赖就可以使用具体的log日志框架。
&emps;还需要添加 log4j 配置。

pom.xml

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.22</version>
        </dependency>

log4j.properties

log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} [%t] %C{1}: %m%n

衡量好 logging handler 的位置和级别

	ChannelPipeline pipeline = ch.pipeline();
	pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
	......

让应用易追踪

完善 “线程名”

        //创建 NioEventLoopGroup
        NioEventLoopGroup boss = new NioEventLoopGroup(0,new DefaultThreadFactory("boss"));
        NioEventLoopGroup work = new NioEventLoopGroup(0,new DefaultThreadFactory("work"));
        try{
            //绑定 NioEventLoopGroup
            serverBootstrap.group(boss,work);
	....
	}

完善 “Handler” 名称

pipeline.addLast("frameDecoder",new OrderFrameDecoder()); //first decode

使用好 Netty 的日志

跟踪诊断:可视化

示例:统计并展示当前那系统连接数

* Console 日志定时输出
* JMX 实时展示
* ELKK、TIG、etc (复杂,需要各种软件配合)

引入依赖 (度量)

        <dependency>
            <groupId>io.dropwizard.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>4.1.1</version>
        </dependency>
        <dependency>
            <groupId>io.dropwizard.metrics</groupId>
            <artifactId>metrics-jmx</artifactId>
            <version>4.1.1</version>
        </dependency>

MetricsHandler

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


@ChannelHandler.Sharable //多个线程共享
/**
 * ChannelDuplexHandler 同时支持 Input和 Output 的Handler
 */
public class MetricsHandler extends ChannelDuplexHandler {

    //记录当前连接数
    private AtomicLong totalConnectionNumber = new AtomicLong();

    //代码块
    {
        MetricRegistry metricRegistry = new MetricRegistry();
        //name可以采用同名,定义 metrics 类型 Long
        metricRegistry.register("totalConnectionNumber", new Gauge<Long>() {
            @Override
            public Long getValue() {
                return totalConnectionNumber.longValue();
            }
        });

        //console 方式展示数据
        ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
        consoleReporter.start(10, TimeUnit.SECONDS);

        //jmx 格式展示数据
        JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
        jmxReporter.start();
    }

    /**
     * 创建连接  ++totalConnectionNumber
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        totalConnectionNumber.incrementAndGet();
        super.channelActive(ctx);
    }

    /**
     * 释放连接 --totalConnectionNumber
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        totalConnectionNumber.decrementAndGet();
        super.channelInactive(ctx);
    }
}

添加 Handler(度量)

pipeline.addLast("metricHandler",new MetricsHandler());

Netty 值得可视化的数据 - “外在”
外在

Netty 值得可视化的数据 –“内在”
内在

跟踪诊断:让应用内存不“泄露”

Netty 内存泄漏是指什么?
原因:“忘记”release

ByteBuf buffer = ctx.alloc().buffer();
buffer.release() / ReferenceCountUtil.release(buffer)  //忘记 release

后果:资源未释放 -> OOM

* 堆外:未 free(PlatformDependent.freeDirectBuffer(buffer));
* 池化:未归还 (recyclerHandle.recycle(this))

Netty 内存泄漏检测核心思路
 Netty 内存泄漏检测核心思路:引用计数(buffer.refCnt())+ 弱引用(Weak reference)

• 引用计数

	+1, 过 -1, = 0 时:尘归尘,土归土,资源也该释放了
	那什么时候判断?“盖棺定论”时 -> 对象被 GC 后

• 强引用与弱引用

#强引用
	String 我是战斗英雄型强保镖 = new String(我是主人));

# 弱引用
	WeakReference<String> 我是爱写作的弱保镖 = new WeakReference<String>(new String(我是主人));
	只有一个爱写作的保镖(弱引用)守护(引用)时:刺客(GC)来袭,主人(referent)必挂(GC掉)。
	不过主人挂掉的(被 GC 掉)时候,我还是可以发挥写作特长:把我自己记到“小本本(ReferenceQueue)”上去。

# WeakReference<String> 我是爱写作的弱保镖 = new WeakReference<String>(new String(我是主人),我的小本本ReferenceQueue);

Netty 内存泄漏检测核心思路

示例:用 Netty 内存泄漏检测工具做检测


作者:Soulboy