Netty in Order Service
编写网络应用程序基本步骤
案例介绍
- AuthOperation: 授权操作
- OrderOperation:点单操作
- KeepaliveOperation:保持连接操作
数据结构设计
Message Header
* version 协议版本
* opCode 不同的opCode,Message Body 对应的 operation/operation result 也是不同的,在JSON 解析时需要根据 opCode 解析到不同的 Message Body 类型。
* streamId 相当于Message ID,标识信息的唯一 ID
Message Body(JSON)
* operation 操作
* operation result 操作结果
length
* length 长度信息值得是整个Message(Header + Body)
套接字(socket)是通信的基石,是支持 TCP/IP 协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议,本地主机的 IP 地址,本地进程的协议端口,远地主机的 IP 地址,远地进程的协议端口。
Netty 对三种常用封侦方式的支持
* 选择协议 TCP 粘包、半包(封帧)
项目依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.learning.netty</groupId>
<artifactId>netty-research</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
</dependencies>
</project>
Message
Message
import io.netty.buffer.ByteBuf;
import io.netty.example.study.util.JsonUtil;
import lombok.Data;
import java.nio.charset.Charset;
@Data
public abstract class Message<T extends MessageBody> {
//MessageHeader
private MessageHeader messageHeader;
//MessageBody:Operation | OperationResult
private T messageBody;
/**
* 获取 messageBody
* @return
*/
public T getMessageBody(){
return messageBody;
}
/**
* 抽象方法,获取 MessageBodyDecodeClass
* @param opcode
* @return
*/
public abstract Class<T> getMessageBodyDecodeClass(int opcode);
/**
* 编码
* @param byteBuf
*/
public void encode(ByteBuf byteBuf) {
//Message Header
byteBuf.writeInt(messageHeader.getVersion());
byteBuf.writeLong(messageHeader.getStreamId());
byteBuf.writeInt(messageHeader.getOpCode());
//Message Body
byteBuf.writeBytes(JsonUtil.toJson(messageBody).getBytes());
}
/**
* 解码
* @param msg
*/
public void decode(ByteBuf msg) {
//Message Header
int version = msg.readInt();
long streamId = msg.readLong();
int opCode = msg.readInt();
MessageHeader messageHeader = new MessageHeader();
messageHeader.setVersion(version);
messageHeader.setOpCode(opCode);
messageHeader.setStreamId(streamId);
this.messageHeader = messageHeader;
//Message Body
Class<T> bodyClazz = getMessageBodyDecodeClass(opCode);
T body = JsonUtil.fromJson(msg.toString(Charset.forName("UTF-8")), bodyClazz);
this.messageBody = body;
}
}
MessageHeader
@Data
public class MessageHeader {
private int version = 1;
private int opCode;
private long streamId;
}
MessageBody
public abstract class MessageBody {
}
RequestMessage
public class RequestMessage extends Message<Operation>{
@Override
public Class getMessageBodyDecodeClass(int opcode) {
//获取指定的 Operation
return OperationType.fromOpCode(opcode).getOperationClazz();
}
public RequestMessage(){}
/**
* 构造方法:构造 RequestMessage
* @param streamId
* @param operation
*/
public RequestMessage(Long streamId, Operation operation){
MessageHeader messageHeader = new MessageHeader();
messageHeader.setStreamId(streamId);
messageHeader.setOpCode(OperationType.fromOperation(operation).getOpCode());
//为 Message 设置 MessageHeader
this.setMessageHeader(messageHeader);
//为 Message 设置 MessageBody
this.setMessageBody(operation);
}
}
ResponseMessage
public class ResponseMessage extends Message <OperationResult>{
@Override
public Class getMessageBodyDecodeClass(int opcode) {
//获取指定的 OperationResult
return OperationType.fromOpCode(opcode).getOperationResultClazz();
}
}
Operation / OperationResult
OperationType (枚举类)
import io.netty.example.study.common.order.OrderOperation;
import io.netty.example.study.common.order.OrderOperationResult;
import io.netty.example.study.common.auth.AuthOperation;
import io.netty.example.study.common.auth.AuthOperationResult;
import io.netty.example.study.common.keepalive.KeepaliveOperation;
import io.netty.example.study.common.keepalive.KeepaliveOperationResult;
import java.util.function.Predicate;
public enum OperationType {
//定义枚举
AUTH(1, AuthOperation.class, AuthOperationResult.class),
KEEPALIVE(2, KeepaliveOperation.class, KeepaliveOperationResult.class),
ORDER(3, OrderOperation.class, OrderOperationResult.class);
//枚举类无法显式为枚举中的常量显式赋值
private int opCode;
private Class<? extends Operation> operationClazz;
private Class<? extends OperationResult> operationResultClazz;
/**
* 枚举类的构造方法 :通过构造方法为枚举中的常量显式赋值
* @param opCode
* @param operationClazz
* @param responseClass
*/
OperationType(int opCode, Class<? extends Operation> operationClazz, Class<? extends OperationResult> responseClass) {
this.opCode = opCode;
this.operationClazz = operationClazz;
this.operationResultClazz = responseClass;
}
//获取 opCode
public int getOpCode(){
return opCode;
}
//获取 operationClazz
public Class<? extends Operation> getOperationClazz() {
return operationClazz;
}
//获取 operationResultClazz
public Class<? extends OperationResult> getOperationResultClazz() {
return operationResultClazz;
}
/**
* 根据 opCode 获取对应的枚举
* @param type
* @return
*/
public static OperationType fromOpCode(int type){
return getOperationType(requestType -> requestType.opCode == type);
}
/**
* 根据 operation 获取对应的枚举
* @param operation
* @return
*/
public static OperationType fromOperation(Operation operation){
//当前遍历的 OperationType 实例的 operationClazz 属性等于 入参的操作类型 (OrderOperation | KeepaliveOperation | AuthOperation)
return getOperationType(requestType -> requestType.operationClazz == operation.getClass());
}
/**
* 函数式编程
* @param predicate 接收⼀个参数,⽤于判断是否满⾜⼀定的条件,过滤数据.
* @return
*/
private static OperationType getOperationType(Predicate<OperationType> predicate){
OperationType[] values = values();
for (OperationType operationType : values) {
if(predicate.test(operationType)){
return operationType;
}
}
//遍历找不到就退出
throw new AssertionError("no found type");
}
}
Operation
public abstract class Operation extends MessageBody{
public abstract OperationResult execute();
}
OperationResult
import lombok.Data;
@Data
public abstract class OperationResult extends MessageBody{
}
AuthOperation
import io.netty.example.study.common.Operation;
import lombok.Data;
import lombok.extern.java.Log;
@Data
@Log
public class AuthOperation extends Operation {
private final String userName;
private final String password;
@Override
public AuthOperationResult execute() {
if("admin".equalsIgnoreCase(this.userName)){
AuthOperationResult orderResponse = new AuthOperationResult(true);
return orderResponse;
}
return new AuthOperationResult(false);
}
}
AuthOperationResult
import io.netty.example.study.common.OperationResult;
import lombok.Data;
@Data
public class AuthOperationResult extends OperationResult {
private final boolean passAuth;
}
KeepaliveOperation
import io.netty.example.study.common.Operation;
import lombok.Data;
import lombok.extern.java.Log;
@Data
@Log
public class KeepaliveOperation extends Operation {
private long time ;
public KeepaliveOperation() {
this.time = System.nanoTime();
}
@Override
public KeepaliveOperationResult execute() {
KeepaliveOperationResult orderResponse = new KeepaliveOperationResult(time);
return orderResponse;
}
}
KeepaliveOperationResult
import io.netty.example.study.common.OperationResult;
import lombok.Data;
@Data
public class KeepaliveOperationResult extends OperationResult {
private final long time;
}
OrderOperation
import io.netty.example.study.common.Operation;
import lombok.Data;
@Data
public class OrderOperation extends Operation {
private int tableId;
private String dish;
public OrderOperation(int tableId, String dish) {
this.tableId = tableId;
this.dish = dish;
}
@Override
public OrderOperationResult execute() {
System.out.println("order's executing startup with orderRequest: " + toString());
//execute order logic
System.out.println("order's executing complete");
OrderOperationResult orderResponse = new OrderOperationResult(tableId, dish, true);
return orderResponse;
}
}
OrderOperationResult
import io.netty.example.study.common.OperationResult;
import lombok.Data;
@Data
public class OrderOperationResult extends OperationResult {
private final int tableId;
private final String dish;
private final boolean complete;
}
IdUtil
IdUtil
import java.util.concurrent.atomic.AtomicLong;
public final class IdUtil {
private static final AtomicLong IDX = new AtomicLong();
private IdUtil(){
//no instance
}
/**
* 生成 streamId 自增
* @return
*/
public static long nextId(){
return IDX.incrementAndGet();
}
}
JsonUtil
public final class JsonUtil {
private static final Gson GSON = new Gson();
private JsonUtil() {
//no instance
}
public static <T> T fromJson(String jsonStr, Class<T> clazz){
return GSON.fromJson(jsonStr, clazz);
}
public static String toJson(Object object){
return GSON.toJson(object);
}
}
服务端
OrderFrameDecoder (一次解码:得出一个没有粘包、半包问题的 ByteBuf,ByteBuf => ByteBuf)
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
public OrderFrameDecoder() {
super(Integer.MAX_VALUE, 0, 2, 0, 2);
}
}
OrderProtocolDecoder (二次解码:ByteBuf => RequestMessage)
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
RequestMessage requestMessage = new RequestMessage();
requestMessage.decode(byteBuf);
//将解码后的数据传递出去
out.add(requestMessage);
}
}
OrderServerProcessHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.example.study.common.Operation;
import io.netty.example.study.common.OperationResult;
import io.netty.example.study.common.RequestMessage;
import io.netty.example.study.common.ResponseMessage;
/**
* SimpleChannelInboundHandler VS ChannelInboundHandlerAdapter
* 前者会自动释放自动释放 解码阶段使用的 ByteBuf(内存池 Or 堆外内存)
*/
public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
//Operation = OrderOperation (MessageBody => Operation)
Operation operation = requestMessage.getMessageBody();
//OperationResult = OrderOperationResult (执行 Operation 得到 OperationResult)
OperationResult operationResult = operation.execute();
//构建 ResponseMessage = MessageHead + MessageResult
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.setMessageHeader(requestMessage.getMessageHeader());
responseMessage.setMessageBody(operationResult);
//将业务处理后的结果(RequestMessage)返回客户端 (进入 OrderProtocolEncoder 、OrderFrameEncoder)
ctx.writeAndFlush(responseMessage);
}
}
OrderProtocolEncoder (编码:RequestMessage => ByteBuf)
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.example.study.common.ResponseMessage;
import io.netty.handler.codec.MessageToMessageEncoder;
/**
* 泛型 代表 Input 类型ResponseMessage
*/
public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage, List<Object> out) throws Exception {
//先申请(分配)一个 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
//将 ResponseMessage encode 到 ByteBuf
responseMessage.encode(buffer);
//完成到 ResponseMessage => ByteBuf
out.add(buffer);
}
}
OrderFrameEncoder (编码:得出一个没有粘包、半包问题的 ByteBuf (方便客户端处理 TCP 半包和粘包) ByteBuf => ByteBuf)
import io.netty.handler.codec.LengthFieldPrepender;
/**
*
*/
public class OrderFrameEncoder extends LengthFieldPrepender {
public OrderFrameEncoder() {
super(2);
}
}
Server
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.study.server.codec.OrderFrameDecoder;
import io.netty.example.study.server.codec.OrderFrameEncoder;
import io.netty.example.study.server.codec.OrderProtocolDecoder;
import io.netty.example.study.server.codec.OrderProtocolEncoder;
import io.netty.example.study.server.handler.OrderServerProcessHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.concurrent.ExecutionException;
public class Server {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//设置 NioServerSocketChannel 模式
serverBootstrap.channel(NioServerSocketChannel.class);
//打印日志
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
//创建 NioEventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
try{
//绑定 NioEventLoopGroup
serverBootstrap.group(group);
//handler()是发生在初始化的时候,childHandler()是发生在客户端连接之后。
//如果需要在客户端连接前的请求进行handler处理,则需要配置handler()
//如果是处理客户端连接之后的handler,则需要配置在childHandler()。
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//解码正序,编码逆序
pipeline.addLast(new OrderFrameDecoder()); //first decode
pipeline.addLast(new OrderFrameEncoder()); //first encode
pipeline.addLast(new OrderProtocolEncoder()); //seconds encode
pipeline.addLast(new OrderProtocolDecoder()); //seconds decode
pipeline.addLast(new OrderServerProcessHandler()); //Handler
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
//启动并同步等待完成
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客户端
OrderProtocolEncoder (RequestMessage => ByteBuf)
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.example.study.common.RequestMessage;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
public class OrderProtocolEncoder extends MessageToMessageEncoder<RequestMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, RequestMessage requestMessage, List<Object> out) throws Exception {
//分配一个 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
//将 RequestMessage encode 到 ByteBuf 中
requestMessage.encode(buffer);
out.add(buffer);
}
}
OrderFrameEncoder (ByteBuf => ByteBuf (解决 TCP 粘包、半包))
import io.netty.handler.codec.LengthFieldPrepender;
public class OrderFrameEncoder extends LengthFieldPrepender {
public OrderFrameEncoder() {
super(2);
}
}
OrderFrameDecoder (ByteBuf => ByteBuf (解决 TCP 粘包、半包))
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
public OrderFrameDecoder() {
super(Integer.MAX_VALUE, 0, 2, 0, 2);
}
}
OrderProtocolDecoder (ByteBuf => ResponseMessage)
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.example.study.common.ResponseMessage;
import io.netty.handler.codec.MessageToMessageDecoder;
/**
* 泛型 代表 input 类型 ByteBuf
*/
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
ResponseMessage responseMessage = new ResponseMessage();
//将 ByteBuf => ResponseMessage
responseMessage.decode(byteBuf);
out.add(responseMessage);
}
}
OperationToRequestMessageEncoder (负责)
import io.netty.channel.ChannelHandlerContext;
import io.netty.example.study.common.Operation;
import io.netty.example.study.common.RequestMessage;
import io.netty.example.study.util.IdUtil;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
/**
* 泛型 代表 input 类型 Operation
* 负责 OrderOperation => RequestMessage
*/
public class OperationToRequestMessageEncoder extends MessageToMessageEncoder <Operation> {
@Override
protected void encode(ChannelHandlerContext ctx, Operation operation, List<Object> out) throws Exception {
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), operation);
out.add(requestMessage);
}
}
ClientV1 (无法能到响应结果,只能从日志中打印看到)
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.study.client.codec.*;
import io.netty.example.study.common.order.OrderOperation;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.concurrent.ExecutionException;
public class ClientV1 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
NioEventLoopGroup group = new NioEventLoopGroup();
try{
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OperationToRequestMessageEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090);
channelFuture.sync();
OrderOperation orderOperation = new OrderOperation(1001, "tudou");
channelFuture.channel().writeAndFlush(orderOperation);
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
完善客户端(响应分发)
通过 streamId 来实现第二种响应分发的方式
* 发起请求,立刻返回 Future
* 将 Future 存入 Map 中,Key 为 streamId
* Response 到达客户端先去 Map 中根据 streamID 查找对应的 Future 对象,将 Response 设置到 Future 中
* 调用该 Future 对象的 get() 方法 :future.get()
OperationResultFuture
import io.netty.example.study.common.OperationResult;
import io.netty.util.concurrent.DefaultPromise;
/**
* JDK Future不好用,使用 Netty 的 DefaultPromise
* V 代表 OperationResult
*/
public class OperationResultFuture extends DefaultPromise<OperationResult> {
}
RequestPendingCenter
import io.netty.example.study.common.OperationResult;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 请求等待中心
*/
public class RequestPendingCenter {
private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();
/**
* 将 Future 存入 Map 中,Key 为 streamId;
* @param streamId
* @param future
*/
public void add(Long streamId, OperationResultFuture future){
this.map.put(streamId, future);
}
/**
* 根据 streamID 查找对应的 Future 对象,将 Response(OperationResultFuture) 设置到 Future 中
* @param streamId
* @param operationResult
*/
public void set(Long streamId, OperationResult operationResult){
OperationResultFuture operationResultFuture = this.map.get(streamId);
if (operationResultFuture != null) { //判断 OperationResultFuture 存在才设置
operationResultFuture.setSuccess(operationResult);
this.map.remove(streamId); //从 map 中移除此 key-value
}
}
}
ResponseDispatcherHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.example.study.common.ResponseMessage;
/**
* 响应分发Handler
*/
public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {
private RequestPendingCenter requestPendingCenter;
/**
* 构造方法注入 RequestPendingCenter
* @param requestPendingCenter
*/
public ResponseDispatcherHandler(RequestPendingCenter requestPendingCenter) {
this.requestPendingCenter = requestPendingCenter;
}
/**
* 解析 ResponseMessage 根据 streamId 找到 Map 指定 key 对应的 Future 对象
* 调用
* @param ctx
* @param responseMessage
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage responseMessage) throws Exception {
requestPendingCenter.set(responseMessage.getMessageHeader().getStreamId(), responseMessage.getMessageBody());
}
}
ClientV2
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.study.client.codec.*;
import io.netty.example.study.client.handler.dispatcher.OperationResultFuture;
import io.netty.example.study.client.handler.dispatcher.RequestPendingCenter;
import io.netty.example.study.client.handler.dispatcher.ResponseDispatcherHandler;
import io.netty.example.study.common.OperationResult;
import io.netty.example.study.common.RequestMessage;
import io.netty.example.study.common.order.OrderOperation;
import io.netty.example.study.util.IdUtil;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.concurrent.ExecutionException;
public class ClientV2 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
NioEventLoopGroup group = new NioEventLoopGroup();
try{
bootstrap.group(group);
//请求等待中心
RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());
// ResponseDispatcherHandler 需要注入 RequestPendingCenter
// 调用 RequestPendingCenter.set(){ operationResultFuture.setSuccess(operationResult); }
pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
pipeline.addLast(new OperationToRequestMessageEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090);
channelFuture.sync();
//生成唯一 streamID
long streamId = IdUtil.nextId();
//请求数据 RequestMessage
RequestMessage requestMessage = new RequestMessage(
streamId, new OrderOperation(1001, "tudou"));
//操作结果占位符 OperationResultFuture
OperationResultFuture operationResultFuture = new OperationResultFuture();
//发送数据之前将数据,将请求根据 streamId 存入 Map中
requestPendingCenter.add(streamId, operationResultFuture);
//发送数据
channelFuture.channel().writeAndFlush(requestMessage);
//Future 阻塞直到
OperationResult operationResult = operationResultFuture.get();
//打印结果
System.out.println(operationResult);
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
控制台输出
# 客户端
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xd3e0b866] REGISTERED
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0xd3e0b866] CONNECT: /127.0.0.1:8090
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] ACTIVE
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler write
信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] WRITE: Message(messageHeader=MessageHeader(version=1, opCode=3, streamId=1), messageBody=OrderOperation(tableId=1001, dish=tudou))
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler flush
信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] FLUSH
OrderOperationResult(tableId=1001, dish=tudou, complete=true)
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd3e0b866, L:/127.0.0.1:5564 - R:/127.0.0.1:8090] READ COMPLETE
# 服务器端
四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xf9e86570] REGISTERED
四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xf9e86570] BIND: 0.0.0.0/0.0.0.0:8090
四月 12, 2020 5:20:40 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] ACTIVE
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] READ: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564]
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xf9e86570, L:/0:0:0:0:0:0:0:0:8090] READ COMPLETE
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] REGISTERED
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] ACTIVE
order's executing startup with orderRequest: OrderOperation(tableId=1001, dish=tudou)
order's executing complete
四月 12, 2020 5:20:58 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x69f4fe2d, L:/127.0.0.1:8090 - R:/127.0.0.1:5564] READ COMPLETE
Netty 日志的原理以使用
Netty 日志框架原理
增加所有log依赖(option),但是没有Jar包,动态判断具体使用哪个Log。
修改 JDK logger 级别
E:\Tools\JDK8\jre\lib\logging.properties
.level= FINE //JDK没有DEBUG级别 只有FINE
使用 slf4j + log4j 示例
加上 pom.xml 依赖就可以使用具体的log日志框架。
&emps;还需要添加 log4j 配置。
pom.xml
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.22</version>
</dependency>
log4j.properties
log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} [%t] %C{1}: %m%n
衡量好 logging handler 的位置和级别
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
......
让应用易追踪
完善 “线程名”
//创建 NioEventLoopGroup
NioEventLoopGroup boss = new NioEventLoopGroup(0,new DefaultThreadFactory("boss"));
NioEventLoopGroup work = new NioEventLoopGroup(0,new DefaultThreadFactory("work"));
try{
//绑定 NioEventLoopGroup
serverBootstrap.group(boss,work);
....
}
完善 “Handler” 名称
pipeline.addLast("frameDecoder",new OrderFrameDecoder()); //first decode
使用好 Netty 的日志
跟踪诊断:可视化
示例:统计并展示当前那系统连接数
* Console 日志定时输出
* JMX 实时展示
* ELKK、TIG、etc (复杂,需要各种软件配合)
引入依赖 (度量)
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.1.1</version>
</dependency>
MetricsHandler
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ChannelHandler.Sharable //多个线程共享
/**
* ChannelDuplexHandler 同时支持 Input和 Output 的Handler
*/
public class MetricsHandler extends ChannelDuplexHandler {
//记录当前连接数
private AtomicLong totalConnectionNumber = new AtomicLong();
//代码块
{
MetricRegistry metricRegistry = new MetricRegistry();
//name可以采用同名,定义 metrics 类型 Long
metricRegistry.register("totalConnectionNumber", new Gauge<Long>() {
@Override
public Long getValue() {
return totalConnectionNumber.longValue();
}
});
//console 方式展示数据
ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
consoleReporter.start(10, TimeUnit.SECONDS);
//jmx 格式展示数据
JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
jmxReporter.start();
}
/**
* 创建连接 ++totalConnectionNumber
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.incrementAndGet();
super.channelActive(ctx);
}
/**
* 释放连接 --totalConnectionNumber
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.decrementAndGet();
super.channelInactive(ctx);
}
}
添加 Handler(度量)
pipeline.addLast("metricHandler",new MetricsHandler());
Netty 值得可视化的数据 - “外在”
Netty 值得可视化的数据 –“内在”
跟踪诊断:让应用内存不“泄露”
Netty 内存泄漏是指什么?
原因:“忘记”release
ByteBuf buffer = ctx.alloc().buffer();
buffer.release() / ReferenceCountUtil.release(buffer) //忘记 release
后果:资源未释放 -> OOM
* 堆外:未 free(PlatformDependent.freeDirectBuffer(buffer));
* 池化:未归还 (recyclerHandle.recycle(this))
Netty 内存泄漏检测核心思路
Netty 内存泄漏检测核心思路:引用计数(buffer.refCnt())+ 弱引用(Weak reference)
• 引用计数
+1, 过 -1, = 0 时:尘归尘,土归土,资源也该释放了
那什么时候判断?“盖棺定论”时 -> 对象被 GC 后
• 强引用与弱引用
#强引用
String 我是战斗英雄型强保镖 = new String(我是主人));
# 弱引用
WeakReference<String> 我是爱写作的弱保镖 = new WeakReference<String>(new String(我是主人));
只有一个爱写作的保镖(弱引用)守护(引用)时:刺客(GC)来袭,主人(referent)必挂(GC掉)。
不过主人挂掉的(被 GC 掉)时候,我还是可以发挥写作特长:把我自己记到“小本本(ReferenceQueue)”上去。
# WeakReference<String> 我是爱写作的弱保镖 = new WeakReference<String>(new String(我是主人),我的小本本ReferenceQueue);
示例:用 Netty 内存泄漏检测工具做检测