vlambda博客
学习文章列表

Netty中的序列化方式之protobuf

为什么要做序列化

  

    在网络通讯中,数据以二进制形式进行传输。假设在我们网络通讯业务中,客户端需要将Person对象传到服务端进行处理,如何传输呢?总不能直接发送Person对象实例吧?因此需要将Person对象实例经过序列化后进行传输。


序列化的方式有哪些

 

在Java应用程序中,可以通过实现Serializable接口声明类可以进行序列化。常用的序列化的方式有xml、JSON、Protobuf、avro等。这几种序列化的方式优缺点如下:

  • XML格式中定义多个对象存在大量重复的Scheme定义,对高效的传输而言,无疑增加了网络传输的开销。

  • JSON是一种比较通用的格式。能支持在不同语言中消息格式定义。传输数量比XML少很多

  • Protobuf是Google开源的一种开源、跨平台的用于做数据结果序列化的工具。效率比上面几种要优

  • avro 在大数据中应用比较多


Netty 中使用protobuf序列化消息

step1 :定义pb格式

syntax = "proto3";option java_package = "com.serilizer.pb";option java_outer_classname = "PersonProto";
message PersonProtoBean {string name = 1;int32 age = 2;string address = 3 ;}

step2 : 将pb生成Java文件代码

通过如下命令,用pb定义的格式,将其生成Java类(事先需要安装protobuf)。

protoc 需要3个参数:

  • 第1个参数:指定pb 文件所在的文件夹

  • 第2个参数:指定需要将Java类文件生成到哪个路径下

  • 第3个参数:指pb文件名


注意,这个地方的路径需要根据pb文件的路径以及需要将Java文件生成在哪个地方来定义。本示例仅仅是示例


示例如下:

protoc -I=. --java_out=../../ person.pb

然后就可以使用pb在Netty客户端与服务端之间传输了。示例如下。

服务端使用:

@Slf4jpublic class Server { public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { /*去掉消息长度部分,并且根据消息长度读取实际数据*/ channel.pipeline().addLast(new ProtobufVarint32FrameDecoder()); channel.pipeline().addLast(new ProtobufDecoder(PersonProto.PersonProtoBean.getDefaultInstance())); channel.pipeline().addLast(new ServerHandler()) ; } }); final ChannelFuture channelFuture = bootstrap.bind(9999); log.info("server started"); channelFuture.channel().closeFuture().sync(); }}

ServerHandler的实现:

package com.serilizer;
import com.serilizer.pb.PersonProto;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import lombok.extern.slf4j.Slf4j;
/** * @author tyb * @Description * @create 2022-05-05 20:35 */@Slf4jpublic class ServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { PersonProto.PersonProtoBean personProtoBean = (PersonProto.PersonProtoBean)msg; log.info("server received :[{}]", personProtoBean.getName()); }}

客户端:

package com.serilizer;
import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/** * @author tyb * @Description * @create 2022-05-05 20:43 */public class Client { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { /* 解决半包粘包问题*/ channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); /* 序列化*/ channel.pipeline().addLast(new ProtobufEncoder()); /* 实际业务处理*/ channel.pipeline().addLast(new ClientHandler()); } }); final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999); channelFuture.channel().closeFuture().sync();
}}

ClientHandler实现:

package com.serilizer;
import com.serilizer.pb.PersonProto;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;import io.netty.util.concurrent.EventExecutorGroup;import lombok.extern.slf4j.Slf4j;
/** * @author tyb * @Description * @create 2022-05-05 20:44 */@Slf4jpublic class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final PersonProto.PersonProtoBean.Builder builder = PersonProto.PersonProtoBean.newBuilder(); builder.setName("tao"); builder.setAge(36); builder.setAddress("wuhan"); ctx.writeAndFlush(builder.build()); log.info("sent over~"); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("exception occur",cause.getCause()); ctx.close(); }
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { log.info("client received :【{}】",byteBuf.toString(CharsetUtil.UTF_8)); }}

客户端输出如下:

21:26:56.068 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 1621:26:56.068 [nioEventLoopGroup-2-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 821:26:56.075 [nioEventLoopGroup-2-1] INFO com.serilizer.ClientHandler - sent over~

服务端输出如下:

21:26:49.836 [main] INFO com.serilizer.Server - server started21:26:56.077 [nioEventLoopGroup-2-2] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 409621:26:56.077 [nioEventLoopGroup-2-2] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 221:26:56.077 [nioEventLoopGroup-2-2] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 1621:26:56.077 [nioEventLoopGroup-2-2] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 821:26:56.081 [nioEventLoopGroup-2-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true21:26:56.083 [nioEventLoopGroup-2-2] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3520bcf321:26:56.099 [nioEventLoopGroup-2-2] INFO com.serilizer.ServerHandler - server received :[tao]

这样就实现了Netty中pb的序列化。

最后,在服务端的使用中,通过Netty内置的ProtobufDecoder指定解码的Class。这个地方只能定义一个class。如果要用通用的格式,那就需要用到自定义消息格式了