上一节说了TCP的粘包和拆包,用一个实例的方式做了说明,那么在netty里面是怎么解决粘包和拆包问题呢,这就需要编解码器,我们写一个简单的自动以协议的demo,说明一下编解码器在解决tcp粘包和拆包的解决方式。 先罗列一下服务端的代码:
MyServer负责服务端的启动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class MyServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899).sync(); channelFuture.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyServerInitializer 负责加入编解码器和handler,包括我们自己定义的编解码器(MyPersonDecoder、MyPersonEncoder):
1 2 3 4 5 6 7 8 9 public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline channelPipeline = ch.pipeline(); channelPipeline.addLast(new MyPersonDecoder()); channelPipeline.addLast(new MyPersonEncoder()); channelPipeline.addLast(new MyServerHandler()); } }
MyServerHandler,接受客户端发来的信息并打印调用次数,之后向客户端写入一个uuid:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class MyServerHandler extends SimpleChannelInboundHandler<PersonProtocal> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, PersonProtocal msg) throws Exception { System.out.println("服务端接受到的数据:"); System.out.println("数据长度:"+msg.getLength()); System.out.println("数据内容:"+ new String(msg.getContent(), Charset.forName("utf-8")) ); System.out.println("服务端接收到的消息数量:"+(++count)); String responseMessage = UUID.randomUUID().toString(); int responseLength = responseMessage.getBytes(Charset.forName("utf-8")).length; byte[] responseContent = responseMessage.getBytes(Charset.forName("utf-8")); PersonProtocal personProtocal = new PersonProtocal(); personProtocal.setLength(responseLength); personProtocal.setContent(responseContent); ctx.writeAndFlush(personProtocal); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
解码器,将字节数组解码成我们自定义的协议PersonProtocal类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class MyPersonDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyPersonDecoder decode invoked "); int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); PersonProtocal personProtocal = new PersonProtocal(); personProtocal.setLength(length); personProtocal.setContent(content); out.add(personProtocal); } }
PersonProtocal 自定义的协议的封装,其实很简单,只有消息的长度和消息内容,先读取消息长度,再读取消息内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class PersonProtocal { private int length; private byte[] content; public int getLength() { return length; } public void setLength(int length) { this.length = length; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } }
解码器MyPersonDecoder ,我们继承了ReplayingDecoder无需关注粘包问题,将字节转换为协议实体(PersonProtocal):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class MyPersonDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyPersonDecoder decode invoked "); int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); PersonProtocal personProtocal = new PersonProtocal(); personProtocal.setLength(length); personProtocal.setContent(content); out.add(personProtocal); } }
编码器MyPersonEncoder实现很简单,这是将数据(PersonProtocal)写入到网络:
1 2 3 4 5 6 7 8 9 10 public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocal> { @Override protected void encode(ChannelHandlerContext ctx, PersonProtocal msg, ByteBuf out) throws Exception { System.out.println("MyPersonEncoder encode invoked"); out.writeInt(msg.getLength()); out.writeBytes(msg.getContent()); } }
最后是客户端的程序MyClientHandler,启动的时候向服务端发送十条消息,再者接受服务端回执的uuid:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class MyClientHandler extends SimpleChannelInboundHandler<PersonProtocal> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for(int i=0;i<10;i++){ String messaage = "sent from client"; int length = messaage.getBytes(Charset.forName("utf-8")).length; byte[] content = messaage.getBytes(Charset.forName("utf-8")); PersonProtocal personProtocal = new PersonProtocal(); personProtocal.setLength(length); personProtocal.setContent(content); ctx.writeAndFlush(personProtocal); } } @Override protected void channelRead0(ChannelHandlerContext ctx, PersonProtocal msg) throws Exception { int length = msg.getLength(); byte[] content = msg.getContent(); System.out.println("客户端接收到的消息"); System.out.println("长度:"+length); System.out.println("消息内容:"+new String(content,Charset.forName("utf-8"))); System.out.println("客户端接收到的消息数量:"+(++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端启动程序Myclient:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class Myclient { public static void main(String[] args) throws InterruptedException { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline channelPipeline = ch.pipeline(); channelPipeline.addLast(new MyPersonDecoder()); channelPipeline.addLast(new MyPersonEncoder()); channelPipeline.addLast(new MyClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync(); channelFuture.channel().writeAndFlush("hello"); channelFuture.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } } }
整个流程: 客户端启动之后,向服务端发送了十条“sent from client”客户端经过编码器MyPersonEncoder发送出去, 然后服务端接收到之后先经过解码器MyPersonDecoder,解码成PersonProtocal实体,然后打印消息内容,服务端每接收到一条“sent from client”,紧接着向客户端发送一个uuid(讲过编码器MyPersonEncoder),之后客户端收到uuid,经过解码器MyPersonDecoder转换成PersonProtocal实体。 服务端打印结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:1 MyPersonEncoder encode invoked MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:2 MyPersonEncoder encode invoked MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:3 MyPersonEncoder encode invoked MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:4 MyPersonEncoder encode invoked MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:5 MyPersonEncoder encode invoked MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:6 MyPersonEncoder encode invoked MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:7 MyPersonEncoder encode invoked MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:8 MyPersonEncoder encode invoked MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:9 MyPersonEncoder encode invoked MyPersonDecoder decode invoked 服务端接受到的数据: 数据长度:16 数据内容:sent from client 服务端接收到的消息数量:10 MyPersonEncoder encode invoked
客户端打印结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 MyPersonEncoder encode invoked MyPersonEncoder encode invoked MyPersonEncoder encode invoked MyPersonEncoder encode invoked MyPersonEncoder encode invoked MyPersonEncoder encode invoked MyPersonEncoder encode invoked MyPersonEncoder encode invoked MyPersonEncoder encode invoked MyPersonEncoder encode invoked MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:aeb03767-5b48-4c1b-ae08-2c643fa511f1 客户端接收到的消息数量:1 MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:22a19d24-3a53-4954-af51-6d8a47a31412 客户端接收到的消息数量:2 MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:0bb4cbff-8725-4aa1-b431-81d185639dd0 客户端接收到的消息数量:3 MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:4770c9ec-8868-4253-a6be-632105bc677c 客户端接收到的消息数量:4 MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:07562dc9-bde7-42bf-91b5-a964d320a4ab 客户端接收到的消息数量:5 MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:08955003-5487-47a4-92c0-3fd6d636abbd 客户端接收到的消息数量:6 MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:4e35d4d9-5780-46df-a78a-8bc0bf293e03 客户端接收到的消息数量:7 MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:051d7e80-8e1c-4b0a-979e-9ab390c9d139 客户端接收到的消息数量:8 MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:a220680e-852f-4525-9ec4-06f7912dcabe 客户端接收到的消息数量:9 MyPersonDecoder decode invoked 客户端接收到的消息 长度:36 消息内容:64bf6571-e246-4f4e-89f5-afe609ccc4ed 客户端接收到的消息数量:10