在RPC通信中,对于接收到的二进制字节流,我们必须要能确定每一条消息的边界,然后进行反序列化解析成相应的消息对象,去进行相应的业务处理。
常用的确定消息边界的方式有两种:
1、长度前缀法。就是在每一条消息的前面固定几个字节来表示当前这条消息的长度,以此来界定一条完整的消息;
2、字符分隔法。在一串二进制流中,每两条消息之间、每个字段之间以特殊的字符分隔开,来确定每条消息每个字段。
Netty实现了一些列开箱即用的消息处理器。本篇我们来重点看看基于长度前缀法的思路实现的LengthFieldBasedFrameDecoder。
image.png
处理器继承ChannelInboundHandlerAdapter,用来对接收到的字节流进行处理。
首先我们来看看使用示例:
new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
只需要指定三个参数,就能将字节流拆包成一条条完整的消息ByteBuf,简直太方便了。
maxFrameLength:最大消息长度,超过会抛出异常TooLongFrameException
lengthFieldOffset:长度域的起始偏移
lengthFieldLength:长度域占用字节数
lengthAdjustment:帧长度的调整字段,用来根据具体情况调整实际读取的帧字节数
initialBytesToStrip:如果业务中用不到消息长度,指定需要跳过的字节数可以直接截掉
核心代码逻辑如下:
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
//lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
//可读字节数小于长度域结束偏移,放弃处理
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
//ByteBuf中长度域起始偏移
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
//未调整的帧长度,长度域中实际存储的值
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"negative pre-adjustment length field: " + frameLength);
}
//包含长度域在内的帧长度
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
if (discard < 0) {
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else {
// Enter the discard mode and discard everything received so far.
discardingTooLongFrame = true;
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
return null;
}
if (initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than initialBytesToStrip: " + initialBytesToStrip);
}
//需要截掉的字节数
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
//实际读取的帧长度
int actualFrameLength = frameLengthInt - initialBytesToStrip;
//有效消息体
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
//修改下标
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
最终的处理结果frame就是一条完整的消息帧。