开发网站的目标,做网站商家,福州市建设厅网站,wordpress主题模板源码操作系统实现的传输层tcp协议中#xff0c;向上层的应用保证尽最大可能的(best effort delivery)、可靠的传输字节流#xff0c;但并不关心实际传输的数据包是否总是符合应用层的要求。应用层有时候会在短时间内向对端发送N个业务逻辑上独立的请求#xff0c;而操作系统tcp层…操作系统实现的传输层tcp协议中向上层的应用保证尽最大可能的(best effort delivery)、可靠的传输字节流但并不关心实际传输的数据包是否总是符合应用层的要求。应用层有时候会在短时间内向对端发送N个业务逻辑上独立的请求而操作系统tcp层面出于效率的考虑并不会按照应用层的逻辑划分一个一个独立的进行消息的发送而是会基于当前的网络负载尽可能的多的将消息数据批量发送。这使得我们在EventLoop事件循环中read读取到的数据并不总是独立、完整的符合应用层逻辑划分的消息数据。黏包问题 假设应用层发送的一次请求数据量比较小(比如0.01kb)tcp层可能不会在接到应用请求后立即进行传输而是会稍微等待一小会。这样如果应用层在短时间内需要传输多次小数据量的请求就可以攒在一起批量传输传输效率会高很多。但这带来的问题就是接收端一次接受到的数据包内应用程序逻辑上的多次请求黏连在了一起需要通过一些方法来将其拆分还原为一个个独立的信息给应用层。拆包问题 假设应用层发送的一次请求数据量比较大(比如10Mb)而tcp层的数据包容量的最大值是有限的所以应用层较大的一次请求数据会被拆分为多个包分开发送。这就导致接收端接受到的某个数据包其实并不是完整的应用层请求数据没法直接交给应用程序去使用而必须等待后续对应请求的所有数据包都接受完成后才能组装成完整的请求对象再交给应用层处理。当然导致黏包拆包的场景远不止上述的那么单一整体的网络负载变化等都可能导致黏包/拆包的现象。可以说黏包/拆包问题并不能看做是tcp自身的问题而是应用层最终需求与tcp传输层功能不匹配导致的问题。tcp出于传输效率的考虑无法很好的解决这个问题所以黏包拆包问题最终只能在更上面的应用层自己来处理。黏包拆包示意图接受到的一个数据包中可能同时存在黏包问题和拆包问题(如下图所示)img黏包/拆包问题解决方案解决黏包/拆包问题最核心的思路是如何确定一个应用层完整消息的边界。对于黏包问题基于边界可以独立的拆分出每一个消息对于拆包问题如果发现收到的数据包末尾没有边界则继续等待新的数据包逐渐累积直到发现边界后再一并上交给应用程序。主流的解决黏包拆包的应用层协议设计方案有三种介绍 优点 缺点1.基于固定长度的协议 每个消息都是固定的大小如果实际上小于固定值则需要填充 简单;易于实现 固定值过大填充会浪费大量传输带宽固定值过小则限制了可用的消息体大小2.基于特殊分隔符的协议 约定一个特殊的分隔符以这个分割符为消息边界 简单;且消息体长度是可变的性能好 消息体的业务数据不允许包含这个特殊分隔符否则会错误的拆分数据包。因此兼容性较差3.基于业务数据长度编码的协议 设计一个固定大小的消息请求头(比如固定16字节、20字节大小)在消息请求头中包含实际的业务消息体长度 消息体长度可变性能好对业务数据内容无限制兼容性也好 实现起来稍显复杂上述这段关于黏包/拆包问题的内容基本copy自我2年前的关于手写简易rpc框架的博客自己动手实现rpc框架(一) 实现点对点的rpc通信。只是当时我仅仅是一个对Netty不甚了解的使用者简单的使用Netty来实现rpc框架中基本的网络通信功能并通过MessageToByteEncoder/ByteToMessageDecoder来实现通信协议处理黏包拆包问题。而现在却尝试着参考Netty的源码通过自己亲手实现这些编解码器的核心逻辑来进一步加深对Netty的理解这种感觉还是挺奇妙的。2. Netty解决黏包/拆包问题的通用编解码器Netty的设计者希望用户在pipeline中添加各式各样的入站和出站的Handler组合起来共同完成复杂的业务逻辑。对发送的消息进行编码、将接收到的消息进行解码毫无疑问也是业务逻辑的一部分所以Netty编解码器是以Handler的形式存在的。Netty中解决黏包/拆包问题的编解码器是通用的在实现基本功能的前提下也要给使用者一定的灵活性来定制自己的功能。因此Netty提供了一些基础的父类Handler完成通用的处理逻辑并同时留下一些抽象的方法交给用户实现的子类去实现自定义的编解码业务逻辑。下面我们通过一个简单但又不失一般性的例子来展示Netty的通用编解码器的用法并结合源码分析其解决黏包/拆包的具体原理。我们首先设计一个基于业务数据长度编码的、非常简单的通信协议MySimpleProtocol消息帧共分为3个部分其中前4个字节是一个int类型的魔数0x2233用于解码时校验协议是否匹配再往后的4个字节则用于标识消息体的长度最后就是消息体的内容消息体的内容是EchoMessageFrame对象的json字符串。MySimpleProtocol协议示意图img_1public class EchoMessageFrame {/*** 协议魔数随便取的* */public static final int MAGIC 0x2233;/*** 消息内容实际消息体的json字符串* */private String messageContent;/*** 用于校验解码是否成功的属性* */private Integer msgLength;}客户端/服务端public class ClientDemo {public static void main(String[] args) throws IOException {DefaultChannelConfig defaultChannelConfig new DefaultChannelConfig();defaultChannelConfig.setInitialReceiveBufferSize(1024); // 设置小一点方便测试defaultChannelConfig.setAllocator(new MyPooledByteBufAllocator()); // 测试池化ByteBuf功能MyNioClientBootstrap myNioClientBootstrap new MyNioClientBootstrap(new InetSocketAddress(8080),new MyChannelPipelineSupplier() {Overridepublic MyChannelPipeline buildMyChannelPipeline(MyNioChannel myNioChannel) {MyChannelPipeline myChannelPipeline new MyChannelPipeline(myNioChannel);// 解码器解决拆包、黏包问题myChannelPipeline.addLast(new MyLengthFieldBasedFrameDecoder(1024 * 1024, 4, 4));// 注册自定义的EchoClientEventHandlermyChannelPipeline.addLast(new EchoMessageEncoderV2());myChannelPipeline.addLast(new EchoMessageDecoderV2());myChannelPipeline.addLast(new EchoClientEventHandlerV2());return myChannelPipeline;}}, defaultChannelConfig);myNioClientBootstrap.start();}}public class ServerDemo {public static void main(String[] args) throws IOException {DefaultChannelConfig defaultChannelConfig new DefaultChannelConfig();defaultChannelConfig.setInitialReceiveBufferSize(16); // 设置小一点方便测试defaultChannelConfig.setAllocator(new MyPooledByteBufAllocator()); // 测试池化ByteBuf功能MyNioServerBootstrap myNioServerBootstrap new MyNioServerBootstrap(new InetSocketAddress(8080),// 先简单一点只支持childEventGroup自定义配置pipelinenew MyChannelPipelineSupplier() {Overridepublic MyChannelPipeline buildMyChannelPipeline(MyNioChannel myNioChannel) {MyChannelPipeline myChannelPipeline new MyChannelPipeline(myNioChannel);// 解码器解决拆包、黏包问题myChannelPipeline.addLast(new MyLengthFieldBasedFrameDecoder(1024 * 1024, 4, 4));// 注册自定义的EchoServerEventHandlermyChannelPipeline.addLast(new EchoMessageEncoderV2());myChannelPipeline.addLast(new EchoMessageDecoderV2());myChannelPipeline.addLast(new EchoServerEventHandlerV2());return myChannelPipeline;}},1,5, defaultChannelConfig);myNioServerBootstrap.start();LockSupport.park();}}编解码流程图img_2Netty通用编码器Encoder原理解析编码器Encoder简单理解就是将逻辑上的一个数据对象从一种格式转换成另一种格式。Netty作为一个网络通信框架其中最典型的场景就是将内存中的一个消息对象转换成二进制的ByteBuf对象发送到对端所对应的便是MessageToByteEncoder。MessageToByteEncoder是一个抽象类重写了ChannelEventHandlerAdapter的write方法。由于Netty其底层出站时只会处理ByteBuf类型对象(以及FileRegion类型)MessageToByteEncoder作为一个出站处理器用于拦截出站的消息将匹配条件的对象按照一定的规则转换成ByteBuf对象。MyNetty MyMessageToByteEncoder实现/*** 基本copy自Netty的MessageToByteEncoder类但做了一些简化* */public abstract class MyMessageToByteEncoderI extends MyChannelEventHandlerAdapter {private final TypeParameterMatcher matcher;public MyMessageToByteEncoder(Class? extends I clazz) {this.matcher TypeParameterMatcher.get(clazz);}Overridepublic void write(MyChannelHandlerContext ctx, Object msg, boolean doFlush, CompletableFutureMyNioChannel completableFuture) throws Exception {MyByteBuf buf null;try {// 判断当前msg的类型和当前Encoder是否匹配if (acceptOutboundMessage(msg)) {// 类型匹配说明该msg需要由当前Encoder来编码将msg转化成ByteBuf用于输出SuppressWarnings(unchecked)I cast (I) msg;// 先分配一个ByteBuf出来buf ctx.alloc().heapBuffer();try {// 由子类实现的自定义逻辑进行编码将msg写入到buf中encode(ctx, cast, buf);} finally {// 编码完成尝试将当前被编码完成的消息释放掉MyReferenceCountUtil.release(cast);}// 将编码后的buf传到后续的outBoundHandler中(比起netty少了一个空buf的优化逻辑)ctx.write(buf, doFlush, completableFuture);buf null;} else {// 不匹配跳过当前的outBoundHandler直接交给后续的handler处理ctx.write(msg, doFlush, completableFuture);}} catch (Throwable e) {throw new RuntimeException(e);} finally {if (buf ! null) {// buf不为null说明编码逻辑有异常提前release掉buf.release();}}}protected abstract void encode(MyChannelHandlerContext ctx, I msg, MyByteBuf out) throws Exception;private boolean acceptOutboundMessage(Object msg) {return matcher.match(msg);}}public class EchoMessageEncoderV2 extends MyMessageToByteEncoderEchoMessageFrame {private static final Logger logger LoggerFactory.getLogger(EchoMessageEncoderV2.class);public EchoMessageEncoderV2() {super(EchoMessageFrame.class);}Overrideprotected void encode(MyChannelHandlerContext ctx, EchoMessageFrame msg, MyByteBuf out) {// 写事件从tail向head传播msg一定是EchoMessage类型String messageJson JsonUtil.obj2Str(msg);byte[] bytes messageJson.getBytes(StandardCharsets.UTF_8);// 写入魔数确保协议是匹配的out.writeInt(EchoMessageFrame.MAGIC);// LengthFieldBased协议先写入消息帧的长度out.writeInt(bytes.length);// 再写入消息体out.writeBytes(bytes);logger.info(EchoMessageEncoder message to byteBuffer, messageJson.length{}, myByteBuf{},messageJson.length(),out.toString(Charset.defaultCharset()));}}MessageToByteEncoder中有一个TypeParameterMatcher成员变量其用于判断write方法所接受到的msg对象是否是所匹配的类型。对于复杂的业务可以同时在流水线中设置针对不同类型消息对象的多个MessageToByteEncoder。MessageToByteEncoder中通过allocateBuffer方法基于要编码的消息对象创建出所需的ByteBuf对象用于承接编码后的二进制数据(allocateBuffer方法可以由子类覆盖)。然后调用子类实现的自定义encode方法进行实际的解码操作。encode方法返回之后如果ByteBuf对象不为空则会通过write方法将编码后的ByteBuf对象传递给pipeline中的下一个出站处理器。而在我们自己定义的EchoMessageEncoderV2中构造方法中设置为只处理EchoMessageFrame类型的对象。同时在重写的decode方法中将参数EchoMessageFrame消息对象按照我们自己约定的协议进行了编码。首先在消息头中写入固定的4字节协议魔数然后再接着写入消息体的长度(messageJson.length)最后将json字符串作为消息体整个写入bytebuf。在Netty提供了通用的编码器MessageToByteEncoder后用户在绝大多数情况下仅需要聚焦于如何将一个消息对象按照既定的协议转换成二进制的ByteBuf对象而不太需要关注ByteBuf对象的创建/释放也不用考虑消息在pipeline中是如何传递的。总之tcp层面的网络应用程序对消息按照特定协议进行编码是不可或缺的而通用的编码器屏蔽掉了底层的细节一定程度上简化了Netty使用者在实现编码逻辑时的复杂度。Netty通用编码器Decoder原理解析相比于编码器解决拆包/黏包问题的核心是更为复杂的解码器因为发送消息时按照约定进行编码的二进制数据包并不总是能恰好被对端完整且独立的接收到。因此Netty提供了通用的解码器来解决这个问题。解决黏包/拆包问题核心的解决思路是暂存累积所有读取到的数据包并在每次读取到新数据时按照所约定的协议尝试对所累积的数据整体进行decode解码每次decode解码时可能出现以下几种情况当前所暂存累积的数据不足以完整的解析出一个完整的消息说明出现了拆包现象。不需要做任何事情只需等待新数据到来继续尝试decode。当前所暂存累积的数据恰好能解析出一个完整的消息一个字节都不多。这是理想情况一般只在流量很小时出现。当前所暂存积累的数据能解析出一个完整的消息但解析完成后数据还有剩余。对剩余的数据继续进行decode还能解析出更多的消息说明出现了黏包现象。对剩余数据持续不断的decode尽可能的解码出更多的消息直到剩余的数据不足以解析完整的数据包。完整的解码出消息后需要及时的将对应的二进制数据释放掉避免累积暂存的数据占用过多的内存。要解决黏包/拆包问题解码器中最核心的两个功能就是对读取到的数据进行累积以及实际的消息解码。对数据进行暂存累积、并在解码后及时释放的功能是通用的在Netty的抽象类MessageToByteEncoder中实现了这个功能。而对消息实际的解码方式则是多种多样的取决于具体的协议因此decode操作需要交由子类去具体实现。Netty ByteToMessageDecoder原理MyNetty MyByteToMessageDecoder实现源码/*** 基本copy自Netty的ByteToMessageDecoder类但做了一些简化* */public abstract class MyByteToMessageDecoder extends MyChannelEventHandlerAdapter {/*** Byte累积容器用于积攒无法解码的半包Byte数据*/private MyByteBuf cumulation;/*** 累积器*/private Cumulator cumulator MERGE_CUMULATOR;private boolean first;private int numReads;private int discardAfterReads 16;/*** 将新接受到的ByteBuf in合并到cumulation中* 除此之外netty中还有另一种累积器COMPOSITE_CUMULATOR基于更复杂的ByteBuf容器CompositeByteBuf所以MyNetty中没有实现* MERGE_CUMULATOR很好理解就是把后面来的ByteBuf中的数据写入之前已有的ByteBuf中这里需要进行内存数据的复制。* 而COMPOSITE_CUMULATOR中使用CompositeByteBuf可以做到几乎没有内存数据的复制。因为CompositeByteBuf通过一系列巧妙的映射计算将实际上内存空间不连续的N个ByteBuf转换为了逻辑上连续的一个ByteBuf。* 因此MERGE_CUMULATOR合并时性能较差但实际解码读取数据时性能更好。而COMPOSITE_CUMULATOR在合并时性能较好而实际解码时性能较差。Netty中默认使用MERGE_CUMULATOR作为累加器。*/public static final Cumulator MERGE_CUMULATOR new Cumulator() {Overridepublic MyByteBuf cumulate(MyByteBufAllocator alloc, MyByteBuf cumulation, MyByteBuf in) {if (!cumulation.isReadable()// in.isContiguous()) {// 已有的cumulation已经被读取完毕了清理掉直接用新来的in代替之前的cumulation// If cumulation is empty and input buffer is contiguous, use it directly// 目前已实现的MyByteBuf都是contiguoustrue的cumulation.release();return in;}try {// 新来的ByteBuf一共有多少字节可供读取final int required in.readableBytes();// 需要合并的字节数超过了cumulation的当前的最大可写阈值需要令cumulation扩容if (required cumulation.maxWritableBytes() ||required cumulation.maxFastWritableBytes() cumulation.refCnt() 1) {return expandCumulation(alloc, cumulation, in);} else {// cumulation的空间足够能够存放in中的数据直接将in中的内容写入cumulation的尾部即可// 因为目前只支持基于数组的heapByteBuf所以直接in.array()cumulation.writeBytes(in, in.readerIndex(), required);// 将in设置为已读完(读指针等于写指针)in.readerIndex(in.writerIndex());return cumulation;}} finally {// in中的内容被写入到cumulation后in需要被release回收掉避免内存泄露in.release();}}};public MyByteToMessageDecoder() {// 解码器必须是channel级别的不能channel间共享, 因为内部暂存的流数据是channel级别的共享的话就全乱套了这里做个校验ensureNotSharable();}Overridepublic void channelRead(MyChannelHandlerContext ctx, Object msg) throws Exception {// 顾名思义ByteToMessageDecoder只处理Byte类型的数据if (msg instanceof MyByteBuf) {// 从Byte累积器中解码成功后的消息集合MyCodecOutputList out MyCodecOutputList.newInstance();// 是否是第一次执行解码this.first (this.cumulation null);MyByteBufAllocator alloc ctx.alloc();MyByteBuf targetCumulation;if (first) {// netty里使用Unpooled.EMPTY_BUFFER性能更好targetCumulation alloc.heapBuffer(0, 0);} else {targetCumulation this.cumulation;}// 通过累积器将之前已经收到的Byte消息与新收到的ByteBuf进行合并得到一个合并后的新ByteBuf(合并的逻辑中涉及到诸如ByteBuf扩容、msg回收等操作)this.cumulation this.cumulator.cumulate(alloc, targetCumulation, (MyByteBuf) msg);try {// 读取cumulation中的数据进行解码callDecode(ctx, cumulation, out);} finally {try {if (cumulation ! null !cumulation.isReadable()) {// 及时清理掉已经读取完成的累积器bufnumReads 0;cumulation.release();cumulation null;} else if (numReads discardAfterReads) {// We did enough reads already try to discard some bytes, so we not risk to see a OOME.// See https://github.com/netty/netty/issues/4275// 当前已读取的次数超过了阈值尝试对cumulation进行缩容(已读的内容清除掉)numReads 0;discardSomeReadBytes();}int size out.size();fireChannelRead(ctx, out, size);} finally {// 后续的handler在处理完成消息后将out列表回收掉out.recycle();}}} else {// 非ByteBuf类型直接跳过该handlerctx.fireChannelRead(msg);}}Overridepublic void channelReadComplete(MyChannelHandlerContext ctx) {numReads 0;// 完成了一次read操作对cumulation进行缩容discardSomeReadBytes();// 省略了一些autoRead相关的逻辑ctx.fireChannelReadComplete();}protected final void discardSomeReadBytes() {if (cumulation ! null !first cumulation.refCnt() 1) {cumulation.discardSomeReadBytes();}}/*** 对oldCumulation进行扩容并且将in中的数据写入到扩容后的ByteBuf中** return 返回扩容后并且合并完成后的ByteBuf*/static MyByteBuf expandCumulation(MyByteBufAllocator alloc, MyByteBuf oldCumulation, MyByteBuf in) {int oldBytes oldCumulation.readableBytes();int newBytes in.readableBytes();// 老的和新的可读字节总数int totalBytes oldBytes newBytes;// 基于totalBytes分配出新的cumulationMyByteBuf newCumulation alloc.heapBuffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));MyByteBuf toRelease newCumulation;try {// This avoids redundant checks and stack depth compared to calling writeBytes(...)// 用setBytes代替writeBytes性能好一点但是需要自己设置正确的写指针(因为setBytes不会自动推进写指针)newCumulation// 先写入oldCumulation的内容.setBytes(0, oldCumulation.array(), oldCumulation.readerIndex(), oldBytes)// 再写入in中的内容.setBytes(oldBytes, in.array(), in.readerIndex(), newBytes)// 再推进写指针.writerIndex(totalBytes);in.readerIndex(in.writerIndex());toRelease oldCumulation;return newCumulation;} finally {toRelease.release();}}/*** 将ByteBuf in中的数据按照既定的规则进行decode解码操作解码成功后的消息加入out列表* p* MyNetty暂不支持handler被remove省略了判断当前handler是否已经被remove的逻辑(ctx.isRemoved()、decodeRemovalReentryProtection等)*/protected void callDecode(MyChannelHandlerContext ctx, MyByteBuf in, ListObject out) {try {while (in.isReadable()) {final int outSize out.size();if (outSize 0) {// 当decode逻辑中成功解码了至少一个完整消息触发fireChannelRead将消息向后面的handler传递fireChannelRead(ctx, out, outSize);// 处理完成后将out列表及时清理掉out.clear();}int oldInputLength in.readableBytes();// 调用子类实现的自定义解码逻辑decode(ctx, in, out);if (out.isEmpty()) {if (oldInputLength in.readableBytes()) {break;} else {continue;}}if (oldInputLength in.readableBytes()) {throw new RuntimeException(getClass() .decode() did not read anything but decoded a message.);}}} catch (Exception cause) {throw new RuntimeException(cause);}}protected abstract void decode(MyChannelHandlerContext ctx, MyByteBuf in, ListObject out) throws Exception;static void fireChannelRead(MyChannelHandlerContext ctx, ListObject msgs, int numElements) {if (msgs instanceof MyCodecOutputList) {fireChannelRead(ctx, (MyCodecOutputList) msgs, numElements);} else {for (int i 0; i numElements; i) {ctx.fireChannelRead(msgs.get(i));}}}static void fireChannelRead(MyChannelHandlerContext ctx, MyCodecOutputList msgs, int numElements) {for (int i 0; i numElements; i) {ctx.fireChannelRead(msgs.getUnsafe(i));}}}ByteToMessageDecoder是一个入站处理器其拦截所有的read读操作将读取到的ByteBuf二进制数据按照协议设定尝试解码为独立的消息传递给流水线上更后面的入站处理器。每一个ByteToMessageDecoder中都持有一个ByteBuf对象(cumulation成员变量)来承载暂存的待解码数据而每次从事件循环中读取新数据时也会有一个新的Bytebuf而累积就需要将之前已经暂存的数据和新读取的数据进行合并方便后续的decode。对于数据合并Netty中默认提供了两种累积策略。一个是比较好理解的基于内存复制的累积策略即MERGE_CUMULATOR。其将当前新读取数据的ByteBuf中的数据复制移动到cumulation容器中。而另一个是较为复杂的基于内存映射的累积策略即COMPOSITE_CUMULATOR。其将新读取数据的ByteBuf作为CompositeByteBuf组合容器的一部分进行合并而不进行实际的内存复制操作。MERGE_CUMULATOR由于涉及到内存复制操作所以在累积时性能较差但由于合并后数据是连续存放的因此在实际解码访问时decode性能较好。而COMPOSITE_CUMULATOR则由于在累积时不进行数据复制操作仅仅做了一个容器的索引映射所以累积时性能较好。但在实际解码时由于内存数据不连续需要进行复杂的映射偏移计算所以decode性能较差。个人理解是绝大多数场景下消息体不大时MERGE_CUMULATOR累积性能总体更好只有在消息体特别大时COMPOSITE_CUMULATOR才会有性能优势。所以Netty中默认使用的是MERGE_CUMULATOR。在将新读取到的消息完毕后会调用callDecode方法进行解码。默认的callDecode方法中会调用子类自定义的decode方法进行解码成功解码完成的消息对象会被放入out列表中并通过fireChannelRead触发读事件传递给后续的入站处理器。Netty LengthFieldBasedFrameDecoder原理除了基础的ByteToMessageDecoder完成了基础的暂存累积能力之外Netty还提供了很多常用的子类通用解码器。在第一节中提到的三种协议设计方式Netty中都提供了与之对应的、开箱即用的通用解码器分别是FixedLengthFrameDecoder(基于固定长度的协议)、DelimiterBasedFrameDecoder(以特定符号作为分隔符的协议)、LineBasedFrameDecoder(以换行符作为分隔符的协议)以及LengthFieldBasedFrameDecoder(基于业务数据长度编码的协议)。限于篇幅我们这里只分析相对复杂的LengthFieldBasedFrameDecoder原理起到一个抛砖引玉的作用。MyNetty MyLengthFieldBasedFrameDecoder实现源码/*** 基本copy自Netty的LengthFieldBasedFrameDecoder但做了一些简化* */public class MyLengthFieldBasedFrameDecoder extends MyByteToMessageDecoder {private final ByteOrder byteOrder;private final int maxFrameLength;private final int lengthFieldOffset;private final int lengthFieldLength;private final int lengthFieldEndOffset;private final int lengthAdjustment;private final int initialBytesToStrip;private final boolean failFast;private boolean discardingTooLongFrame;private long tooLongFrameLength;private long bytesToDiscard;private int frameLengthInt -1;public MyLengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset, int lengthFieldLength) {this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);}public MyLengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip) {this(maxFrameLength,lengthFieldOffset, lengthFieldLength, lengthAdjustment,initialBytesToStrip, true);}public MyLengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip, boolean failFast) {this(ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength,lengthAdjustment, initialBytesToStrip, failFast);}public MyLengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip, boolean failFast) {this.byteOrder ObjectUtil.checkNotNull(byteOrder, byteOrder);ObjectUtil.checkPositive(maxFrameLength, maxFrameLength);ObjectUtil.checkPositiveOrZero(lengthFieldOffset, lengthFieldOffset);ObjectUtil.checkPositiveOrZero(initialBytesToStrip, initialBytesToStrip);if (lengthFieldOffset maxFrameLength - lengthFieldLength) {throw new IllegalArgumentException(maxFrameLength ( maxFrameLength ) must be equal to or greater than lengthFieldOffset ( lengthFieldOffset ) lengthFieldLength ( lengthFieldLength ).);}this.maxFrameLength maxFrameLength;this.lengthFieldOffset lengthFieldOffset;this.lengthFieldLength lengthFieldLength;this.lengthAdjustment lengthAdjustment;this.lengthFieldEndOffset lengthFieldOffset lengthFieldLength;this.initialBytesToStrip initialBytesToStrip;this.failFast failFast;}Overrideprotected final void decode(MyChannelHandlerContext ctx, MyByteBuf in, ListObject out) throws Exception {Object decoded decode(ctx, in);if (decoded ! null) {out.add(decoded);}}private void discardingTooLongFrame(MyByteBuf in) {long bytesToDiscard this.bytesToDiscard;int localBytesToDiscard (int) Math.min(bytesToDiscard, in.readableBytes());in.skipBytes(localBytesToDiscard);bytesToDiscard - localBytesToDiscard;this.bytesToDiscard bytesToDiscard;failIfNecessary(false);}private static void failOnNegativeLengthField(MyByteBuf in, long frameLength, int lengthFieldEndOffset) {in.skipBytes(lengthFieldEndOffset);throw new MyNettyException(negative pre-adjustment length field: frameLength);}private static void failOnFrameLengthLessThanLengthFieldEndOffset(MyByteBuf in,long frameLength,int lengthFieldEndOffset) {in.skipBytes(lengthFieldEndOffset);throw new MyNettyException(Adjusted frame length ( frameLength ) is less than lengthFieldEndOffset: lengthFieldEndOffset);}private void exceededFrameLength(MyByteBuf in, long frameLength) {long discard frameLength - in.readableBytes();tooLongFrameLength frameLength;if (discard 0) {// buffer contains more bytes then the frameLength so we can discard all nowin.skipBytes((int) frameLength);} else {// Enter the discard mode and discard everything received so far.discardingTooLongFrame true;bytesToDiscard discard;in.skipBytes(in.readableBytes());}failIfNecessary(true);}