vlambda博客
学习文章列表

.NET 通信传输利器Netty(Net is DotNetty)介绍

    昨天文章留言问有没有用过Netty,确实没有用过,今天转一篇文章跟大家一起学习学习。正文如下:

 (先埋怨一下微软大大)我们做NET开发,十分羡慕JAVA上能有NETTY, SPRING, STRUTS, DUBBO等等优秀框架,而我们NET就只有干瞪眼,哎,无赖之前生态圈没做好,恨铁不成钢啊。不过由于近来Net Core的发布,慢慢也拉回了一小部分属于微软的天下,打住,闲话扯到这儿。

传统通讯的问题:

  我们使用通用的应用程序或者类库来实现互相通讯,比如,我们经常使用一个 HTTP 客户端库来从 web 服务器上获取信息,或者通过 web 服务来执行一个远程的调用。

  然而,有时候一个通用的协议或他的实现并没有很好的满足需求。比如我们无法使用一个通用的 HTTP 服务器来处理大文件、电子邮件以及近实时消息,比如金融信息和多人游戏数据。我们需要一个高度优化的协议来处理一些特殊的场景。例如你可能想实现一个优化了的 Ajax 的聊天应用、媒体流传输或者是大文件传输器,你甚至可以自己设计和实现一个全新的协议来准确地实现你的需求。

  另一个不可避免的情况是当你不得不处理遗留的专有协议来确保与旧系统的互操作性。在这种情况下,重要的是我们如何才能快速实现协议而不牺牲应用的稳定性和性能。

解决:

  Netty 是一个提供 asynchronous event-driven (异步事件驱动)的网络应用框架,是一个用以快速开发高性能、可扩展协议的服务器和客户端。

  换句话说,Netty 是一个 NIO 客户端服务器框架,使用它可以快速简单地开发网络应用程序,比如服务器和客户端的协议。Netty 大大简化了网络程序的开发过程比如 TCP 和 UDP 的 socket 服务的开发。

“快速和简单”并不意味着应用程序会有难维护和性能低的问题,Netty 是一个精心设计的框架,它从许多协议的实现中吸收了很多的经验比如 FTP、SMTP、HTTP、许多二进制和基于文本的传统协议.因此,Netty 已经成功地找到一个方式,在不失灵活性的前提下来实现开发的简易性,高性能,稳定性。

  有一些用户可能已经发现其他的一些网络框架也声称自己有同样的优势,所以你可能会问是 Netty 和它们的不同之处。答案就是 Netty 的哲学设计理念。Netty 从开始就为用户提供了用户体验最好的 API 以及实现设计。正是因为 Netty 的哲学设计理念,才让您得以轻松地阅读本指南并使用 Netty。

(DotNetty的框架和实现是怎么回事,笔者不太清楚,但完全可参考Netty官方的文档来学习和使用DotNetty相关的API接口)

 

DotNetty中几个重要的库(程序集):

DotNetty.Buffers:对内存缓冲区管理的封装。

DotNetty.Codecs:对编解码是封装,包括一些基础基类的实现,我们在项目中自定义的协议,都要继承该项目的特定基类和实现。

DotNetty.Codecs.Mqtt:MQTT(消息队列遥测传输)编解码是封装,包括一些基础基类的实现。

DotNetty.Codecs.Protobuf:Protobuf 编解码是封装,包括一些基础基类的实现。

DotNetty.Codecs.ProtocolBuffers:ProtocolBuffers编解码是封装,包括一些基础基类的实现。

DotNetty.Codecs.Redis:Redis 协议编解码是封装,包括一些基础基类的实现。

DotNetty.Common:公共的类库项目,包装线程池,并行任务和常用帮助类的封装。

DotNetty.Handlers:封装了常用的管道处理器,比如Tls编解码,超时机制,心跳检查,日志等。

DotNetty.Transport:DotNetty核心的实现,Socket基础框架,通信模式:异步非阻塞。

DotNetty.Transport.Libuv:DotNetty自己实现基于Libuv (高性能的,事件驱动的I/O库) 核心的实现。

常用的库有Codecs, Common, Handlers, Buffers, Transport,目前Azure团队正在实现其他Netty中的API(包括非公共Netty的API),让我们拭目以待吧。

 

