vlambda博客
学习文章列表

利用CountDownLatch实现基于netty的"BIO"

Netty是一个NIO客户端-服务器框架,在NIO模式下,线程直接返回结果,当缓冲区准备好数据之后再异步线程回调客户端消息处理。不会像BIO一样阻塞线程(I/O)并持续等待被调用方准备好响应数据才返回。

但是在某些使用socket通讯的场景下,使用了NIO的netty,但是调用方必须在当前线程等待服务返回业务处理状态之后才能进行后续的操作,那么就需要用到BIO模式,那么除了使用java原生的BIO,这里介绍一种使用CountDownLatch+Netty来模拟BIO的实现。

  • 环境:netty4.1.27、eclipse,jdk1.8;

  • 流程设计



  • 原理说明

  • 1、在客户端发送数据给服务端时(本实例服务端采用java原生socket简单实现),通过uuid生成本次发送的请求的唯一标识(sn),并将此sn附在发送数据的头部位置(服务端返回时需要带上此sn),同时客户端产生一个CountDownLatch和当前sn绑定。

  • 2、发送成功后,通过CountDownLatch.await阻塞当前线程。

  • 3、服务端接收数据并处理相关完后,封装含客户端sn的返回数据发送给客户端,客户端接收到服务端返回的信息后,根据返回的sn获取到对应的CountDownLatch,并调用countDown使得上一步阻塞的线程能够继续进行后续业务操作。

  • 关键代码片段

  • 发送消息

 /*** 发送消息* @param msg*/public void sendMsg(String ip,int port,String msg) {  try {     //1 连接到服务端,如果长连接可以修改此逻辑    connectToServer(ip, port);    //2生成阻塞计数器    String sn = UUID.randomUUID().toString();    Receiver.addWait(sn);     //3 发送数据    client.channel().writeAndFlush(sn+"@"+msg+"END").sync();//加end防止粘包    LOG.info("发送消息成功,msg:"+msg);     //4 等待接接收到消息后,计数器countdown。    Receiver.block(sn);//可以设置等待时间,以防止线程长时间阻塞。    //5处理服务端返回    handlerServerAnswer(sn);  } catch (Exception e) { LOG.error("处理消息失败.",e); }}


  • 接收消息

/**  * 处理服务端返回消息 */@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {   try {     String dataFromServer = (String) msg;     String[] snData = dataFromServer.split("@");     String sn = snData[0];     String data = snData[1];     response.put(sn, data);//等待业务逻辑消费     if(!waitLatch.containsKey(sn)) {       LOG.error("非法sn:"+sn);       return;     }     waitLatch.get(sn).countDown();//取消阻塞,让发送线程后续逻辑运行。     waitLatch.remove(sn); } catch (Exception e) { LOG.error("接收到非法数据:"+msg); } }


  • 服务端代码

/*** 模拟服务端* @param args* @throws Exception*/public static void main( String[] args ) throws Exception  {     serverSocket = new ServerSocket(PORT);     LOG.info("服务监听启动,端口:"+PORT);     while(true) {       try {      Socket socket = serverSocket.accept(); InputStream in = socket.getInputStream(); int len; StringBuffer sb = new StringBuffer(); byte[] bytes = new byte[1024]; while((len = in.read(bytes))!=-1) { String bytesData = new String(bytes,0,len,"utf-8"); sb.append(bytesData); if(bytesData.endsWith("END")) { //处理业务逻辑并返回消息 handMsgAndResponse(socket,sb.toString()); sb = null; } } } catch (Exception e) { LOG.error("处理客户端消息失败.",e); } Thread.sleep(1000); }  }
  • 源码下载

末尾推荐一本书,介绍I/O和netty相关知识的,还不错。