Netty - 核心编解码器
netty 最核心的部分就是掌握 ByteBuf 的用法原理,业务逻辑主要是写ChannelHandler,以及各种编解码器。
1、ChannelInboundHandlerAdapter & ChannelOutboundHandlerAdapter - 核心实现类
这俩实现类都继承了 io.netty.channel.ChannelHandlerAdapter
类,基本实现都需要用户去实现,所以他基本上对于用户来说是透明的,我们开发可以控制我们全部的输入输出,对象进行释放等等。
API 方法
// 判断是否和其他管道共享数据 ,每一个客户端连接会申请一个管道, 就和别人共享
isSharable
// 每一个客户端只会注册一次handler
handlerAdded
// 每一个客户端只会一次注册
channelRegistered
// 活跃
channelActive
// 接收客户端信息
channelRead -> write/flush操作
// out输出流处理
out : write
out : flush
// 已经写出去了 ....
channelReadComplete
// 断开 ....
channelInactive
channelUnregistered
handlerRemoved
1. exceptionCaught() 和 handlerRemoved() 事件何时触发
- exceptionCaught():异常关闭,比如关闭客户端或者服务端手动关闭
- handlerRemoved():正常关闭执行,比如执行
ctx.close()
所以一般场景是:
- 服务端执行了
ctx.close()
,此时客户端应当在handlerRemoved()
中执行被关闭的业务逻辑,同时服务器端也是,当服务器异常关闭的时候,客户端的exceptionCaught()
事件会触发 - 当客户端执行了
ctx.close()
,服务器端的handlerRemoved()
事件就会触发,当客户端异常关闭,服务器端的exceptionCaught()
事件会触发
所以,异常关闭事件时exceptionCaught()
,正常关闭时handlerRemoved()
2. ctx.writeAndFlush(msg) 与 ct.channel().writeAndFlush(msg) 区别
ChannelHandlerContext.writeAndFlush(msg);
比如目前的
pipeline.addlast(out1,in,out2)
,此时比如in执行了ChannelHandlerContext.writeAndFlush(msg);
,此时解码器只会走out1 , 就是从开头到in, 也就是执行out1.write -> ou1.flush
ChannelHandlerContext.channel().writeAndFlush(msg);
而这个呢, 还是上面种情况 , 此时我in输出改成了
ChannelHandlerContext.channel().writeAndFlush(msg);
此时会从开头到最后走一遍 , 也就是会从out1.write -> out2.write -> ou1.flush -> ou2.flush
3. ctx.write() 和 ctx.flush() 和 ctx.writeAndFlush()
write就是写 ,flush就是推到缓冲区发出去
ctx.write()
会调用输出流的 write
方法 , 同时 flush也是 , 然后 writeAndFlush
是两者的结合体,俩都执行
2、MessageToByteEncoder 与 ByteToMessageDecoder 编解码器
这俩类实现了上面我们刚刚提到的俩编解码器的实现, 所以他基本上封装了他主要的方法 , 所以我们主要关注io.netty.handler.codec.ByteToMessageDecoder#channelRead
这个方法。都是抽象类,所以需要我们继承,重写对应的encode() 和 decode() 方法
1. MessageToByteEncoder\<I\> 编码器
将 发送出去的信息 转换成 ByteBuf
对象。这个泛型参数I,就是输入的消息。
由于我们执行写出操作,比如 ctx.write() 或者 writeAndFlush操作,会调用输出流的write方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
// 我们发送的消息
I cast = (I) msg;
// 分配内存
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 解码-> 交给我们去写
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
//ChannelOutboundInvoker 处理链
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
// 他会帮助我们释放一次,所以不需要我们手动释放我们输出的对象
if (buf != null) {
buf.release();
}
}
}
// 比如:自定义实现这个类,会让我们实现encode方法
public class IntegerEncoder extends MessageToByteEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
throws Exception {
out.writeInt(msg);
}
}
2. ByteToMessageDecoder 解码器(重点)
他重写了 ChannelInboundHandlerAdapter
类 , 重写了父类好多方法 ,我们主要关注 io.netty.handler.codec.ByteToMessageDecoder#channelRead
这个方法
基本核心的文档内容在下面 :
ChannelInboundHandlerAdapter which decodes bytes in a stream-like fashion from one ByteBuf to an other Message type.
就是将输入进来的
ByteBuf
转换成我们想要的数据类型对象 , 添加到他的集合中 Generally frame detection should be handled earlier in the pipeline by adding a DelimiterBasedFrameDecoder, FixedLengthFrameDecoder, LengthFieldBasedFrameDecoder, or LineBasedFrameDecoder.
通常,通过添加DelimiterBasedFrameDecoder、FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder或LineBasedFrameDecoder,可以在管道中更早地处理帧检测。
io.netty.handler.codec.ByteToMessageDecoder#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断 , 他是不是ByteBuf 类型
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
// first来判断 cumulation(buf)里面是否缓存了前面的数据
first = cumulation == null;
// 这里就会将 前面的累积的数据 与新传进来的数据合并在一起,然后 cumulation 里面 存取当前的所有数据
// 这里使用了策略模式:cumulator。默认:内存复制积累器
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
// 这里调用解码方法
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally { // 上面的 callDecode()方法里面如果执行了break、等,就会执行这里finally
// 读完就要释放,所以一般不需要我们手动释放
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
...
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
// 向下传递数据
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
解码中两种数据积累器(Cumulator) 的区别。
使用了策略模式
MERGE_CUMULATOR (
默认方式
)使用了内存复制 (using memory copies)
COMPOSITE_CUMULATOR
组合。对外提供一个逻辑的统一视图
io.netty.handler.codec.ByteToMessageDecoder#callDecode
方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 当可读,就一直执行,就是读指针 < 写指针
while (in.isReadable()) {
int outSize = out.size();
// 一般不会走这里
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
...
outSize = 0;
}
// 记录可读长度
int oldInputLength = in.readableBytes();
// 调用这里。这里面就包括了调用自己实现的decode()方法
// 在decode中时,不能执行handler remove清理操作
// 那decode完之后,需要清理数据
decodeRemovalReentryProtection(ctx, in, out);
...
// 这里就说明没有往out里面添加数据
if (outSize == out.size()) {
// 如果读指针没有移动过,
if (oldInputLength == in.readableBytes()) {
break;
} else {
//
continue;
}
}
...
}
} catch (DecoderException e) {
...
}
}
io.netty.handler.codec.ByteToMessageDecoder#decodeRemovalReentryProtection
方法
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
// decodeState状态:处理handler 被remove的情况
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 这里调用的是个抽象方法,实际调用的就是自己实现这个类时,具体实现的方法内容
// 运用了模板模式
decode(ctx, in, out);
} finally {
...
}
}
io.netty.handler.codec.ByteToMessageDecoder#decode
抽象方法,需要我们去实现
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
3、MessageToMessageDecoder 和 MessageToMessageEncoder 编解码器
1. MessageToMessageEncoder\<I\> 编码器
他继承了
ChannelOutboundHandlerAdapter
类 , 所以是一个输出流 编码器 ,这里的泛型指的是输入端 , out里添加输出 , 这个不管你是啥, 一般情况都是ByteBuf对象.
这里通常就是将Java对象变成 ByteBuf对象。泛型I就是Java对象
public class IntegerToStringEncoder extends
MessageToMessageEncoder<Integer> {
// 差不多,很简单
@Override
public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out)
throws Exception {
out.add(message.toString());
}
}
2. MessageToMessageDecoder\<I\> 解码器
这里的泛型指的是输入端 , 输出中添加你想添加的对象。
这里通常就是将 ByteBuf对象 变成Java对象。泛型I就是ByteBuf对象
重写下面的方法就行 ,下面这个例子表示的是输入是String, 输出是Int 类型
public class StringToIntegerDecoder extends
MessageToMessageDecoder<String> {
@Override
public void decode(ChannelHandlerContext ctx, String message,
List<Object> out) throws Exception {
out.add(message.length());
}
}
4、SimpleChannelInboundHandler\<I\>
与我们直接继承
ChannelInboundHandlerAdapter
相比的优势:
- acceptInboundMessage方法可以判断当前传递的msg是否可以被当前Handler处理
- 他可以帮助我们自动释放内存
泛型I
是我们已经解码后的类型,他会将解码后的类型传递给我们,同时他并不需要我们手动释放,他实现了ChannelInboundHandlerAdapter
类,
基本上他是不需要我们做类型转换的。开发时,只需要专注实现抽象方法 channelRead0
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 每次都会release初始化为true
boolean release = true;
try {
// 判断类型,因为SimpleChannelInboundHandler有一个泛型I, 就是判断和他记录的类型是否相同
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
// 这个就是我们写的业务逻辑
channelRead0(ctx, imsg);
} else {
// 如果不是 I这个类型,当前handler就不会处理
release = false;
// 直接像后面的Hanler传递就行
ctx.fireChannelRead(msg);
}
} finally {
// 最后会自动给我们释放 ,所以我们不需要人工去释放,会造成不必要的浪费
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
io.netty.channel.SimpleChannelInboundHandler#channelRead0
抽象方法
protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
5、一般用“两层”编解码器
为什么需要“二次”解码
把解决粘包和半包问题的常用三种解码器叫一次解码器
因为一次解码的结果是字节,所以需要将其转换为对象。称为“二次解码器”
。
反之,编码器也是类似。
一次解码器:ByteToMessageDecoder
ByteBuf(原始数据流) -> ByteBuf(用户数据) --> 这里处理了粘包和半包问题
二次解码器:MessageToMessageDecoder\<I\>
ByteBuf(用户数据) -> Java Object
可以合并成一步到位?
可以,但不建议。原因:没有分层,不够清晰;耦合性高,不容易置换方案
常用的“二次编解码”方式
Java序列化、XML、JSON、MessagePack、Protobuf、其他
Netty 对二次编解码的支持
例子:io.netty.example.worldclock.WorldClockClientInitializer#initChannel