直接上点对点之间通讯的栗子

  DotNetty的Example文件夹下有许多官方提供的实例,有抛弃服务实例(Discard),有应答服务实例(echo),有Telnet服务实例等等,为了实现直接点对点通讯,笔者采用了Echo的demo,此后的RPC调用也会基于Echo而实现,注释详细,直接上接收端(Server)的代码:

/*

* Netty 是一个半成品,作用是在需要基于自定义协议的基础上完成自己的通信封装

* Netty 大大简化了网络程序的开发过程比如 TCP 和 UDP 的 socket 服务的开发。

* “快速和简单”并不意味着应用程序会有难维护和性能低的问题,

* Netty 是一个精心设计的框架,它从许多协议的实现中吸收了很多的经验比如 FTP、SMTP、HTTP、许多二进制和基于文本的传统协议。

* 因此,Netty 已经成功地找到一个方式,在不失灵活性的前提下来实现开发的简易性,高性能,稳定性。

*/

 

namespace Echo.Server

{

    using System;

    using System.Threading.Tasks;

    using DotNetty.Codecs;

    using DotNetty.Handlers.Logging;

    using DotNetty.Transport.Bootstrapping;

    using DotNetty.Transport.Channels;

    using DotNetty.Transport.Libuv;

    using Examples.Common;

 

    static class Program

    {

        static async Task RunServerAsync()

        {

            ExampleHelper.SetConsoleLogger();

            

            // 申明一个主回路调度组

            var dispatcher = new DispatcherEventLoopGroup();

 

            /*

             Netty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。

             在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。

             第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。

             如何知道多少个线程已经被使用,如何映射到已经创建的 Channel上都需要依赖于 IEventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系。

             */

 

            // 主工作线程组,设置为1个线程

            IEventLoopGroup bossGroup = dispatcher; // (1)

            // 子工作线程组,设置为1个线程

            IEventLoopGroup workerGroup = new WorkerEventLoopGroup(dispatcher);

 

            try

            {

                // 声明一个服务端Bootstrap,每个Netty服务端程序,都由ServerBootstrap控制,通过链式的方式组装需要的参数

                var serverBootstrap = new ServerBootstrap(); // (2)

                // 设置主和工作线程组

                serverBootstrap.Group(bossGroup, workerGroup);

 

                if (ServerSettings.UseLibuv)

                {

                    // 申明服务端通信通道为TcpServerChannel

                    serverBootstrap.Channel<TcpServerChannel>(); // (3)

                }

 

                serverBootstrap

                    // 设置网络IO参数等

                    .Option(ChannelOption.SoBacklog, 100) // (5)

 

                    // 在主线程组上设置一个打印日志的处理器

                    .Handler(new LoggingHandler("SRV-LSTN"))

 

                    // 设置工作线程参数

                    .ChildHandler(

                        /*

                         * ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。

                         * 也许你想通过增加一些处理类比如DiscardServerHandler 来配置一个新的 Channel 或者其对应的ChannelPipeline 来实现你的网络程序。

                         * 当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。

                         */

                        new ActionChannelInitializer<IChannel>( // (4)

                            channel =>

                            {

                                /*

                                 * 工作线程连接器是设置了一个管道,服务端主线程所有接收到的信息都会通过这个管道一层层往下传输,

                                 * 同时所有出栈的消息 也要这个管道的所有处理器进行一步步处理。

                                 */

                                IChannelPipeline pipeline = channel.Pipeline;

 

                                // 添加日志拦截器

                                pipeline.AddLast(new LoggingHandler("SRV-CONN"));

 

                                // 添加出栈消息,通过这个handler在消息顶部加上消息的长度。

                                // LengthFieldPrepender(2):使用2个字节来存储数据的长度。

                                pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));

 

                                /*

                                  入栈消息通过该Handler,解析消息的包长信息,并将正确的消息体发送给下一个处理Handler

                                  1,InitialBytesToStrip = 0,       //读取时需要跳过的字节数

                                  2,LengthAdjustment = -5,         //包实际长度的纠正,如果包长包括包头和包体,则要减去Length之前的部分

                                  3,LengthFieldLength = 4,         //长度字段的字节数 整型为4个字节

                                  4,LengthFieldOffset = 1,         //长度属性的起始(偏移)位

                                  5,MaxFrameLength = int.MaxValue, //最大包长

                                 */

                                pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

 

                                // 业务handler

                                pipeline.AddLast("echo", new EchoServerHandler());

                            }));

 

