懒人入门网络编程(四):实现一个socket长连接没那么简单!
本篇你会学到:
1、socket的基本用法,以及socket编程中各个socket的区别
2、如何实现一个socket长连接,偏实战。
3、讨论我们在进行socket的过程中需要考虑哪些问题。
socket的基本用法
socket是TCP层的封装,通过socket我们就可以进行TCP通信。
在Java的SDK中,socket共有两个接口,一个是用于监听链接请求的ServerSocket和用于端与端通信的sockect,至于这两个socket的区别在后面解释。
使用socket的步骤如下:
创建 ServerSocket 并监听客户连接;
使用 Socket 连接服务端;
通过 Socket.getInputStream()/getOutputStream() 获取输入输出流进行通信。
下面我们按照这个步骤实现一个简单的Echo服务器来学习回顾一下socket编程的步骤。也由此引出我们如何实现一个socket长连接。所谓Echo服务器,顾名思义就是一个回射服务器,客户端给服务器发送消息,服务器收到并做出响应,将客户端发来的数据原封不动的返回给客户端。我们采用的是一个BIO(block input output)的模式。
public static void main(String[] args)throws IOException {
ServerSocket server = null;
try {
//1. 创建一个 ServerSocket 并监听端口 port
server = new ServerSocket(PORT);
System.out.println("the time server is start in port :"+PORT);
Socket socket = null;
while (true){
// 2. 开始接受客户连接
socket = server.accept();
// 3. 连接建立后数据处理
new Thread(new TimeServerHandler(socket)).start();
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(server != null){
System.out.println("the time server close");
server.close();
}
}
}
public class BioClient {
private static final int PORT = 8080;
private static final String HOST = "127.0.0.1";
public static void main(String[] args)throws IOException {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
// 创建socket并请求连接
socket = new Socket(HOST, PORT);
//收发数据,跟服务器进行通信
...
}catch (Exception e){
e.printStackTrace();
}finally {
if (in != null) {
try {
in.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (out != null) {
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
输入/输出流方法:
socket.getInputStream()
socket.getOutputStream()
首先我们来看服务器的实现:
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader( new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(),true);
String body = null;
while (( body = in.readLine())!= null && body.length()!=0){
System.out.println("the time server receive msg :"+body);
out.println(body);
}
} catch (Exception e){
e.printStackTrace();
} finally {
//关闭输入输出流
//关闭socket
}
}
可以看到,服务端的实现其实很简单,我们不停地读取输入数据,然后写回给客户端。
来看下客户端的写法:
//收发数据,跟服务器进行通信
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("i am client");
String resp = in.readLine();
System.out.println(resp);
客户端接收服务器的消息,自己发送一个i am client给服务器,并把服务器回射的消息打印出来,其中PrintWriter这个方法会自动刷新缓冲区的数据。
我这个实现非常简单,但是有个问题是客户端在收数据的时候无法写入,写入的时候无法收数据。单独开一个线程专门处理服务端数据的接收即可,你如果想进一步完善这个功能,可以自己尝试一下,或者后面我要说的socket长连接实现了这一点你可以进行参考。值得关注的是我们在进行网络编程的过程中要特别注意异常的处理,当发生异常的时候,我们要关闭socket。
ServerSocket和socket你分清楚了吗
不妨思考一个问题:我们运行上面的echo的服务器和客户端建立连接以后,一共产生了多少个socket呢?
答案是3个socket:服务器2个,客户端一个。
直接引出这样一个问题:
Socket 和 ServerSocket 的区别是什么?
从之前我的这篇文章《》中不知道你有没有找到这个问题的答案,你可以记住这句话:在 Java 的 SDK 中,socket 的共有两个接口:用于监听客户连接的 ServerSocket 和用于通信的 Socket。你要注意的是ServerSocket我只说是用于监听客户连接,而真正用于通信的是socket。
注:以下描述使用的是 UNIX/Linux 系统的 API。
首先,我们创建 ServerSocket 后,内核会创建一个 socket。这个 socket 既可以拿来监听客户连接,也可以连接远端的服务。由于 ServerSocket 是用来监听客户连接的,紧接着它就会对内核创建的这个 socket 调用 listen 函数。这样一来,这个 socket 就成了所谓的 listening socket,它开始监听客户的连接。
接下来,我们的客户端创建一个 Socket,同样的,内核也创建一个 socket 实例。内核创建的这个 socket 跟 ServerSocket 一开始创建的那个没有什么区别。不同的是,接下来 Socket 会对它执行 connect,发起对服务端的连接。前面我们说过,socket API 其实是 TCP 层的封装,所以 connect 后,内核会发送一个 SYN 给服务端。
现在,我们切换角色到服务端。服务端的主机在收到这个 SYN 后,会创建一个新的 socket,这个新创建的 socket 跟客户端继续执行三次握手过程。
三次握手完成后,我们执行的 serverSocket.accept() 会返回一个 Socket 实例,这个 socket 就是上一步内核自动帮我们创建的。
所以说:在一个客户端连接的情况下,其实有 3 个 socket。关于内核自动创建的这个 socket,还有一个很有意思的地方。它的端口号跟 ServerSocket 是一毛一样的。咦!!不是说,一个端口只能绑定一个 socket 吗?其实这个说法并不够准确。
前面我说的TCP 通过端口号来区分数据属于哪个进程的说法,在 socket 的实现里需要改一改。Socket 并不仅仅使用端口号来区别不同的 socket 实例,而是使用 <peer addr:peer port, local addr:local port> 这个四元组。
在上面的例子中,我们的 ServerSocket 长这样:<*:*, *:9877>。意思是,可以接受任何的客户端,和本地任何 IP。accept 返回的 Socket 则是这样:<127.0.0.1:xxxx, 127.0.0.1:9877>。其中,xxxx 是客户端的端口号。如果数据是发送给一个已连接的 socket,内核会找到一个完全匹配的实例,所以数据准确发送给了对端,如果是客户端要发起连接,这时候只有 <*:*, *:9877> 会匹配成功,所以 SYN 也准确发送给了监听套接字。
实现一个socket长连接的思路
熟悉socket编程的读者可能会熟悉下面的这个方法:
public void setKeepAlive(boolean on) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
getImpl().setOption(SocketOptions.SO_KEEPALIVE, Boolean.valueOf(on));
}
setKeepAlive 字面意思是“保持活着”,这个方法也确实是提供用来保持一个TCP连接的,但是为什么我们在设计一个通信系统的时候往往不会直接使用这个方法呢,而是自己实现一个“保活”机制,不推荐使用SO_KEEPALIVE为什么呢?
我们通过了解TCPKeepAlive的原理,来找到这个问题的答案。
TCP内嵌有心跳包,以服务端为例,当server检测到超过一定时间(/proc/sys/net/ipv4/tcp_keepalive_time 7200 即2小时)没有数据传输,那么会向client端发送一个keepalive packet,此时client端有三种反应:
client端连接正常,返回一个ACK.server端收到ACK后重置计时器,在2小时后在发送探测.如果2小时内连接上有数据传输,那么在该时间的基础上向后推延2小时发送探测包;
客户端异常关闭,或网络断开。client无响应,server收不到ACK,在一定时间(/proc/sys/net/ipv4/tcp_keepalive_intvl 75 即75秒)后重发keepalive packet, 并且重发一定次数(/proc/sys/net/ipv4/tcp_keepalive_probes 9 即9次);
客户端曾经崩溃,但已经重启.server收到的探测响应是一个复位,server端终止连接。
注意:两个小时才会发一次。也就是说,在没有实际数据通信的时候,我把网线拔了,你的应用程序要经过两个小时才会知道。
假定现在有一对已经连接的 socket,在以下情况发生时候,socket 将不再可用:
1)某一端关闭是 socket(这不是废话吗):主动关闭的一方会发送 FIN,通知对方要关闭 TCP 连接。在这种情况下,另一端如果去读 socket,将会读到 EoF(End of File)。于是我们知道对方关闭了 socket;
2)应用程序奔溃:此时 socket 会由内核关闭,结果跟情况1一样;
3)系统奔溃:这时候系统是来不及发送 FIN 的,因为它已经跪了。此时对方无法得知这一情况。对方在尝试读取数据时,最后会返回 read time out。如果写数据,则是 host unreachable 之类的错误。
4)电缆被挖断、网线被拔:跟情况3差不多,如果没有对 socket 进行读写,两边都不知道发生了事故。跟情况3不同的是,如果我们把网线接回去,socket 依旧可以正常使用。
在上面的几种情形中,有一个共同点就是,只要去读、写 socket,只要 socket 连接不正常,我们就能够知道。基于这一点,要实现一个 socket 长连接,我们需要做的就是不断地给对方写数据,然后读取对方的数据,也就是所谓的心跳。只要心还在跳,socket 就是活的。写数据的间隔,需要根据实际的应用需求来决定。
心跳包不是实际的业务数据,根据通信协议的不同,需要做不同的处理。
比方说,我们使用 JSON 进行通信,那么,可以为协议包加一个 type 字段,表面这个 JSON 是心跳还是业务数据:
{
"msgType": 0, // 0 表示心跳 // 1 表示真实的通信数据
// ...
}
使用二进制协议的情况类似。要求就是,我们能够区别一个数据包是心跳还是真实数据。这样,我们便实现了一个 socket 长连接。
来看一个socket长连接的实现案例
这个案例可以说是通俗易懂的,提供了编写一个socket长连接的思路,值得借鉴。目录结构比较简单,作者的目的应该也是提供一个解决思路,简单的demo更容易帮助读者理解。
四个文件,第一眼就关注到了LongLiveSocket类,“长的活着的socket”没错了,这个类就是长连接保活的类。也是实现长连接的一个核心类。我们就来看看这个类的实现:
private final Runnable mHeartBeatTask = new Runnable() {
private byte[] mHeartBeat = new byte[0];
public void run() {
// no need to be atomic
// noinspection NonAtomicOperationOnVolatileField
++mSeqNumHeartBeatSent;
// 我们使用长度为 0 的数据作为 heart beat
write(mHeartBeat, new WritingCallback() {
public void onSuccess() {
// 每隔 HEART_BEAT_INTERVAL_MILLIS 发送一次
mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS);
// At this point, the heart-beat might be received and handled
if (mSeqNumHeartBeatRecv < mSeqNumHeartBeatSent) {
mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS);
// double check
if (mSeqNumHeartBeatRecv == mSeqNumHeartBeatSent) {
mUIHandler.removeCallbacks(mHeartBeatTimeoutTask);
}
}
}
public void onFail(byte[] data, int offset, int len) {
// nop
// write() 方法会处理失败
}
});
}
};
private final Runnable mHeartBeatTimeoutTask = () -> {
Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout");
closeSocket();
};
可以看出来这个发送心跳频率包的核心方法的实现是:
1、有两个心跳包计数器,mSeqNumHeartBeatSent 发送心跳包计数器,每次发送一个心跳包,mSeqNumHeartBeatSent +1 。mSeqNumHeartBeatRecv 接收的心跳包计数器,每次接收客户端发来的心跳包mSeqNumHeartBeatRecv +1。
2、这个心跳包的大小是0个字节, rivate byte[] mHeartBeat = new byte[0];
3、如果mSeqNumHeartBeatRecv < mSeqNumHeartBeatSent 则认为对端断开连接,关闭socket。
客户端client改造后的代码如下,可供参考:
public class EchoClient {
private static final String TAG = "EchoClient";
private final LongLiveSocket mLongLiveSocket;
public EchoClient(String host, int port) {
mLongLiveSocket = new LongLiveSocket(
host, port,
(data, offset, len) -> Log.i(TAG, "EchoClient: received: " + new String(data, offset, len)),
// 返回 true,所以只要出错,就会一直重连
() -> true);
}
public void send(String msg) {
mLongLiveSocket.write(msg.getBytes(), new LongLiveSocket.WritingCallback() {
public void onSuccess() {
Log.d(TAG, "onSuccess: ");
}
public void onFail(byte[] data, int offset, int len) {
Log.w(TAG, "onFail: fail to write: " + new String(data, offset, len));
// 连接成功后,还会发送这个消息
mLongLiveSocket.write(data, offset, len, this);
}
});
}
}
就这样,一个带 socket 长连接的客户端就完成了。剩余代码跟我们这里的主题没有太大关系,感兴趣的读者可以看看文末附件里的源码或者自己完成这个例子。
下面是一些输出示例:
03:54:55.583 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:00.588 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:05.594 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:09.638 12691-12710/com.example.echo D/EchoClient: onSuccess:
03:55:09.639 12691-12713/com.example.echo I/EchoClient: EchoClient: received: hello
03:55:10.595 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:14.652 12691-12710/com.example.echo D/EchoClient: onSuccess:
03:55:14.654 12691-12713/com.example.echo I/EchoClient: EchoClient: received: echo
03:55:15.596 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:20.597 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:25.602 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
最后需要说明的是,如果想节省资源,在有客户发送数据的时候可以省略 heart beat。
我们对读出错时候的处理,可能也存在一些争议。读出错后,我们只是关闭了 socket。socket 需要等到下一次写动作发生时,才会重新连接。实际应用中,如果这是一个问题,在读出错后可以直接开始重连。这种情况下,还需要一些额外的同步,避免重复创建 socket。heart beat timeout 的情况类似。
socket编程中还有哪些问题需要注意呢?
画了个脑图更直观一点:
cizikeshafd
小码逆袭
原文链接(https://jekton.github.io/2018/06/23/socket-intro/ 有改动)。