vlambda博客
学习文章列表

Netty源码剖析四:解码和编码



一)先看2个问题,文尾有解答

1.netty解码器的抽象解码过程是怎样的?

2.netty如何把对象变成字节流,最终写入到socket里的?


二)netty中用于解码的顶层的抽象框架类是ByteToMessageDecoder类。类里面提供了用于解码的抽象框架,其中的decode()方法由子类去实现。


ByteToMessageDecoder类先通过Cumulator类将客户端传过来的数据进行汇集,然后在channelRead方法里调用子类的decode()方法将数据解码成用户定义的格式,最后将解析出来的对象,放入到out的list中,然后继续向下传递。


        public interface Cumulator {
        ByteBuf cumulate(ByteBufAllocator var1, ByteBuf var2, ByteBuf var3);
    }


    public static final ByteToMessageDecoder.Cumulator MERGE_CUMULATOR = new ByteToMessageDecoder.Cumulator() {
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            ByteBuf buffer;
            if (cumulation.writerIndex() <= cumulation.maxCapacity() - in.readableBytes() && cumulation.refCnt() <= 1) {
                buffer = cumulation;
            } else {
                buffer = ByteToMessageDecoder.expandCumulation(alloc, cumulation, in.readableBytes());
            }

            buffer.writeBytes(in);
            in.release();
            return buffer;
        }
    };


    protected abstract void decode(ChannelHandlerContext var1, ByteBuf var2, List<Object> var3) throws Exception;


public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            boolean var10 = false;

            try {
                var10 = true;
                ByteBuf data = (ByteBuf)msg;
                this.first = this.cumulation == null;
                if (this.first) {
                    this.cumulation = data;
                } else {
                    this.cumulation = this.cumulator.cumulate(ctx.alloc(), this.cumulation, data);
                }
                //将汇总后的cumulator数据传递给方法,进行解码
                this.callDecode(ctx, this.cumulation, out);//开始解码
                var10 = false;
            } catch (DecoderException var11) {
                throw var11;
            } catch (Throwable var12) {
                throw new DecoderException(var12);
            } finally {
                if (var10) {


protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while(true) {
                if (in.isReadable()) {
                    int outSize = out.size();
                    if (outSize > 0) {
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
                        if (ctx.isRemoved()) {
                            return;
                        }

                        outSize = 0;
                    }

                    int oldInputLength = in.readableBytes();
                    this.decode(ctx, in, out);//抽象的由子类实现的decode方法
                    if (!ctx.isRemoved()) {
                        if (outSize == out.size()) {
                            if (oldInputLength != in.readableBytes()) {
                                continue;
                            }
                        } else {
                            if (oldInputLength == in.readableBytes()) {
                                throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() did not read anything but decoded a message.");
                            }

                            if (!this.isSingleDecode()) {
                                continue;
                            }
                        }
                    }
                }

                return;
            }
        } catch (DecoderException var6) {
            throw var6;
        } catch (Throwable var7) {
            throw new DecoderException(var7);
        }
    }


三)Netty中常见的几种解码类:

1.固定长度的解码器-FixedLengthFrameDecoder

2.行解码器-LineBasedFrameDecoder

3.基于分隔符的解码器-DelimiterBasedFrameDecoder

4基于长度域的解码器-LengthFieldBaseFrameDecoder


四)Netty中的编码通过MessageToByteEncoder类进行顶层的编码抽象。

通过继承MessageToByteEncoder类,然后复写他的write()方法,把一个对象转换成字节,然后调用writeAndFlush方法,将数据写入到底层的java的scoket中进行传播。


writeAndFlush()方法实际上是进行了两步操作,这两步操作是分开的。第一步是write方法,write是从尾部TailContext通过pipline一直向Head传播,复写write方法会将对象转换成字节,不写write方法会调用父类默认的方法会默认向前一直传播,一直到HeadContext那,HeadContext的处理:最终调用Unsafede write方法。分配一个对外直接内存,然后把数据放到里面,作为数据的底层传输介质;然后把数据封装成Entity, 写入到缓冲区里(这个缓冲区的结构是一个链表),这里有个最高水位64K, 超过这个,write不会被写入,还有一个最低水位32K,低于这个又变成可写


flush方法也会一直传播到HeadContext那,做的处理是:最终调用Unsafe的flush方法。调用jdk底层的api进行自旋写入。Jdk底层的标志:javachannel(),    ByteBuffer.write


//HeadContext里的write方法

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            this.unsafe.write(msg, promise);
        }


//Unsafe里的write方法

        public final void write(Object msg, ChannelPromise promise) {
            this.assertEventLoop();
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                this.safeSetFailure(promise, AbstractChannel.WRITE_CLOSED_CHANNEL_EXCEPTION);
                ReferenceCountUtil.release(msg);
            } else {
                int size;
                try {
                    msg = AbstractChannel.this.filterOutboundMessage(msg);
                    size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable var6) {
                    this.safeSetFailure(promise, var6);
                    ReferenceCountUtil.release(msg);
                    return;
                }

                outboundBuffer.addMessage(msg, size, promise);
            }
        }


//将ByteBuf对象转换成DirectBuffer

    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf)msg;
            return buf.isDirect() ? msg : this.newDirectBuffer(buf);
        } else if (msg instanceof FileRegion) {
            return msg;
        } else {
            throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
        }
    }


//将DirectBuffer封装成一个Entity放入到缓冲区中(插入写队列)

    public void addMessage(Object msg, int size, ChannelPromise promise) {
        ChannelOutboundBuffer.Entry entry = ChannelOutboundBuffer.Entry.newInstance(msg, size, total(msg), promise);
        if (this.tailEntry == null) {
            this.flushedEntry = null;
            this.tailEntry = entry;
        } else {
            ChannelOutboundBuffer.Entry tail = this.tailEntry;
            tail.next = entry;//是一个链表的结构
            this.tailEntry = entry;
        }

        if (this.unflushedEntry == null) {
            this.unflushedEntry = entry;
        }

        this.incrementPendingOutboundBytes((long)size, false);//判断最高水位是否还可写
    }


//判断最高水位,如果超过最高水位,则标志成不可写状态

        private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size != 0L) {
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
            if (newWriteBufferSize > (long)this.channel.config().getWriteBufferHighWaterMark()) {
                this.setUnwritable(invokeLater);
            }
        }
    }


//默认的最高水位是64K

public final class WriteBufferWaterMark {
    private static final int DEFAULT_LOW_WATER_MARK = 32768;
    private static final int DEFAULT_HIGH_WATER_MARK = 65536;
    public static final WriteBufferWaterMark DEFAULT = new WriteBufferWaterMark(3276865536false);
    private final int low;
    private final int high;


五)文首的2个问题

1.netty解码器的抽象解码过程是怎样的?

通过 ByteToMessageDecoder顶层的抽象框架类来实现的。具体步骤。1.累加器Cumulate将字节累加起来。2调用子类的decode()具体实现3将解析到的byteBuf对象放到out的那个List里,并会向下传播。


2.netty如何把对象变成字节流,最终写入到socket里的?

编码通过继承MessageToByteEncoder类,复写他的write方法来吧一个对象转换成字节。

然后调用writeAndFlush方法,这个方法先write, 后flush


Netty源码剖析四:解码和编码

免费领取程序员赚钱技术教程

- 长按识别关注 -

技术,职场,产品,思维

行业观察