                // bootstrap绑定到指定端口的行为就是服务端启动服务,同样的Serverbootstrap可以bind到多个端口

                IChannel boundChannel = await serverBootstrap.BindAsync(ServerSettings.Port); // (6)

 

                Console.WriteLine("wait the client input");

                Console.ReadLine();

 

                // 关闭服务

                await boundChannel.CloseAsync();

            }

            finally

            {

                // 释放指定工作组线程

                await Task.WhenAll( // (7)

                    bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),

                    workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))

                );

            }

        }

 

        static void Main() => RunServerAsync().Wait();

    }

  1. IEventLoopGroup 是用来处理I/O操作的多线程事件循环器,DotNetty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 IEventLoopGroup 会被使用。第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。

  2. ServerBootstrap 是一个启动 Transport 服务的辅助启动类。你可以在这个服务中直接使用 Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。

  3. 这里我们指定使用 TcpServerChannel类来举例说明一个新的 Channel 如何接收进来的连接。

  4. ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel,当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。

  5. 你可以设置这里指定的 Channel 实现的配置参数。我们正在写一个TCP/IP 的服务端,因此我们被允许设置 socket 的参数选项比如tcpNoDelay 和 keepAlive。

  6. 使用完成后,优雅的释放掉指定的工作组线程,当然,你可以选择关闭程序,但这并不推荐。

 

Server端的事件处理代码:

上一部分代码中加粗地方的实现

namespace Echo.Server

{

    using System;

    using System.Text;

    using DotNetty.Buffers;

    using DotNetty.Transport.Channels;

 

    /// <summary>

    /// 服务端处理事件函数

    /// </summary>

    public class EchoServerHandler : ChannelHandlerAdapter // ChannelHandlerAdapter 业务继承基类适配器 // (1)

    {

        /// <summary>

        /// 管道开始读

        /// </summary>

        /// <param name="context"></param>

        /// <param name="message"></param>

        public override void ChannelRead(IChannelHandlerContext context, object message) // (2)

        {

            if (message is IByteBuffer buffer)    // (3)

            {

                Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8));

            }

 

            context.WriteAsync(message); // (4)

        }

 

        /// <summary>

        /// 管道读取完成

        /// </summary>

        /// <param name="context"></param>

        public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); // (5)

 

        /// <summary>

        /// 出现异常

        /// </summary>

        /// <param name="context"></param>

        /// <param name="exception"></param>

        public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)

        {

            Console.WriteLine("Exception: " + exception);

            context.CloseAsync();

        }

    }

}

  1. DiscardServerHandler 继承自 ChannelInboundHandlerAdapter,这个类实现了IChannelHandler接口,IChannelHandler提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承 ChannelInboundHandlerAdapter 类而不是你自己去实现接口方法。

  2. 这里我们覆盖了 chanelRead() 事件处理方法。每当从客户端收到新的数据时,这个方法会在收到消息时被调用,这个例子中,收到的消息的类型是 ByteBuf。

  3. 为了响应或显示客户端发来的信息,为此,我们将在控制台中打印出客户端传来的数据。

  4. 然后,我们将客户端传来的消息通过context.WriteAsync写回到客户端。

  5. 当然,步骤4只是将流缓存到上下文中,并没执行真正的写入操作,通过执行Flush将流数据写入管道,并通过context传回给传来的客户端。

 

 Client端代码:

 重点看注释的地方,其他地方跟Server端没有任何区别

namespace Echo.Client

{

    using System;

    using System.Net;

    using System.Text;

    using System.Threading.Tasks;

    using DotNetty.Buffers;

    using DotNetty.Codecs;

    using DotNetty.Handlers.Logging;

    using DotNetty.Transport.Bootstrapping;

    using DotNetty.Transport.Channels;

    using DotNetty.Transport.Channels.Sockets;

    using Examples.Common;

 

    static class Program

    {

        static async Task RunClientAsync()

        {

            ExampleHelper.SetConsoleLogger();

            

            var group = new MultithreadEventLoopGroup();

 

            try

            {

                var bootstrap = new Bootstrap();

                bootstrap

                    .Group(group)

                    .Channel<TcpSocketChannel>()

                    .Option(ChannelOption.TcpNodelay, true)

                    .Handler(

                        new ActionChannelInitializer<ISocketChannel>(

                            channel =>

                            {

                                IChannelPipeline pipeline = channel.Pipeline;

                                pipeline.AddLast(new LoggingHandler());

                                pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));

                                pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

 

                                pipeline.AddLast("echo", new EchoClientHandler());

                            }));

 

                IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port));

                

                // 建立死循环,类同于While(true)

                for (;;) // (4)

                {

                    Console.WriteLine("input you data:");

                    // 根据设置建立缓存区大小

                    IByteBuffer initialMessage = Unpooled.Buffer(ClientSettings.Size); // (1)

                    string r = Console.ReadLine();

                    // 将数据流写入缓冲区

                    initialMessage.WriteBytes(Encoding.UTF8.GetBytes(r ?? throw new InvalidOperationException())); // (2)

                    // 将缓冲区数据流写入到管道中

                    await clientChannel.WriteAndFlushAsync(initialMessage); // (3)

                    if(r.Contains("bye"))

                        break;

                }

 

                Console.WriteLine("byebye");

                

 

                await clientChannel.CloseAsync();

            }

            finally

            {

                await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));

            }

        }

 

        static void Main() => RunClientAsync().Wait();

    }

}

  1. 初始化一个缓冲区的大小。

  2. 默认缓冲区接受的数据类型为bytes[],当然这样也更加便于序列化成流。

  3. 将缓冲区的流直接数据写入到Channel管道中。该管道一般为链接通讯的另一端(C端)。

  4. 建立死循环,这样做的目的是为了测试每次都必须从客户端输入的数据,通过服务端回路一次后,再进行下一次的输入操作。

 

Client端的事件处理代码:

namespace Echo.Client

{

    using System;

    using System.Text;

    using DotNetty.Buffers;

    using DotNetty.Transport.Channels;

 

    public class EchoClientHandler : ChannelHandlerAdapter

    {

        readonly IByteBuffer initialMessage;

 

        public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage);

 

        public override void ChannelRead(IChannelHandlerContext context, object message)

        {

            if (message is IByteBuffer byteBuffer)

            {

                Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8));

            }

        }

 

        public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

 

        public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)

        {

            Console.WriteLine("Exception: " + exception);

            context.CloseAsync();

        }

    }

}

非常简单,将数据流显示到控制台。

 

实现结果

  至此,我们使用DotNetty框架搭建简单的应答服务器就这样做好了,很简单,实现效果如下:

  C端主动向S端主动发送数据后,S端收到数据,在控制台打印出数据,并回传给C端,当然,S端还可以做很多很多的事情。

 

 

DotNetty内部调试记录分析

  虽然DotNetty官方没有提供任何技术文档,但官方却提供了详细的调试记录,很多时候,我们学习者其实也可以通过调试记录来分析某一个功能的实现流程。我们可以通过将DotNetty的内部输入输出记录打印到控制台上。

InternalLoggerFactory.DefaultFactory.AddProvider(new ConsoleLoggerProvider((s, level) => true, false));

  可以看到服务端的打印记录一下多出来了许多许多,有大部分是属于DotNetty内部调试时的打印记录,我们只着重看如下的部分。

dbug: SRV-LSTN[0]

      [id: 0x3e8afca1] HANDLER_ADDED

dbug: SRV-LSTN[0]

      [id: 0x3e8afca1] REGISTERED (1)

dbug: SRV-LSTN[0]

      [id: 0x3e8afca1] BIND: 0.0.0.0:8007 (2)

wait the client input

dbug: SRV-LSTN[0]

      [id: 0x3e8afca1, 0.0.0.0:8007] ACTIVE (3)

dbug: SRV-LSTN[0]

      [id: 0x3e8afca1, 0.0.0.0:8007] READ (4)

dbug: SRV-LSTN[0]

      [id: 0x3e8afca1, 0.0.0.0:8007] RECEIVED: [id: 0x7bac2775, 127.0.0.1:64073 :> 127.0.0.1:8007] (5)

dbug: SRV-LSTN[0]

      [id: 0x3e8afca1, 0.0.0.0:8007] RECEIVED_COMPLETE (6)

dbug: SRV-LSTN[0]

      [id: 0x3e8afca1, 0.0.0.0:8007] READ (7)

dbug: SRV-CONN[0]

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] HANDLER_ADDED (8)

dbug: SRV-CONN[0]

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] REGISTERED (9)

dbug: SRV-CONN[0]

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] ACTIVE (10)

dbug: SRV-CONN[0]

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] READ (11)

dbug: DotNetty.Buffers.AbstractByteBuffer[0]    (12)

      -Dio.netty.buffer.bytebuf.checkAccessible: True

dbug: SRV-CONN[0]

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] RECEIVED: 14B (13)

               +-------------------------------------------------+

               |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |

      +--------+-------------------------------------------------+----------------+

      |100000000| 00 0C 68 65 6C 6C 6F 20 77 6F 72 6C 64 21      |..hello world!  |

      +--------+-------------------------------------------------+----------------+

Received from client: hello world!

dbug: SRV-CONN[0]    (14)

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] WRITE: 2B

               +-------------------------------------------------+

               |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |

      +--------+-------------------------------------------------+----------------+

      |100000000| 00 0C                                          |..              |

      +--------+-------------------------------------------------+----------------+

dbug: SRV-CONN[0] (15)

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] WRITE: 12B

               +-------------------------------------------------+

               |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |

      +--------+-------------------------------------------------+----------------+

      |100000000| 68 65 6C 6C 6F 20 77 6F 72 6C 64 21            |hello world!    |

      +--------+-------------------------------------------------+----------------+

dbug: SRV-CONN[0] (16)

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] RECEIVED_COMPLETE

dbug: SRV-CONN[0] (17)

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] FLUSH

dbug: SRV-CONN[0] (18)

      [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] READ

咋一看,有18个操作,好像有点太多了,其实不然,还有很多很多的内部调试细节并没打印到控制台上。

  1. 通过手动建立的工作线程组,并将这组线程注册到管道中,这个管道可以是基于SOCKER,可以基于IChannel(1);

  2. 激活自定义管道(3);

  3. 开始读取(其实也是开始监听)(4);

  4. 收到来自id为0x7bac2775的客户端连接请求,建立连接,并继续开始监听(5)(6)(7);

  5. 从第8步开始,日志已经变成id为0x7bac2775的记录了,当然一样包含注册管道,激活管道,开始监听等等与S端一模一样的操作(8)(9)(10)(11)

  6. 当笔者输入一条"hello world!"数据后,DotNetty.Buffers.AbstractByteBuffer会进行数据类型检查,以便确认能将数据放入到管道中。(12)

  7. 将数据发送到S端,数据大小为14B,hello world前有两个点,代表这是数据头,紧接着再发送两个点,但没有任何数据,代表数据已经结束。DotNetty将数据的十六进制存储位用易懂的方式表现了出来,很人性化。(13)(14)

  8. S端收到数据没有任何加工和处理,马上将数据回传到C端。(15)(16)

  9. 最后,当这个过程完成后,需要将缓存区的数据强制写入到管道中,所以会执行一次Flush操作,整个传输完成。接下来,不管是C端还是S端,继续将自己的状态改成READ,用于监听管道中的各种情况,比如连接状态,数据传输等等(17)。

 

总结

  对于刚开始接触Socket编程的朋友而言,这是个噩梦,因为Socket编程的复杂性不会比多线程容易,甚至会更复杂。协议,压缩,传输,多线程,监听,流控制等等一系列问题摆在面前,因此而诞生了Netty这样优秀的开源框架,但是Netty是个半成品,因为你需要基于他来实现自己想要的协议,传输等等自定义操作,而底层的内容,你完全不用关心。不像某些框架,比如Newtonsoft.Json这样的功能性框架,不用配置,不用自定义,直接拿来用就可以了。

  虽然DotNetty帮我们实现了底层大量的操作,但如果不熟悉或者一点也不懂网络通信,同样对上面的代码是一头雾水,为何?行情需要,我们程序员天天都在赶业务,哪有时间去了解和学习更多的细节...通过将调试记录打印出来,并逐行挨个的对照代码进行分析,就会慢慢开始理解最简单的通信流程了。

  本篇只是实现了基于DotNetty最简单的通讯过程,也只是将数据做了一下回路,并没做到任何与RPC有关的调用,下一篇我们开始讲这个例子深入,介绍基于DotNetty的RPC调用。

出处:https://www.cnblogs.com/lhxsoft/p/10783073.html